tokio: remove Send + 'static requirement from block_on (#1329)

Removes the `Send` requirement to futures passed to `Runtime::block_on`.
Previously, `block_on` was implemented by sending the future to a
runtime thread. In order to do this, the future must be Send.

The reason why the future is sent to the pool is because we cannot
guarantee, while off the pool, that a reactor / timer thread is running.
This is due to a limitation in the current version of tokio-threadpool.
There is a plan to fix this (#1177), but the proper fix is non trivial.

In order to unblock APIs that require this, this patch updates the
runtime to spawn an always running thread containing a reactor and
timer. All calls to `block_on` will use that reactor and timer.
This commit is contained in:
Carl Lerche 2019-07-19 17:25:04 -07:00 committed by GitHub
parent a99fa6e096
commit 9d3e5aac08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 10 deletions

View File

@ -0,0 +1,61 @@
//! Temporary reactor + timer that runs on a background thread. This it to make
//! `block_on` work.
use tokio_current_thread::CurrentThread;
use tokio_reactor::Reactor;
use tokio_sync::oneshot;
use tokio_timer::clock::Clock;
use tokio_timer::timer::{self, Timer};
use std::{io, thread};
#[derive(Debug)]
pub struct Background {
reactor_handle: tokio_reactor::Handle,
timer_handle: timer::Handle,
shutdown_tx: Option<oneshot::Sender<()>>,
thread: Option<thread::JoinHandle<()>>,
}
pub fn spawn(clock: &Clock) -> io::Result<Background> {
let clock = clock.clone();
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();
let timer = Timer::new_with_now(reactor, clock);
let timer_handle = timer.handle();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let shutdown_tx = Some(shutdown_tx);
let thread = thread::spawn(move || {
let mut rt = CurrentThread::new_with_park(timer);
let _ = rt.block_on(shutdown_rx);
});
let thread = Some(thread);
Ok(Background {
reactor_handle,
timer_handle,
shutdown_tx,
thread,
})
}
impl Background {
pub(super) fn reactor(&self) -> &tokio_reactor::Handle {
&self.reactor_handle
}
pub(super) fn timer(&self) -> &timer::Handle {
&self.timer_handle
}
}
impl Drop for Background {
fn drop(&mut self) {
let _ = self.shutdown_tx.take().unwrap().send(());
let _ = self.thread.take().unwrap().join();
}
}

View File

@ -1,15 +1,17 @@
use super::{background, Inner, Runtime};
use crate::reactor::Reactor;
use num_cpus;
use tokio_reactor;
use tokio_threadpool::Builder as ThreadPoolBuilder;
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};
use num_cpus;
use tracing_core as trace;
use std::io;
use std::sync::Mutex;
use std::time::Duration;
use std::any::Any;
use super::{Inner, Runtime};
/// Builds Tokio Runtime with custom configuration values.
///
@ -333,6 +335,9 @@ impl Builder {
// public API dependency, we should allow users to set a custom
// subscriber for the runtime.
let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone);
let trace = dispatch.clone();
let background = background::spawn(&clock)?;
let pool = self
.threadpool_builder
@ -363,6 +368,8 @@ impl Builder {
Ok(Runtime {
inner: Some(Inner {
pool,
background,
trace,
}),
})
}

View File

@ -1,11 +1,15 @@
mod background;
mod builder;
mod task_executor;
pub use self::builder::Builder;
pub use self::task_executor::TaskExecutor;
use background::Background;
use tokio_executor::enter;
use tokio_timer::timer;
use tracing_core as trace;
use std::future::Future;
use std::io;
@ -32,6 +36,19 @@ pub struct Runtime {
struct Inner {
/// Task execution pool.
pool: tokio_threadpool::ThreadPool,
/// Tracing dispatcher
trace: trace::Dispatch,
/// Maintains a reactor and timer that are always running on a background
/// thread. This is to support `runtime.block_on` w/o requiring the future
/// to be `Send`.
///
/// A dedicated background thread is required as the threadpool threads
/// might not be running. However, this is a temporary work around.
///
/// TODO: Delete this
background: Background,
}
// ===== impl Runtime =====
@ -146,18 +163,22 @@ impl Runtime {
/// future panics, or if called within an asynchronous execution context.
pub fn block_on<F>(&self, future: F) -> F::Output
where
F: Send + 'static + Future,
F::Output: Send + 'static,
F: Future,
{
let mut entered = enter().expect("nested block_on");
let (tx, rx) = crate::sync::oneshot::channel();
self.spawn(async move {
let res = future.await;
let _ = tx.send(res);
});
let bg = &self.inner().background;
let trace = &self.inner().trace;
entered.block_on(rx).expect("blocked on future paniced")
tokio_executor::with_default(&mut self.inner().pool.sender(), || {
tokio_reactor::with_default(bg.reactor(), || {
timer::with_default(bg.timer(), || {
trace::dispatcher::with_default(trace, || {
entered.block_on(future)
})
})
})
})
}
/// Signals the runtime to shutdown once it becomes idle.

View File

@ -76,6 +76,28 @@ fn block_on_timer() {
e.block_on(rt.shutdown_on_idle());
}
#[test]
fn block_on_socket() {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let addr = "127.0.0.1:0".parse().unwrap();
let (tx, rx) = oneshot::channel();
let mut listener = TcpListener::bind(&addr).unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = listener.accept().await;
tx.send(()).unwrap();
});
TcpStream::connect(&addr).await.unwrap();
rx.await.unwrap();
});
}
#[test]
fn block_waits() {
let (a_tx, a_rx) = oneshot::channel();