diff --git a/tokio-executor/src/enter.rs b/tokio-executor/src/enter.rs index c10675fce..37a09bfb8 100644 --- a/tokio-executor/src/enter.rs +++ b/tokio-executor/src/enter.rs @@ -61,6 +61,42 @@ pub fn enter() -> Result { }) } +// Forces the current "entered" state to be cleared while the closure +// is executed. +// +// # Warning +// +// This is hidden for a reason. Do not use without fully understanding +// executors. Misuing can easily cause your program to deadlock. +#[doc(hidden)] +pub fn exit R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset; + impl Drop for Reset { + fn drop(&mut self) { + ENTERED.with(|c| { + c.set(true); + }); + } + } + + ENTERED.with(|c| { + debug_assert!(c.get()); + c.set(false); + }); + + let reset = Reset; + let ret = f(); + ::std::mem::forget(reset); + + ENTERED.with(|c| { + assert!(!c.get(), "closure claimed permanent executor"); + c.set(true); + }); + + ret +} + impl Enter { /// Blocks the thread on the specified future, returning the value with /// which that future completes. diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs index 8f15d603e..df5519c91 100644 --- a/tokio-executor/src/lib.rs +++ b/tokio-executor/src/lib.rs @@ -64,7 +64,7 @@ mod global; pub mod park; mod typed; -pub use crate::enter::{enter, Enter, EnterError}; +pub use crate::enter::{enter, exit, Enter, EnterError}; pub use crate::error::SpawnError; pub use crate::executor::Executor; pub use crate::global::{spawn, with_default, DefaultExecutor}; diff --git a/tokio-threadpool/src/blocking.rs b/tokio-threadpool/src/blocking.rs index 0fad2d847..3b59545c2 100644 --- a/tokio-threadpool/src/blocking.rs +++ b/tokio-threadpool/src/blocking.rs @@ -4,6 +4,7 @@ use futures_core::ready; use std::error::Error; use std::fmt; use std::task::Poll; +use tokio_executor; /// Error raised by `blocking`. pub struct BlockingError { @@ -140,7 +141,10 @@ where ready!(res)?; // Currently in blocking mode, so call the inner closure - let ret = f(); + // + // "Exit" the current executor in case the blocking function wants + // to call a different executor. + let ret = tokio_executor::exit(move || f()); // Try to transition out of blocking mode. This is a fast path that takes // back ownership of the worker if the worker handoff didn't complete yet. diff --git a/tokio-threadpool/tests/blocking.rs b/tokio-threadpool/tests/blocking.rs index 4b10affc9..b2c48493d 100644 --- a/tokio-threadpool/tests/blocking.rs +++ b/tokio-threadpool/tests/blocking.rs @@ -39,6 +39,26 @@ fn basic() { rx2.recv().unwrap(); } +#[test] +fn other_executors_can_run_inside_blocking() { + let _ = ::env_logger::try_init(); + + let pool = Builder::new().pool_size(1).max_blocking(1).build(); + + let (tx, rx) = mpsc::channel(); + + pool.spawn(async move { + let res = blocking(|| { + let _e = tokio_executor::enter().expect("nested blocking enter"); + tx.send(()).unwrap(); + }); + + assert_ready!(res).unwrap(); + }); + + rx.recv().unwrap(); +} + #[test] fn notify_task_on_capacity() { const BLOCKING: usize = 10;