From 3d7263d3a0b73ab35d63b45a6524bde7251851e8 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Mon, 4 Jun 2018 23:09:17 -0400 Subject: [PATCH] Implement Runtime::block_on using oneshot (#391) --- src/runtime/mod.rs | 24 +++++++++++ tests/runtime.rs | 104 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index ff61d8700..ed416fd19 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -127,6 +127,7 @@ use std::io; use tokio_threadpool as threadpool; +use futures; use futures::future::Future; #[cfg(feature = "unstable-futures")] use futures2; @@ -365,6 +366,29 @@ impl Runtime { self } + /// Run a future to completion on the Tokio runtime. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchrounous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on(&mut self, future: F) -> Result + where + F: Send + 'static + Future, + R: Send + 'static, + E: Send + 'static, + { + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + rx.wait().unwrap() + } + /// Signals the runtime to shutdown once it becomes idle. /// /// Returns a future that completes once the shutdown operation has diff --git a/tests/runtime.rs b/tests/runtime.rs index 9c76b08e8..7012a1783 100644 --- a/tests/runtime.rs +++ b/tests/runtime.rs @@ -1,9 +1,15 @@ extern crate tokio; extern crate env_logger; +extern crate futures; +use futures::sync::oneshot; +use std::sync::{Arc, Mutex}; +use std::thread; use tokio::io; use tokio::net::{TcpStream, TcpListener}; +use tokio::prelude::future::lazy; use tokio::prelude::*; +use tokio::runtime::Runtime; macro_rules! t { ($e:expr) => (match $e { @@ -69,3 +75,101 @@ fn runtime_multi_threaded() { runtime.spawn(create_client_server_future()); runtime.shutdown_on_idle().wait().unwrap(); } + +#[test] +fn block_on_timer() { + use std::time::{Duration, Instant}; + use tokio::timer::{Delay, Error}; + + fn after_1s(x: T) -> Box + Send> + where + T: Send + 'static, + { + Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) + } + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); + runtime.shutdown_on_idle().wait().unwrap(); +} + +#[test] +fn spawn_from_block_on() { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + tokio::spawn(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + })); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + use std::time::Duration; + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + runtime + .block_on(rx.then(move |_| { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, *cnt.lock().unwrap()); + runtime.shutdown_on_idle().wait().unwrap(); +} + +#[test] +fn spawn_many() { + const ITER: usize = 200; + + let cnt = Arc::new(Mutex::new(0)); + let mut runtime = Runtime::new().unwrap(); + + for _ in 0..ITER { + let c = cnt.clone(); + runtime.spawn(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + })); + } + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(ITER, *cnt.lock().unwrap()); +}