rt: add a Handle::current() (#2040)

Adds `Handle::current()` for accessing a handle to the runtime
associated with the current thread. This handle can then be
passed to other threads in order to spawn or perform other
runtime related tasks.
This commit is contained in:
Benjamin Fry 2020-01-06 11:32:21 -08:00 committed by Carl Lerche
parent 5930acef73
commit 0193df3a59
4 changed files with 92 additions and 45 deletions

View File

@ -56,6 +56,12 @@ cfg_not_blocking_impl! {
self self
} }
#[cfg(any(
feature = "blocking",
feature = "dns",
feature = "fs",
feature = "io-std",
))]
pub(crate) fn enter<F, R>(&self, f: F) -> R pub(crate) fn enter<F, R>(&self, f: F) -> R
where where
F: FnOnce() -> R, F: FnOnce() -> R,

View File

@ -8,7 +8,6 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback}; use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
use crate::task::{self, JoinHandle}; use crate::task::{self, JoinHandle};
use std::cell::Cell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::Duration;
@ -68,11 +67,6 @@ struct Shared {
type Task = task::Task<NoopSchedule>; type Task = task::Task<NoopSchedule>;
thread_local! {
/// Thread-local tracking the current executor
static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None)
}
const KEEP_ALIVE: Duration = Duration::from_secs(10); const KEEP_ALIVE: Duration = Duration::from_secs(10);
/// Run the provided function on an executor dedicated to blocking operations. /// Run the provided function on an executor dedicated to blocking operations.
@ -80,16 +74,14 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where where
F: FnOnce() -> R + Send + 'static, F: FnOnce() -> R + Send + 'static,
{ {
BLOCKING.with(|cell| { use crate::runtime::context::ThreadContext;
let schedule = match cell.get() {
Some(ptr) => unsafe { &*ptr },
None => panic!("not currently running on the Tokio runtime."),
};
let (task, handle) = task::joinable(BlockingTask::new(func)); let schedule =
schedule.schedule(task); ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");
handle
}) let (task, handle) = task::joinable(BlockingTask::new(func));
schedule.schedule(task);
handle
} }
// ===== impl BlockingPool ===== // ===== impl BlockingPool =====
@ -168,30 +160,10 @@ impl Spawner {
where where
F: FnOnce() -> R, F: FnOnce() -> R,
{ {
// While scary, this is safe. The function takes a `&BlockingPool`, let ctx = crate::runtime::context::ThreadContext::clone_current();
// which guarantees that the reference lives for the duration of let _e = ctx.with_blocking_spawner(self.clone()).enter();
// `with_pool`.
//
// Because we are always clearing the TLS value at the end of the
// function, we can cast the reference to 'static which thread-local
// cells require.
BLOCKING.with(|cell| {
let was = cell.replace(None);
// Ensure that the pool is removed from the thread-local context f()
// when leaving the scope. This handles cases that involve panicking.
struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>);
impl Drop for Reset<'_> {
fn drop(&mut self) {
self.0.set(self.1);
}
}
let _reset = Reset(cell, was);
cell.set(Some(self as *const Spawner));
f()
})
} }
fn schedule(&self, task: Task) { fn schedule(&self, task: Task) {
@ -248,6 +220,7 @@ impl Spawner {
self.inner.io_handle.clone(), self.inner.io_handle.clone(),
self.inner.time_handle.clone(), self.inner.time_handle.clone(),
Some(self.inner.clock.clone()), Some(self.inner.clock.clone()),
Some(self.clone()),
); );
let spawner = self.clone(); let spawner = self.clone();
builder builder

View File

@ -20,6 +20,9 @@ pub(crate) struct ThreadContext {
/// Source of `Instant::now()` /// Source of `Instant::now()`
clock: Option<crate::runtime::time::Clock>, clock: Option<crate::runtime::time::Clock>,
/// Blocking pool spawner
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
} }
impl Default for ThreadContext { impl Default for ThreadContext {
@ -35,6 +38,7 @@ impl Default for ThreadContext {
#[cfg(any(not(feature = "time"), loom))] #[cfg(any(not(feature = "time"), loom))]
time_handle: (), time_handle: (),
clock: None, clock: None,
blocking_spawner: None,
} }
} }
} }
@ -48,6 +52,7 @@ impl ThreadContext {
io_handle: crate::runtime::io::Handle, io_handle: crate::runtime::io::Handle,
time_handle: crate::runtime::time::Handle, time_handle: crate::runtime::time::Handle,
clock: Option<crate::runtime::time::Clock>, clock: Option<crate::runtime::time::Clock>,
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
) -> Self { ) -> Self {
ThreadContext { ThreadContext {
spawner, spawner,
@ -60,6 +65,7 @@ impl ThreadContext {
#[cfg(any(not(feature = "time"), loom))] #[cfg(any(not(feature = "time"), loom))]
time_handle, time_handle,
clock, clock,
blocking_spawner,
} }
} }
@ -81,23 +87,20 @@ impl ThreadContext {
}) })
} }
#[cfg(all(feature = "io-driver", not(loom)))]
pub(crate) fn io_handle() -> crate::runtime::io::Handle { pub(crate) fn io_handle() -> crate::runtime::io::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() { CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(), Some(ref ctx) => ctx.io_handle.clone(),
None => None, None => Default::default(),
}) })
} }
#[cfg(all(feature = "time", not(loom)))]
pub(crate) fn time_handle() -> crate::runtime::time::Handle { pub(crate) fn time_handle() -> crate::runtime::time::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() { CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(), Some(ref ctx) => ctx.time_handle.clone(),
None => None, None => Default::default(),
}) })
} }
#[cfg(feature = "rt-core")]
pub(crate) fn spawn_handle() -> Option<Spawner> { pub(crate) fn spawn_handle() -> Option<Spawner> {
CONTEXT.with(|ctx| match *ctx.borrow() { CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.spawner.clone()), Some(ref ctx) => Some(ctx.spawner.clone()),
@ -105,7 +108,6 @@ impl ThreadContext {
}) })
} }
#[cfg(all(feature = "test-util", feature = "time"))]
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> { pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
CONTEXT.with( CONTEXT.with(
|ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) { |ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) {
@ -114,6 +116,31 @@ impl ThreadContext {
}, },
) )
} }
pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> {
CONTEXT.with(|ctx| {
match ctx
.borrow()
.as_ref()
.map(|ctx| ctx.blocking_spawner.clone())
{
Some(Some(blocking_spawner)) => Some(blocking_spawner),
_ => None,
}
})
}
}
cfg_blocking_impl! {
impl ThreadContext {
pub(crate) fn with_blocking_spawner(
mut self,
blocking_spawner: crate::runtime::blocking::Spawner,
) -> Self {
self.blocking_spawner.replace(blocking_spawner);
self
}
}
} }
/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop. /// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop.

View File

@ -35,9 +35,50 @@ impl Handle {
self.io_handle.clone(), self.io_handle.clone(),
self.time_handle.clone(), self.time_handle.clone(),
Some(self.clock.clone()), Some(self.clock.clone()),
Some(self.blocking_spawner.clone()),
) )
.enter(); .enter();
self.blocking_spawner.enter(|| f())
f()
}
/// Returns a Handle view over the currently running Runtime
///
/// # Panic
///
/// A Runtime must have been started or this will panic
///
/// # Examples
///
/// This allows for the current handle to be gotten when running in a `#`
///
/// ```
/// # use tokio::runtime::Runtime;
///
/// # fn dox() {
/// # let rt = Runtime::new().unwrap();
/// # rt.spawn(async {
/// use tokio::runtime::Handle;
///
/// let handle = Handle::current();
/// handle.spawn(async {
/// println!("now running in the existing Runtime");
/// })
/// # });
/// # }
/// ```
pub fn current() -> Self {
use crate::runtime::context::ThreadContext;
Handle {
spawner: ThreadContext::spawn_handle()
.expect("not currently running on the Tokio runtime."),
io_handle: ThreadContext::io_handle(),
time_handle: ThreadContext::time_handle(),
clock: ThreadContext::clock().expect("not currently running on the Tokio runtime."),
blocking_spawner: ThreadContext::blocking_spawner()
.expect("not currently running on the Tokio runtime."),
}
} }
} }