runtime: refactor thread-local setters (#1449)

This commit is contained in:
Douman 2019-08-15 22:00:57 +02:00 committed by Carl Lerche
parent 8538c25170
commit 37131b2114
10 changed files with 97 additions and 120 deletions

View File

@ -53,6 +53,7 @@ use mio::event::Evented;
use slab::Slab;
use std::cell::RefCell;
use std::io;
use std::marker::PhantomData;
#[cfg(all(unix, not(target_os = "fuchsia")))]
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::AtomicUsize;
@ -160,54 +161,45 @@ fn _assert_kinds() {
// ===== impl Reactor =====
/// Set the default reactor for the duration of the closure
///
/// # Panics
///
/// This function panics if there already is a default reactor set.
pub fn with_default<F, R>(handle: &Handle, f: F) -> R
where
F: FnOnce() -> R,
{
// Ensure that the executor is removed from the thread-local context
// when leaving the scope. This handles cases that involve panicking.
struct Reset;
#[derive(Debug)]
///Guard that resets current reactor on drop.
pub struct DefaultGuard<'a> {
_lifetime: PhantomData<&'a u8>,
}
impl Drop for Reset {
fn drop(&mut self) {
CURRENT_REACTOR.with(|current| {
let mut current = current.borrow_mut();
*current = None;
});
}
}
// This ensures the value for the current reactor gets reset even if there
// is a panic.
let _r = Reset;
CURRENT_REACTOR.with(|current| {
{
impl Drop for DefaultGuard<'_> {
fn drop(&mut self) {
CURRENT_REACTOR.with(|current| {
let mut current = current.borrow_mut();
*current = None;
});
}
}
assert!(
current.is_none(),
"default Tokio reactor already set \
for execution context"
);
///Sets handle for a default reactor, returning guard that unsets it on drop.
pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
CURRENT_REACTOR.with(|current| {
let mut current = current.borrow_mut();
let handle = match handle.as_priv() {
Some(handle) => handle,
None => {
panic!("`handle` does not reference a reactor");
}
};
assert!(
current.is_none(),
"default Tokio reactor already set \
for execution context"
);
*current = Some(handle.clone());
}
let handle = match handle.as_priv() {
Some(handle) => handle,
None => {
panic!("`handle` does not reference a reactor");
}
};
f()
})
*current = Some(handle.clone());
});
DefaultGuard {
_lifetime: PhantomData,
}
}
impl Reactor {

View File

@ -134,11 +134,10 @@ impl MockClock {
let handle = timer.handle();
let time = self.time.clone();
::tokio_timer::with_default(&handle, || {
let mut handle = Handle::new(timer, time);
f(&mut handle)
// lazy(|| Ok::<_, ()>(f(&mut handle))).wait().unwrap()
})
let _timer = ::tokio_timer::set_default(&handle);
let mut handle = Handle::new(timer, time);
f(&mut handle)
// lazy(|| Ok::<_, ()>(f(&mut handle))).wait().unwrap()
})
}
}

View File

@ -55,7 +55,7 @@ pub use error::Error;
pub use interval::Interval;
#[doc(inline)]
pub use timeout::Timeout;
pub use timer::{with_default, Timer};
pub use timer::{set_default, Timer};
use std::time::{Duration, Instant};

View File

@ -3,6 +3,7 @@ use crate::timer::Inner;
use crate::{Delay, Error, /*Interval,*/ Timeout};
use std::cell::RefCell;
use std::fmt;
use std::marker::PhantomData;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
@ -47,57 +48,46 @@ thread_local! {
static CURRENT_TIMER: RefCell<Option<HandlePriv>> = RefCell::new(None)
}
/// Set the default timer for the duration of the closure.
///
/// From within the closure, [`Delay`] instances that are created via
/// [`Delay::new`] can be used.
#[derive(Debug)]
///Unsets default timer handler on drop.
pub struct DefaultGuard<'a> {
_lifetime: PhantomData<&'a u8>,
}
impl Drop for DefaultGuard<'_> {
fn drop(&mut self) {
CURRENT_TIMER.with(|current| {
let mut current = current.borrow_mut();
*current = None;
})
}
}
///Sets handle to default timer, returning guard that unsets it on drop.
///
/// # Panics
///
/// This function panics if there already is a default timer set.
///
/// [`Delay`]: ../struct.Delay.html
/// [`Delay::new`]: ../struct.Delay.html#method.new
pub fn with_default<F, R>(handle: &Handle, f: F) -> R
where
F: FnOnce() -> R,
{
// Ensure that the timer is removed from the thread-local context
// when leaving the scope. This handles cases that involve panicking.
struct Reset;
impl Drop for Reset {
fn drop(&mut self) {
CURRENT_TIMER.with(|current| {
let mut current = current.borrow_mut();
*current = None;
});
}
}
// This ensures the value for the current timer gets reset even if there is
// a panic.
let _r = Reset;
pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
CURRENT_TIMER.with(|current| {
{
let mut current = current.borrow_mut();
let mut current = current.borrow_mut();
assert!(
current.is_none(),
"default Tokio timer already set \
for execution context"
);
assert!(
current.is_none(),
"default Tokio timer already set \
for execution context"
);
let handle = handle
.as_priv()
.unwrap_or_else(|| panic!("`handle` does not reference a timer"));
let handle = handle
.as_priv()
.unwrap_or_else(|| panic!("`handle` does not reference a timer"));
*current = Some(handle.clone());
}
*current = Some(handle.clone());
});
f()
})
DefaultGuard {
_lifetime: PhantomData,
}
}
impl Handle {

View File

@ -45,7 +45,7 @@ use self::entry::Entry;
use self::stack::Stack;
pub(crate) use self::handle::HandlePriv;
pub use self::handle::{with_default, Handle};
pub use self::handle::{set_default, Handle};
pub use self::now::{Now, SystemNow};
pub(crate) use self::registration::Registration;

View File

@ -265,7 +265,7 @@ cfg_if! {
use std::env;
use std::fs::File;
use std::io::Error;
use std::io;
use std::mem;
use std::sync::Once;

View File

@ -197,18 +197,16 @@ impl Runtime {
// This will set the default handle and timer to use inside the closure
// and run the future.
tokio_net::with_default(&reactor_handle, || {
clock::with_default(clock, || {
timer::with_default(&timer_handle, || {
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, || f(executor))
})
})
let _reactor = tokio_net::set_default(&reactor_handle);
clock::with_default(clock, || {
let _timer = timer::set_default(&timer_handle);
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, || f(executor))
})
}
}

View File

@ -343,15 +343,13 @@ impl Builder {
.around_worker(move |w| {
let index = w.id().to_usize();
tokio_net::with_default(&reactor_handles[index], || {
clock::with_default(&clock, || {
timer::with_default(&timer_handles[index], || {
trace::dispatcher::with_default(&dispatch, || {
w.run();
})
});
let _reactor = tokio_net::set_default(&reactor_handles[index]);
clock::with_default(&clock, || {
let _timer = timer::set_default(&timer_handles[index]);
trace::dispatcher::with_default(&dispatch, || {
w.run();
})
});
})
})
.custom_park(move |worker_id| {
let index = worker_id.to_usize();

View File

@ -173,12 +173,10 @@ impl Runtime {
let trace = &self.inner().trace;
tokio_executor::with_default(&mut self.inner().pool.sender(), || {
tokio_net::with_default(bg.reactor(), || {
timer::with_default(bg.timer(), || {
trace::dispatcher::with_default(trace, || {
entered.block_on(future)
})
})
let _reactor = tokio_net::set_default(bg.reactor());
let _timer = timer::set_default(bg.timer());
trace::dispatcher::with_default(trace, || {
entered.block_on(future)
})
})
}

View File

@ -66,11 +66,13 @@ fn test_drop_on_notify() {
let _enter = tokio_executor::enter().unwrap();
tokio_net::with_default(&reactor.handle(), || {
{
let handle = reactor.handle();
let _reactor = tokio_net::set_default(&handle);
let waker = waker_ref(&task);
let mut cx = Context::from_waker(&waker);
assert_pending!(task.future.lock().unwrap().as_mut().poll(&mut cx));
});
}
// Get the address
let addr = addr_rx.recv().unwrap();