From 19500f7df892b616d1b33ee51aad7b318bc36228 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 28 Mar 2018 22:26:47 -0700 Subject: [PATCH] Provide a timer implementation (#249) This patch adds a new crate: tokio-timer. This crate provides an efficient timer implemeentation designed for use in Tokio based applications. The timer users a hierarchical hashed timer wheel algorithm with six levels, each having 64 slots. This allows the timer to have a resolution of 1ms while maintaining O(1) complexity for insert, removal, and firing of timeouts. There already exists a tokio-timer crate. This is a complete rewrite which solves the outstanding problems with the existing tokio-timer library. Closes #146. --- .travis.yml | 17 + Cargo.toml | 1 + ci/tsan | 5 + tokio-timer/CHANGELOG.md | 19 + tokio-timer/Cargo.toml | 19 + tokio-timer/LICENSE | 25 + tokio-timer/README.md | 19 + tokio-timer/src/atomic.rs | 87 ++++ tokio-timer/src/deadline.rs | 183 ++++++++ tokio-timer/src/error.rs | 60 +++ tokio-timer/src/interval.rs | 58 +++ tokio-timer/src/lib.rs | 41 ++ tokio-timer/src/sleep.rs | 110 +++++ tokio-timer/src/timer/entry.rs | 559 ++++++++++++++++++++++ tokio-timer/src/timer/handle.rs | 127 +++++ tokio-timer/src/timer/level.rs | 201 ++++++++ tokio-timer/src/timer/mod.rs | 645 ++++++++++++++++++++++++++ tokio-timer/src/timer/now.rs | 27 ++ tokio-timer/src/timer/registration.rs | 82 ++++ tokio-timer/tests/deadline.rs | 105 +++++ tokio-timer/tests/hammer.rs | 240 ++++++++++ tokio-timer/tests/interval.rs | 46 ++ tokio-timer/tests/sleep.rs | 488 +++++++++++++++++++ tokio-timer/tests/support/mod.rs | 234 ++++++++++ 24 files changed, 3398 insertions(+) create mode 100644 ci/tsan create mode 100644 tokio-timer/CHANGELOG.md create mode 100644 tokio-timer/Cargo.toml create mode 100644 tokio-timer/LICENSE create mode 100644 tokio-timer/README.md create mode 100644 tokio-timer/src/atomic.rs create mode 100644 tokio-timer/src/deadline.rs create mode 100644 tokio-timer/src/error.rs create mode 100644 tokio-timer/src/interval.rs create mode 100644 tokio-timer/src/lib.rs create mode 100644 tokio-timer/src/sleep.rs create mode 100644 tokio-timer/src/timer/entry.rs create mode 100644 tokio-timer/src/timer/handle.rs create mode 100644 tokio-timer/src/timer/level.rs create mode 100644 tokio-timer/src/timer/mod.rs create mode 100644 tokio-timer/src/timer/now.rs create mode 100644 tokio-timer/src/timer/registration.rs create mode 100644 tokio-timer/tests/deadline.rs create mode 100644 tokio-timer/tests/hammer.rs create mode 100644 tokio-timer/tests/interval.rs create mode 100644 tokio-timer/tests/sleep.rs create mode 100644 tokio-timer/tests/support/mod.rs diff --git a/.travis.yml b/.travis.yml index 082298b03..140840048 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,9 @@ sudo: false matrix: include: + # This represents the minimum Rust version supported by Tokio. Updating this + # should be done in a dedicated PR and cannot be greater than two 0.x + # releases prior to the current stable. - rust: 1.21.0 - rust: stable - os: osx @@ -16,7 +19,21 @@ script: set -e if [[ "$TRAVIS_RUST_VERSION" == nightly ]] then + # Pin the nightly version until rust-lang/rust#49436 is resolved. + rustup override set nightly-2018-03-26 + + # Make sure the benchmarks compile cargo build --benches --all + + # Run address sanitizer + ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" \ + RUSTFLAGS="-Z sanitizer=address" \ + cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu + + # Run thread sanitizer + TSAN_OPTIONS="suppressions=`pwd`/ci/tsan" \ + RUSTFLAGS="-Z sanitizer=thread" \ + cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu fi - | set -e diff --git a/Cargo.toml b/Cargo.toml index 3e0e2560a..6f6d30fc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "tokio-io", "tokio-reactor", "tokio-threadpool", + "tokio-timer", "tokio-tcp", "tokio-udp", "futures2", diff --git a/ci/tsan b/ci/tsan new file mode 100644 index 000000000..bc9867f5b --- /dev/null +++ b/ci/tsan @@ -0,0 +1,5 @@ +# TSAN suppressions file for Tokio + +# TSAN does not understand fences and `Arc::drop` is implemented using a fence. +# This causes many false positives. +race:Arc*drop diff --git a/tokio-timer/CHANGELOG.md b/tokio-timer/CHANGELOG.md new file mode 100644 index 000000000..6490e85e4 --- /dev/null +++ b/tokio-timer/CHANGELOG.md @@ -0,0 +1,19 @@ +# 0.2.0 (unreleased) + +* Rewrite from scratch using a hierarchical wheel strategy (#249). + +# 0.1.2 (Jun 27, 2017) + +* Allow naming timer thread. +* Track changes in dependencies. + +# 0.1.1 (Apr 6, 2017) + +* Set Rust v1.14 as the minimum supported version. +* Fix bug related to intervals. +* Impl `PartialEq + Eq` for TimerError. +* Add `Debug` implementations. + +# 0.1.0 (Jan 11, 2017) + +* Initial Release diff --git a/tokio-timer/Cargo.toml b/tokio-timer/Cargo.toml new file mode 100644 index 000000000..f4593b4f7 --- /dev/null +++ b/tokio-timer/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tokio-timer" +version = "0.2.0" +authors = ["Carl Lerche "] +license = "MIT" +readme = "README.md" +repository = "https://github.com/tokio-rs/tokio-timer" +homepage = "https://github.com/tokio-rs/tokio-timer" +documentation = "https://docs.rs/tokio-timer" +description = """ +Timer facilities for Tokio +""" + +[dependencies] +futures = "0.1.19" +tokio-executor = { version = "0.1.1", path = "../tokio-executor" } + +[dev-dependencies] +rand = "0.4.2" diff --git a/tokio-timer/LICENSE b/tokio-timer/LICENSE new file mode 100644 index 000000000..38c1e27b8 --- /dev/null +++ b/tokio-timer/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/tokio-timer/README.md b/tokio-timer/README.md new file mode 100644 index 000000000..8ccdc06a8 --- /dev/null +++ b/tokio-timer/README.md @@ -0,0 +1,19 @@ +# tokio-timer + +Timer facilities for Tokio + +[Documentation](https://tokio-rs.github.io/tokio/tokio_timer/) + +## Overview + +This crate provides timer facilities for usage with Tokio. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/tokio-timer/src/atomic.rs b/tokio-timer/src/atomic.rs new file mode 100644 index 000000000..ad2233b02 --- /dev/null +++ b/tokio-timer/src/atomic.rs @@ -0,0 +1,87 @@ +//! Implementation of an atomic u64 cell. On 64 bit platforms, this is a wrapper +//! around `AtomicUsize`. On 32 bit platforms, this is implemented using a +//! `Mutex`. +//! +//! This file can be removed if/when `AtomicU64` lands in `std`. + +pub use self::imp::AtomicU64; + +#[cfg(target_pointer_width = "64")] +mod imp { + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[derive(Debug)] + pub struct AtomicU64 { + inner: AtomicUsize, + } + + impl AtomicU64 { + pub fn new(val: u64) -> AtomicU64 { + AtomicU64 { + inner: AtomicUsize::new(val as usize), + } + } + + pub fn load(&self, ordering: Ordering) -> u64 { + self.inner.load(ordering) as u64 + } + + pub fn store(&self, val: u64, ordering: Ordering) { + self.inner.store(val as usize, ordering) + } + + pub fn fetch_or(&self, val: u64, ordering: Ordering) -> u64 { + self.inner.fetch_or(val as usize, ordering) as u64 + } + + pub fn compare_and_swap(&self, old: u64, new: u64, ordering: Ordering) -> u64 { + self.inner.compare_and_swap( + old as usize, new as usize, ordering) as u64 + } + } +} + +#[cfg(not(target_pointer_width = "64"))] +mod imp { + use std::sync::Mutex; + + #[derive(Debug)] + pub struct AtomicU64 { + inner: Mutex, + } + + impl AtomicU64 { + pub fn new(val: u64) -> AtomicU64 { + AtomicU64 { + inner: Mutex::new(val), + } + } + + pub fn load(&self, _: Ordering) -> u64 { + *self.inner.lock().unwrap() + } + + pub fn store(&self, val: u64, _: Ordering) { + *self.inner.lock().unwrap() = val; + } + + pub fn fetch_or(&self, val: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock().unwrap(); + let prev = *lock; + *lock = prev | val; + prev + } + + pub fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 { + let mut lock = self.inner.lock().unwrap(); + let prev = *lock; + + if prev != old { + return prev; + } + + *lock = new; + prev + } + } +} diff --git a/tokio-timer/src/deadline.rs b/tokio-timer/src/deadline.rs new file mode 100644 index 000000000..7ba8a5433 --- /dev/null +++ b/tokio-timer/src/deadline.rs @@ -0,0 +1,183 @@ +//! Docs + +use Sleep; + +use futures::{Future, Poll, Async}; + +use std::error; +use std::fmt; +use std::time::Instant; + +/// Allows a given `Future` to execute until the specified deadline. +/// +/// If the inner future completes before the deadline is reached, then +/// `Deadline` completes with that value. Otherwise, `Deadline` completes with a +/// [`DeadlineError`]. +/// +/// [`DeadlineError`]: struct.DeadlineError.html +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Deadline { + future: T, + sleep: Sleep, +} + +/// Error returned by `Deadline` future. +#[derive(Debug)] +pub struct DeadlineError(Kind); + +/// Deadline error variants +#[derive(Debug)] +enum Kind { + /// Inner future returned an error + Inner(T), + + /// The deadline elapsed. + Elapsed, + + /// Timer returned an error. + Timer(::Error), +} + +impl Deadline { + /// Create a new `Deadline` that completes when `future` completes or when + /// `deadline` is reached. + pub fn new(future: T, deadline: Instant) -> Deadline { + Deadline::new_with_sleep(future, Sleep::new(deadline)) + } + + pub(crate) fn new_with_sleep(future: T, sleep: Sleep) -> Deadline { + Deadline { + future, + sleep, + } + } + + /// Gets a reference to the underlying future in this deadline. + pub fn get_ref(&self) -> &T { + &self.future + } + + /// Gets a mutable reference to the underlying future in this deadline. + pub fn get_mut(&mut self) -> &mut T { + &mut self.future + } + + /// Consumes this deadline, returning the underlying future. + pub fn into_inner(self) -> T { + self.future + } +} + +impl Future for Deadline +where T: Future, +{ + type Item = T::Item; + type Error = DeadlineError; + + fn poll(&mut self) -> Poll { + // First, try polling the future + match self.future.poll() { + Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), + Ok(Async::NotReady) => {} + Err(e) => return Err(DeadlineError::inner(e)), + } + + // Now check the timer + match self.sleep.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => { + Err(DeadlineError::elapsed()) + }, + Err(e) => Err(DeadlineError::timer(e)), + } + } +} + +// ===== impl DeadlineError ===== + +impl DeadlineError { + /// Create a new `DeadlineError` representing the inner future completing + /// with `Err`. + pub fn inner(err: T) -> DeadlineError { + DeadlineError(Kind::Inner(err)) + } + + /// Returns `true` if the error was caused by the inner future completing + /// with `Err`. + pub fn is_inner(&self) -> bool { + match self.0 { + Kind::Inner(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the inner future error. + pub fn into_inner(self) -> Option { + match self.0 { + Kind::Inner(err) => Some(err), + _ => None, + } + } + + /// Create a new `DeadlineError` representing the inner future not + /// completing before the deadline is reached. + pub fn elapsed() -> DeadlineError { + DeadlineError(Kind::Elapsed) + } + + /// Returns `true` if the error was caused by the inner future not + /// completing before the deadline is reached. + pub fn is_elapsed(&self) -> bool { + match self.0 { + Kind::Elapsed => true, + _ => false, + } + } + + /// Creates a new `DeadlineError` representing an error encountered by the + /// timer implementation + pub fn timer(err: ::Error) -> DeadlineError { + DeadlineError(Kind::Timer(err)) + } + + /// Returns `true` if the error was caused by the timer. + pub fn is_timer(&self) -> bool { + match self.0 { + Kind::Timer(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the error raised by the timer implementation. + pub fn into_timer(self) -> Option<::Error> { + match self.0 { + Kind::Timer(err) => Some(err), + _ => None, + } + } +} + +impl error::Error for DeadlineError { + fn description(&self) -> &str { + use self::Kind::*; + + match self.0 { + Inner(ref e) => e.description(), + Elapsed => "deadline has elapsed", + Timer(ref e) => e.description(), + } + } +} + +impl fmt::Display for DeadlineError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use self::Kind::*; + + match self.0 { + Inner(ref e) => e.fmt(fmt), + Elapsed => "deadline has elapsed".fmt(fmt), + Timer(ref e) => e.fmt(fmt), + } + } +} diff --git a/tokio-timer/src/error.rs b/tokio-timer/src/error.rs new file mode 100644 index 000000000..8f0956956 --- /dev/null +++ b/tokio-timer/src/error.rs @@ -0,0 +1,60 @@ +use self::Kind::*; + +use std::error; +use std::fmt; + +/// Errors encountered by the timer implementation. +#[derive(Debug)] +pub struct Error(Kind); + +#[derive(Debug)] +enum Kind { + Shutdown, + AtCapacity, +} + +impl Error { + /// Create an error representing a shutdown timer. + pub fn shutdown() -> Error { + Error(Shutdown) + } + + /// Returns `true` if the error was caused by the timer being shutdown. + pub fn is_shutdown(&self) -> bool { + match self.0 { + Kind::Shutdown => true, + _ => false, + } + } + + /// Create an error representing a timer at capacity. + pub fn at_capacity() -> Error { + Error(AtCapacity) + } + + /// Returns `true` if the error was caused by the timer being at capacity. + pub fn is_at_capacity(&self) -> bool { + match self.0 { + Kind::AtCapacity => true, + _ => false, + } + } +} + +impl error::Error for Error { + fn description(&self) -> &str { + use self::Kind::*; + + match self.0 { + Shutdown => "timer is shutdown", + AtCapacity => "timer is at capacity and cannot create a new entry", + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use std::error::Error; + self.description().fmt(fmt) + } +} diff --git a/tokio-timer/src/interval.rs b/tokio-timer/src/interval.rs new file mode 100644 index 000000000..4365285d1 --- /dev/null +++ b/tokio-timer/src/interval.rs @@ -0,0 +1,58 @@ +use Sleep; + +use futures::{Future, Stream, Poll}; + +use std::time::{Instant, Duration}; + +/// A stream representing notifications at fixed interval +#[derive(Debug)] +pub struct Interval { + /// Future that completes the next time the `Interval` yields a value. + sleep: Sleep, + + /// The duration between values yielded by `Interval`. + duration: Duration, +} + +impl Interval { + /// Create a new `Interval` that starts at `at` and yields every `duration` + /// interval after that. + /// + /// The `duration` argument must be a non-zero duration. + /// + /// # Panics + /// + /// This function panics if `duration` is zero. + pub fn new(at: Instant, duration: Duration) -> Interval { + assert!(duration > Duration::new(0, 0), "`duration` must be non-zero."); + + Interval::new_with_sleep(Sleep::new(at), duration) + } + + pub(crate) fn new_with_sleep(sleep: Sleep, duration: Duration) -> Interval { + Interval { + sleep, + duration, + } + } +} + +impl Stream for Interval { + type Item = Instant; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // Wait for the sleep to be done + let _ = try_ready!(self.sleep.poll()); + + // Get the `now` by looking at the `sleep` deadline + let now = self.sleep.deadline(); + + // The next interval value is `duration` after the one that just + // yielded. + self.sleep.reset(now + self.duration); + + // Return the current instant + Ok(Some(now).into()) + } +} diff --git a/tokio-timer/src/lib.rs b/tokio-timer/src/lib.rs new file mode 100644 index 000000000..caeef270a --- /dev/null +++ b/tokio-timer/src/lib.rs @@ -0,0 +1,41 @@ +//! Utilities for scheduling work to happen after a period of time. +//! +//! This crate provides a number of utilities for working with periods of time: +//! +//! * [`Sleep`]: A future that completes at a specified instant in time. +//! +//! * [`Interval`] A stream that yields at fixed time intervals. +//! +//! * [`Deadline`]: Wraps a future, requiring it to complete before a specified +//! instant in time, erroring if the future takes too long. +//! +//! These three types are backed by a [`Timer`] instance. In order for +//! [`Sleep`], [`Interval`], and [`Deadline`] to function, the associated +//! [`Timer`] instance must be running on some thread. +//! +//! [`Sleep`]: struct.Sleep.html +//! [`Deadline`]: struct.Deadline.html +//! [`Interval`]: struct.Interval.html +//! [`Timer`]: timer/struct.Timer.html + +#![doc(html_root_url = "https://docs.rs/tokio-timer/0.2.0")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +extern crate tokio_executor; + +#[macro_use] +extern crate futures; + +pub mod timer; + +mod atomic; +mod deadline; +mod error; +mod interval; +mod sleep; + +pub use self::deadline::{Deadline, DeadlineError}; +pub use self::error::Error; +pub use self::interval::Interval; +pub use self::timer::{Timer, with_default}; +pub use self::sleep::Sleep; diff --git a/tokio-timer/src/sleep.rs b/tokio-timer/src/sleep.rs new file mode 100644 index 000000000..e2960921b --- /dev/null +++ b/tokio-timer/src/sleep.rs @@ -0,0 +1,110 @@ +use Error; +use timer::Registration; + +use futures::{Future, Poll}; + +use std::time::Instant; + +/// A future that completes at a specified instant in time. +/// +/// Instances of `Sleep` perform no work and complete with `()` once the +/// specified deadline has been reached. +/// +/// `Sleep` has a resolution of one millisecond and should not be used for tasks +/// that require high-resolution timers. +/// +/// [`new`]: #method.new +#[derive(Debug)] +pub struct Sleep { + /// The instant at which the future completes. + deadline: Instant, + + /// The link between the `Sleep` instance at the timer that drives it. + /// + /// When `Sleep` is created with `new`, this is initialized to `None` and is + /// lazily set in `poll`. When `poll` is called, the default for the current + /// execution context is used (obtained via `Handle::current`). + /// + /// When `sleep` is created with `new_with_registration`, the value is set. + /// + /// Once `registration` is set to `Some`, it is never changed. + registration: Option, +} + +// ===== impl Sleep ===== + +impl Sleep { + /// Create a new `Sleep` instance that elapses at `deadline`. + /// + /// Only millisecond level resolution is guaranteed. There is no guarantee + /// as to how the sub-millisecond portion of `deadline` will be handled. + /// `Sleep` should not be used for high-resolution timer use cases. + pub fn new(deadline: Instant) -> Sleep { + Sleep { + deadline, + registration: None, + } + } + + pub(crate) fn new_with_registration( + deadline: Instant, + registration: Registration) -> Sleep + { + Sleep { + deadline, + registration: Some(registration), + } + } + + /// Returns the instant at which the future will complete. + pub fn deadline(&self) -> Instant { + self.deadline + } + + /// Returns true if the `Sleep` has elapsed + /// + /// A `Sleep` is elapsed when the requested duration has elapsed. + pub fn is_elapsed(&self) -> bool { + self.registration.as_ref() + .map(|r| r.is_elapsed()) + .unwrap_or(false) + } + + /// Reset the `Sleep` instance to a new deadline. + /// + /// Calling this function allows changing the instant at which the `Sleep` + /// future completes without having to create new associated state. + /// + /// This function can be called both before and after the future has + /// completed. + pub fn reset(&mut self, deadline: Instant) { + self.deadline = deadline; + + if let Some(registration) = self.registration.as_ref() { + registration.reset(deadline); + } + } + + /// Register the sleep with the timer instance for the current execution + /// context. + fn register(&mut self) { + if self.registration.is_some() { + return; + } + + self.registration = Some(Registration::new(self.deadline)); + } +} + +impl Future for Sleep { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + // Ensure the `Sleep` instance is associated with a timer. + self.register(); + + self.registration.as_ref().unwrap() + .poll_elapsed() + } +} diff --git a/tokio-timer/src/timer/entry.rs b/tokio-timer/src/timer/entry.rs new file mode 100644 index 000000000..7ea1e68e2 --- /dev/null +++ b/tokio-timer/src/timer/entry.rs @@ -0,0 +1,559 @@ +use Error; +use atomic::AtomicU64; +use timer::{Handle, Inner}; + +use futures::Poll; +use futures::task::AtomicTask; + +use std::cell::UnsafeCell; +use std::ptr; +use std::sync::{Arc, Weak}; +use std::sync::atomic::{AtomicBool, AtomicPtr}; +use std::sync::atomic::Ordering::SeqCst; +use std::time::Instant; +use std::u64; + +/// Internal state shared between a `Sleep` instance and the timer. +/// +/// This struct is used as a node in two intrusive data structures: +/// +/// * An atomic stack used to signal to the timer thread that the entry state +/// has changed. The timer thread will observe the entry on this stack and +/// perform any actions as necessary. +/// +/// * A doubly linked list used **only** by the timer thread. Each slot in the +/// timer wheel is a head pointer to the list of entries that must be +/// processed during that timer tick. +#[derive(Debug)] +pub(crate) struct Entry { + /// Timer internals. Using a weak pointer allows the timer to shutdown + /// without all `Sleep` instances having completed. + inner: Weak, + + /// Task to notify once the deadline is reached. + task: AtomicTask, + + /// Tracks the entry state. This value contains the following information: + /// + /// * The deadline at which the entry must be "fired". + /// * A flag indicating if the entry has already been fired. + /// * Whether or not the entry transitioned to the error state. + /// + /// When an `Entry` is created, `state` is initialized to the instant at + /// which the entry must be fired. When a timer is reset to a different + /// instant, this value is changed. + state: AtomicU64, + + /// When true, the entry is counted by `Inner` towards the max oustanding + /// timeouts. The drop fn uses this to know if it should decrement the + /// counter. + /// + /// One might think that it would be easier to just not create the `Entry`. + /// The problem is that `Sleep` expects creating a `Registration` to always + /// return a `Registration` instance. This simplifying factor allows it to + /// improve the struct layout. To do this, we must always allocate the node. + counted: bool, + + /// True wheen the entry is queued in the "process" stack. This value + /// is set before pushing the value and unset after popping the value. + queued: AtomicBool, + + /// Next entry in the "process" linked list. + /// + /// Represents a strong Arc ref. + next_atomic: UnsafeCell<*mut Entry>, + + /// When the entry expires, relative to the `start` of the timer + /// (Inner::start). This is only used by the timer. + /// + /// A `Sleep` instance can be reset to a different deadline by the thread + /// that owns the `Sleep` instance. In this case, the timer thread will not + /// immediately know that this has happened. The timer thread must know the + /// last deadline that it saw as it uses this value to locate the entry in + /// its wheel. + /// + /// Once the timer thread observes that the instant has changed, it updates + /// the wheel and sets this value. The idea is that this value eventually + /// converges to the value of `state` as the timer thread makes updates. + when: UnsafeCell>, + + /// Next entry in the State's linked list. + /// + /// This is only accessed by the timer + next_stack: UnsafeCell>>, + + /// Previous entry in the State's linked list. + /// + /// This is only accessed by the timer and is used to unlink a canceled + /// entry. + /// + /// This is a weak reference. + prev_stack: UnsafeCell<*const Entry>, +} + +/// A doubly linked stack +pub(crate) struct Stack { + head: Option>, +} + +/// A stack of `Entry` nodes +#[derive(Debug)] +pub(crate) struct AtomicStack { + /// Stack head + head: AtomicPtr, +} + +/// Entries that were removed from the stack +#[derive(Debug)] +pub(crate) struct AtomicStackEntries { + ptr: *mut Entry, +} + +/// Flag indicating a timer entry has elapsed +const ELAPSED: u64 = 1 << 63; + +/// Flag indicating a timer entry has reached an error state +const ERROR: u64 = u64::MAX; + +/// Used to indicate that the timer has shutdown. +const SHUTDOWN: *mut Entry = 1 as *mut _; + +// ===== impl Entry ===== + +impl Entry { + pub fn new(when: u64, handle: Handle) -> Entry { + assert!(when > 0 && when < u64::MAX); + + Entry { + inner: handle.into_inner(), + task: AtomicTask::new(), + state: AtomicU64::new(when), + counted: true, + queued: AtomicBool::new(false), + next_atomic: UnsafeCell::new(ptr::null_mut()), + when: UnsafeCell::new(None), + next_stack: UnsafeCell::new(None), + prev_stack: UnsafeCell::new(ptr::null_mut()), + } + } + + pub fn new_elapsed(handle: Handle) -> Entry { + Entry { + inner: handle.into_inner(), + task: AtomicTask::new(), + state: AtomicU64::new(ELAPSED), + counted: true, + queued: AtomicBool::new(false), + next_atomic: UnsafeCell::new(ptr::null_mut()), + when: UnsafeCell::new(None), + next_stack: UnsafeCell::new(None), + prev_stack: UnsafeCell::new(ptr::null_mut()), + } + } + + /// Create a new `Entry` that is in the error state. Calling `poll_elapsed` on + /// this `Entry` will always result in `Err` being returned. + pub fn new_error() -> Entry { + Entry { + inner: Weak::new(), + task: AtomicTask::new(), + state: AtomicU64::new(ERROR), + counted: false, + queued: AtomicBool::new(false), + next_atomic: UnsafeCell::new(ptr::null_mut()), + when: UnsafeCell::new(None), + next_stack: UnsafeCell::new(None), + prev_stack: UnsafeCell::new(ptr::null_mut()), + } + } + + /// The current entry state as known by the timer. This is not the value of + /// `state`, but lets the timer know how to converge its state to `state`. + pub fn when_internal(&self) -> Option { + unsafe { (*self.when.get()) } + } + + pub fn set_when_internal(&self, when: Option) { + unsafe { (*self.when.get()) = when; } + } + + /// Called by `Timer` to load the current value of `state` for processing + pub fn load_state(&self) -> Option { + let state = self.state.load(SeqCst); + + if is_elapsed(state) { + None + } else { + Some(state) + } + } + + pub fn is_elapsed(&self) -> bool { + let state = self.state.load(SeqCst); + is_elapsed(state) + } + + pub fn fire(&self, when: u64) { + let mut curr = self.state.load(SeqCst); + + loop { + if is_elapsed(curr) || curr > when { + return; + } + + let next = ELAPSED | curr; + let actual = self.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + self.task.notify(); + } + + pub fn error(&self) { + // Only transition to the error state if not currently elapsed + let mut curr = self.state.load(SeqCst); + + loop { + if is_elapsed(curr) { + return; + } + + let next = ERROR; + + let actual = self.state.compare_and_swap(curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + self.task.notify(); + } + + pub fn cancel(entry: &Arc) { + let state = entry.state.fetch_or(ELAPSED, SeqCst); + + if is_elapsed(state) { + // Nothing more to do + return; + } + + let inner = match entry.inner.upgrade() { + Some(inner) => inner, + None => return, + }; + + let _ = inner.queue(entry); + } + + pub fn poll_elapsed(&self) -> Poll<(), Error> { + use futures::Async::NotReady; + + let mut curr = self.state.load(SeqCst); + + if is_elapsed(curr) { + if curr == ERROR { + return Err(Error::shutdown()); + } else { + return Ok(().into()); + } + } + + self.task.register(); + + curr = self.state.load(SeqCst).into(); + + if is_elapsed(curr) { + if curr == ERROR { + return Err(Error::shutdown()); + } else { + return Ok(().into()); + } + } + + Ok(NotReady) + } + + pub fn reset(entry: &Arc, deadline: Instant) { + let inner = match entry.inner.upgrade() { + Some(inner) => inner, + None => return, + }; + + let when = inner.normalize_deadline(deadline); + let elapsed = inner.elapsed(); + + let mut curr = entry.state.load(SeqCst); + let mut notify; + + loop { + // In these two cases, there is no work to do when resetting the + // timer. If the `Entry` is in an error state, then it cannot be + // used anymore. If resetting the entry to the current value, then + // the reset is a noop. + if curr == ERROR || curr == when { + return; + } + + let next; + + if when <= elapsed { + next = ELAPSED; + notify = !is_elapsed(curr); + } else { + next = when; + notify = true; + } + + let actual = entry.state.compare_and_swap( + curr, next, SeqCst); + + if curr == actual { + break; + } + + curr = actual; + } + + if notify { + let _ = inner.queue(entry); + } + } +} + +fn is_elapsed(state: u64) -> bool { + state & ELAPSED == ELAPSED +} + +impl Drop for Entry { + fn drop(&mut self) { + if !self.counted { + return; + } + + let inner = match self.inner.upgrade() { + Some(inner) => inner, + None => return, + }; + + inner.decrement(); + } +} + +unsafe impl Send for Entry {} +unsafe impl Sync for Entry {} + +// ===== impl Stack ===== + +impl Stack { + pub fn new() -> Stack { + Stack { head: None } + } + + pub fn is_empty(&self) -> bool { + self.head.is_none() + } + + /// Push an entry to the head of the linked list + pub fn push(&mut self, entry: Arc) { + // Get a pointer to the entry to for the prev link + let ptr: *const Entry = &*entry as *const _; + + // Remove the old head entry + let old = self.head.take(); + + unsafe { + // Ensure the entry is not already in a stack. + debug_assert!((*entry.next_stack.get()).is_none()); + debug_assert!((*entry.prev_stack.get()).is_null()); + + if let Some(ref entry) = old.as_ref() { + debug_assert!({ + // The head is not already set to the entry + ptr != &***entry as *const _ + }); + + // Set the previous link on the old head + *entry.prev_stack.get() = ptr; + } + + // Set this entry's next pointer + *entry.next_stack.get() = old; + + } + + // Update the head pointer + self.head = Some(entry); + } + + /// Pop the head of the linked list + pub fn pop(&mut self) -> Option> { + let entry = self.head.take(); + + unsafe { + if let Some(entry) = entry.as_ref() { + self.head = (*entry.next_stack.get()).take(); + + if let Some(entry) = self.head.as_ref() { + *entry.prev_stack.get() = ptr::null(); + } + + *entry.prev_stack.get() = ptr::null(); + } + } + + entry + } + + /// Remove the entry from the linked list + /// + /// The caller must ensure that the entry actually is contained by the list. + pub fn remove(&mut self, entry: &Entry) { + unsafe { + // Ensure that the entry is in fact contained by the stack + debug_assert!({ + // This walks the full linked list even if an entry is found. + let mut next = self.head.as_ref(); + let mut contains = false; + + while let Some(n) = next { + if entry as *const _ == &**n as *const _ { + debug_assert!(!contains); + contains = true; + } + + next = (*n.next_stack.get()).as_ref(); + } + + contains + }); + + // Unlink `entry` from the next node + let next = (*entry.next_stack.get()).take(); + + if let Some(next) = next.as_ref() { + (*next.prev_stack.get()) = *entry.prev_stack.get(); + } + + // Unlink `entry` from the prev node + + if let Some(prev) = (*entry.prev_stack.get()).as_ref() { + *prev.next_stack.get() = next; + } else { + // It is the head + self.head = next; + } + + // Unset the prev pointer + *entry.prev_stack.get() = ptr::null(); + } + } +} + +// ===== impl AtomicStack ===== + +impl AtomicStack { + pub fn new() -> AtomicStack { + AtomicStack { head: AtomicPtr::new(ptr::null_mut()) } + } + + /// Push an entry onto the stack. + /// + /// Returns `true` if the entry was pushed, `false` if the entry is already + /// on the stack, `Err` if the timer is shutdown. + pub fn push(&self, entry: &Arc) -> Result { + // First, set the queued bit on the entry + let queued = entry.queued.fetch_or(true, SeqCst).into(); + + if queued { + // Already queued, nothing more to do + return Ok(false); + } + + let ptr = Arc::into_raw(entry.clone()) as *mut _; + + let mut curr = self.head.load(SeqCst); + + loop { + if curr == SHUTDOWN { + // Don't leak the entry node + let _ = unsafe { Arc::from_raw(ptr) }; + + return Err(Error::shutdown()); + } + + // Update the `next` pointer. This is safe because setting the queued + // bit is a "lock" on this field. + unsafe { + *(entry.next_atomic.get()) = curr; + } + + let actual = self.head.compare_and_swap(curr, ptr, SeqCst); + + if actual == curr { + break; + } + + curr = actual; + } + + Ok(true) + } + + /// Take all entries from the stack + pub fn take(&self) -> AtomicStackEntries { + let ptr = self.head.swap(ptr::null_mut(), SeqCst); + AtomicStackEntries { ptr } + } + + /// Drain all remaining nodes in the stack and prevent any new nodes from + /// being pushed onto the stack. + pub fn shutdown(&self) { + // Shutdown the processing queue + let ptr = self.head.swap(SHUTDOWN, SeqCst); + + // Let the drop fn of `AtomicStackEntries` handle draining the stack + drop(AtomicStackEntries { ptr }); + } +} + +// ===== impl AtomicStackEntries ===== + +impl Iterator for AtomicStackEntries { + type Item = Arc; + + fn next(&mut self) -> Option { + if self.ptr.is_null() { + return None; + } + + // Convert the pointer to an `Arc` + let entry = unsafe { Arc::from_raw(self.ptr) }; + + // Update `self.ptr` to point to the next element of the stack + self.ptr = unsafe { (*entry.next_atomic.get()) }; + + // Unset the queued flag + let res = entry.queued.fetch_and(false, SeqCst); + debug_assert!(res); + + // Return the entry + Some(entry) + } +} + +impl Drop for AtomicStackEntries { + fn drop(&mut self) { + while let Some(entry) = self.next() { + // Flag the entry as errored + entry.error(); + } + } +} diff --git a/tokio-timer/src/timer/handle.rs b/tokio-timer/src/timer/handle.rs new file mode 100644 index 000000000..9f6794fe7 --- /dev/null +++ b/tokio-timer/src/timer/handle.rs @@ -0,0 +1,127 @@ +use {Error, Sleep, Deadline, Interval}; +use timer::{Registration, Inner}; + +use tokio_executor::Enter; + +use std::cell::RefCell; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; + +/// Handle to timer instance. +/// +/// The `Handle` allows creating `Sleep` instances that are driven by the +/// associated timer. +/// +/// A `Handle` is obtained by calling [`Timer::handle`]. +/// +/// [`Timer::handle`]: struct.Timer.html#method.handle +#[derive(Debug, Clone)] +pub struct Handle { + inner: Weak, +} + +/// Tracks the timer for the current execution context. +thread_local!(static CURRENT_TIMER: RefCell> = RefCell::new(None)); + +/// Set the default timer for the duration of the closure. +/// +/// From within the closure, [`Sleep`] instances that are created via +/// [`Sleep::new`] can be used. +/// +/// # Panics +/// +/// This function panics if there already is a default timer set. +/// +/// [`Sleep`]: ../struct.Sleep.html +pub fn with_default(handle: &Handle, enter: &mut Enter, f: F) -> R +where F: FnOnce(&mut Enter) -> R +{ + // Ensure that the timer is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset; + + impl Drop for Reset { + fn drop(&mut self) { + CURRENT_TIMER.with(|current| { + let mut current = current.borrow_mut(); + *current = None; + }); + } + } + + // This ensures the value for the current timer gets reset even if there is + // a panic. + let _r = Reset; + + CURRENT_TIMER.with(|current| { + { + let mut current = current.borrow_mut(); + assert!(current.is_none(), "default Tokio timer already set \ + for execution context"); + *current = Some(handle.clone()); + } + + f(enter) + }) +} + +impl Handle { + pub(crate) fn new(inner: Weak) -> Handle { + Handle { inner } + } + + /// Returns a handle to the current timer. + /// + /// The current timer is the timer that is currently set as default using + /// [`with_default`]. + /// + /// This function should only be called from within the context of + /// [`with_default`]. Calling this function from outside of this context + /// will return a `Handle` that does not reference a timer. `Sleep` + /// instances created with this handle will error. + /// + /// [`with_default`]: ../fn.with_default.html + pub fn current() -> Handle { + Handle::try_current() + .unwrap_or(Handle { inner: Weak::new() }) + } + + /// Create a `Sleep` driven by this handle's associated `Timer`. + pub fn sleep(&self, deadline: Instant) -> Sleep { + let registration = Registration::new_with_handle(deadline, self.clone()); + Sleep::new_with_registration(deadline, registration) + } + + /// Create a `Deadline` driven by this handle's associated `Timer`. + pub fn deadline(&self, future: T, deadline: Instant) -> Deadline { + Deadline::new_with_sleep(future, self.sleep(deadline)) + } + + /// Create a new `Interval` that starts at `at` and yields every `duration` + /// interval after that. + pub fn interval(&self, at: Instant, duration: Duration) -> Interval { + Interval::new_with_sleep(self.sleep(at), duration) + } + + /// Try to get a handle to the current timer. + /// + /// Returns `Err` if no handle is found. + pub(crate) fn try_current() -> Result { + CURRENT_TIMER.with(|current| { + match *current.borrow() { + Some(ref handle) => Ok(handle.clone()), + None => Err(Error::shutdown()), + } + }) + } + + /// Try to return a strong ref to the inner + pub(crate) fn inner(&self) -> Option> { + self.inner.upgrade() + } + + /// Consume the handle, returning the weak Inner ref. + pub(crate) fn into_inner(self) -> Weak { + self.inner + } +} diff --git a/tokio-timer/src/timer/level.rs b/tokio-timer/src/timer/level.rs new file mode 100644 index 000000000..8be0ba180 --- /dev/null +++ b/tokio-timer/src/timer/level.rs @@ -0,0 +1,201 @@ +use timer::{entry, Entry}; + +use std::fmt; +use std::sync::Arc; + +/// Wheel for a single level in the timer. This wheel contains 64 slots. +pub(crate) struct Level { + level: usize, + + /// Bit field tracking which slots currently contain entries. + /// + /// Using a bit field to track slots that contain entries allows avoiding a + /// scan to find entries. This field is updated when entries are added or + /// removed from a slot. + /// + /// The least-significant bit represents slot zero. + occupied: u64, + + /// Slots + slot: [entry::Stack; LEVEL_MULT], +} + +/// Indicates when a slot must be processed next. +#[derive(Debug)] +pub struct Expiration { + /// The level containing the slot. + pub level: usize, + + /// The slot index. + pub slot: usize, + + /// The instant at which the slot needs to be processed. + pub deadline: u64, +} + +/// Level multiplier. +/// +/// Being a power of 2 is very important. +const LEVEL_MULT: usize = 64; + +impl Level { + pub fn new(level: usize) -> Level { + // Rust's derived implementations for arrays require that the value + // contained by the array be `Copy`. So, here we have to manually + // initialize every single slot. + macro_rules! s { + () => { entry::Stack::new() }; + }; + + Level { + level, + occupied: 0, + slot: [ + // It does not look like the necessary traits are + // derived for [T; 64]. + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(), + ], + } + } + + /// Finds the slot that needs to be processed next and returns the slot and + /// `Instant` at which this slot must be processed. + pub fn next_expiration(&self, now: u64) -> Option { + // Use the `occupied` bit field to get the index of the next slot that + // needs to be processed. + let slot = match self.next_occupied_slot(now) { + Some(slot) => slot, + None => return None, + }; + + // From the slot index, calculate the `Instant` at which it needs to be + // processed. This value *must* be in the future with respect to `now`. + + let level_range = level_range(self.level); + let slot_range = slot_range(self.level); + + // TODO: This can probably be simplified w/ power of 2 math + let level_start = now - (now % level_range); + let deadline = level_start + slot as u64 * slot_range; + + debug_assert!(deadline >= now, "deadline={}; now={}; level={}; slot={}; occupied={:b}", + deadline, now, self.level, slot, self.occupied); + + Some(Expiration { + level: self.level, + slot, + deadline, + }) + } + + fn next_occupied_slot(&self, now: u64) -> Option { + if self.occupied == 0 { + return None; + } + + // Get the slot for now using Maths + let now_slot = (now / slot_range(self.level)) as usize; + let occupied = self.occupied.rotate_right(now_slot as u32); + let zeros = occupied.trailing_zeros() as usize; + let slot = (zeros + now_slot) % 64; + + Some(slot) + } + + pub fn add_entry(&mut self, entry: Arc, when: u64) { + let slot = slot_for(when, self.level); + + self.slot[slot].push(entry); + self.occupied |= occupied_bit(slot); + } + + pub fn remove_entry(&mut self, entry: &Entry, when: u64) { + let slot = slot_for(when, self.level); + + self.slot[slot].remove(entry); + + if self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + // Unset the bit + self.occupied ^= occupied_bit(slot); + } + } + + pub fn pop_entry_slot(&mut self, slot: usize) -> Option> { + let ret = self.slot[slot].pop(); + + if ret.is_some() && self.slot[slot].is_empty() { + // The bit is currently set + debug_assert!(self.occupied & occupied_bit(slot) != 0); + + self.occupied ^= occupied_bit(slot); + } + + ret + } +} + +impl Drop for Level { + fn drop(&mut self) { + while let Some(slot) = self.next_occupied_slot(0) { + // This should always have one + let entry = self.pop_entry_slot(slot) + .expect("occupied bit set invalid"); + + entry.error(); + } + } +} + +impl fmt::Debug for Level { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Level") + .field("occupied", &self.occupied) + .finish() + } +} + +fn occupied_bit(slot: usize) -> u64 { + (1 << slot) +} + +fn slot_range(level: usize) -> u64 { + LEVEL_MULT.pow(level as u32) as u64 +} + +fn level_range(level: usize) -> u64 { + LEVEL_MULT as u64 * slot_range(level) +} + +/// Convert a duration (milliseconds) and a level to a slot position +fn slot_for(duration: u64, level: usize) -> usize { + ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_slot_for() { + for pos in 1..64 { + assert_eq!(pos as usize, slot_for(pos, 0)); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!(pos as usize, slot_for(a as u64, level)); + } + } + } +} diff --git a/tokio-timer/src/timer/mod.rs b/tokio-timer/src/timer/mod.rs new file mode 100644 index 000000000..a0f00c375 --- /dev/null +++ b/tokio-timer/src/timer/mod.rs @@ -0,0 +1,645 @@ +//! Timer implementation. +//! +//! This module contains the types needed to run a timer. +//! +//! The [`Timer`] type runs the timer logic. It holds all the necessary state +//! to track all associated [`Sleep`] instances and delivering notifications +//! once the deadlines are reached. +//! +//! The [`Handle`] type is a reference to a [`Timer`] instance. This type is +//! `Clone`, `Send`, and `Sync`. This type is used to create instances of +//! [`Sleep`]. +//! +//! The [`Now`] trait describes how to get an `Instance` representing the +//! current moment in time. [`SystemNow`] is the default implementation, where +//! [`Now::now`] is implemented by calling `Instant::now`. +//! +//! [`Timer`] is generic over [`Now`]. This allows the source of time to be +//! customized. This ability is especially useful in tests and any environment +//! where determinism is necessary. +//! +//! Note, when using the Tokio runtime, the `Timer` does not need to be manually +//! setup as the runtime comes pre-configured with a `Timer` instance. +//! +//! [`Timer`]: struct.Timer.html +//! [`Handle`]: struct.Handle.html +//! [`Sleep`]: ../struct.Sleep.html +//! [`Now`]: trait.Now.html +//! [`Now::now`]: trait.Now.html#method.now + +mod entry; +mod handle; +mod level; +mod now; +mod registration; + +use self::entry::Entry; +use self::level::{Level, Expiration}; + +pub use self::handle::{Handle, with_default}; +pub use self::now::{Now, SystemNow}; +pub(crate) use self::registration::Registration; + +use Error; +use atomic::AtomicU64; + +use tokio_executor::park::{Park, Unpark, ParkThread}; + +use std::{cmp, fmt}; +use std::time::{Duration, Instant}; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::usize; + +/// Timer implementation that drives [`Sleep`], [`Interval`], and [`Deadline`]. +/// +/// A `Timer` instance tracks the state necessary for managing time and +/// notifying the [`Sleep`] instances once their deadlines are reached. +/// +/// It is expected that a single `Timer` instance manages many individual +/// `Sleep` instances. The `Timer` implementation is thread-safe and, as such, +/// is able to handle callers from across threads. +/// +/// Callers do not use `Timer` directly to create `Sleep` instances. Instead, +/// [`Handle`] is used. A handle for the timer instance is obtained by calling +/// [`handle`]. [`Handle`] is the type that implements `Clone` and is `Send + +/// Sync`. +/// +/// After creating the `Timer` instance, the caller must repeatedly call +/// [`turn`]. The timer will perform no work unless [`turn`] is called +/// repeatedly. +/// +/// The `Timer` has a resolution of one millisecond. Any unit of time that falls +/// between milliseconds are rounded up to the next millisecond. +/// +/// When the `Timer` instance is dropped, any outstanding `Sleep` instance that +/// has not elapsed will be notified with an error. At this point, calling +/// `poll` on the sleep instance will result in `Err` being returned. +/// +/// # Implementation +/// +/// `Timer` is based on the [paper by Varghese and Lauck][paper]. +/// +/// A hashed timing wheel is a vector of slots, where each slot handles a time +/// slice. As time progresses, the timer walks over the slot for the current +/// instant, and processes each entry for that slot. When the timer reaches the +/// end of the wheel, it starts again at the beginning. +/// +/// The `Timer` implementation maintains six wheels arranged in a set of levels. +/// As the levels go up, the slots of the associated wheel represent larger +/// intervals of time. At each level, the wheel has 64 slots. Each slot covers a +/// range of time equal to the wheel at the lower level. At level zero, each +/// slot represents one millisecond of time. +/// +/// The wheels are: +/// +/// * Level 0: 64 x 1 millisecond slots. +/// * Level 1: 64 x 64 millisecond slots. +/// * Level 2: 64 x ~4 second slots. +/// * Level 3: 64 x ~4 minute slots. +/// * Level 4: 64 x ~4 hour slots. +/// * Level 5: 64 x ~12 day slots. +/// +/// When the timer processes entries at level zero, it will notify all the +/// [`Sleep`] instances as their deadlines have been reached. For all higher +/// levels, all entries will be redistributed across the wheel at the next level +/// down. Eventually, as time progresses, entries will `Sleep` instances will +/// either be canceled (dropped) or their associated entries will reach level +/// zero and be notified. +/// +/// [`Sleep`]: ../struct.Sleep.html +/// [`Interval`]: ../struct.Interval.html +/// [`Deadline`]: ../struct.Deadline.html +/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf +/// [`handle`]: #method.handle +/// [`turn`]: #method.turn +/// [`Handle`]: struct.Handle.html +#[derive(Debug)] +pub struct Timer { + /// Shared state + inner: Arc, + + /// The number of milliseconds elapsed since the timer started. + elapsed: u64, + + /// Timer wheel. + /// + /// Levels: + /// + /// * 1 ms slots / 64 ms range + /// * 64 ms slots / ~ 4 sec range + /// * ~ 4 sec slots / ~ 4 min range + /// * ~ 4 min slots / ~ 4 hr range + /// * ~ 4 hr slots / ~ 12 day range + /// * ~ 12 day slots / ~ 2 yr range + levels: Vec, + + /// Thread parker. The `Timer` park implementation delegates to this. + park: T, + + /// Source of "now" instances + now: N, +} + +/// Return value from the `turn` method on `Timer`. +/// +/// Currently this value doesn't actually provide any functionality, but it may +/// in the future give insight into what happened during `turn`. +#[derive(Debug)] +pub struct Turn(()); + +/// Timer state shared between `Timer`, `Handle`, and `Registration`. +pub(crate) struct Inner { + /// The instant at which the timer started running. + start: Instant, + + /// The last published timer `elapsed` value. + elapsed: AtomicU64, + + /// Number of active timeouts + num: AtomicUsize, + + /// Head of the "process" linked list. + process: entry::AtomicStack, + + /// Unparks the timer thread. + unpark: Box, +} + +/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots +/// each, the timer is able to track time up to 2 years into the future with a +/// precision of 1 millisecond. +const NUM_LEVELS: usize = 6; + +/// The maximum duration of a sleep +const MAX_DURATION: u64 = 1 << (6 * NUM_LEVELS); + +/// Maximum number of timeouts the system can handle concurrently. +const MAX_TIMEOUTS: usize = usize::MAX >> 1; + +// ===== impl Timer ===== + +impl Timer +where T: Park +{ + /// Create a new `Timer` instance that uses `park` to block the current + /// thread. + /// + /// Once the timer has been created, a handle can be obtained using + /// [`handle`]. The handle is used to create `Sleep` instances. + /// + /// Use `default` when constructing a `Timer` using the default `park` + /// instance. + /// + /// [`handle`]: #method.handle + pub fn new(park: T) -> Self { + Timer::new_with_now(park, SystemNow::new()) + } +} + +impl Timer { + /// Returns a reference to the underlying `Park` instance. + pub fn get_park(&self) -> &T { + &self.park + } + + /// Returns a mutable reference to the underlying `Park` instance. + pub fn get_park_mut(&mut self) -> &mut T { + &mut self.park + } +} + +impl Timer +where T: Park, + N: Now, +{ + /// Create a new `Timer` instance that uses `park` to block the current + /// thread and `now` to get the current `Instant`. + /// + /// Specifying the source of time is useful when testing. + pub fn new_with_now(park: T, mut now: N) -> Self { + let unpark = Box::new(park.unpark()); + + let levels = (0..NUM_LEVELS) + .map(Level::new) + .collect(); + + Timer { + inner: Arc::new(Inner::new(now.now(), unpark)), + elapsed: 0, + levels, + park, + now, + } + } + + /// Returns a handle to the timer. + /// + /// The `Handle` is how `Sleep` instances are created. The `Sleep` instances + /// can either be created directly or the `Handle` instance can be passed to + /// `with_default`, setting the timer as the default timer for the execution + /// context. + pub fn handle(&self) -> Handle { + Handle::new(Arc::downgrade(&self.inner)) + } + + /// Performs one iteration of the timer loop. + /// + /// This function must be called repeatedly in order for the `Timer` + /// instance to make progress. This is where the work happens. + /// + /// The `Timer` will use the `Park` instance that was specified in [`new`] + /// to block the current thread until the next `Sleep` instance elapses. One + /// call to `turn` results in at most one call to `park.park()`. + /// + /// # Return + /// + /// On success, `Ok(Turn)` is returned, where `Turn` is a placeholder type + /// that currently does nothing but may, in the future, have functions add + /// to provide information about the call to `turn`. + /// + /// If the call to `park.park()` fails, then `Err` is returned with the + /// error. + /// + /// [`new`]: #method.new + pub fn turn(&mut self, max_wait: Option) -> Result { + match max_wait { + Some(timeout) => self.park_timeout(timeout)?, + None => self.park()?, + } + + Ok(Turn(())) + } + + /// Returns the instant at which the next timeout expires. + fn next_expiration(&self) -> Option { + // Check all levels + for level in 0..NUM_LEVELS { + if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { + // There cannot be any expirations at a higher level that happen + // before this one. + debug_assert!({ + let mut res = true; + + for l2 in (level+1)..NUM_LEVELS { + if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { + if e2.deadline < expiration.deadline { + res = false; + } + } + } + + res + }); + + return Some(expiration); + } + } + + None + } + + /// Converts an `Expiration` to an `Instant`. + fn expiration_instant(&self, expiration: &Expiration) -> Instant { + self.inner.start + Duration::from_millis(expiration.deadline) + } + + /// Run timer related logic + fn process(&mut self) { + let now = ms(self.now.now() - self.inner.start, Round::Down); + + loop { + let expiration = match self.next_expiration() { + Some(expiration) => expiration, + None => break, + }; + + if expiration.deadline > now { + // This expiration should not fire on this tick + break; + } + + // Prcess the slot, either moving it down a level or firing the + // timeout if currently at the final (boss) level. + self.process_expiration(&expiration); + + self.set_elapsed(expiration.deadline); + } + + self.set_elapsed(now); + } + + fn set_elapsed(&mut self, when: u64) { + assert!(self.elapsed <= when, "elapsed={:?}; when={:?}", self.elapsed, when); + + if when > self.elapsed { + self.elapsed = when; + self.inner.elapsed.store(when, SeqCst); + } else { + assert_eq!(self.elapsed, when); + } + } + + fn process_expiration(&mut self, expiration: &Expiration) { + while let Some(entry) = self.pop_entry(expiration) { + if expiration.level == 0 { + let when = entry.when_internal() + .expect("invalid internal entry state"); + + debug_assert_eq!(when, expiration.deadline); + + // Fire the entry + entry.fire(when); + + // Track that the entry has been fired + entry.set_when_internal(None); + } else { + let when = entry.when_internal() + .expect("entry not tracked"); + + let next_level = expiration.level - 1; + + self.levels[next_level] + .add_entry(entry, when); + } + } + } + + fn pop_entry(&mut self, expiration: &Expiration) -> Option> { + self.levels[expiration.level].pop_entry_slot(expiration.slot) + } + + /// Process the entry queue + /// + /// This handles adding and canceling timeouts. + fn process_queue(&mut self) { + for entry in self.inner.process.take() { + match (entry.when_internal(), entry.load_state()) { + (None, None) => { + // Nothing to do + } + (Some(when), None) => { + // Remove the entry + self.clear_entry(&entry, when); + } + (None, Some(when)) => { + // Queue the entry + self.add_entry(entry, when); + } + (Some(curr), Some(next)) => { + self.clear_entry(&entry, curr); + self.add_entry(entry, next); + } + } + } + } + + fn clear_entry(&mut self, entry: &Arc, when: u64) { + // Get the level at which the entry should be stored + let level = self.level_for(when); + self.levels[level].remove_entry(entry, when); + + entry.set_when_internal(None); + } + + /// Fire the entry if it needs to, otherwise queue it to be processed later. + /// + /// Returns `None` if the entry was fired. + fn add_entry(&mut self, entry: Arc, when: u64) { + if when <= self.elapsed { + // The entry's deadline has elapsed, so fire it and update the + // internal state accordingly. + entry.set_when_internal(None); + entry.fire(when); + + return; + } else if when - self.elapsed > MAX_DURATION { + // The entry's deadline is invalid, so error it and update the + // internal state accordingly. + entry.set_when_internal(None); + entry.error(); + + return; + } + + // Get the level at which the entry should be stored + let level = self.level_for(when); + + entry.set_when_internal(Some(when)); + self.levels[level].add_entry(entry, when); + + debug_assert!({ + self.levels[level].next_expiration(self.elapsed) + .map(|e| e.deadline >= self.elapsed) + .unwrap_or(true) + }); + } + + fn level_for(&self, when: u64) -> usize { + level_for(self.elapsed, when) + } +} + +fn level_for(elapsed: u64, when: u64) -> usize { + let masked = elapsed ^ when; + + assert!(masked != 0, "elapsed={}; when={}", elapsed, when); + + let leading_zeros = masked.leading_zeros() as usize; + let significant = 63 - leading_zeros; + significant / 6 +} + +impl Default for Timer { + fn default() -> Self { + Timer::new(ParkThread::new()) + } +} + +impl Park for Timer +where T: Park, + N: Now, +{ + type Unpark = T::Unpark; + type Error = T::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.process_queue(); + + match self.next_expiration() { + Some(expiration) => { + let now = self.now.now(); + let deadline = self.expiration_instant(&expiration); + + if deadline > now { + self.park.park_timeout(deadline - now)?; + } + } + None => { + self.park.park()?; + } + } + + self.process(); + + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.process_queue(); + + match self.next_expiration() { + Some(expiration) => { + let now = self.now.now(); + let deadline = self.expiration_instant(&expiration); + + if deadline > now { + self.park.park_timeout(cmp::min(deadline - now, duration))?; + } + } + None => { + self.park.park_timeout(duration)?; + } + } + + self.process(); + + Ok(()) + } +} + +impl Drop for Timer { + fn drop(&mut self) { + // Shutdown the stack of entries to process, preventing any new entries + // from being pushed. + self.inner.process.shutdown(); + } +} + +// ===== impl Inner ===== + +impl Inner { + fn new(start: Instant, unpark: Box) -> Inner { + Inner { + num: AtomicUsize::new(0), + elapsed: AtomicU64::new(0), + process: entry::AtomicStack::new(), + start, + unpark, + } + } + + fn elapsed(&self) -> u64 { + self.elapsed.load(SeqCst) + } + + /// Increment the number of active timeouts + fn increment(&self) -> Result<(), Error> { + let mut curr = self.num.load(SeqCst); + + loop { + if curr == MAX_TIMEOUTS { + return Err(Error::at_capacity()); + } + + let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst); + + if curr == actual { + return Ok(()); + } + + curr = actual; + } + } + + /// Decrement the number of active timeouts + fn decrement(&self) { + let prev = self.num.fetch_sub(1, SeqCst); + debug_assert!(prev <= MAX_TIMEOUTS); + } + + fn queue(&self, entry: &Arc) -> Result<(), Error> { + if self.process.push(entry)? { + // The timer is notified so that it can process the timeout + self.unpark.unpark(); + } + + Ok(()) + } + + fn normalize_deadline(&self, deadline: Instant) -> u64 { + if deadline < self.start { + return 0; + } + + ms(deadline - self.start, Round::Up) + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Inner") + .finish() + } +} + +enum Round { + Up, + Down, +} + +/// Convert a `Duration` to milliseconds, rounding up and saturating at +/// `u64::MAX`. +/// +/// The saturating is fine because `u64::MAX` milliseconds are still many +/// million years. +#[inline] +fn ms(duration: Duration, round: Round) -> u64 { + const NANOS_PER_MILLI: u32 = 1_000_000; + const MILLIS_PER_SEC: u64 = 1_000; + + // Round up. + let millis = match round { + Round::Up => (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI, + Round::Down => duration.subsec_nanos() / NANOS_PER_MILLI, + }; + + duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_level_for() { + for pos in 1..64 { + assert_eq!(0, level_for(0, pos), "level_for({}) -- binary = {:b}", pos, pos); + } + + for level in 1..5 { + for pos in level..64 { + let a = pos * 64_usize.pow(level as u32); + assert_eq!(level, level_for(0, a as u64), + "level_for({}) -- binary = {:b}", a, a); + + if pos > level { + let a = a - 1; + assert_eq!(level, level_for(0, a as u64), + "level_for({}) -- binary = {:b}", a, a); + } + + if pos < 64 { + let a = a + 1; + assert_eq!(level, level_for(0, a as u64), + "level_for({}) -- binary = {:b}", a, a); + } + } + } + } +} diff --git a/tokio-timer/src/timer/now.rs b/tokio-timer/src/timer/now.rs new file mode 100644 index 000000000..244cb650a --- /dev/null +++ b/tokio-timer/src/timer/now.rs @@ -0,0 +1,27 @@ +use std::time::Instant; + +/// Returns `Instant` values representing the current instant in time. +/// +/// This allows customizing the source of time which is especially useful for +/// testing. +pub trait Now { + /// Returns an instant corresponding to "now". + fn now(&mut self) -> Instant; +} + +/// Returns the instant corresponding to now using a monotonic clock. +#[derive(Debug)] +pub struct SystemNow(()); + +impl SystemNow { + /// Create a new `SystemNow`. + pub fn new() -> SystemNow { + SystemNow(()) + } +} + +impl Now for SystemNow { + fn now(&mut self) -> Instant { + Instant::now() + } +} diff --git a/tokio-timer/src/timer/registration.rs b/tokio-timer/src/timer/registration.rs new file mode 100644 index 000000000..8364d58aa --- /dev/null +++ b/tokio-timer/src/timer/registration.rs @@ -0,0 +1,82 @@ +use Error; +use timer::{Handle, Entry}; + +use futures::Poll; + +use std::sync::Arc; +use std::time::Instant; + +/// Registration with a timer. +/// +/// The association between a `Sleep` instance and a timer is done lazily in +/// `poll` +#[derive(Debug)] +pub(crate) struct Registration { + entry: Arc, +} + +impl Registration { + pub fn new(deadline: Instant) -> Registration { + fn is_send() {} + is_send::(); + + match Handle::try_current() { + Ok(handle) => Registration::new_with_handle(deadline, handle), + Err(_) => Registration::new_error(), + } + } + + pub fn new_with_handle(deadline: Instant, handle: Handle) -> Registration { + let inner = match handle.inner() { + Some(inner) => inner, + None => return Registration::new_error(), + }; + + // Increment the number of active timeouts + if inner.increment().is_err() { + return Registration::new_error(); + } + + let when = inner.normalize_deadline(deadline); + + if when <= inner.elapsed() { + // The deadline has already elapsed, ther eis no point creating the + // structures. + return Registration { + entry: Arc::new(Entry::new_elapsed(handle)), + }; + } + + let entry = Arc::new(Entry::new(when, handle)); + + if inner.queue(&entry).is_err() { + // The timer has shutdown, transition the entry to the error state. + entry.error(); + } + + Registration { entry } + } + + pub fn reset(&self, deadline: Instant) { + Entry::reset(&self.entry, deadline); + } + + fn new_error() -> Registration { + let entry = Arc::new(Entry::new_error()); + Registration { entry } + } + + pub fn is_elapsed(&self) -> bool { + self.entry.is_elapsed() + } + + pub fn poll_elapsed(&self) -> Poll<(), Error> { + self.entry.poll_elapsed() + } +} + +impl Drop for Registration { + fn drop(&mut self) { + Entry::cancel(&self.entry); + } +} diff --git a/tokio-timer/tests/deadline.rs b/tokio-timer/tests/deadline.rs new file mode 100644 index 000000000..229a23cbe --- /dev/null +++ b/tokio-timer/tests/deadline.rs @@ -0,0 +1,105 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_timer; + +#[macro_use] +mod support; +use support::*; + +use tokio_timer::*; + +use futures::{future, Future}; +use futures::sync::oneshot; + +#[test] +fn simultaneous_deadline_future_completion() { + mocked(|_, time| { + // Create a future that is immediately ready + let fut = future::ok::<_, ()>(()); + + // Wrap it with a deadline + let mut fut = Deadline::new(fut, time.now()); + + // Ready! + assert_ready!(fut); + }); +} + +#[test] +fn completed_future_past_deadline() { + mocked(|_, time| { + // Create a future that is immediately ready + let fut = future::ok::<_, ()>(()); + + // Wrap it with a deadline + let mut fut = Deadline::new(fut, time.now() - ms(1000)); + + // Ready! + assert_ready!(fut); + }); +} + +#[test] +fn future_and_deadline_in_future() { + mocked(|timer, time| { + // Not yet complete + let (tx, rx) = oneshot::channel(); + + // Wrap it with a deadline + let mut fut = Deadline::new(rx, time.now() + ms(100)); + + // Ready! + assert_not_ready!(fut); + + // Turn the timer, it runs for the elapsed time + advance(timer, ms(90)); + + assert_not_ready!(fut); + + // Complete the future + tx.send(()).unwrap(); + + assert_ready!(fut); + }); +} + +#[test] +fn deadline_now_elapses() { + mocked(|_, time| { + let fut = future::empty::<(), ()>(); + + // Wrap it with a deadline + let mut fut = Deadline::new(fut, time.now()); + + assert_elapsed!(fut); + }); +} + +#[test] +fn deadline_future_elapses() { + mocked(|timer, time| { + let fut = future::empty::<(), ()>(); + + // Wrap it with a deadline + let mut fut = Deadline::new(fut, time.now() + ms(300)); + + assert_not_ready!(fut); + + advance(timer, ms(300)); + + assert_elapsed!(fut); + }); +} + +#[test] +fn future_errors_first() { + mocked(|_, time| { + let fut = future::err::<(), ()>(()); + + // Wrap it with a deadline + let mut fut = Deadline::new(fut, time.now() + ms(100)); + + // Ready! + assert!(fut.poll().unwrap_err().is_inner()); + }); +} diff --git a/tokio-timer/tests/hammer.rs b/tokio-timer/tests/hammer.rs new file mode 100644 index 000000000..0f17d761d --- /dev/null +++ b/tokio-timer/tests/hammer.rs @@ -0,0 +1,240 @@ +extern crate futures; +extern crate rand; +extern crate tokio_executor; +extern crate tokio_timer; + +use tokio_executor::park::{Park, Unpark, UnparkThread}; +use tokio_timer::*; + +use futures::{Future, Stream}; +use futures::stream::FuturesUnordered; +use rand::Rng; + +use std::cmp; +use std::sync::{Arc, Barrier}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::thread; +use std::time::{Duration, Instant}; + +struct Signal { + rem: AtomicUsize, + unpark: UnparkThread, +} + +#[test] +fn hammer_complete() { + const ITERS: usize = 5; + const THREADS: usize = 4; + const PER_THREAD: usize = 40; + const MIN_DELAY: u64 = 1; + const MAX_DELAY: u64 = 5_000; + + for _ in 0..ITERS { + let mut timer = Timer::default(); + let handle = timer.handle(); + let barrier = Arc::new(Barrier::new(THREADS)); + + let done = Arc::new(Signal { + rem: AtomicUsize::new(THREADS), + unpark: timer.get_park().unpark(), + }); + + for _ in 0..THREADS { + let handle = handle.clone(); + let barrier = barrier.clone(); + let done = done.clone(); + + thread::spawn(move || { + let mut exec = FuturesUnordered::new(); + let mut rng = rand::thread_rng(); + + barrier.wait(); + + for _ in 0..PER_THREAD { + let deadline = Instant::now() + Duration::from_millis( + rng.gen_range(MIN_DELAY, MAX_DELAY)); + + exec.push({ + handle.sleep(deadline) + .and_then(move |_| { + let now = Instant::now(); + assert!(now >= deadline, "deadline greater by {:?}", deadline - now); + Ok(()) + }) + }); + } + + // Run the logic + exec.for_each(|_| Ok(())) + .wait() + .unwrap(); + + if 1 == done.rem.fetch_sub(1, SeqCst) { + done.unpark.unpark(); + } + }); + } + + while done.rem.load(SeqCst) > 0 { + timer.turn(None).unwrap(); + } + } +} + +#[test] +fn hammer_cancel() { + const ITERS: usize = 5; + const THREADS: usize = 4; + const PER_THREAD: usize = 40; + const MIN_DELAY: u64 = 1; + const MAX_DELAY: u64 = 5_000; + + for _ in 0..ITERS { + let mut timer = Timer::default(); + let handle = timer.handle(); + let barrier = Arc::new(Barrier::new(THREADS)); + + let done = Arc::new(Signal { + rem: AtomicUsize::new(THREADS), + unpark: timer.get_park().unpark(), + }); + + for _ in 0..THREADS { + let handle = handle.clone(); + let barrier = barrier.clone(); + let done = done.clone(); + + thread::spawn(move || { + let mut exec = FuturesUnordered::new(); + let mut rng = rand::thread_rng(); + + barrier.wait(); + + for _ in 0..PER_THREAD { + let deadline1 = Instant::now() + Duration::from_millis( + rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline2 = Instant::now() + Duration::from_millis( + rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline = cmp::min(deadline1, deadline2); + + let sleep = handle.sleep(deadline1); + let join = handle.deadline(sleep, deadline2); + + exec.push({ + join + .and_then(move |_| { + let now = Instant::now(); + assert!(now >= deadline, "deadline greater by {:?}", deadline - now); + Ok(()) + }) + }); + } + + // Run the logic + exec + .or_else(|e| { + assert!(e.is_elapsed()); + Ok::<_, ()>(()) + }) + .for_each(|_| Ok(())) + .wait() + .unwrap(); + + if 1 == done.rem.fetch_sub(1, SeqCst) { + done.unpark.unpark(); + } + }); + } + + while done.rem.load(SeqCst) > 0 { + timer.turn(None).unwrap(); + } + } +} + +#[test] +fn hammer_reset() { + const ITERS: usize = 5; + const THREADS: usize = 4; + const PER_THREAD: usize = 40; + const MIN_DELAY: u64 = 1; + const MAX_DELAY: u64 = 250; + + for _ in 0..ITERS { + let mut timer = Timer::default(); + let handle = timer.handle(); + let barrier = Arc::new(Barrier::new(THREADS)); + + let done = Arc::new(Signal { + rem: AtomicUsize::new(THREADS), + unpark: timer.get_park().unpark(), + }); + + for _ in 0..THREADS { + let handle = handle.clone(); + let barrier = barrier.clone(); + let done = done.clone(); + + thread::spawn(move || { + let mut exec = FuturesUnordered::new(); + let mut rng = rand::thread_rng(); + + barrier.wait(); + + for _ in 0..PER_THREAD { + let deadline1 = Instant::now() + Duration::from_millis( + rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline2 = deadline1 + Duration::from_millis( + rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline3 = deadline2 + Duration::from_millis( + rng.gen_range(MIN_DELAY, MAX_DELAY)); + + exec.push({ + handle.sleep(deadline1) + // Select over a second sleep + .select2(handle.sleep(deadline2)) + .map_err(|e| panic!("boom; err={:?}", e)) + .and_then(move |res| { + use futures::future::Either::*; + + let now = Instant::now(); + assert!(now >= deadline1, "deadline greater by {:?}", deadline1 - now); + + let mut other = match res { + A((_, other)) => other, + B((_, other)) => other, + }; + + other.reset(deadline3); + other + }) + .and_then(move |_| { + let now = Instant::now(); + assert!(now >= deadline3, "deadline greater by {:?}", deadline3 - now); + Ok(()) + }) + }); + } + + // Run the logic + exec + .for_each(|_| Ok(())) + .wait() + .unwrap(); + + if 1 == done.rem.fetch_sub(1, SeqCst) { + done.unpark.unpark(); + } + }); + } + + while done.rem.load(SeqCst) > 0 { + timer.turn(None).unwrap(); + } + } +} diff --git a/tokio-timer/tests/interval.rs b/tokio-timer/tests/interval.rs new file mode 100644 index 000000000..60d6d8d62 --- /dev/null +++ b/tokio-timer/tests/interval.rs @@ -0,0 +1,46 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_timer; + +#[macro_use] +mod support; +use support::*; + +use tokio_timer::*; + +use futures::{Stream}; + +#[test] +#[should_panic] +fn interval_zero_duration() { + mocked(|_, time| { + let _ = Interval::new(time.now(), ms(0)); + }); +} + +#[test] +fn usage() { + mocked(|timer, time| { + let start = time.now(); + let mut int = Interval::new(start, ms(300)); + + assert_ready!(int, Some(start)); + assert_not_ready!(int); + + advance(timer, ms(100)); + assert_not_ready!(int); + + advance(timer, ms(200)); + assert_ready!(int, Some(start + ms(300))); + assert_not_ready!(int); + + advance(timer, ms(400)); + assert_ready!(int, Some(start + ms(600))); + assert_not_ready!(int); + + advance(timer, ms(500)); + assert_ready!(int, Some(start + ms(900))); + assert_ready!(int, Some(start + ms(1200))); + assert_not_ready!(int); + }); +} diff --git a/tokio-timer/tests/sleep.rs b/tokio-timer/tests/sleep.rs new file mode 100644 index 000000000..472ed9a12 --- /dev/null +++ b/tokio-timer/tests/sleep.rs @@ -0,0 +1,488 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_timer; + +#[macro_use] +mod support; +use support::*; + +use tokio_timer::*; + +use futures::Future; + +use std::time::{Duration, Instant}; + +#[test] +fn immediate_sleep() { + mocked(|timer, time| { + // Create `Sleep` that elapsed immediately. + let mut sleep = Sleep::new(time.now()); + + // Ready! + assert_ready!(sleep); + + // Turn the timer, it runs for the elapsed time + turn(timer, ms(1000)); + + // The time has not advanced. The `turn` completed immediately. + assert_eq!(time.advanced(), ms(1000)); + }); +} + +#[test] +fn delayed_sleep_level_0() { + for &i in &[1, 10, 60] { + mocked(|timer, time| { + // Create a `Sleep` that elapses in the future + let mut sleep = Sleep::new(time.now() + ms(i)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(i)); + + assert_ready!(sleep); + }); + } +} + +#[test] +fn sub_ms_delayed_sleep() { + mocked(|timer, time| { + for _ in 0..5 { + let deadline = time.now() + + Duration::from_millis(1) + + Duration::new(0, 1); + + let mut sleep = Sleep::new(deadline); + + assert_not_ready!(sleep); + + turn(timer, None); + assert_ready!(sleep); + + assert!(time.now() >= deadline); + + time.advance(Duration::new(0, 1)); + } + }); +} + +#[test] +fn delayed_sleep_wrapping_level_0() { + mocked(|timer, time| { + turn(timer, ms(5)); + assert_eq!(time.advanced(), ms(5)); + + let mut sleep = Sleep::new(time.now() + ms(60)); + + assert_not_ready!(sleep); + + turn(timer, None); + assert_eq!(time.advanced(), ms(64)); + assert_not_ready!(sleep); + + turn(timer, None); + assert_eq!(time.advanced(), ms(65)); + + assert_ready!(sleep); + }); +} + +#[test] +fn timer_wrapping_with_higher_levels() { + mocked(|timer, time| { + // Set sleep to hit level 1 + let mut s1 = Sleep::new(time.now() + ms(64)); + assert_not_ready!(s1); + + // Turn a bit + turn(timer, ms(5)); + + // Set timeout such that it will hit level 0, but wrap + let mut s2 = Sleep::new(time.now() + ms(60)); + assert_not_ready!(s2); + + // This should result in s1 firing + turn(timer, None); + assert_eq!(time.advanced(), ms(64)); + + assert_ready!(s1); + assert_not_ready!(s2); + + turn(timer, None); + assert_eq!(time.advanced(), ms(65)); + + assert_ready!(s2); + }); +} + +#[test] +fn sleep_with_deadline_in_past() { + mocked(|timer, time| { + // Create `Sleep` that elapsed immediately. + let mut sleep = Sleep::new(time.now() - ms(100)); + + // Even though the sleep expires in the past, it is not ready yet + // because the timer must observe it. + assert_ready!(sleep); + + // Turn the timer, it runs for the elapsed time + turn(timer, ms(1000)); + + // The time has not advanced. The `turn` completed immediately. + assert_eq!(time.advanced(), ms(1000)); + }); +} + +#[test] +fn delayed_sleep_level_1() { + mocked(|timer, time| { + // Create a `Sleep` that elapses in the future + let mut sleep = Sleep::new(time.now() + ms(234)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + // Turn the timer, this will wake up to cascade the timer down. + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(192)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + // Turn the timer again + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(234)); + + // The sleep has elapsed. + assert_ready!(sleep); + }); + + mocked(|timer, time| { + // Create a `Sleep` that elapses in the future + let mut sleep = Sleep::new(time.now() + ms(234)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + // Turn the timer with a smaller timeout than the cascade. + turn(timer, ms(100)); + assert_eq!(time.advanced(), ms(100)); + + assert_not_ready!(sleep); + + // Turn the timer, this will wake up to cascade the timer down. + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(192)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + // Turn the timer again + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(234)); + + // The sleep has elapsed. + assert_ready!(sleep); + }); +} + +#[test] +fn creating_sleep_outside_of_context() { + let now = Instant::now(); + + // This creates a sleep outside of the context of a mock timer. This tests + // that it will still expire. + let mut sleep = Sleep::new(now + ms(500)); + + mocked_with_now(now, |timer, time| { + // This registers the sleep with the timer + assert_not_ready!(sleep); + + // Wait some time... the timer is cascading + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(448)); + + assert_not_ready!(sleep); + + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(500)); + + // The sleep has elapsed + assert_ready!(sleep); + }); +} + +#[test] +fn concurrently_set_two_timers_second_one_shorter() { + mocked(|timer, time| { + let mut sleep1 = Sleep::new(time.now() + ms(500)); + let mut sleep2 = Sleep::new(time.now() + ms(200)); + + // The sleep has not elapsed + assert_not_ready!(sleep1); + assert_not_ready!(sleep2); + + // Sleep until a cascade + turn(timer, None); + assert_eq!(time.advanced(), ms(192)); + + // Sleep until the second timer. + turn(timer, None); + assert_eq!(time.advanced(), ms(200)); + + // The shorter sleep fires + assert_ready!(sleep2); + assert_not_ready!(sleep1); + + turn(timer, None); + assert_eq!(time.advanced(), ms(448)); + + assert_not_ready!(sleep1); + + // Turn again, this time the time will advance to the second sleep + turn(timer, None); + assert_eq!(time.advanced(), ms(500)); + + assert_ready!(sleep1); + }) +} + +#[test] +fn short_sleep() { + mocked(|timer, time| { + // Create a `Sleep` that elapses in the future + let mut sleep = Sleep::new(time.now() + ms(1)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + // Turn the timer, but not enough timee will go by. + turn(timer, None); + + // The sleep has elapsed. + assert_ready!(sleep); + + // The time has advanced to the point of the sleep elapsing. + assert_eq!(time.advanced(), ms(1)); + }) +} + +#[test] +fn sorta_long_sleep() { + const MIN_5: u64 = 5 * 60 * 1000; + + mocked(|timer, time| { + // Create a `Sleep` that elapses in the future + let mut sleep = Sleep::new(time.now() + ms(MIN_5)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + let cascades = &[ + 262_144, + 262_144 + 9 * 4096, + 262_144 + 9 * 4096 + 15 * 64, + ]; + + for &elapsed in cascades { + turn(timer, None); + assert_eq!(time.advanced(), ms(elapsed)); + + assert_not_ready!(sleep); + } + + turn(timer, None); + assert_eq!(time.advanced(), ms(MIN_5)); + + // The sleep has elapsed. + assert_ready!(sleep); + }) +} + +#[test] +fn very_long_sleep() { + const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000; + + mocked(|timer, time| { + // Create a `Sleep` that elapses in the future + let mut sleep = Sleep::new(time.now() + ms(MO_5)); + + // The sleep has not elapsed. + assert_not_ready!(sleep); + + let cascades = &[ + 12_884_901_888, + 12_952_010_752, + 12_959_875_072, + 12_959_997_952, + ]; + + for &elapsed in cascades { + turn(timer, None); + assert_eq!(time.advanced(), ms(elapsed)); + + assert_not_ready!(sleep); + } + + // Turn the timer, but not enough time will go by. + turn(timer, None); + + // The time has advanced to the point of the sleep elapsing. + assert_eq!(time.advanced(), ms(MO_5)); + + // The sleep has elapsed. + assert_ready!(sleep); + }) +} + +#[test] +fn greater_than_max() { + const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000; + + mocked(|timer, time| { + // Create a `Sleep` that elapses in the future + let mut sleep = Sleep::new(time.now() + ms(YR_5)); + + assert_not_ready!(sleep); + + turn(timer, ms(0)); + + assert!(sleep.poll().is_err()); + }) +} + +#[test] +fn unpark_is_delayed() { + mocked(|timer, time| { + let mut sleep1 = Sleep::new(time.now() + ms(100)); + let mut sleep2 = Sleep::new(time.now() + ms(101)); + let mut sleep3 = Sleep::new(time.now() + ms(200)); + + assert_not_ready!(sleep1); + assert_not_ready!(sleep2); + assert_not_ready!(sleep3); + + time.park_for(ms(500)); + + turn(timer, None); + + assert_eq!(time.advanced(), ms(500)); + + assert_ready!(sleep1); + assert_ready!(sleep2); + assert_ready!(sleep3); + }) +} + +#[test] +fn set_timeout_at_deadline_greater_than_max_timer() { + const YR_1: u64 = 365 * 24 * 60 * 60 * 1000; + const YR_5: u64 = 5 * YR_1; + + mocked(|timer, time| { + for _ in 0..5 { + turn(timer, ms(YR_1)); + } + + let mut sleep = Sleep::new(time.now() + ms(1)); + assert_not_ready!(sleep); + + turn(timer, ms(1000)); + assert_eq!(time.advanced(), Duration::from_millis(YR_5) + ms(1)); + + assert_ready!(sleep); + }); +} + +#[test] +fn reset_future_sleep_before_fire() { + mocked(|timer, time| { + let mut sleep = Sleep::new(time.now() + ms(100)); + + assert_not_ready!(sleep); + + sleep.reset(time.now() + ms(200)); + + turn(timer, None); + assert_eq!(time.advanced(), ms(192)); + + assert_not_ready!(sleep); + + turn(timer, None); + assert_eq!(time.advanced(), ms(200)); + + assert_ready!(sleep); + }); +} + +#[test] +fn reset_past_sleep_before_turn() { + mocked(|timer, time| { + let mut sleep = Sleep::new(time.now() + ms(100)); + + assert_not_ready!(sleep); + + sleep.reset(time.now() + ms(80)); + + turn(timer, None); + assert_eq!(time.advanced(), ms(64)); + + assert_not_ready!(sleep); + + turn(timer, None); + assert_eq!(time.advanced(), ms(80)); + + assert_ready!(sleep); + }); +} + +#[test] +fn reset_past_sleep_before_fire() { + mocked(|timer, time| { + let mut sleep = Sleep::new(time.now() + ms(100)); + + assert_not_ready!(sleep); + turn(timer, ms(10)); + + assert_not_ready!(sleep); + sleep.reset(time.now() + ms(80)); + + turn(timer, None); + assert_eq!(time.advanced(), ms(64)); + + assert_not_ready!(sleep); + + turn(timer, None); + assert_eq!(time.advanced(), ms(90)); + + assert_ready!(sleep); + }); +} + +#[test] +fn reset_future_sleep_after_fire() { + mocked(|timer, time| { + let mut sleep = Sleep::new(time.now() + ms(100)); + + assert_not_ready!(sleep); + + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(64)); + + turn(timer, None); + assert_eq!(time.advanced(), ms(100)); + + assert_ready!(sleep); + + sleep.reset(time.now() + ms(10)); + assert_not_ready!(sleep); + + turn(timer, ms(1000)); + assert_eq!(time.advanced(), ms(110)); + + assert_ready!(sleep); + }); +} diff --git a/tokio-timer/tests/support/mod.rs b/tokio-timer/tests/support/mod.rs new file mode 100644 index 000000000..8afeb0d90 --- /dev/null +++ b/tokio-timer/tests/support/mod.rs @@ -0,0 +1,234 @@ +#![allow(unused_macros, unused_imports, dead_code)] + +use tokio_executor::park::{Park, Unpark}; +use tokio_timer::timer::{Timer, Now}; + +use futures::future::{lazy, Future}; + +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::time::{Instant, Duration}; + +macro_rules! assert_ready { + ($f:expr) => { + assert!($f.poll().unwrap().is_ready()); + }; + ($f:expr, $expect:expr) => { + assert_eq!($f.poll().unwrap(), ::futures::Async::Ready($expect)); + }; +} + +macro_rules! assert_not_ready { + ($f:expr) => { + assert!(!$f.poll().unwrap().is_ready()); + } +} + +macro_rules! assert_elapsed { + ($f:expr) => { + assert!($f.poll().unwrap_err().is_elapsed()); + } +} + +#[derive(Debug)] +pub struct MockTime { + inner: Inner, + _p: PhantomData>, +} + +#[derive(Debug)] +pub struct MockNow { + inner: Inner, + _p: PhantomData>, +} + +#[derive(Debug)] +pub struct MockPark { + inner: Inner, + _p: PhantomData>, +} + +#[derive(Debug)] +pub struct MockUnpark { + inner: Inner, +} + +type Inner = Arc>; + +#[derive(Debug)] +struct State { + base: Instant, + advance: Duration, + unparked: bool, + park_for: Option, +} + +pub fn ms(num: u64) -> Duration { + Duration::from_millis(num) +} + +pub trait IntoTimeout { + fn into_timeout(self) -> Option; +} + +impl IntoTimeout for Option { + fn into_timeout(self) -> Self { + self + } +} + +impl IntoTimeout for Duration { + fn into_timeout(self) -> Option { + Some(self) + } +} + +/// Turn the timer state once +pub fn turn(timer: &mut Timer, duration: T) { + timer.turn(duration.into_timeout()).unwrap(); +} + +/// Advance the timer the specified amount +pub fn advance(timer: &mut Timer, duration: Duration) { + let inner = timer.get_park().inner.clone(); + let deadline = inner.lock().unwrap().now() + duration; + + while inner.lock().unwrap().now() < deadline { + let dur = deadline - inner.lock().unwrap().now(); + turn(timer, dur); + } +} + +pub fn mocked(f: F) -> R +where F: FnOnce(&mut Timer, &mut MockTime) -> R +{ + mocked_with_now(Instant::now(), f) +} + +pub fn mocked_with_now(now: Instant, f: F) -> R +where F: FnOnce(&mut Timer, &mut MockTime) -> R +{ + let mut time = MockTime::new(now); + let park = time.mock_park(); + let now = time.mock_now(); + + let mut timer = Timer::new_with_now(park, now); + let handle = timer.handle(); + + let mut enter = ::tokio_executor::enter().unwrap(); + + ::tokio_timer::with_default(&handle, &mut enter, |_| { + lazy(|| { + Ok::<_, ()>(f(&mut timer, &mut time)) + }).wait().unwrap() + }) +} + +impl MockTime { + pub fn new(now: Instant) -> MockTime { + let state = State { + base: now, + advance: Duration::default(), + unparked: false, + park_for: None, + }; + + MockTime { + inner: Arc::new(Mutex::new(state)), + _p: PhantomData, + } + } + + pub fn mock_now(&self) -> MockNow { + let inner = self.inner.clone(); + MockNow { + inner, + _p: PhantomData, + } + } + + pub fn mock_park(&self) -> MockPark { + let inner = self.inner.clone(); + MockPark { + inner, + _p: PhantomData, + } + } + + pub fn now(&self) -> Instant { + self.inner.lock().unwrap().now() + } + + /// Returns the total amount of time the time has been advanced. + pub fn advanced(&self) -> Duration { + self.inner.lock().unwrap().advance + } + + pub fn advance(&self, duration: Duration) { + let mut inner = self.inner.lock().unwrap(); + inner.advance(duration); + } + + /// The next call to park_timeout will be for this duration, regardless of + /// the timeout passed to `park_timeout`. + pub fn park_for(&self, duration: Duration) { + self.inner.lock().unwrap().park_for = Some(duration); + } +} + +impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + let inner = self.inner.clone(); + MockUnpark { inner } + } + + fn park(&mut self) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().map_err(|_| ())?; + + let duration = inner.park_for.take() + .expect("call park_for first"); + + inner.advance(duration); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().unwrap(); + + if let Some(duration) = inner.park_for.take() { + inner.advance(duration); + } else { + inner.advance(duration); + } + + Ok(()) + } +} + +impl Unpark for MockUnpark { + fn unpark(&self) { + if let Ok(mut inner) = self.inner.lock() { + inner.unparked = true; + } + } +} + +impl Now for MockNow { + fn now(&mut self) -> Instant { + self.inner.lock().unwrap().now() + } +} + +impl State { + fn now(&self) -> Instant { + self.base + self.advance + } + + fn advance(&mut self, duration: Duration) { + self.advance += duration; + } +}