Implement Runtime::block_on using oneshot (#391)

This commit is contained in:
Jon Gjengset 2018-06-04 23:09:17 -04:00 committed by Carl Lerche
parent 9caec1c15d
commit 3d7263d3a0
2 changed files with 128 additions and 0 deletions

View File

@ -127,6 +127,7 @@ use std::io;
use tokio_threadpool as threadpool;
use futures;
use futures::future::Future;
#[cfg(feature = "unstable-futures")]
use futures2;
@ -365,6 +366,29 @@ impl Runtime {
self
}
/// Run a future to completion on the Tokio runtime.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// This method should not be called from an asynchrounous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
rx.wait().unwrap()
}
/// Signals the runtime to shutdown once it becomes idle.
///
/// Returns a future that completes once the shutdown operation has

View File

@ -1,9 +1,15 @@
extern crate tokio;
extern crate env_logger;
extern crate futures;
use futures::sync::oneshot;
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::io;
use tokio::net::{TcpStream, TcpListener};
use tokio::prelude::future::lazy;
use tokio::prelude::*;
use tokio::runtime::Runtime;
macro_rules! t {
($e:expr) => (match $e {
@ -69,3 +75,101 @@ fn runtime_multi_threaded() {
runtime.spawn(create_client_server_future());
runtime.shutdown_on_idle().wait().unwrap();
}
#[test]
fn block_on_timer() {
use std::time::{Duration, Instant};
use tokio::timer::{Delay, Error};
fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send>
where
T: Send + 'static,
{
Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x))
}
let mut runtime = Runtime::new().unwrap();
assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42);
runtime.shutdown_on_idle().wait().unwrap();
}
#[test]
fn spawn_from_block_on() {
let cnt = Arc::new(Mutex::new(0));
let c = cnt.clone();
let mut runtime = Runtime::new().unwrap();
let msg = runtime
.block_on(lazy(move || {
{
let mut x = c.lock().unwrap();
*x = 1 + *x;
}
// Spawn!
tokio::spawn(lazy(move || {
{
let mut x = c.lock().unwrap();
*x = 1 + *x;
}
Ok::<(), ()>(())
}));
Ok::<_, ()>("hello")
}))
.unwrap();
runtime.shutdown_on_idle().wait().unwrap();
assert_eq!(2, *cnt.lock().unwrap());
assert_eq!(msg, "hello");
}
#[test]
fn block_waits() {
let (tx, rx) = oneshot::channel();
thread::spawn(|| {
use std::time::Duration;
thread::sleep(Duration::from_millis(1000));
tx.send(()).unwrap();
});
let cnt = Arc::new(Mutex::new(0));
let c = cnt.clone();
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(rx.then(move |_| {
{
let mut x = c.lock().unwrap();
*x = 1 + *x;
}
Ok::<_, ()>(())
}))
.unwrap();
assert_eq!(1, *cnt.lock().unwrap());
runtime.shutdown_on_idle().wait().unwrap();
}
#[test]
fn spawn_many() {
const ITER: usize = 200;
let cnt = Arc::new(Mutex::new(0));
let mut runtime = Runtime::new().unwrap();
for _ in 0..ITER {
let c = cnt.clone();
runtime.spawn(lazy(move || {
{
let mut x = c.lock().unwrap();
*x = 1 + *x;
}
Ok::<(), ()>(())
}));
}
runtime.shutdown_on_idle().wait().unwrap();
assert_eq!(ITER, *cnt.lock().unwrap());
}