time: Eagerly bind delays to timer (#1800)

## Motivation

Similar to #1666, it is no longer necessary to lazily register delays with the
executions default timer. All delays are expected to be created from within a
runtime, and should panic if not done so.

## Solution

`tokio::time` now assumes there to be a `CURRENT_TIMER` set when creating a
delay; this can be assumed if called within a tokio runtime. If there is no
current timer, the application will panic with a "no current timer" message.

## Follow-up

Similar to #1666, `HandlePriv` can probably be removed, but this mainly prepares
for 0.2 API changes. Because it is not in the public API, this can be done in a
following change.

Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
This commit is contained in:
Kevin Leimkuhler 2019-11-20 12:24:41 -08:00 committed by Carl Lerche
parent bc150cd0b5
commit 3e643c7b81
8 changed files with 145 additions and 178 deletions

View File

@ -20,5 +20,10 @@ fn async_fn() {
#[test] #[test]
fn test_delay() { fn test_delay() {
let deadline = Instant::now() + Duration::from_millis(100); let deadline = Instant::now() + Duration::from_millis(100);
assert_eq!((), block_on(delay_until(deadline))); assert_eq!(
(),
block_on(async {
delay_until(deadline).await;
})
);
} }

View File

@ -42,7 +42,7 @@ pub fn delay_for(duration: Duration) -> Delay {
#[derive(Debug)] #[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"] #[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Delay { pub struct Delay {
/// The link between the `Delay` instance at the timer that drives it. /// The link between the `Delay` instance and the timer that drives it.
/// ///
/// This also stores the `deadline` value. /// This also stores the `deadline` value.
registration: Registration, registration: Registration,
@ -76,21 +76,12 @@ impl Delay {
pub fn reset(&mut self, deadline: Instant) { pub fn reset(&mut self, deadline: Instant) {
self.registration.reset(deadline); self.registration.reset(deadline);
} }
/// Register the delay with the timer instance for the current execution
/// context.
fn register(&mut self) {
self.registration.register();
}
} }
impl Future for Delay { impl Future for Delay {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// Ensure the `Delay` instance is associated with a timer.
self.register();
// `poll_elapsed` can return an error in two cases: // `poll_elapsed` can return an error in two cases:
// //
// - AtCapacity: this is a pathlogical case where far too many // - AtCapacity: this is a pathlogical case where far too many

View File

@ -228,15 +228,19 @@ impl<T> DelayQueue<T> {
/// ```rust /// ```rust
/// # use tokio::time::DelayQueue; /// # use tokio::time::DelayQueue;
/// # use std::time::Duration; /// # use std::time::Duration;
/// let mut delay_queue = DelayQueue::with_capacity(10);
/// ///
/// // These insertions are done without further allocation /// # #[tokio::main]
/// for i in 0..10 { /// # async fn main() {
/// delay_queue.insert(i, Duration::from_secs(i)); /// let mut delay_queue = DelayQueue::with_capacity(10);
/// }
/// ///
/// // This will make the queue allocate additional storage /// // These insertions are done without further allocation
/// delay_queue.insert(11, Duration::from_secs(11)); /// for i in 0..10 {
/// delay_queue.insert(i, Duration::from_secs(i));
/// }
///
/// // This will make the queue allocate additional storage
/// delay_queue.insert(11, Duration::from_secs(11));
/// # }
/// ``` /// ```
pub fn with_capacity(capacity: usize) -> DelayQueue<T> { pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
DelayQueue { DelayQueue {
@ -279,13 +283,16 @@ impl<T> DelayQueue<T> {
/// ```rust /// ```rust
/// use tokio::time::{DelayQueue, Duration, Instant}; /// use tokio::time::{DelayQueue, Duration, Instant};
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// let key = delay_queue.insert_at( /// # async fn main() {
/// "foo", Instant::now() + Duration::from_secs(5)); /// let mut delay_queue = DelayQueue::new();
/// let key = delay_queue.insert_at(
/// "foo", Instant::now() + Duration::from_secs(5));
/// ///
/// // Remove the entry /// // Remove the entry
/// let item = delay_queue.remove(&key); /// let item = delay_queue.remove(&key);
/// assert_eq!(*item.get_ref(), "foo"); /// assert_eq!(*item.get_ref(), "foo");
/// # }
/// ``` /// ```
/// ///
/// [`poll`]: #method.poll /// [`poll`]: #method.poll
@ -380,12 +387,15 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue; /// use tokio::time::DelayQueue;
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
/// let key = delay_queue.insert("foo", Duration::from_secs(5));
/// ///
/// // Remove the entry /// // Remove the entry
/// let item = delay_queue.remove(&key); /// let item = delay_queue.remove(&key);
/// assert_eq!(*item.get_ref(), "foo"); /// assert_eq!(*item.get_ref(), "foo");
/// # }
/// ``` /// ```
/// ///
/// [`poll`]: #method.poll /// [`poll`]: #method.poll
@ -430,12 +440,15 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue; /// use tokio::time::DelayQueue;
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
/// let key = delay_queue.insert("foo", Duration::from_secs(5));
/// ///
/// // Remove the entry /// // Remove the entry
/// let item = delay_queue.remove(&key); /// let item = delay_queue.remove(&key);
/// assert_eq!(*item.get_ref(), "foo"); /// assert_eq!(*item.get_ref(), "foo");
/// # }
/// ``` /// ```
pub fn remove(&mut self, key: &Key) -> Expired<T> { pub fn remove(&mut self, key: &Key) -> Expired<T> {
use crate::time::wheel::Stack; use crate::time::wheel::Stack;
@ -477,14 +490,17 @@ impl<T> DelayQueue<T> {
/// ```rust /// ```rust
/// use tokio::time::{DelayQueue, Duration, Instant}; /// use tokio::time::{DelayQueue, Duration, Instant};
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
/// let key = delay_queue.insert("foo", Duration::from_secs(5));
/// ///
/// // "foo" is scheduled to be returned in 5 seconds /// // "foo" is scheduled to be returned in 5 seconds
/// ///
/// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10)); /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
/// ///
/// // "foo"is now scheduled to be returned in 10 seconds /// // "foo"is now scheduled to be returned in 10 seconds
/// # }
/// ``` /// ```
pub fn reset_at(&mut self, key: &Key, when: Instant) { pub fn reset_at(&mut self, key: &Key, when: Instant) {
self.wheel.remove(&key.index, &mut self.slab); self.wheel.remove(&key.index, &mut self.slab);
@ -531,14 +547,17 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue; /// use tokio::time::DelayQueue;
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// let key = delay_queue.insert("foo", Duration::from_secs(5)); /// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
/// let key = delay_queue.insert("foo", Duration::from_secs(5));
/// ///
/// // "foo" is scheduled to be returned in 5 seconds /// // "foo" is scheduled to be returned in 5 seconds
/// ///
/// delay_queue.reset(&key, Duration::from_secs(10)); /// delay_queue.reset(&key, Duration::from_secs(10));
/// ///
/// // "foo"is now scheduled to be returned in 10 seconds /// // "foo"is now scheduled to be returned in 10 seconds
/// # }
/// ``` /// ```
pub fn reset(&mut self, key: &Key, timeout: Duration) { pub fn reset(&mut self, key: &Key, timeout: Duration) {
self.reset_at(key, Instant::now() + timeout); self.reset_at(key, Instant::now() + timeout);
@ -558,15 +577,18 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue; /// use tokio::time::DelayQueue;
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
/// ///
/// delay_queue.insert("foo", Duration::from_secs(5)); /// delay_queue.insert("foo", Duration::from_secs(5));
/// ///
/// assert!(!delay_queue.is_empty()); /// assert!(!delay_queue.is_empty());
/// ///
/// delay_queue.clear(); /// delay_queue.clear();
/// ///
/// assert!(delay_queue.is_empty()); /// assert!(delay_queue.is_empty());
/// # }
/// ``` /// ```
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.slab.clear(); self.slab.clear();
@ -612,12 +634,15 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue; /// use tokio::time::DelayQueue;
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
/// ///
/// delay_queue.insert("hello", Duration::from_secs(10)); /// delay_queue.insert("hello", Duration::from_secs(10));
/// delay_queue.reserve(10); /// delay_queue.reserve(10);
/// ///
/// assert!(delay_queue.capacity() >= 11); /// assert!(delay_queue.capacity() >= 11);
/// # }
/// ``` /// ```
pub fn reserve(&mut self, additional: usize) { pub fn reserve(&mut self, additional: usize) {
self.slab.reserve(additional); self.slab.reserve(additional);
@ -634,11 +659,14 @@ impl<T> DelayQueue<T> {
/// use tokio::time::DelayQueue; /// use tokio::time::DelayQueue;
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// let mut delay_queue = DelayQueue::new(); /// # #[tokio::main]
/// assert!(delay_queue.is_empty()); /// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
/// assert!(delay_queue.is_empty());
/// ///
/// delay_queue.insert("hello", Duration::from_secs(5)); /// delay_queue.insert("hello", Duration::from_secs(5));
/// assert!(!delay_queue.is_empty()); /// assert!(!delay_queue.is_empty());
/// # }
/// ``` /// ```
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.slab.is_empty() self.slab.is_empty()

View File

@ -6,7 +6,7 @@ use crate::time::{Duration, Error, Instant};
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::ptr; use std::ptr;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::task::{self, Poll}; use std::task::{self, Poll};
use std::u64; use std::u64;
@ -31,7 +31,7 @@ pub(crate) struct Entry {
/// without all `Delay` instances having completed. /// without all `Delay` instances having completed.
/// ///
/// When `None`, the entry has not yet been linked with a timer instance. /// When `None`, the entry has not yet been linked with a timer instance.
inner: Option<Weak<Inner>>, inner: Weak<Inner>,
/// Tracks the entry state. This value contains the following information: /// Tracks the entry state. This value contains the following information:
/// ///
@ -104,18 +104,41 @@ const ERROR: u64 = u64::MAX;
// ===== impl Entry ===== // ===== impl Entry =====
impl Entry { impl Entry {
pub(crate) fn new(deadline: Instant, duration: Duration) -> Entry { pub(crate) fn new(deadline: Instant, duration: Duration) -> Arc<Entry> {
Entry { let handle_priv = HandlePriv::current();
let handle = handle_priv.inner().unwrap();
// Increment the number of active timeouts
if handle.increment().is_err() {
// TODO(kleimkuhler): Transition to error state instead of
// panicking?
panic!("failed to add entry; timer at capacity");
};
let when = handle.normalize_deadline(deadline);
let state = if when <= handle.elapsed() {
ELAPSED
} else {
when
};
let entry = Arc::new(Entry {
time: CachePadded(UnsafeCell::new(Time { deadline, duration })), time: CachePadded(UnsafeCell::new(Time { deadline, duration })),
inner: None, inner: handle_priv.into_inner(),
waker: AtomicWaker::new(), waker: AtomicWaker::new(),
state: AtomicU64::new(0), state: AtomicU64::new(state),
queued: AtomicBool::new(false), queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()), next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None), when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None), next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()), prev_stack: UnsafeCell::new(ptr::null_mut()),
});
if handle.queue(&entry).is_err() {
entry.error();
} }
entry
} }
/// Only called by `Registration` /// Only called by `Registration`
@ -129,77 +152,6 @@ impl Entry {
&mut *self.time.0.get() &mut *self.time.0.get()
} }
/// Returns `true` if the `Entry` is currently associated with a timer
/// instance.
pub(crate) fn is_registered(&self) -> bool {
self.inner.is_some()
}
/// Only called by `Registration`
pub(crate) fn register(me: &mut Arc<Self>) {
let handle = match HandlePriv::try_current() {
Ok(handle) => handle,
Err(_) => {
// Could not associate the entry with a timer, transition the
// state to error
Arc::get_mut(me).unwrap().transition_to_error();
return;
}
};
Entry::register_with(me, handle)
}
/// Only called by `Registration`
pub(crate) fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
assert!(!me.is_registered(), "only register an entry once");
let deadline = me.time_ref().deadline;
let inner = match handle.inner() {
Some(inner) => inner,
None => {
// Could not associate the entry with a timer, transition the
// state to error
Arc::get_mut(me).unwrap().transition_to_error();
return;
}
};
// Increment the number of active timeouts
if inner.increment().is_err() {
Arc::get_mut(me).unwrap().transition_to_error();
return;
}
// Associate the entry with the timer
Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
let when = inner.normalize_deadline(deadline);
// Relaxed OK: At this point, there are no other threads that have
// access to this entry.
if when <= inner.elapsed() {
me.state.store(ELAPSED, Relaxed);
return;
} else {
me.state.store(when, Relaxed);
}
if inner.queue(me).is_err() {
// The timer has shutdown, transition the entry to the error state.
me.error();
}
}
fn transition_to_error(&mut self) {
self.inner = Some(Weak::new());
self.state = AtomicU64::new(ERROR);
}
/// The current entry state as known by the timer. This is not the value of /// The current entry state as known by the timer. This is not the value of
/// `state`, but lets the timer know how to converge its state to `state`. /// `state`, but lets the timer know how to converge its state to `state`.
pub(crate) fn when_internal(&self) -> Option<u64> { pub(crate) fn when_internal(&self) -> Option<u64> {
@ -317,10 +269,6 @@ impl Entry {
/// Only called by `Registration` /// Only called by `Registration`
pub(crate) fn reset(entry: &mut Arc<Entry>) { pub(crate) fn reset(entry: &mut Arc<Entry>) {
if !entry.is_registered() {
return;
}
let inner = match entry.upgrade_inner() { let inner = match entry.upgrade_inner() {
Some(inner) => inner, Some(inner) => inner,
None => return, None => return,
@ -367,7 +315,7 @@ impl Entry {
} }
fn upgrade_inner(&self) -> Option<Arc<Inner>> { fn upgrade_inner(&self) -> Option<Arc<Inner>> {
self.inner.as_ref().and_then(|inner| inner.upgrade()) self.inner.upgrade()
} }
} }

View File

@ -1,6 +1,4 @@
use crate::time::driver::Inner; use crate::time::driver::Inner;
use crate::time::Error;
use std::cell::RefCell; use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -24,7 +22,7 @@ thread_local! {
} }
#[derive(Debug)] #[derive(Debug)]
///Unsets default timer handler on drop. /// Guard that unsets the current default timer on drop.
pub(crate) struct DefaultGuard<'a> { pub(crate) struct DefaultGuard<'a> {
prev: Option<HandlePriv>, prev: Option<HandlePriv>,
_lifetime: PhantomData<&'a u8>, _lifetime: PhantomData<&'a u8>,
@ -39,7 +37,7 @@ impl Drop for DefaultGuard<'_> {
} }
} }
///Sets handle to default timer, returning guard that unsets it on drop. /// Sets handle to default timer, returning guard that unsets it on drop.
/// ///
/// # Panics /// # Panics
/// ///
@ -82,11 +80,13 @@ impl Default for Handle {
impl HandlePriv { impl HandlePriv {
/// Try to get a handle to the current timer. /// Try to get a handle to the current timer.
/// ///
/// Returns `Err` if no handle is found. /// # Panics
pub(crate) fn try_current() -> Result<HandlePriv, Error> { ///
/// This function panics if there is no current timer set.
pub(crate) fn current() -> Self {
CURRENT_TIMER.with(|current| match *current.borrow() { CURRENT_TIMER.with(|current| match *current.borrow() {
Some(ref handle) => Ok(handle.clone()), Some(ref handle) => handle.clone(),
None => Err(Error::shutdown()), None => panic!("no current timer"),
}) })
} }

View File

@ -15,11 +15,8 @@ pub(crate) struct Registration {
impl Registration { impl Registration {
pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration { pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration {
fn is_send<T: Send + Sync>() {}
is_send::<Registration>();
Registration { Registration {
entry: Arc::new(Entry::new(deadline, duration)), entry: Entry::new(deadline, duration),
} }
} }
@ -27,12 +24,6 @@ impl Registration {
self.entry.time_ref().deadline self.entry.time_ref().deadline
} }
pub(crate) fn register(&mut self) {
if !self.entry.is_registered() {
Entry::register(&mut self.entry)
}
}
pub(crate) fn reset(&mut self, deadline: Instant) { pub(crate) fn reset(&mut self, deadline: Instant) {
unsafe { unsafe {
self.entry.time_mut().deadline = deadline; self.entry.time_mut().deadline = deadline;

View File

@ -1,4 +1,24 @@
mod mock_clock; mod mock_clock;
mod test_delay; mod test_delay;
mod test_queue; mod test_queue;
use crate::time::{self, Instant};
use std::time::Duration;
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
#[test]
fn registration_is_send_and_sync() {
use crate::time::driver::Registration;
assert_send::<Registration>();
assert_sync::<Registration>();
}
#[test]
#[should_panic]
fn delay_is_eager() {
let when = Instant::now() + Duration::from_millis(100);
let _ = time::delay_until(when);
}

View File

@ -181,29 +181,13 @@ fn delayed_delay_level_1() {
} }
#[test] #[test]
#[should_panic]
fn creating_delay_outside_of_context() { fn creating_delay_outside_of_context() {
let now = Instant::now(); let now = Instant::now();
// This creates a delay outside of the context of a mock timer. This tests // This creates a delay outside of the context of a mock timer. This tests
// that it will still expire. // that it will panic.
let mut fut = task::spawn(delay_until(now + ms(500))); let _fut = task::spawn(delay_until(now + ms(500)));
mock(|clock| {
// This registers the delay with the timer
assert_pending!(fut.poll());
// Wait some time... the timer is cascading
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(448));
assert_pending!(fut.poll());
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(500));
// The delay has elapsed
assert_ready!(fut.poll());
});
} }
#[test] #[test]