rt: cleanup runtime::context (#2063)

Tweak context to remove more fns and usage of `Option`. Remove
`ThreadContext` struct as it is reduced to just `Handle`. Avoid passing
around individual driver handles and instead limit to the
`runtime::Handle` struct.
This commit is contained in:
Carl Lerche 2020-01-07 07:53:40 -08:00 committed by GitHub
parent 855d39f849
commit 45da5f3510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 120 additions and 536 deletions

View File

@ -5,7 +5,6 @@ pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark}; use crate::park::{Park, Unpark};
#[cfg(all(feature = "io-driver", not(loom)))]
use crate::runtime::context; use crate::runtime::context;
use crate::util::slab::{Address, Slab}; use crate::util::slab::{Address, Slab};
@ -198,14 +197,8 @@ impl Handle {
/// # Panics /// # Panics
/// ///
/// This function panics if there is no current reactor set. /// This function panics if there is no current reactor set.
#[cfg(all(feature = "io-driver", not(loom)))]
pub(super) fn current() -> Self { pub(super) fn current() -> Self {
context::ThreadContext::io_handle().expect("no current reactor") context::io_handle().expect("no current reactor")
}
#[cfg(any(not(feature = "io-driver"), loom))]
pub(super) fn current() -> Self {
panic!("no current reactor")
} }
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise

View File

@ -4,7 +4,6 @@ macro_rules! cfg_resource_drivers {
($($item:item)*) => { ($($item:item)*) => {
$( $(
#[cfg(any(feature = "io-driver", feature = "time"))] #[cfg(any(feature = "io-driver", feature = "time"))]
#[cfg(not(loom))]
$item $item
)* )*
} }

View File

@ -11,43 +11,23 @@ cfg_blocking_impl! {
mod shutdown; mod shutdown;
mod task; mod task;
use crate::runtime::{self, Builder, io, time}; use crate::runtime::Builder;
pub(crate) fn create_blocking_pool( pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
builder: &Builder, BlockingPool::new(builder, thread_cap)
spawner: &runtime::Spawner,
io: &io::Handle,
time: &time::Handle,
clock: &time::Clock,
thread_cap: usize,
) -> BlockingPool {
BlockingPool::new(
builder,
spawner,
io,
time,
clock,
thread_cap)
} }
} }
cfg_not_blocking_impl! { cfg_not_blocking_impl! {
use crate::runtime::{self, io, time, Builder}; use crate::runtime::Builder;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct BlockingPool {} pub(crate) struct BlockingPool {}
pub(crate) use BlockingPool as Spawner; pub(crate) use BlockingPool as Spawner;
pub(crate) fn create_blocking_pool( pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
_builder: &Builder,
_spawner: &runtime::Spawner,
_io: &io::Handle,
_time: &time::Handle,
_clock: &time::Clock,
_thread_cap: usize,
) -> BlockingPool {
BlockingPool {} BlockingPool {}
} }
@ -55,18 +35,5 @@ cfg_not_blocking_impl! {
pub(crate) fn spawner(&self) -> &BlockingPool { pub(crate) fn spawner(&self) -> &BlockingPool {
self self
} }
#[cfg(any(
feature = "blocking",
feature = "dns",
feature = "fs",
feature = "io-std",
))]
pub(crate) fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
f()
}
} }
} }

View File

@ -5,7 +5,7 @@ use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown; use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask; use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback}; use crate::runtime::{Builder, Callback, Handle};
use crate::task::{self, JoinHandle}; use crate::task::{self, JoinHandle};
use std::collections::VecDeque; use std::collections::VecDeque;
@ -41,18 +41,6 @@ struct Inner {
/// Call before a thread stops /// Call before a thread stops
before_stop: Option<Callback>, before_stop: Option<Callback>,
/// Spawns async tasks
spawner: runtime::Spawner,
/// Runtime I/O driver handle
io_handle: io::Handle,
/// Runtime time driver handle
time_handle: time::Handle,
/// Source of `Instant::now()`
clock: time::Clock,
thread_cap: usize, thread_cap: usize,
} }
@ -74,27 +62,17 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where where
F: FnOnce() -> R + Send + 'static, F: FnOnce() -> R + Send + 'static,
{ {
use crate::runtime::context::ThreadContext; let rt = Handle::current();
let schedule =
ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");
let (task, handle) = task::joinable(BlockingTask::new(func)); let (task, handle) = task::joinable(BlockingTask::new(func));
schedule.schedule(task); rt.blocking_spawner.spawn(task, &rt);
handle handle
} }
// ===== impl BlockingPool ===== // ===== impl BlockingPool =====
impl BlockingPool { impl BlockingPool {
pub(crate) fn new( pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
builder: &Builder,
spawner: &runtime::Spawner,
io: &io::Handle,
time: &time::Handle,
clock: &time::Clock,
thread_cap: usize,
) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel(); let (shutdown_tx, shutdown_rx) = shutdown::channel();
BlockingPool { BlockingPool {
@ -113,10 +91,6 @@ impl BlockingPool {
stack_size: builder.thread_stack_size, stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(), after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(), before_stop: builder.before_stop.clone(),
spawner: spawner.clone(),
io_handle: io.clone(),
time_handle: time.clone(),
clock: clock.clone(),
thread_cap, thread_cap,
}), }),
}, },
@ -152,21 +126,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner ===== // ===== impl Spawner =====
impl Spawner { impl Spawner {
/// Set the blocking pool for the duration of the closure fn spawn(&self, task: Task, rt: &Handle) {
///
/// If a blocking pool is already set, it will be restored when the closure
/// returns or if it panics.
pub(crate) fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let ctx = crate::runtime::context::ThreadContext::clone_current();
let _e = ctx.with_blocking_spawner(self.clone()).enter();
f()
}
fn schedule(&self, task: Task) {
let shutdown_tx = { let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap(); let mut shared = self.inner.shared.lock().unwrap();
@ -205,41 +165,32 @@ impl Spawner {
}; };
if let Some(shutdown_tx) = shutdown_tx { if let Some(shutdown_tx) = shutdown_tx {
self.spawn_thread(shutdown_tx); self.spawn_thread(shutdown_tx, rt);
} }
} }
fn spawn_thread(&self, shutdown_tx: shutdown::Sender) { fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
if let Some(stack_size) = self.inner.stack_size { if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size); builder = builder.stack_size(stack_size);
} }
let thread_context = ThreadContext::new(
self.inner.spawner.clone(), let rt = rt.clone();
self.inner.io_handle.clone(),
self.inner.time_handle.clone(),
Some(self.inner.clock.clone()),
Some(self.clone()),
);
let spawner = self.clone();
builder builder
.spawn(move || { .spawn(move || {
let _e = thread_context.enter(); // Only the reference should be moved into the closure
run_thread(spawner); let rt = &rt;
drop(shutdown_tx); rt.enter(move || {
rt.blocking_spawner.inner.run();
drop(shutdown_tx);
})
}) })
.unwrap(); .unwrap();
} }
} }
fn run_thread(spawner: Spawner) {
spawner.enter(|| {
let inner = &*spawner.inner;
inner.run()
});
}
impl Inner { impl Inner {
fn run(&self) { fn run(&self) {
if let Some(f) = &self.after_start { if let Some(f) = &self.after_start {

View File

@ -325,14 +325,7 @@ impl Builder {
let spawner = Spawner::Shell; let spawner = Spawner::Shell;
let blocking_pool = blocking::create_blocking_pool( let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
self,
&spawner,
&io_handle,
&time_handle,
&clock,
self.max_threads,
);
let blocking_spawner = blocking_pool.spawner().clone(); let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime { Ok(Runtime {
@ -425,7 +418,7 @@ cfg_rt_core! {
let spawner = Spawner::Basic(scheduler.spawner()); let spawner = Spawner::Basic(scheduler.spawner());
// Blocking pool // Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads); let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone(); let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime { Ok(Runtime {
@ -465,21 +458,24 @@ cfg_rt_threaded! {
let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
// Create the blocking pool // Create the blocking pool
let blocking_pool = blocking::create_blocking_pool(self, &spawner, &io_handle, &time_handle, &clock, self.max_threads); let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone(); let blocking_spawner = blocking_pool.spawner().clone();
// Create the runtime handle
let handle = Handle {
spawner,
io_handle,
time_handle,
clock,
blocking_spawner,
};
// Spawn the thread pool workers // Spawn the thread pool workers
workers.spawn(&blocking_spawner); workers.spawn(&handle);
Ok(Runtime { Ok(Runtime {
kind: Kind::ThreadPool(scheduler), kind: Kind::ThreadPool(scheduler),
handle: Handle { handle,
spawner,
io_handle,
time_handle,
clock,
blocking_spawner,
},
blocking_pool, blocking_pool,
}) })
} }

View File

@ -1,99 +1,26 @@
//! Thread local runtime context //! Thread local runtime context
use crate::runtime::Spawner; use crate::runtime::Handle;
use std::cell::RefCell; use std::cell::RefCell;
thread_local! { thread_local! {
static CONTEXT: RefCell<Option<ThreadContext>> = RefCell::new(None) static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
} }
/// ThreadContext makes Runtime context accessible to each Runtime thread. pub(crate) fn current() -> Option<Handle> {
#[derive(Debug, Clone)] CONTEXT.with(|ctx| ctx.borrow().clone())
pub(crate) struct ThreadContext {
/// Handles to the executor.
spawner: Spawner,
/// Handles to the I/O drivers
io_handle: crate::runtime::io::Handle,
/// Handles to the time drivers
time_handle: crate::runtime::time::Handle,
/// Source of `Instant::now()`
clock: Option<crate::runtime::time::Clock>,
/// Blocking pool spawner
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
} }
impl Default for ThreadContext { cfg_io_driver! {
fn default() -> Self {
ThreadContext {
spawner: Spawner::Shell,
#[cfg(all(feature = "io-driver", not(loom)))]
io_handle: None,
#[cfg(any(not(feature = "io-driver"), loom))]
io_handle: (),
#[cfg(all(feature = "time", not(loom)))]
time_handle: None,
#[cfg(any(not(feature = "time"), loom))]
time_handle: (),
clock: None,
blocking_spawner: None,
}
}
}
impl ThreadContext {
/// Construct a new [`ThreadContext`]
///
/// [`ThreadContext`]: struct.ThreadContext.html
pub(crate) fn new(
spawner: Spawner,
io_handle: crate::runtime::io::Handle,
time_handle: crate::runtime::time::Handle,
clock: Option<crate::runtime::time::Clock>,
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
) -> Self {
ThreadContext {
spawner,
#[cfg(all(feature = "io-driver", not(loom)))]
io_handle,
#[cfg(any(not(feature = "io-driver"), loom))]
io_handle,
#[cfg(all(feature = "time", not(loom)))]
time_handle,
#[cfg(any(not(feature = "time"), loom))]
time_handle,
clock,
blocking_spawner,
}
}
/// Clone the current [`ThreadContext`] if one is set, otherwise construct a new [`ThreadContext`].
///
/// [`ThreadContext`]: struct.ThreadContext.html
#[allow(dead_code)]
pub(crate) fn clone_current() -> Self {
CONTEXT.with(|ctx| ctx.borrow().clone().unwrap_or_else(Default::default))
}
/// Set this [`ThreadContext`] as the current active [`ThreadContext`].
///
/// [`ThreadContext`]: struct.ThreadContext.html
pub(crate) fn enter(self) -> ThreadContextDropGuard {
CONTEXT.with(|ctx| {
let previous = ctx.borrow_mut().replace(self);
ThreadContextDropGuard { previous }
})
}
pub(crate) fn io_handle() -> crate::runtime::io::Handle { pub(crate) fn io_handle() -> crate::runtime::io::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() { CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(), Some(ref ctx) => ctx.io_handle.clone(),
None => Default::default(), None => Default::default(),
}) })
} }
}
cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::time::Handle { pub(crate) fn time_handle() -> crate::runtime::time::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() { CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(), Some(ref ctx) => ctx.time_handle.clone(),
@ -101,61 +28,46 @@ impl ThreadContext {
}) })
} }
pub(crate) fn spawn_handle() -> Option<Spawner> { cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.clock.clone()),
None => None,
})
}
}
}
cfg_rt_core! {
pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
CONTEXT.with(|ctx| match *ctx.borrow() { CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.spawner.clone()), Some(ref ctx) => Some(ctx.spawner.clone()),
None => None, None => None,
}) })
} }
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
CONTEXT.with(
|ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) {
Some(Some(clock)) => Some(clock),
_ => None,
},
)
}
pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> {
CONTEXT.with(|ctx| {
match ctx
.borrow()
.as_ref()
.map(|ctx| ctx.blocking_spawner.clone())
{
Some(Some(blocking_spawner)) => Some(blocking_spawner),
_ => None,
}
})
}
} }
cfg_blocking_impl! { /// Set this [`ThreadContext`] as the current active [`ThreadContext`].
impl ThreadContext { ///
pub(crate) fn with_blocking_spawner( /// [`ThreadContext`]: struct.ThreadContext.html
mut self, pub(crate) fn enter<F, R>(new: Handle, f: F) -> R
blocking_spawner: crate::runtime::blocking::Spawner, where
) -> Self { F: FnOnce() -> R,
self.blocking_spawner.replace(blocking_spawner); {
self struct DropGuard(Option<Handle>);
impl Drop for DropGuard {
fn drop(&mut self) {
CONTEXT.with(|ctx| {
*ctx.borrow_mut() = self.0.take();
});
} }
} }
}
/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop. let _guard = CONTEXT.with(|ctx| {
/// let old = ctx.borrow_mut().replace(new);
/// [`ThreadContextDropGuard`]: struct.ThreadContextDropGuard.html DropGuard(old)
#[derive(Debug)] });
pub(crate) struct ThreadContextDropGuard {
previous: Option<ThreadContext>,
}
impl Drop for ThreadContextDropGuard { f()
fn drop(&mut self) {
CONTEXT.with(|ctx| match self.previous.clone() {
Some(prev) => ctx.borrow_mut().replace(prev),
None => ctx.borrow_mut().take(),
});
}
} }

View File

@ -30,16 +30,7 @@ impl Handle {
where where
F: FnOnce() -> R, F: FnOnce() -> R,
{ {
let _e = context::ThreadContext::new( context::enter(self.clone(), f)
self.spawner.clone(),
self.io_handle.clone(),
self.time_handle.clone(),
Some(self.clock.clone()),
Some(self.blocking_spawner.clone()),
)
.enter();
f()
} }
/// Returns a Handle view over the currently running Runtime /// Returns a Handle view over the currently running Runtime
@ -68,17 +59,7 @@ impl Handle {
/// # } /// # }
/// ``` /// ```
pub fn current() -> Self { pub fn current() -> Self {
use crate::runtime::context::ThreadContext; context::current().expect("not currently running on the Tokio runtime.")
Handle {
spawner: ThreadContext::spawn_handle()
.expect("not currently running on the Tokio runtime."),
io_handle: ThreadContext::io_handle(),
time_handle: ThreadContext::time_handle(),
clock: ThreadContext::clock().expect("not currently running on the Tokio runtime."),
blocking_spawner: ThreadContext::blocking_spawner()
.expect("not currently running on the Tokio runtime."),
}
} }
} }

View File

@ -8,7 +8,7 @@ pub(crate) use std::io::Result;
pub(crate) use variant::*; pub(crate) use variant::*;
#[cfg(all(feature = "io-driver", not(loom)))] #[cfg(feature = "io-driver")]
mod variant { mod variant {
use crate::io::driver; use crate::io::driver;
use crate::park::{Either, ParkThread}; use crate::park::{Either, ParkThread};
@ -28,6 +28,9 @@ mod variant {
pub(crate) type Handle = Option<driver::Handle>; pub(crate) type Handle = Option<driver::Handle>;
pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> {
#[cfg(loom)]
assert!(!enable);
if enable { if enable {
let driver = driver::Driver::new()?; let driver = driver::Driver::new()?;
let handle = driver.handle(); let handle = driver.handle();
@ -40,7 +43,7 @@ mod variant {
} }
} }
#[cfg(any(not(feature = "io-driver"), loom))] #[cfg(not(feature = "io-driver"))]
mod variant { mod variant {
use crate::park::ParkThread; use crate::park::ParkThread;

View File

@ -38,7 +38,7 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
#[cfg(loom)] #[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2; const LOCAL_QUEUE_CAPACITY: usize = 2;
use crate::runtime::{self, blocking, Parker}; use crate::runtime::{self, Parker};
use crate::task::JoinHandle; use crate::task::JoinHandle;
use std::fmt; use std::fmt;
@ -107,11 +107,10 @@ impl Drop for ThreadPool {
} }
impl Workers { impl Workers {
pub(crate) fn spawn(self, blocking_pool: &blocking::Spawner) { pub(crate) fn spawn(self, rt: &runtime::Handle) {
blocking_pool.enter(|| { rt.enter(|| {
for worker in self.workers { for worker in self.workers {
let b = blocking_pool.clone(); runtime::spawn_blocking(move || worker.run());
runtime::spawn_blocking(move || worker.run(b));
} }
}); });
} }

View File

@ -1,9 +1,9 @@
use crate::loom::cell::CausalCell; use crate::loom::cell::CausalCell;
use crate::loom::sync::Arc; use crate::loom::sync::Arc;
use crate::park::Park; use crate::park::Park;
use crate::runtime;
use crate::runtime::park::Parker; use crate::runtime::park::Parker;
use crate::runtime::thread_pool::{current, slice, Owned, Shared}; use crate::runtime::thread_pool::{current, slice, Owned, Shared};
use crate::runtime::{self, blocking};
use crate::task::Task; use crate::task::Task;
use std::cell::Cell; use std::cell::Cell;
@ -119,7 +119,7 @@ impl Worker {
} }
} }
pub(super) fn run(self, blocking_pool: blocking::Spawner) { pub(super) fn run(self) {
// First, acquire a lock on the worker. // First, acquire a lock on the worker.
let guard = match self.acquire_lock() { let guard = match self.acquire_lock() {
Some(guard) => guard, Some(guard) => guard,
@ -131,37 +131,35 @@ impl Worker {
// Enter a runtime context // Enter a runtime context
let _enter = crate::runtime::enter(); let _enter = crate::runtime::enter();
blocking_pool.enter(|| { ON_BLOCK.with(|ob| {
ON_BLOCK.with(|ob| { // Ensure that the ON_BLOCK is removed from the thread-local context
// Ensure that the ON_BLOCK is removed from the thread-local context // when leaving the scope. This handles cases that involve panicking.
// when leaving the scope. This handles cases that involve panicking. struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>);
struct Reset<'a>(&'a Cell<Option<*const dyn Fn()>>);
impl<'a> Drop for Reset<'a> { impl<'a> Drop for Reset<'a> {
fn drop(&mut self) { fn drop(&mut self) {
self.0.set(None); self.0.set(None);
}
} }
}
let _reset = Reset(ob); let _reset = Reset(ob);
let allow_blocking: &dyn Fn() = &|| self.block_in_place(&blocking_pool); let allow_blocking: &dyn Fn() = &|| self.block_in_place();
ob.set(Some(unsafe { ob.set(Some(unsafe {
// NOTE: We cannot use a safe cast to raw pointer here, since we are // NOTE: We cannot use a safe cast to raw pointer here, since we are
// _also_ erasing the lifetime of these pointers. That is safe here, // _also_ erasing the lifetime of these pointers. That is safe here,
// because we know that ob will set back to None before allow_blocking // because we know that ob will set back to None before allow_blocking
// is dropped. // is dropped.
#[allow(clippy::useless_transmute)] #[allow(clippy::useless_transmute)]
std::mem::transmute::<_, *const dyn Fn()>(allow_blocking) std::mem::transmute::<_, *const dyn Fn()>(allow_blocking)
})); }));
let _ = guard.run(); let _ = guard.run();
// Ensure that we reset ob before allow_blocking is dropped. // Ensure that we reset ob before allow_blocking is dropped.
drop(_reset); drop(_reset);
}); });
})
}); });
if self.gone.get() { if self.gone.get() {
@ -206,7 +204,7 @@ impl Worker {
} }
/// Enter an in-place blocking section /// Enter an in-place blocking section
fn block_in_place(&self, blocking_pool: &blocking::Spawner) { fn block_in_place(&self) {
// If our Worker has already been given away, then blocking is fine! // If our Worker has already been given away, then blocking is fine!
if self.gone.get() { if self.gone.get() {
return; return;
@ -259,8 +257,7 @@ impl Worker {
}; };
// Give away the worker // Give away the worker
let b = blocking_pool.clone(); runtime::spawn_blocking(move || worker.run());
runtime::spawn_blocking(move || worker.run(b));
} }
} }

View File

@ -5,7 +5,7 @@
pub(crate) use variant::*; pub(crate) use variant::*;
#[cfg(all(feature = "time", not(loom)))] #[cfg(feature = "time")]
mod variant { mod variant {
use crate::park::Either; use crate::park::Either;
use crate::runtime::io; use crate::runtime::io;
@ -36,7 +36,7 @@ mod variant {
} }
} }
#[cfg(any(not(feature = "time"), loom))] #[cfg(not(feature = "time"))]
mod variant { mod variant {
use crate::runtime::io; use crate::runtime::io;

View File

@ -123,7 +123,7 @@ doc_rt_core! {
T: Future + Send + 'static, T: Future + Send + 'static,
T::Output: Send + 'static, T::Output: Send + 'static,
{ {
let spawn_handle = runtime::context::ThreadContext::spawn_handle() let spawn_handle = runtime::context::spawn_handle()
.expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`"); .expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
spawn_handle.spawn(task) spawn_handle.spawn(task)
} }

View File

@ -64,7 +64,7 @@ cfg_test_util! {
/// Panics if time is already frozen or if called from outside of the Tokio /// Panics if time is already frozen or if called from outside of the Tokio
/// runtime. /// runtime.
pub fn pause() { pub fn pause() {
let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime"); let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut frozen = clock.inner.frozen.lock().unwrap(); let mut frozen = clock.inner.frozen.lock().unwrap();
if frozen.is_some() { if frozen.is_some() {
panic!("time is already frozen"); panic!("time is already frozen");
@ -82,7 +82,7 @@ cfg_test_util! {
/// Panics if time is not frozen or if called from outside of the Tokio /// Panics if time is not frozen or if called from outside of the Tokio
/// runtime. /// runtime.
pub fn resume() { pub fn resume() {
let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime"); let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut frozen = clock.inner.frozen.lock().unwrap(); let mut frozen = clock.inner.frozen.lock().unwrap();
if frozen.is_none() { if frozen.is_none() {
@ -102,14 +102,14 @@ cfg_test_util! {
/// Panics if time is not frozen or if called from outside of the Tokio /// Panics if time is not frozen or if called from outside of the Tokio
/// runtime. /// runtime.
pub async fn advance(duration: Duration) { pub async fn advance(duration: Duration) {
let clock = context::ThreadContext::clock().expect("time cannot be frozen from outside the Tokio runtime"); let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration); clock.advance(duration);
crate::task::yield_now().await; crate::task::yield_now().await;
} }
/// Return the current instant, factoring in frozen time. /// Return the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant { pub(crate) fn now() -> Instant {
if let Some(clock) = context::ThreadContext::clock() { if let Some(clock) = context::clock() {
if let Some(frozen) = *clock.inner.frozen.lock().unwrap() { if let Some(frozen) = *clock.inner.frozen.lock().unwrap() {
Instant::from_std(clock.inner.start + frozen) Instant::from_std(clock.inner.start + frozen)
} else { } else {

View File

@ -21,7 +21,7 @@ impl Handle {
/// ///
/// This function panics if there is no current timer set. /// This function panics if there is no current timer set.
pub(crate) fn current() -> Self { pub(crate) fn current() -> Self {
context::ThreadContext::time_handle().expect("no current timer") context::time_handle().expect("no current timer")
} }
/// Try to return a strong ref to the inner /// Try to return a strong ref to the inner

View File

@ -1,5 +1,3 @@
#![cfg(not(loom))]
//! Utilities for tracking time. //! Utilities for tracking time.
//! //!
//! This module provides a number of types for executing code after a set period //! This module provides a number of types for executing code after a set period

View File

@ -1,212 +0,0 @@
use crate::park::{Park, Unpark};
use crate::runtime::context;
use crate::time::driver::Driver;
use crate::time::{Clock, Duration, Instant};
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
/// Run the provided closure with a `MockClock` that starts at the current time.
pub(crate) fn mock<F, R>(f: F) -> R
where
F: FnOnce(&mut Handle) -> R,
{
let mut mock = MockClock::new();
mock.enter(f)
}
/// Mock clock for use with `tokio-timer` futures.
///
/// A mock timer that is able to advance and wake after a
/// certain duration.
#[derive(Debug)]
pub(crate) struct MockClock {
time: MockTime,
clock: Clock,
}
/// A handle to the `MockClock`.
#[derive(Debug)]
pub(crate) struct Handle {
timer: Driver<MockPark>,
time: MockTime,
clock: Clock,
}
type Inner = Arc<Mutex<State>>;
#[derive(Debug, Clone)]
struct MockTime {
inner: Inner,
_pd: PhantomData<Rc<()>>,
}
#[derive(Debug)]
struct MockNow {
inner: Inner,
}
#[derive(Debug)]
struct MockPark {
inner: Inner,
_pd: PhantomData<Rc<()>>,
}
#[derive(Debug)]
struct MockUnpark {
inner: Inner,
}
#[derive(Debug)]
struct State {
clock: Clock,
unparked: bool,
park_for: Option<Duration>,
}
impl MockClock {
/// Create a new `MockClock` with the current time.
pub(crate) fn new() -> Self {
let clock = Clock::new_frozen();
let time = MockTime::new(clock.clone());
MockClock { time, clock }
}
/// Enter the `MockClock` context.
pub(crate) fn enter<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Handle) -> R,
{
let park = self.time.mock_park();
let timer = Driver::new(park, self.clock.clone());
let handle = timer.handle();
let ctx = context::ThreadContext::clone_current();
let _e = ctx
.with_clock(self.clock.clone())
.with_time_handle(Some(handle))
.enter();
let time = self.time.clone();
let mut handle = Handle::new(timer, time, self.clock.clone());
f(&mut handle)
}
}
impl Default for MockClock {
fn default() -> Self {
Self::new()
}
}
impl Handle {
pub(self) fn new(timer: Driver<MockPark>, time: MockTime, clock: Clock) -> Self {
Handle { timer, time, clock }
}
/// Turn the internal timer and mock park for the provided duration.
pub(crate) fn turn(&mut self) {
self.timer.park().unwrap();
}
/// Turn the internal timer and mock park for the provided duration.
pub(crate) fn turn_for(&mut self, duration: Duration) {
self.timer.park_timeout(duration).unwrap();
}
/// Advance the `MockClock` by the provided duration.
pub(crate) fn advance(&mut self, duration: Duration) {
let now = Instant::now();
let end = now + duration;
while Instant::now() < end {
self.turn_for(end - Instant::now());
}
}
/// Returns the total amount of time the time has been advanced.
pub(crate) fn advanced(&self) -> Duration {
self.clock.advanced()
}
/// Get the currently mocked time
pub(crate) fn now(&mut self) -> Instant {
self.time.now()
}
/// Turn the internal timer once, but force "parking" for `duration` regardless of any pending
/// timeouts
pub(crate) fn park_for(&mut self, duration: Duration) {
self.time.inner.lock().unwrap().park_for = Some(duration);
self.turn()
}
}
impl MockTime {
pub(crate) fn new(clock: Clock) -> MockTime {
let state = State {
clock,
unparked: false,
park_for: None,
};
MockTime {
inner: Arc::new(Mutex::new(state)),
_pd: PhantomData,
}
}
pub(crate) fn mock_park(&self) -> MockPark {
let inner = self.inner.clone();
MockPark {
inner,
_pd: PhantomData,
}
}
pub(crate) fn now(&self) -> Instant {
Instant::now()
}
}
impl State {}
impl Park for MockPark {
type Unpark = MockUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
let inner = self.inner.clone();
MockUnpark { inner }
}
fn park(&mut self) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().map_err(|_| ())?;
let duration = inner.park_for.take().expect("call park_for first");
inner.clock.advance(duration);
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
if let Some(duration) = inner.park_for.take() {
inner.clock.advance(duration);
} else {
inner.clock.advance(duration);
}
Ok(())
}
}
impl Unpark for MockUnpark {
fn unpark(&self) {
if let Ok(mut inner) = self.inner.lock() {
inner.unparked = true;
}
}
}