From 9d3e5aac083fd7369bcc48b7f75472b6f8f87ea8 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 19 Jul 2019 17:25:04 -0700 Subject: [PATCH] tokio: remove `Send + 'static` requirement from `block_on` (#1329) Removes the `Send` requirement to futures passed to `Runtime::block_on`. Previously, `block_on` was implemented by sending the future to a runtime thread. In order to do this, the future must be Send. The reason why the future is sent to the pool is because we cannot guarantee, while off the pool, that a reactor / timer thread is running. This is due to a limitation in the current version of tokio-threadpool. There is a plan to fix this (#1177), but the proper fix is non trivial. In order to unblock APIs that require this, this patch updates the runtime to spawn an always running thread containing a reactor and timer. All calls to `block_on` will use that reactor and timer. --- tokio/src/runtime/threadpool/background.rs | 61 ++++++++++++++++++++++ tokio/src/runtime/threadpool/builder.rs | 11 +++- tokio/src/runtime/threadpool/mod.rs | 37 ++++++++++--- tokio/tests/runtime_threaded.rs | 22 ++++++++ 4 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 tokio/src/runtime/threadpool/background.rs diff --git a/tokio/src/runtime/threadpool/background.rs b/tokio/src/runtime/threadpool/background.rs new file mode 100644 index 000000000..64ab54415 --- /dev/null +++ b/tokio/src/runtime/threadpool/background.rs @@ -0,0 +1,61 @@ +//! Temporary reactor + timer that runs on a background thread. This it to make +//! `block_on` work. + +use tokio_current_thread::CurrentThread; +use tokio_reactor::Reactor; +use tokio_sync::oneshot; +use tokio_timer::clock::Clock; +use tokio_timer::timer::{self, Timer}; + +use std::{io, thread}; + +#[derive(Debug)] +pub struct Background { + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + shutdown_tx: Option>, + thread: Option>, +} + +pub fn spawn(clock: &Clock) -> io::Result { + let clock = clock.clone(); + + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + + let timer = Timer::new_with_now(reactor, clock); + let timer_handle = timer.handle(); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let shutdown_tx = Some(shutdown_tx); + + let thread = thread::spawn(move || { + let mut rt = CurrentThread::new_with_park(timer); + let _ = rt.block_on(shutdown_rx); + }); + let thread = Some(thread); + + Ok(Background { + reactor_handle, + timer_handle, + shutdown_tx, + thread, + }) +} + +impl Background { + pub(super) fn reactor(&self) -> &tokio_reactor::Handle { + &self.reactor_handle + } + + pub(super) fn timer(&self) -> &timer::Handle { + &self.timer_handle + } +} + +impl Drop for Background { + fn drop(&mut self) { + let _ = self.shutdown_tx.take().unwrap().send(()); + let _ = self.thread.take().unwrap().join(); + } +} diff --git a/tokio/src/runtime/threadpool/builder.rs b/tokio/src/runtime/threadpool/builder.rs index 420d035f7..54c37691d 100644 --- a/tokio/src/runtime/threadpool/builder.rs +++ b/tokio/src/runtime/threadpool/builder.rs @@ -1,15 +1,17 @@ +use super::{background, Inner, Runtime}; use crate::reactor::Reactor; -use num_cpus; + use tokio_reactor; use tokio_threadpool::Builder as ThreadPoolBuilder; use tokio_timer::clock::{self, Clock}; use tokio_timer::timer::{self, Timer}; + +use num_cpus; use tracing_core as trace; use std::io; use std::sync::Mutex; use std::time::Duration; use std::any::Any; -use super::{Inner, Runtime}; /// Builds Tokio Runtime with custom configuration values. /// @@ -333,6 +335,9 @@ impl Builder { // public API dependency, we should allow users to set a custom // subscriber for the runtime. let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone); + let trace = dispatch.clone(); + + let background = background::spawn(&clock)?; let pool = self .threadpool_builder @@ -363,6 +368,8 @@ impl Builder { Ok(Runtime { inner: Some(Inner { pool, + background, + trace, }), }) } diff --git a/tokio/src/runtime/threadpool/mod.rs b/tokio/src/runtime/threadpool/mod.rs index e69bd0817..5955210af 100644 --- a/tokio/src/runtime/threadpool/mod.rs +++ b/tokio/src/runtime/threadpool/mod.rs @@ -1,11 +1,15 @@ +mod background; mod builder; mod task_executor; pub use self::builder::Builder; pub use self::task_executor::TaskExecutor; +use background::Background; use tokio_executor::enter; +use tokio_timer::timer; +use tracing_core as trace; use std::future::Future; use std::io; @@ -32,6 +36,19 @@ pub struct Runtime { struct Inner { /// Task execution pool. pool: tokio_threadpool::ThreadPool, + + /// Tracing dispatcher + trace: trace::Dispatch, + + /// Maintains a reactor and timer that are always running on a background + /// thread. This is to support `runtime.block_on` w/o requiring the future + /// to be `Send`. + /// + /// A dedicated background thread is required as the threadpool threads + /// might not be running. However, this is a temporary work around. + /// + /// TODO: Delete this + background: Background, } // ===== impl Runtime ===== @@ -146,18 +163,22 @@ impl Runtime { /// future panics, or if called within an asynchronous execution context. pub fn block_on(&self, future: F) -> F::Output where - F: Send + 'static + Future, - F::Output: Send + 'static, + F: Future, { let mut entered = enter().expect("nested block_on"); - let (tx, rx) = crate::sync::oneshot::channel(); - self.spawn(async move { - let res = future.await; - let _ = tx.send(res); - }); + let bg = &self.inner().background; + let trace = &self.inner().trace; - entered.block_on(rx).expect("blocked on future paniced") + tokio_executor::with_default(&mut self.inner().pool.sender(), || { + tokio_reactor::with_default(bg.reactor(), || { + timer::with_default(bg.timer(), || { + trace::dispatcher::with_default(trace, || { + entered.block_on(future) + }) + }) + }) + }) } /// Signals the runtime to shutdown once it becomes idle. diff --git a/tokio/tests/runtime_threaded.rs b/tokio/tests/runtime_threaded.rs index ec8423d3b..cdec008b9 100644 --- a/tokio/tests/runtime_threaded.rs +++ b/tokio/tests/runtime_threaded.rs @@ -76,6 +76,28 @@ fn block_on_timer() { e.block_on(rt.shutdown_on_idle()); } +#[test] +fn block_on_socket() { + let rt = Runtime::new().unwrap(); + + rt.block_on(async move { + let addr = "127.0.0.1:0".parse().unwrap(); + + let (tx, rx) = oneshot::channel(); + + let mut listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let _ = listener.accept().await; + tx.send(()).unwrap(); + }); + + TcpStream::connect(&addr).await.unwrap(); + rx.await.unwrap(); + }); +} + #[test] fn block_waits() { let (a_tx, a_rx) = oneshot::channel();