Provide a timer implementation (#249)

This patch adds a new crate: tokio-timer. This crate provides an
efficient timer implemeentation designed for use in Tokio based
applications.

The timer users a hierarchical hashed timer wheel algorithm with six
levels, each having 64 slots. This allows the timer to have a resolution
of 1ms while maintaining O(1) complexity for insert, removal, and firing
of timeouts.

There already exists a tokio-timer crate. This is a complete rewrite
which solves the outstanding problems with the existing tokio-timer
library.

Closes #146.
This commit is contained in:
Carl Lerche 2018-03-28 22:26:47 -07:00 committed by GitHub
parent ad189826f4
commit 19500f7df8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 3398 additions and 0 deletions

View File

@ -4,6 +4,9 @@ sudo: false
matrix:
include:
# This represents the minimum Rust version supported by Tokio. Updating this
# should be done in a dedicated PR and cannot be greater than two 0.x
# releases prior to the current stable.
- rust: 1.21.0
- rust: stable
- os: osx
@ -16,7 +19,21 @@ script:
set -e
if [[ "$TRAVIS_RUST_VERSION" == nightly ]]
then
# Pin the nightly version until rust-lang/rust#49436 is resolved.
rustup override set nightly-2018-03-26
# Make sure the benchmarks compile
cargo build --benches --all
# Run address sanitizer
ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" \
RUSTFLAGS="-Z sanitizer=address" \
cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu
# Run thread sanitizer
TSAN_OPTIONS="suppressions=`pwd`/ci/tsan" \
RUSTFLAGS="-Z sanitizer=thread" \
cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu
fi
- |
set -e

View File

@ -27,6 +27,7 @@ members = [
"tokio-io",
"tokio-reactor",
"tokio-threadpool",
"tokio-timer",
"tokio-tcp",
"tokio-udp",
"futures2",

5
ci/tsan Normal file
View File

@ -0,0 +1,5 @@
# TSAN suppressions file for Tokio
# TSAN does not understand fences and `Arc::drop` is implemented using a fence.
# This causes many false positives.
race:Arc*drop

19
tokio-timer/CHANGELOG.md Normal file
View File

@ -0,0 +1,19 @@
# 0.2.0 (unreleased)
* Rewrite from scratch using a hierarchical wheel strategy (#249).
# 0.1.2 (Jun 27, 2017)
* Allow naming timer thread.
* Track changes in dependencies.
# 0.1.1 (Apr 6, 2017)
* Set Rust v1.14 as the minimum supported version.
* Fix bug related to intervals.
* Impl `PartialEq + Eq` for TimerError.
* Add `Debug` implementations.
# 0.1.0 (Jan 11, 2017)
* Initial Release

19
tokio-timer/Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "tokio-timer"
version = "0.2.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tokio-rs/tokio-timer"
homepage = "https://github.com/tokio-rs/tokio-timer"
documentation = "https://docs.rs/tokio-timer"
description = """
Timer facilities for Tokio
"""
[dependencies]
futures = "0.1.19"
tokio-executor = { version = "0.1.1", path = "../tokio-executor" }
[dev-dependencies]
rand = "0.4.2"

25
tokio-timer/LICENSE Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2018 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

19
tokio-timer/README.md Normal file
View File

@ -0,0 +1,19 @@
# tokio-timer
Timer facilities for Tokio
[Documentation](https://tokio-rs.github.io/tokio/tokio_timer/)
## Overview
This crate provides timer facilities for usage with Tokio.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tokio by you, shall be licensed as MIT, without any additional
terms or conditions.

87
tokio-timer/src/atomic.rs Normal file
View File

@ -0,0 +1,87 @@
//! Implementation of an atomic u64 cell. On 64 bit platforms, this is a wrapper
//! around `AtomicUsize`. On 32 bit platforms, this is implemented using a
//! `Mutex`.
//!
//! This file can be removed if/when `AtomicU64` lands in `std`.
pub use self::imp::AtomicU64;
#[cfg(target_pointer_width = "64")]
mod imp {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
pub struct AtomicU64 {
inner: AtomicUsize,
}
impl AtomicU64 {
pub fn new(val: u64) -> AtomicU64 {
AtomicU64 {
inner: AtomicUsize::new(val as usize),
}
}
pub fn load(&self, ordering: Ordering) -> u64 {
self.inner.load(ordering) as u64
}
pub fn store(&self, val: u64, ordering: Ordering) {
self.inner.store(val as usize, ordering)
}
pub fn fetch_or(&self, val: u64, ordering: Ordering) -> u64 {
self.inner.fetch_or(val as usize, ordering) as u64
}
pub fn compare_and_swap(&self, old: u64, new: u64, ordering: Ordering) -> u64 {
self.inner.compare_and_swap(
old as usize, new as usize, ordering) as u64
}
}
}
#[cfg(not(target_pointer_width = "64"))]
mod imp {
use std::sync::Mutex;
#[derive(Debug)]
pub struct AtomicU64 {
inner: Mutex<u64>,
}
impl AtomicU64 {
pub fn new(val: u64) -> AtomicU64 {
AtomicU64 {
inner: Mutex::new(val),
}
}
pub fn load(&self, _: Ordering) -> u64 {
*self.inner.lock().unwrap()
}
pub fn store(&self, val: u64, _: Ordering) {
*self.inner.lock().unwrap() = val;
}
pub fn fetch_or(&self, val: u64, _: Ordering) -> u64 {
let mut lock = self.inner.lock().unwrap();
let prev = *lock;
*lock = prev | val;
prev
}
pub fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 {
let mut lock = self.inner.lock().unwrap();
let prev = *lock;
if prev != old {
return prev;
}
*lock = new;
prev
}
}
}

183
tokio-timer/src/deadline.rs Normal file
View File

@ -0,0 +1,183 @@
//! Docs
use Sleep;
use futures::{Future, Poll, Async};
use std::error;
use std::fmt;
use std::time::Instant;
/// Allows a given `Future` to execute until the specified deadline.
///
/// If the inner future completes before the deadline is reached, then
/// `Deadline` completes with that value. Otherwise, `Deadline` completes with a
/// [`DeadlineError`].
///
/// [`DeadlineError`]: struct.DeadlineError.html
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Deadline<T> {
future: T,
sleep: Sleep,
}
/// Error returned by `Deadline` future.
#[derive(Debug)]
pub struct DeadlineError<T>(Kind<T>);
/// Deadline error variants
#[derive(Debug)]
enum Kind<T> {
/// Inner future returned an error
Inner(T),
/// The deadline elapsed.
Elapsed,
/// Timer returned an error.
Timer(::Error),
}
impl<T> Deadline<T> {
/// Create a new `Deadline` that completes when `future` completes or when
/// `deadline` is reached.
pub fn new(future: T, deadline: Instant) -> Deadline<T> {
Deadline::new_with_sleep(future, Sleep::new(deadline))
}
pub(crate) fn new_with_sleep(future: T, sleep: Sleep) -> Deadline<T> {
Deadline {
future,
sleep,
}
}
/// Gets a reference to the underlying future in this deadline.
pub fn get_ref(&self) -> &T {
&self.future
}
/// Gets a mutable reference to the underlying future in this deadline.
pub fn get_mut(&mut self) -> &mut T {
&mut self.future
}
/// Consumes this deadline, returning the underlying future.
pub fn into_inner(self) -> T {
self.future
}
}
impl<T> Future for Deadline<T>
where T: Future,
{
type Item = T::Item;
type Error = DeadlineError<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.future.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(DeadlineError::inner(e)),
}
// Now check the timer
match self.sleep.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
Err(DeadlineError::elapsed())
},
Err(e) => Err(DeadlineError::timer(e)),
}
}
}
// ===== impl DeadlineError =====
impl<T> DeadlineError<T> {
/// Create a new `DeadlineError` representing the inner future completing
/// with `Err`.
pub fn inner(err: T) -> DeadlineError<T> {
DeadlineError(Kind::Inner(err))
}
/// Returns `true` if the error was caused by the inner future completing
/// with `Err`.
pub fn is_inner(&self) -> bool {
match self.0 {
Kind::Inner(_) => true,
_ => false,
}
}
/// Consumes `self`, returning the inner future error.
pub fn into_inner(self) -> Option<T> {
match self.0 {
Kind::Inner(err) => Some(err),
_ => None,
}
}
/// Create a new `DeadlineError` representing the inner future not
/// completing before the deadline is reached.
pub fn elapsed() -> DeadlineError<T> {
DeadlineError(Kind::Elapsed)
}
/// Returns `true` if the error was caused by the inner future not
/// completing before the deadline is reached.
pub fn is_elapsed(&self) -> bool {
match self.0 {
Kind::Elapsed => true,
_ => false,
}
}
/// Creates a new `DeadlineError` representing an error encountered by the
/// timer implementation
pub fn timer(err: ::Error) -> DeadlineError<T> {
DeadlineError(Kind::Timer(err))
}
/// Returns `true` if the error was caused by the timer.
pub fn is_timer(&self) -> bool {
match self.0 {
Kind::Timer(_) => true,
_ => false,
}
}
/// Consumes `self`, returning the error raised by the timer implementation.
pub fn into_timer(self) -> Option<::Error> {
match self.0 {
Kind::Timer(err) => Some(err),
_ => None,
}
}
}
impl<T: error::Error> error::Error for DeadlineError<T> {
fn description(&self) -> &str {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.description(),
Elapsed => "deadline has elapsed",
Timer(ref e) => e.description(),
}
}
}
impl<T: fmt::Display> fmt::Display for DeadlineError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.fmt(fmt),
Elapsed => "deadline has elapsed".fmt(fmt),
Timer(ref e) => e.fmt(fmt),
}
}
}

60
tokio-timer/src/error.rs Normal file
View File

@ -0,0 +1,60 @@
use self::Kind::*;
use std::error;
use std::fmt;
/// Errors encountered by the timer implementation.
#[derive(Debug)]
pub struct Error(Kind);
#[derive(Debug)]
enum Kind {
Shutdown,
AtCapacity,
}
impl Error {
/// Create an error representing a shutdown timer.
pub fn shutdown() -> Error {
Error(Shutdown)
}
/// Returns `true` if the error was caused by the timer being shutdown.
pub fn is_shutdown(&self) -> bool {
match self.0 {
Kind::Shutdown => true,
_ => false,
}
}
/// Create an error representing a timer at capacity.
pub fn at_capacity() -> Error {
Error(AtCapacity)
}
/// Returns `true` if the error was caused by the timer being at capacity.
pub fn is_at_capacity(&self) -> bool {
match self.0 {
Kind::AtCapacity => true,
_ => false,
}
}
}
impl error::Error for Error {
fn description(&self) -> &str {
use self::Kind::*;
match self.0 {
Shutdown => "timer is shutdown",
AtCapacity => "timer is at capacity and cannot create a new entry",
}
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
self.description().fmt(fmt)
}
}

View File

@ -0,0 +1,58 @@
use Sleep;
use futures::{Future, Stream, Poll};
use std::time::{Instant, Duration};
/// A stream representing notifications at fixed interval
#[derive(Debug)]
pub struct Interval {
/// Future that completes the next time the `Interval` yields a value.
sleep: Sleep,
/// The duration between values yielded by `Interval`.
duration: Duration,
}
impl Interval {
/// Create a new `Interval` that starts at `at` and yields every `duration`
/// interval after that.
///
/// The `duration` argument must be a non-zero duration.
///
/// # Panics
///
/// This function panics if `duration` is zero.
pub fn new(at: Instant, duration: Duration) -> Interval {
assert!(duration > Duration::new(0, 0), "`duration` must be non-zero.");
Interval::new_with_sleep(Sleep::new(at), duration)
}
pub(crate) fn new_with_sleep(sleep: Sleep, duration: Duration) -> Interval {
Interval {
sleep,
duration,
}
}
}
impl Stream for Interval {
type Item = Instant;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Wait for the sleep to be done
let _ = try_ready!(self.sleep.poll());
// Get the `now` by looking at the `sleep` deadline
let now = self.sleep.deadline();
// The next interval value is `duration` after the one that just
// yielded.
self.sleep.reset(now + self.duration);
// Return the current instant
Ok(Some(now).into())
}
}

41
tokio-timer/src/lib.rs Normal file
View File

@ -0,0 +1,41 @@
//! Utilities for scheduling work to happen after a period of time.
//!
//! This crate provides a number of utilities for working with periods of time:
//!
//! * [`Sleep`]: A future that completes at a specified instant in time.
//!
//! * [`Interval`] A stream that yields at fixed time intervals.
//!
//! * [`Deadline`]: Wraps a future, requiring it to complete before a specified
//! instant in time, erroring if the future takes too long.
//!
//! These three types are backed by a [`Timer`] instance. In order for
//! [`Sleep`], [`Interval`], and [`Deadline`] to function, the associated
//! [`Timer`] instance must be running on some thread.
//!
//! [`Sleep`]: struct.Sleep.html
//! [`Deadline`]: struct.Deadline.html
//! [`Interval`]: struct.Interval.html
//! [`Timer`]: timer/struct.Timer.html
#![doc(html_root_url = "https://docs.rs/tokio-timer/0.2.0")]
#![deny(missing_docs, warnings, missing_debug_implementations)]
extern crate tokio_executor;
#[macro_use]
extern crate futures;
pub mod timer;
mod atomic;
mod deadline;
mod error;
mod interval;
mod sleep;
pub use self::deadline::{Deadline, DeadlineError};
pub use self::error::Error;
pub use self::interval::Interval;
pub use self::timer::{Timer, with_default};
pub use self::sleep::Sleep;

110
tokio-timer/src/sleep.rs Normal file
View File

@ -0,0 +1,110 @@
use Error;
use timer::Registration;
use futures::{Future, Poll};
use std::time::Instant;
/// A future that completes at a specified instant in time.
///
/// Instances of `Sleep` perform no work and complete with `()` once the
/// specified deadline has been reached.
///
/// `Sleep` has a resolution of one millisecond and should not be used for tasks
/// that require high-resolution timers.
///
/// [`new`]: #method.new
#[derive(Debug)]
pub struct Sleep {
/// The instant at which the future completes.
deadline: Instant,
/// The link between the `Sleep` instance at the timer that drives it.
///
/// When `Sleep` is created with `new`, this is initialized to `None` and is
/// lazily set in `poll`. When `poll` is called, the default for the current
/// execution context is used (obtained via `Handle::current`).
///
/// When `sleep` is created with `new_with_registration`, the value is set.
///
/// Once `registration` is set to `Some`, it is never changed.
registration: Option<Registration>,
}
// ===== impl Sleep =====
impl Sleep {
/// Create a new `Sleep` instance that elapses at `deadline`.
///
/// Only millisecond level resolution is guaranteed. There is no guarantee
/// as to how the sub-millisecond portion of `deadline` will be handled.
/// `Sleep` should not be used for high-resolution timer use cases.
pub fn new(deadline: Instant) -> Sleep {
Sleep {
deadline,
registration: None,
}
}
pub(crate) fn new_with_registration(
deadline: Instant,
registration: Registration) -> Sleep
{
Sleep {
deadline,
registration: Some(registration),
}
}
/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.deadline
}
/// Returns true if the `Sleep` has elapsed
///
/// A `Sleep` is elapsed when the requested duration has elapsed.
pub fn is_elapsed(&self) -> bool {
self.registration.as_ref()
.map(|r| r.is_elapsed())
.unwrap_or(false)
}
/// Reset the `Sleep` instance to a new deadline.
///
/// Calling this function allows changing the instant at which the `Sleep`
/// future completes without having to create new associated state.
///
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&mut self, deadline: Instant) {
self.deadline = deadline;
if let Some(registration) = self.registration.as_ref() {
registration.reset(deadline);
}
}
/// Register the sleep with the timer instance for the current execution
/// context.
fn register(&mut self) {
if self.registration.is_some() {
return;
}
self.registration = Some(Registration::new(self.deadline));
}
}
impl Future for Sleep {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Ensure the `Sleep` instance is associated with a timer.
self.register();
self.registration.as_ref().unwrap()
.poll_elapsed()
}
}

View File

@ -0,0 +1,559 @@
use Error;
use atomic::AtomicU64;
use timer::{Handle, Inner};
use futures::Poll;
use futures::task::AtomicTask;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, AtomicPtr};
use std::sync::atomic::Ordering::SeqCst;
use std::time::Instant;
use std::u64;
/// Internal state shared between a `Sleep` instance and the timer.
///
/// This struct is used as a node in two intrusive data structures:
///
/// * An atomic stack used to signal to the timer thread that the entry state
/// has changed. The timer thread will observe the entry on this stack and
/// perform any actions as necessary.
///
/// * A doubly linked list used **only** by the timer thread. Each slot in the
/// timer wheel is a head pointer to the list of entries that must be
/// processed during that timer tick.
#[derive(Debug)]
pub(crate) struct Entry {
/// Timer internals. Using a weak pointer allows the timer to shutdown
/// without all `Sleep` instances having completed.
inner: Weak<Inner>,
/// Task to notify once the deadline is reached.
task: AtomicTask,
/// Tracks the entry state. This value contains the following information:
///
/// * The deadline at which the entry must be "fired".
/// * A flag indicating if the entry has already been fired.
/// * Whether or not the entry transitioned to the error state.
///
/// When an `Entry` is created, `state` is initialized to the instant at
/// which the entry must be fired. When a timer is reset to a different
/// instant, this value is changed.
state: AtomicU64,
/// When true, the entry is counted by `Inner` towards the max oustanding
/// timeouts. The drop fn uses this to know if it should decrement the
/// counter.
///
/// One might think that it would be easier to just not create the `Entry`.
/// The problem is that `Sleep` expects creating a `Registration` to always
/// return a `Registration` instance. This simplifying factor allows it to
/// improve the struct layout. To do this, we must always allocate the node.
counted: bool,
/// True wheen the entry is queued in the "process" stack. This value
/// is set before pushing the value and unset after popping the value.
queued: AtomicBool,
/// Next entry in the "process" linked list.
///
/// Represents a strong Arc ref.
next_atomic: UnsafeCell<*mut Entry>,
/// When the entry expires, relative to the `start` of the timer
/// (Inner::start). This is only used by the timer.
///
/// A `Sleep` instance can be reset to a different deadline by the thread
/// that owns the `Sleep` instance. In this case, the timer thread will not
/// immediately know that this has happened. The timer thread must know the
/// last deadline that it saw as it uses this value to locate the entry in
/// its wheel.
///
/// Once the timer thread observes that the instant has changed, it updates
/// the wheel and sets this value. The idea is that this value eventually
/// converges to the value of `state` as the timer thread makes updates.
when: UnsafeCell<Option<u64>>,
/// Next entry in the State's linked list.
///
/// This is only accessed by the timer
next_stack: UnsafeCell<Option<Arc<Entry>>>,
/// Previous entry in the State's linked list.
///
/// This is only accessed by the timer and is used to unlink a canceled
/// entry.
///
/// This is a weak reference.
prev_stack: UnsafeCell<*const Entry>,
}
/// A doubly linked stack
pub(crate) struct Stack {
head: Option<Arc<Entry>>,
}
/// A stack of `Entry` nodes
#[derive(Debug)]
pub(crate) struct AtomicStack {
/// Stack head
head: AtomicPtr<Entry>,
}
/// Entries that were removed from the stack
#[derive(Debug)]
pub(crate) struct AtomicStackEntries {
ptr: *mut Entry,
}
/// Flag indicating a timer entry has elapsed
const ELAPSED: u64 = 1 << 63;
/// Flag indicating a timer entry has reached an error state
const ERROR: u64 = u64::MAX;
/// Used to indicate that the timer has shutdown.
const SHUTDOWN: *mut Entry = 1 as *mut _;
// ===== impl Entry =====
impl Entry {
pub fn new(when: u64, handle: Handle) -> Entry {
assert!(when > 0 && when < u64::MAX);
Entry {
inner: handle.into_inner(),
task: AtomicTask::new(),
state: AtomicU64::new(when),
counted: true,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
pub fn new_elapsed(handle: Handle) -> Entry {
Entry {
inner: handle.into_inner(),
task: AtomicTask::new(),
state: AtomicU64::new(ELAPSED),
counted: true,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
/// Create a new `Entry` that is in the error state. Calling `poll_elapsed` on
/// this `Entry` will always result in `Err` being returned.
pub fn new_error() -> Entry {
Entry {
inner: Weak::new(),
task: AtomicTask::new(),
state: AtomicU64::new(ERROR),
counted: false,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
/// The current entry state as known by the timer. This is not the value of
/// `state`, but lets the timer know how to converge its state to `state`.
pub fn when_internal(&self) -> Option<u64> {
unsafe { (*self.when.get()) }
}
pub fn set_when_internal(&self, when: Option<u64>) {
unsafe { (*self.when.get()) = when; }
}
/// Called by `Timer` to load the current value of `state` for processing
pub fn load_state(&self) -> Option<u64> {
let state = self.state.load(SeqCst);
if is_elapsed(state) {
None
} else {
Some(state)
}
}
pub fn is_elapsed(&self) -> bool {
let state = self.state.load(SeqCst);
is_elapsed(state)
}
pub fn fire(&self, when: u64) {
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) || curr > when {
return;
}
let next = ELAPSED | curr;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.task.notify();
}
pub fn error(&self) {
// Only transition to the error state if not currently elapsed
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) {
return;
}
let next = ERROR;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.task.notify();
}
pub fn cancel(entry: &Arc<Entry>) {
let state = entry.state.fetch_or(ELAPSED, SeqCst);
if is_elapsed(state) {
// Nothing more to do
return;
}
let inner = match entry.inner.upgrade() {
Some(inner) => inner,
None => return,
};
let _ = inner.queue(entry);
}
pub fn poll_elapsed(&self) -> Poll<(), Error> {
use futures::Async::NotReady;
let mut curr = self.state.load(SeqCst);
if is_elapsed(curr) {
if curr == ERROR {
return Err(Error::shutdown());
} else {
return Ok(().into());
}
}
self.task.register();
curr = self.state.load(SeqCst).into();
if is_elapsed(curr) {
if curr == ERROR {
return Err(Error::shutdown());
} else {
return Ok(().into());
}
}
Ok(NotReady)
}
pub fn reset(entry: &Arc<Entry>, deadline: Instant) {
let inner = match entry.inner.upgrade() {
Some(inner) => inner,
None => return,
};
let when = inner.normalize_deadline(deadline);
let elapsed = inner.elapsed();
let mut curr = entry.state.load(SeqCst);
let mut notify;
loop {
// In these two cases, there is no work to do when resetting the
// timer. If the `Entry` is in an error state, then it cannot be
// used anymore. If resetting the entry to the current value, then
// the reset is a noop.
if curr == ERROR || curr == when {
return;
}
let next;
if when <= elapsed {
next = ELAPSED;
notify = !is_elapsed(curr);
} else {
next = when;
notify = true;
}
let actual = entry.state.compare_and_swap(
curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
if notify {
let _ = inner.queue(entry);
}
}
}
fn is_elapsed(state: u64) -> bool {
state & ELAPSED == ELAPSED
}
impl Drop for Entry {
fn drop(&mut self) {
if !self.counted {
return;
}
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.decrement();
}
}
unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}
// ===== impl Stack =====
impl Stack {
pub fn new() -> Stack {
Stack { head: None }
}
pub fn is_empty(&self) -> bool {
self.head.is_none()
}
/// Push an entry to the head of the linked list
pub fn push(&mut self, entry: Arc<Entry>) {
// Get a pointer to the entry to for the prev link
let ptr: *const Entry = &*entry as *const _;
// Remove the old head entry
let old = self.head.take();
unsafe {
// Ensure the entry is not already in a stack.
debug_assert!((*entry.next_stack.get()).is_none());
debug_assert!((*entry.prev_stack.get()).is_null());
if let Some(ref entry) = old.as_ref() {
debug_assert!({
// The head is not already set to the entry
ptr != &***entry as *const _
});
// Set the previous link on the old head
*entry.prev_stack.get() = ptr;
}
// Set this entry's next pointer
*entry.next_stack.get() = old;
}
// Update the head pointer
self.head = Some(entry);
}
/// Pop the head of the linked list
pub fn pop(&mut self) -> Option<Arc<Entry>> {
let entry = self.head.take();
unsafe {
if let Some(entry) = entry.as_ref() {
self.head = (*entry.next_stack.get()).take();
if let Some(entry) = self.head.as_ref() {
*entry.prev_stack.get() = ptr::null();
}
*entry.prev_stack.get() = ptr::null();
}
}
entry
}
/// Remove the entry from the linked list
///
/// The caller must ensure that the entry actually is contained by the list.
pub fn remove(&mut self, entry: &Entry) {
unsafe {
// Ensure that the entry is in fact contained by the stack
debug_assert!({
// This walks the full linked list even if an entry is found.
let mut next = self.head.as_ref();
let mut contains = false;
while let Some(n) = next {
if entry as *const _ == &**n as *const _ {
debug_assert!(!contains);
contains = true;
}
next = (*n.next_stack.get()).as_ref();
}
contains
});
// Unlink `entry` from the next node
let next = (*entry.next_stack.get()).take();
if let Some(next) = next.as_ref() {
(*next.prev_stack.get()) = *entry.prev_stack.get();
}
// Unlink `entry` from the prev node
if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
*prev.next_stack.get() = next;
} else {
// It is the head
self.head = next;
}
// Unset the prev pointer
*entry.prev_stack.get() = ptr::null();
}
}
}
// ===== impl AtomicStack =====
impl AtomicStack {
pub fn new() -> AtomicStack {
AtomicStack { head: AtomicPtr::new(ptr::null_mut()) }
}
/// Push an entry onto the stack.
///
/// Returns `true` if the entry was pushed, `false` if the entry is already
/// on the stack, `Err` if the timer is shutdown.
pub fn push(&self, entry: &Arc<Entry>) -> Result<bool, Error> {
// First, set the queued bit on the entry
let queued = entry.queued.fetch_or(true, SeqCst).into();
if queued {
// Already queued, nothing more to do
return Ok(false);
}
let ptr = Arc::into_raw(entry.clone()) as *mut _;
let mut curr = self.head.load(SeqCst);
loop {
if curr == SHUTDOWN {
// Don't leak the entry node
let _ = unsafe { Arc::from_raw(ptr) };
return Err(Error::shutdown());
}
// Update the `next` pointer. This is safe because setting the queued
// bit is a "lock" on this field.
unsafe {
*(entry.next_atomic.get()) = curr;
}
let actual = self.head.compare_and_swap(curr, ptr, SeqCst);
if actual == curr {
break;
}
curr = actual;
}
Ok(true)
}
/// Take all entries from the stack
pub fn take(&self) -> AtomicStackEntries {
let ptr = self.head.swap(ptr::null_mut(), SeqCst);
AtomicStackEntries { ptr }
}
/// Drain all remaining nodes in the stack and prevent any new nodes from
/// being pushed onto the stack.
pub fn shutdown(&self) {
// Shutdown the processing queue
let ptr = self.head.swap(SHUTDOWN, SeqCst);
// Let the drop fn of `AtomicStackEntries` handle draining the stack
drop(AtomicStackEntries { ptr });
}
}
// ===== impl AtomicStackEntries =====
impl Iterator for AtomicStackEntries {
type Item = Arc<Entry>;
fn next(&mut self) -> Option<Self::Item> {
if self.ptr.is_null() {
return None;
}
// Convert the pointer to an `Arc<Entry>`
let entry = unsafe { Arc::from_raw(self.ptr) };
// Update `self.ptr` to point to the next element of the stack
self.ptr = unsafe { (*entry.next_atomic.get()) };
// Unset the queued flag
let res = entry.queued.fetch_and(false, SeqCst);
debug_assert!(res);
// Return the entry
Some(entry)
}
}
impl Drop for AtomicStackEntries {
fn drop(&mut self) {
while let Some(entry) = self.next() {
// Flag the entry as errored
entry.error();
}
}
}

View File

@ -0,0 +1,127 @@
use {Error, Sleep, Deadline, Interval};
use timer::{Registration, Inner};
use tokio_executor::Enter;
use std::cell::RefCell;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
/// Handle to timer instance.
///
/// The `Handle` allows creating `Sleep` instances that are driven by the
/// associated timer.
///
/// A `Handle` is obtained by calling [`Timer::handle`].
///
/// [`Timer::handle`]: struct.Timer.html#method.handle
#[derive(Debug, Clone)]
pub struct Handle {
inner: Weak<Inner>,
}
/// Tracks the timer for the current execution context.
thread_local!(static CURRENT_TIMER: RefCell<Option<Handle>> = RefCell::new(None));
/// Set the default timer for the duration of the closure.
///
/// From within the closure, [`Sleep`] instances that are created via
/// [`Sleep::new`] can be used.
///
/// # Panics
///
/// This function panics if there already is a default timer set.
///
/// [`Sleep`]: ../struct.Sleep.html
pub fn with_default<F, R>(handle: &Handle, enter: &mut Enter, f: F) -> R
where F: FnOnce(&mut Enter) -> R
{
// Ensure that the timer is removed from the thread-local context
// when leaving the scope. This handles cases that involve panicking.
struct Reset;
impl Drop for Reset {
fn drop(&mut self) {
CURRENT_TIMER.with(|current| {
let mut current = current.borrow_mut();
*current = None;
});
}
}
// This ensures the value for the current timer gets reset even if there is
// a panic.
let _r = Reset;
CURRENT_TIMER.with(|current| {
{
let mut current = current.borrow_mut();
assert!(current.is_none(), "default Tokio timer already set \
for execution context");
*current = Some(handle.clone());
}
f(enter)
})
}
impl Handle {
pub(crate) fn new(inner: Weak<Inner>) -> Handle {
Handle { inner }
}
/// Returns a handle to the current timer.
///
/// The current timer is the timer that is currently set as default using
/// [`with_default`].
///
/// This function should only be called from within the context of
/// [`with_default`]. Calling this function from outside of this context
/// will return a `Handle` that does not reference a timer. `Sleep`
/// instances created with this handle will error.
///
/// [`with_default`]: ../fn.with_default.html
pub fn current() -> Handle {
Handle::try_current()
.unwrap_or(Handle { inner: Weak::new() })
}
/// Create a `Sleep` driven by this handle's associated `Timer`.
pub fn sleep(&self, deadline: Instant) -> Sleep {
let registration = Registration::new_with_handle(deadline, self.clone());
Sleep::new_with_registration(deadline, registration)
}
/// Create a `Deadline` driven by this handle's associated `Timer`.
pub fn deadline<T>(&self, future: T, deadline: Instant) -> Deadline<T> {
Deadline::new_with_sleep(future, self.sleep(deadline))
}
/// Create a new `Interval` that starts at `at` and yields every `duration`
/// interval after that.
pub fn interval(&self, at: Instant, duration: Duration) -> Interval {
Interval::new_with_sleep(self.sleep(at), duration)
}
/// Try to get a handle to the current timer.
///
/// Returns `Err` if no handle is found.
pub(crate) fn try_current() -> Result<Handle, Error> {
CURRENT_TIMER.with(|current| {
match *current.borrow() {
Some(ref handle) => Ok(handle.clone()),
None => Err(Error::shutdown()),
}
})
}
/// Try to return a strong ref to the inner
pub(crate) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
/// Consume the handle, returning the weak Inner ref.
pub(crate) fn into_inner(self) -> Weak<Inner> {
self.inner
}
}

View File

@ -0,0 +1,201 @@
use timer::{entry, Entry};
use std::fmt;
use std::sync::Arc;
/// Wheel for a single level in the timer. This wheel contains 64 slots.
pub(crate) struct Level {
level: usize,
/// Bit field tracking which slots currently contain entries.
///
/// Using a bit field to track slots that contain entries allows avoiding a
/// scan to find entries. This field is updated when entries are added or
/// removed from a slot.
///
/// The least-significant bit represents slot zero.
occupied: u64,
/// Slots
slot: [entry::Stack; LEVEL_MULT],
}
/// Indicates when a slot must be processed next.
#[derive(Debug)]
pub struct Expiration {
/// The level containing the slot.
pub level: usize,
/// The slot index.
pub slot: usize,
/// The instant at which the slot needs to be processed.
pub deadline: u64,
}
/// Level multiplier.
///
/// Being a power of 2 is very important.
const LEVEL_MULT: usize = 64;
impl Level {
pub fn new(level: usize) -> Level {
// Rust's derived implementations for arrays require that the value
// contained by the array be `Copy`. So, here we have to manually
// initialize every single slot.
macro_rules! s {
() => { entry::Stack::new() };
};
Level {
level,
occupied: 0,
slot: [
// It does not look like the necessary traits are
// derived for [T; 64].
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
s!(), s!(), s!(), s!(), s!(), s!(), s!(), s!(),
],
}
}
/// Finds the slot that needs to be processed next and returns the slot and
/// `Instant` at which this slot must be processed.
pub fn next_expiration(&self, now: u64) -> Option<Expiration> {
// Use the `occupied` bit field to get the index of the next slot that
// needs to be processed.
let slot = match self.next_occupied_slot(now) {
Some(slot) => slot,
None => return None,
};
// From the slot index, calculate the `Instant` at which it needs to be
// processed. This value *must* be in the future with respect to `now`.
let level_range = level_range(self.level);
let slot_range = slot_range(self.level);
// TODO: This can probably be simplified w/ power of 2 math
let level_start = now - (now % level_range);
let deadline = level_start + slot as u64 * slot_range;
debug_assert!(deadline >= now, "deadline={}; now={}; level={}; slot={}; occupied={:b}",
deadline, now, self.level, slot, self.occupied);
Some(Expiration {
level: self.level,
slot,
deadline,
})
}
fn next_occupied_slot(&self, now: u64) -> Option<usize> {
if self.occupied == 0 {
return None;
}
// Get the slot for now using Maths
let now_slot = (now / slot_range(self.level)) as usize;
let occupied = self.occupied.rotate_right(now_slot as u32);
let zeros = occupied.trailing_zeros() as usize;
let slot = (zeros + now_slot) % 64;
Some(slot)
}
pub fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
let slot = slot_for(when, self.level);
self.slot[slot].push(entry);
self.occupied |= occupied_bit(slot);
}
pub fn remove_entry(&mut self, entry: &Entry, when: u64) {
let slot = slot_for(when, self.level);
self.slot[slot].remove(entry);
if self.slot[slot].is_empty() {
// The bit is currently set
debug_assert!(self.occupied & occupied_bit(slot) != 0);
// Unset the bit
self.occupied ^= occupied_bit(slot);
}
}
pub fn pop_entry_slot(&mut self, slot: usize) -> Option<Arc<Entry>> {
let ret = self.slot[slot].pop();
if ret.is_some() && self.slot[slot].is_empty() {
// The bit is currently set
debug_assert!(self.occupied & occupied_bit(slot) != 0);
self.occupied ^= occupied_bit(slot);
}
ret
}
}
impl Drop for Level {
fn drop(&mut self) {
while let Some(slot) = self.next_occupied_slot(0) {
// This should always have one
let entry = self.pop_entry_slot(slot)
.expect("occupied bit set invalid");
entry.error();
}
}
}
impl fmt::Debug for Level {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Level")
.field("occupied", &self.occupied)
.finish()
}
}
fn occupied_bit(slot: usize) -> u64 {
(1 << slot)
}
fn slot_range(level: usize) -> u64 {
LEVEL_MULT.pow(level as u32) as u64
}
fn level_range(level: usize) -> u64 {
LEVEL_MULT as u64 * slot_range(level)
}
/// Convert a duration (milliseconds) and a level to a slot position
fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_slot_for() {
for pos in 1..64 {
assert_eq!(pos as usize, slot_for(pos, 0));
}
for level in 1..5 {
for pos in level..64 {
let a = pos * 64_usize.pow(level as u32);
assert_eq!(pos as usize, slot_for(a as u64, level));
}
}
}
}

View File

@ -0,0 +1,645 @@
//! Timer implementation.
//!
//! This module contains the types needed to run a timer.
//!
//! The [`Timer`] type runs the timer logic. It holds all the necessary state
//! to track all associated [`Sleep`] instances and delivering notifications
//! once the deadlines are reached.
//!
//! The [`Handle`] type is a reference to a [`Timer`] instance. This type is
//! `Clone`, `Send`, and `Sync`. This type is used to create instances of
//! [`Sleep`].
//!
//! The [`Now`] trait describes how to get an `Instance` representing the
//! current moment in time. [`SystemNow`] is the default implementation, where
//! [`Now::now`] is implemented by calling `Instant::now`.
//!
//! [`Timer`] is generic over [`Now`]. This allows the source of time to be
//! customized. This ability is especially useful in tests and any environment
//! where determinism is necessary.
//!
//! Note, when using the Tokio runtime, the `Timer` does not need to be manually
//! setup as the runtime comes pre-configured with a `Timer` instance.
//!
//! [`Timer`]: struct.Timer.html
//! [`Handle`]: struct.Handle.html
//! [`Sleep`]: ../struct.Sleep.html
//! [`Now`]: trait.Now.html
//! [`Now::now`]: trait.Now.html#method.now
mod entry;
mod handle;
mod level;
mod now;
mod registration;
use self::entry::Entry;
use self::level::{Level, Expiration};
pub use self::handle::{Handle, with_default};
pub use self::now::{Now, SystemNow};
pub(crate) use self::registration::Registration;
use Error;
use atomic::AtomicU64;
use tokio_executor::park::{Park, Unpark, ParkThread};
use std::{cmp, fmt};
use std::time::{Duration, Instant};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::usize;
/// Timer implementation that drives [`Sleep`], [`Interval`], and [`Deadline`].
///
/// A `Timer` instance tracks the state necessary for managing time and
/// notifying the [`Sleep`] instances once their deadlines are reached.
///
/// It is expected that a single `Timer` instance manages many individual
/// `Sleep` instances. The `Timer` implementation is thread-safe and, as such,
/// is able to handle callers from across threads.
///
/// Callers do not use `Timer` directly to create `Sleep` instances. Instead,
/// [`Handle`] is used. A handle for the timer instance is obtained by calling
/// [`handle`]. [`Handle`] is the type that implements `Clone` and is `Send +
/// Sync`.
///
/// After creating the `Timer` instance, the caller must repeatedly call
/// [`turn`]. The timer will perform no work unless [`turn`] is called
/// repeatedly.
///
/// The `Timer` has a resolution of one millisecond. Any unit of time that falls
/// between milliseconds are rounded up to the next millisecond.
///
/// When the `Timer` instance is dropped, any outstanding `Sleep` instance that
/// has not elapsed will be notified with an error. At this point, calling
/// `poll` on the sleep instance will result in `Err` being returned.
///
/// # Implementation
///
/// `Timer` is based on the [paper by Varghese and Lauck][paper].
///
/// A hashed timing wheel is a vector of slots, where each slot handles a time
/// slice. As time progresses, the timer walks over the slot for the current
/// instant, and processes each entry for that slot. When the timer reaches the
/// end of the wheel, it starts again at the beginning.
///
/// The `Timer` implementation maintains six wheels arranged in a set of levels.
/// As the levels go up, the slots of the associated wheel represent larger
/// intervals of time. At each level, the wheel has 64 slots. Each slot covers a
/// range of time equal to the wheel at the lower level. At level zero, each
/// slot represents one millisecond of time.
///
/// The wheels are:
///
/// * Level 0: 64 x 1 millisecond slots.
/// * Level 1: 64 x 64 millisecond slots.
/// * Level 2: 64 x ~4 second slots.
/// * Level 3: 64 x ~4 minute slots.
/// * Level 4: 64 x ~4 hour slots.
/// * Level 5: 64 x ~12 day slots.
///
/// When the timer processes entries at level zero, it will notify all the
/// [`Sleep`] instances as their deadlines have been reached. For all higher
/// levels, all entries will be redistributed across the wheel at the next level
/// down. Eventually, as time progresses, entries will `Sleep` instances will
/// either be canceled (dropped) or their associated entries will reach level
/// zero and be notified.
///
/// [`Sleep`]: ../struct.Sleep.html
/// [`Interval`]: ../struct.Interval.html
/// [`Deadline`]: ../struct.Deadline.html
/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
/// [`handle`]: #method.handle
/// [`turn`]: #method.turn
/// [`Handle`]: struct.Handle.html
#[derive(Debug)]
pub struct Timer<T, N = SystemNow> {
/// Shared state
inner: Arc<Inner>,
/// The number of milliseconds elapsed since the timer started.
elapsed: u64,
/// Timer wheel.
///
/// Levels:
///
/// * 1 ms slots / 64 ms range
/// * 64 ms slots / ~ 4 sec range
/// * ~ 4 sec slots / ~ 4 min range
/// * ~ 4 min slots / ~ 4 hr range
/// * ~ 4 hr slots / ~ 12 day range
/// * ~ 12 day slots / ~ 2 yr range
levels: Vec<Level>,
/// Thread parker. The `Timer` park implementation delegates to this.
park: T,
/// Source of "now" instances
now: N,
}
/// Return value from the `turn` method on `Timer`.
///
/// Currently this value doesn't actually provide any functionality, but it may
/// in the future give insight into what happened during `turn`.
#[derive(Debug)]
pub struct Turn(());
/// Timer state shared between `Timer`, `Handle`, and `Registration`.
pub(crate) struct Inner {
/// The instant at which the timer started running.
start: Instant,
/// The last published timer `elapsed` value.
elapsed: AtomicU64,
/// Number of active timeouts
num: AtomicUsize,
/// Head of the "process" linked list.
process: entry::AtomicStack,
/// Unparks the timer thread.
unpark: Box<Unpark>,
}
/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
/// each, the timer is able to track time up to 2 years into the future with a
/// precision of 1 millisecond.
const NUM_LEVELS: usize = 6;
/// The maximum duration of a sleep
const MAX_DURATION: u64 = 1 << (6 * NUM_LEVELS);
/// Maximum number of timeouts the system can handle concurrently.
const MAX_TIMEOUTS: usize = usize::MAX >> 1;
// ===== impl Timer =====
impl<T> Timer<T>
where T: Park
{
/// Create a new `Timer` instance that uses `park` to block the current
/// thread.
///
/// Once the timer has been created, a handle can be obtained using
/// [`handle`]. The handle is used to create `Sleep` instances.
///
/// Use `default` when constructing a `Timer` using the default `park`
/// instance.
///
/// [`handle`]: #method.handle
pub fn new(park: T) -> Self {
Timer::new_with_now(park, SystemNow::new())
}
}
impl<T, N> Timer<T, N> {
/// Returns a reference to the underlying `Park` instance.
pub fn get_park(&self) -> &T {
&self.park
}
/// Returns a mutable reference to the underlying `Park` instance.
pub fn get_park_mut(&mut self) -> &mut T {
&mut self.park
}
}
impl<T, N> Timer<T, N>
where T: Park,
N: Now,
{
/// Create a new `Timer` instance that uses `park` to block the current
/// thread and `now` to get the current `Instant`.
///
/// Specifying the source of time is useful when testing.
pub fn new_with_now(park: T, mut now: N) -> Self {
let unpark = Box::new(park.unpark());
let levels = (0..NUM_LEVELS)
.map(Level::new)
.collect();
Timer {
inner: Arc::new(Inner::new(now.now(), unpark)),
elapsed: 0,
levels,
park,
now,
}
}
/// Returns a handle to the timer.
///
/// The `Handle` is how `Sleep` instances are created. The `Sleep` instances
/// can either be created directly or the `Handle` instance can be passed to
/// `with_default`, setting the timer as the default timer for the execution
/// context.
pub fn handle(&self) -> Handle {
Handle::new(Arc::downgrade(&self.inner))
}
/// Performs one iteration of the timer loop.
///
/// This function must be called repeatedly in order for the `Timer`
/// instance to make progress. This is where the work happens.
///
/// The `Timer` will use the `Park` instance that was specified in [`new`]
/// to block the current thread until the next `Sleep` instance elapses. One
/// call to `turn` results in at most one call to `park.park()`.
///
/// # Return
///
/// On success, `Ok(Turn)` is returned, where `Turn` is a placeholder type
/// that currently does nothing but may, in the future, have functions add
/// to provide information about the call to `turn`.
///
/// If the call to `park.park()` fails, then `Err` is returned with the
/// error.
///
/// [`new`]: #method.new
pub fn turn(&mut self, max_wait: Option<Duration>) -> Result<Turn, T::Error> {
match max_wait {
Some(timeout) => self.park_timeout(timeout)?,
None => self.park()?,
}
Ok(Turn(()))
}
/// Returns the instant at which the next timeout expires.
fn next_expiration(&self) -> Option<Expiration> {
// Check all levels
for level in 0..NUM_LEVELS {
if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
// There cannot be any expirations at a higher level that happen
// before this one.
debug_assert!({
let mut res = true;
for l2 in (level+1)..NUM_LEVELS {
if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
if e2.deadline < expiration.deadline {
res = false;
}
}
}
res
});
return Some(expiration);
}
}
None
}
/// Converts an `Expiration` to an `Instant`.
fn expiration_instant(&self, expiration: &Expiration) -> Instant {
self.inner.start + Duration::from_millis(expiration.deadline)
}
/// Run timer related logic
fn process(&mut self) {
let now = ms(self.now.now() - self.inner.start, Round::Down);
loop {
let expiration = match self.next_expiration() {
Some(expiration) => expiration,
None => break,
};
if expiration.deadline > now {
// This expiration should not fire on this tick
break;
}
// Prcess the slot, either moving it down a level or firing the
// timeout if currently at the final (boss) level.
self.process_expiration(&expiration);
self.set_elapsed(expiration.deadline);
}
self.set_elapsed(now);
}
fn set_elapsed(&mut self, when: u64) {
assert!(self.elapsed <= when, "elapsed={:?}; when={:?}", self.elapsed, when);
if when > self.elapsed {
self.elapsed = when;
self.inner.elapsed.store(when, SeqCst);
} else {
assert_eq!(self.elapsed, when);
}
}
fn process_expiration(&mut self, expiration: &Expiration) {
while let Some(entry) = self.pop_entry(expiration) {
if expiration.level == 0 {
let when = entry.when_internal()
.expect("invalid internal entry state");
debug_assert_eq!(when, expiration.deadline);
// Fire the entry
entry.fire(when);
// Track that the entry has been fired
entry.set_when_internal(None);
} else {
let when = entry.when_internal()
.expect("entry not tracked");
let next_level = expiration.level - 1;
self.levels[next_level]
.add_entry(entry, when);
}
}
}
fn pop_entry(&mut self, expiration: &Expiration) -> Option<Arc<Entry>> {
self.levels[expiration.level].pop_entry_slot(expiration.slot)
}
/// Process the entry queue
///
/// This handles adding and canceling timeouts.
fn process_queue(&mut self) {
for entry in self.inner.process.take() {
match (entry.when_internal(), entry.load_state()) {
(None, None) => {
// Nothing to do
}
(Some(when), None) => {
// Remove the entry
self.clear_entry(&entry, when);
}
(None, Some(when)) => {
// Queue the entry
self.add_entry(entry, when);
}
(Some(curr), Some(next)) => {
self.clear_entry(&entry, curr);
self.add_entry(entry, next);
}
}
}
}
fn clear_entry(&mut self, entry: &Arc<Entry>, when: u64) {
// Get the level at which the entry should be stored
let level = self.level_for(when);
self.levels[level].remove_entry(entry, when);
entry.set_when_internal(None);
}
/// Fire the entry if it needs to, otherwise queue it to be processed later.
///
/// Returns `None` if the entry was fired.
fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
if when <= self.elapsed {
// The entry's deadline has elapsed, so fire it and update the
// internal state accordingly.
entry.set_when_internal(None);
entry.fire(when);
return;
} else if when - self.elapsed > MAX_DURATION {
// The entry's deadline is invalid, so error it and update the
// internal state accordingly.
entry.set_when_internal(None);
entry.error();
return;
}
// Get the level at which the entry should be stored
let level = self.level_for(when);
entry.set_when_internal(Some(when));
self.levels[level].add_entry(entry, when);
debug_assert!({
self.levels[level].next_expiration(self.elapsed)
.map(|e| e.deadline >= self.elapsed)
.unwrap_or(true)
});
}
fn level_for(&self, when: u64) -> usize {
level_for(self.elapsed, when)
}
}
fn level_for(elapsed: u64, when: u64) -> usize {
let masked = elapsed ^ when;
assert!(masked != 0, "elapsed={}; when={}", elapsed, when);
let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;
significant / 6
}
impl Default for Timer<ParkThread, SystemNow> {
fn default() -> Self {
Timer::new(ParkThread::new())
}
}
impl<T, N> Park for Timer<T, N>
where T: Park,
N: Now,
{
type Unpark = T::Unpark;
type Error = T::Error;
fn unpark(&self) -> Self::Unpark {
self.park.unpark()
}
fn park(&mut self) -> Result<(), Self::Error> {
self.process_queue();
match self.next_expiration() {
Some(expiration) => {
let now = self.now.now();
let deadline = self.expiration_instant(&expiration);
if deadline > now {
self.park.park_timeout(deadline - now)?;
}
}
None => {
self.park.park()?;
}
}
self.process();
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.process_queue();
match self.next_expiration() {
Some(expiration) => {
let now = self.now.now();
let deadline = self.expiration_instant(&expiration);
if deadline > now {
self.park.park_timeout(cmp::min(deadline - now, duration))?;
}
}
None => {
self.park.park_timeout(duration)?;
}
}
self.process();
Ok(())
}
}
impl<T, N> Drop for Timer<T, N> {
fn drop(&mut self) {
// Shutdown the stack of entries to process, preventing any new entries
// from being pushed.
self.inner.process.shutdown();
}
}
// ===== impl Inner =====
impl Inner {
fn new(start: Instant, unpark: Box<Unpark>) -> Inner {
Inner {
num: AtomicUsize::new(0),
elapsed: AtomicU64::new(0),
process: entry::AtomicStack::new(),
start,
unpark,
}
}
fn elapsed(&self) -> u64 {
self.elapsed.load(SeqCst)
}
/// Increment the number of active timeouts
fn increment(&self) -> Result<(), Error> {
let mut curr = self.num.load(SeqCst);
loop {
if curr == MAX_TIMEOUTS {
return Err(Error::at_capacity());
}
let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst);
if curr == actual {
return Ok(());
}
curr = actual;
}
}
/// Decrement the number of active timeouts
fn decrement(&self) {
let prev = self.num.fetch_sub(1, SeqCst);
debug_assert!(prev <= MAX_TIMEOUTS);
}
fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
if self.process.push(entry)? {
// The timer is notified so that it can process the timeout
self.unpark.unpark();
}
Ok(())
}
fn normalize_deadline(&self, deadline: Instant) -> u64 {
if deadline < self.start {
return 0;
}
ms(deadline - self.start, Round::Up)
}
}
impl fmt::Debug for Inner {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Inner")
.finish()
}
}
enum Round {
Up,
Down,
}
/// Convert a `Duration` to milliseconds, rounding up and saturating at
/// `u64::MAX`.
///
/// The saturating is fine because `u64::MAX` milliseconds are still many
/// million years.
#[inline]
fn ms(duration: Duration, round: Round) -> u64 {
const NANOS_PER_MILLI: u32 = 1_000_000;
const MILLIS_PER_SEC: u64 = 1_000;
// Round up.
let millis = match round {
Round::Up => (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI,
Round::Down => duration.subsec_nanos() / NANOS_PER_MILLI,
};
duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_level_for() {
for pos in 1..64 {
assert_eq!(0, level_for(0, pos), "level_for({}) -- binary = {:b}", pos, pos);
}
for level in 1..5 {
for pos in level..64 {
let a = pos * 64_usize.pow(level as u32);
assert_eq!(level, level_for(0, a as u64),
"level_for({}) -- binary = {:b}", a, a);
if pos > level {
let a = a - 1;
assert_eq!(level, level_for(0, a as u64),
"level_for({}) -- binary = {:b}", a, a);
}
if pos < 64 {
let a = a + 1;
assert_eq!(level, level_for(0, a as u64),
"level_for({}) -- binary = {:b}", a, a);
}
}
}
}
}

View File

@ -0,0 +1,27 @@
use std::time::Instant;
/// Returns `Instant` values representing the current instant in time.
///
/// This allows customizing the source of time which is especially useful for
/// testing.
pub trait Now {
/// Returns an instant corresponding to "now".
fn now(&mut self) -> Instant;
}
/// Returns the instant corresponding to now using a monotonic clock.
#[derive(Debug)]
pub struct SystemNow(());
impl SystemNow {
/// Create a new `SystemNow`.
pub fn new() -> SystemNow {
SystemNow(())
}
}
impl Now for SystemNow {
fn now(&mut self) -> Instant {
Instant::now()
}
}

View File

@ -0,0 +1,82 @@
use Error;
use timer::{Handle, Entry};
use futures::Poll;
use std::sync::Arc;
use std::time::Instant;
/// Registration with a timer.
///
/// The association between a `Sleep` instance and a timer is done lazily in
/// `poll`
#[derive(Debug)]
pub(crate) struct Registration {
entry: Arc<Entry>,
}
impl Registration {
pub fn new(deadline: Instant) -> Registration {
fn is_send<T: Send + Sync>() {}
is_send::<Registration>();
match Handle::try_current() {
Ok(handle) => Registration::new_with_handle(deadline, handle),
Err(_) => Registration::new_error(),
}
}
pub fn new_with_handle(deadline: Instant, handle: Handle) -> Registration {
let inner = match handle.inner() {
Some(inner) => inner,
None => return Registration::new_error(),
};
// Increment the number of active timeouts
if inner.increment().is_err() {
return Registration::new_error();
}
let when = inner.normalize_deadline(deadline);
if when <= inner.elapsed() {
// The deadline has already elapsed, ther eis no point creating the
// structures.
return Registration {
entry: Arc::new(Entry::new_elapsed(handle)),
};
}
let entry = Arc::new(Entry::new(when, handle));
if inner.queue(&entry).is_err() {
// The timer has shutdown, transition the entry to the error state.
entry.error();
}
Registration { entry }
}
pub fn reset(&self, deadline: Instant) {
Entry::reset(&self.entry, deadline);
}
fn new_error() -> Registration {
let entry = Arc::new(Entry::new_error());
Registration { entry }
}
pub fn is_elapsed(&self) -> bool {
self.entry.is_elapsed()
}
pub fn poll_elapsed(&self) -> Poll<(), Error> {
self.entry.poll_elapsed()
}
}
impl Drop for Registration {
fn drop(&mut self) {
Entry::cancel(&self.entry);
}
}

View File

@ -0,0 +1,105 @@
extern crate futures;
extern crate tokio_executor;
extern crate tokio_timer;
#[macro_use]
mod support;
use support::*;
use tokio_timer::*;
use futures::{future, Future};
use futures::sync::oneshot;
#[test]
fn simultaneous_deadline_future_completion() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now());
// Ready!
assert_ready!(fut);
});
}
#[test]
fn completed_future_past_deadline() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now() - ms(1000));
// Ready!
assert_ready!(fut);
});
}
#[test]
fn future_and_deadline_in_future() {
mocked(|timer, time| {
// Not yet complete
let (tx, rx) = oneshot::channel();
// Wrap it with a deadline
let mut fut = Deadline::new(rx, time.now() + ms(100));
// Ready!
assert_not_ready!(fut);
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
assert_not_ready!(fut);
// Complete the future
tx.send(()).unwrap();
assert_ready!(fut);
});
}
#[test]
fn deadline_now_elapses() {
mocked(|_, time| {
let fut = future::empty::<(), ()>();
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now());
assert_elapsed!(fut);
});
}
#[test]
fn deadline_future_elapses() {
mocked(|timer, time| {
let fut = future::empty::<(), ()>();
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now() + ms(300));
assert_not_ready!(fut);
advance(timer, ms(300));
assert_elapsed!(fut);
});
}
#[test]
fn future_errors_first() {
mocked(|_, time| {
let fut = future::err::<(), ()>(());
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now() + ms(100));
// Ready!
assert!(fut.poll().unwrap_err().is_inner());
});
}

240
tokio-timer/tests/hammer.rs Normal file
View File

@ -0,0 +1,240 @@
extern crate futures;
extern crate rand;
extern crate tokio_executor;
extern crate tokio_timer;
use tokio_executor::park::{Park, Unpark, UnparkThread};
use tokio_timer::*;
use futures::{Future, Stream};
use futures::stream::FuturesUnordered;
use rand::Rng;
use std::cmp;
use std::sync::{Arc, Barrier};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::thread;
use std::time::{Duration, Instant};
struct Signal {
rem: AtomicUsize,
unpark: UnparkThread,
}
#[test]
fn hammer_complete() {
const ITERS: usize = 5;
const THREADS: usize = 4;
const PER_THREAD: usize = 40;
const MIN_DELAY: u64 = 1;
const MAX_DELAY: u64 = 5_000;
for _ in 0..ITERS {
let mut timer = Timer::default();
let handle = timer.handle();
let barrier = Arc::new(Barrier::new(THREADS));
let done = Arc::new(Signal {
rem: AtomicUsize::new(THREADS),
unpark: timer.get_park().unpark(),
});
for _ in 0..THREADS {
let handle = handle.clone();
let barrier = barrier.clone();
let done = done.clone();
thread::spawn(move || {
let mut exec = FuturesUnordered::new();
let mut rng = rand::thread_rng();
barrier.wait();
for _ in 0..PER_THREAD {
let deadline = Instant::now() + Duration::from_millis(
rng.gen_range(MIN_DELAY, MAX_DELAY));
exec.push({
handle.sleep(deadline)
.and_then(move |_| {
let now = Instant::now();
assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
Ok(())
})
});
}
// Run the logic
exec.for_each(|_| Ok(()))
.wait()
.unwrap();
if 1 == done.rem.fetch_sub(1, SeqCst) {
done.unpark.unpark();
}
});
}
while done.rem.load(SeqCst) > 0 {
timer.turn(None).unwrap();
}
}
}
#[test]
fn hammer_cancel() {
const ITERS: usize = 5;
const THREADS: usize = 4;
const PER_THREAD: usize = 40;
const MIN_DELAY: u64 = 1;
const MAX_DELAY: u64 = 5_000;
for _ in 0..ITERS {
let mut timer = Timer::default();
let handle = timer.handle();
let barrier = Arc::new(Barrier::new(THREADS));
let done = Arc::new(Signal {
rem: AtomicUsize::new(THREADS),
unpark: timer.get_park().unpark(),
});
for _ in 0..THREADS {
let handle = handle.clone();
let barrier = barrier.clone();
let done = done.clone();
thread::spawn(move || {
let mut exec = FuturesUnordered::new();
let mut rng = rand::thread_rng();
barrier.wait();
for _ in 0..PER_THREAD {
let deadline1 = Instant::now() + Duration::from_millis(
rng.gen_range(MIN_DELAY, MAX_DELAY));
let deadline2 = Instant::now() + Duration::from_millis(
rng.gen_range(MIN_DELAY, MAX_DELAY));
let deadline = cmp::min(deadline1, deadline2);
let sleep = handle.sleep(deadline1);
let join = handle.deadline(sleep, deadline2);
exec.push({
join
.and_then(move |_| {
let now = Instant::now();
assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
Ok(())
})
});
}
// Run the logic
exec
.or_else(|e| {
assert!(e.is_elapsed());
Ok::<_, ()>(())
})
.for_each(|_| Ok(()))
.wait()
.unwrap();
if 1 == done.rem.fetch_sub(1, SeqCst) {
done.unpark.unpark();
}
});
}
while done.rem.load(SeqCst) > 0 {
timer.turn(None).unwrap();
}
}
}
#[test]
fn hammer_reset() {
const ITERS: usize = 5;
const THREADS: usize = 4;
const PER_THREAD: usize = 40;
const MIN_DELAY: u64 = 1;
const MAX_DELAY: u64 = 250;
for _ in 0..ITERS {
let mut timer = Timer::default();
let handle = timer.handle();
let barrier = Arc::new(Barrier::new(THREADS));
let done = Arc::new(Signal {
rem: AtomicUsize::new(THREADS),
unpark: timer.get_park().unpark(),
});
for _ in 0..THREADS {
let handle = handle.clone();
let barrier = barrier.clone();
let done = done.clone();
thread::spawn(move || {
let mut exec = FuturesUnordered::new();
let mut rng = rand::thread_rng();
barrier.wait();
for _ in 0..PER_THREAD {
let deadline1 = Instant::now() + Duration::from_millis(
rng.gen_range(MIN_DELAY, MAX_DELAY));
let deadline2 = deadline1 + Duration::from_millis(
rng.gen_range(MIN_DELAY, MAX_DELAY));
let deadline3 = deadline2 + Duration::from_millis(
rng.gen_range(MIN_DELAY, MAX_DELAY));
exec.push({
handle.sleep(deadline1)
// Select over a second sleep
.select2(handle.sleep(deadline2))
.map_err(|e| panic!("boom; err={:?}", e))
.and_then(move |res| {
use futures::future::Either::*;
let now = Instant::now();
assert!(now >= deadline1, "deadline greater by {:?}", deadline1 - now);
let mut other = match res {
A((_, other)) => other,
B((_, other)) => other,
};
other.reset(deadline3);
other
})
.and_then(move |_| {
let now = Instant::now();
assert!(now >= deadline3, "deadline greater by {:?}", deadline3 - now);
Ok(())
})
});
}
// Run the logic
exec
.for_each(|_| Ok(()))
.wait()
.unwrap();
if 1 == done.rem.fetch_sub(1, SeqCst) {
done.unpark.unpark();
}
});
}
while done.rem.load(SeqCst) > 0 {
timer.turn(None).unwrap();
}
}
}

View File

@ -0,0 +1,46 @@
extern crate futures;
extern crate tokio_executor;
extern crate tokio_timer;
#[macro_use]
mod support;
use support::*;
use tokio_timer::*;
use futures::{Stream};
#[test]
#[should_panic]
fn interval_zero_duration() {
mocked(|_, time| {
let _ = Interval::new(time.now(), ms(0));
});
}
#[test]
fn usage() {
mocked(|timer, time| {
let start = time.now();
let mut int = Interval::new(start, ms(300));
assert_ready!(int, Some(start));
assert_not_ready!(int);
advance(timer, ms(100));
assert_not_ready!(int);
advance(timer, ms(200));
assert_ready!(int, Some(start + ms(300)));
assert_not_ready!(int);
advance(timer, ms(400));
assert_ready!(int, Some(start + ms(600)));
assert_not_ready!(int);
advance(timer, ms(500));
assert_ready!(int, Some(start + ms(900)));
assert_ready!(int, Some(start + ms(1200)));
assert_not_ready!(int);
});
}

488
tokio-timer/tests/sleep.rs Normal file
View File

@ -0,0 +1,488 @@
extern crate futures;
extern crate tokio_executor;
extern crate tokio_timer;
#[macro_use]
mod support;
use support::*;
use tokio_timer::*;
use futures::Future;
use std::time::{Duration, Instant};
#[test]
fn immediate_sleep() {
mocked(|timer, time| {
// Create `Sleep` that elapsed immediately.
let mut sleep = Sleep::new(time.now());
// Ready!
assert_ready!(sleep);
// Turn the timer, it runs for the elapsed time
turn(timer, ms(1000));
// The time has not advanced. The `turn` completed immediately.
assert_eq!(time.advanced(), ms(1000));
});
}
#[test]
fn delayed_sleep_level_0() {
for &i in &[1, 10, 60] {
mocked(|timer, time| {
// Create a `Sleep` that elapses in the future
let mut sleep = Sleep::new(time.now() + ms(i));
// The sleep has not elapsed.
assert_not_ready!(sleep);
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(i));
assert_ready!(sleep);
});
}
}
#[test]
fn sub_ms_delayed_sleep() {
mocked(|timer, time| {
for _ in 0..5 {
let deadline = time.now()
+ Duration::from_millis(1)
+ Duration::new(0, 1);
let mut sleep = Sleep::new(deadline);
assert_not_ready!(sleep);
turn(timer, None);
assert_ready!(sleep);
assert!(time.now() >= deadline);
time.advance(Duration::new(0, 1));
}
});
}
#[test]
fn delayed_sleep_wrapping_level_0() {
mocked(|timer, time| {
turn(timer, ms(5));
assert_eq!(time.advanced(), ms(5));
let mut sleep = Sleep::new(time.now() + ms(60));
assert_not_ready!(sleep);
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
assert_not_ready!(sleep);
turn(timer, None);
assert_eq!(time.advanced(), ms(65));
assert_ready!(sleep);
});
}
#[test]
fn timer_wrapping_with_higher_levels() {
mocked(|timer, time| {
// Set sleep to hit level 1
let mut s1 = Sleep::new(time.now() + ms(64));
assert_not_ready!(s1);
// Turn a bit
turn(timer, ms(5));
// Set timeout such that it will hit level 0, but wrap
let mut s2 = Sleep::new(time.now() + ms(60));
assert_not_ready!(s2);
// This should result in s1 firing
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
assert_ready!(s1);
assert_not_ready!(s2);
turn(timer, None);
assert_eq!(time.advanced(), ms(65));
assert_ready!(s2);
});
}
#[test]
fn sleep_with_deadline_in_past() {
mocked(|timer, time| {
// Create `Sleep` that elapsed immediately.
let mut sleep = Sleep::new(time.now() - ms(100));
// Even though the sleep expires in the past, it is not ready yet
// because the timer must observe it.
assert_ready!(sleep);
// Turn the timer, it runs for the elapsed time
turn(timer, ms(1000));
// The time has not advanced. The `turn` completed immediately.
assert_eq!(time.advanced(), ms(1000));
});
}
#[test]
fn delayed_sleep_level_1() {
mocked(|timer, time| {
// Create a `Sleep` that elapses in the future
let mut sleep = Sleep::new(time.now() + ms(234));
// The sleep has not elapsed.
assert_not_ready!(sleep);
// Turn the timer, this will wake up to cascade the timer down.
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(192));
// The sleep has not elapsed.
assert_not_ready!(sleep);
// Turn the timer again
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(234));
// The sleep has elapsed.
assert_ready!(sleep);
});
mocked(|timer, time| {
// Create a `Sleep` that elapses in the future
let mut sleep = Sleep::new(time.now() + ms(234));
// The sleep has not elapsed.
assert_not_ready!(sleep);
// Turn the timer with a smaller timeout than the cascade.
turn(timer, ms(100));
assert_eq!(time.advanced(), ms(100));
assert_not_ready!(sleep);
// Turn the timer, this will wake up to cascade the timer down.
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(192));
// The sleep has not elapsed.
assert_not_ready!(sleep);
// Turn the timer again
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(234));
// The sleep has elapsed.
assert_ready!(sleep);
});
}
#[test]
fn creating_sleep_outside_of_context() {
let now = Instant::now();
// This creates a sleep outside of the context of a mock timer. This tests
// that it will still expire.
let mut sleep = Sleep::new(now + ms(500));
mocked_with_now(now, |timer, time| {
// This registers the sleep with the timer
assert_not_ready!(sleep);
// Wait some time... the timer is cascading
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(448));
assert_not_ready!(sleep);
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(500));
// The sleep has elapsed
assert_ready!(sleep);
});
}
#[test]
fn concurrently_set_two_timers_second_one_shorter() {
mocked(|timer, time| {
let mut sleep1 = Sleep::new(time.now() + ms(500));
let mut sleep2 = Sleep::new(time.now() + ms(200));
// The sleep has not elapsed
assert_not_ready!(sleep1);
assert_not_ready!(sleep2);
// Sleep until a cascade
turn(timer, None);
assert_eq!(time.advanced(), ms(192));
// Sleep until the second timer.
turn(timer, None);
assert_eq!(time.advanced(), ms(200));
// The shorter sleep fires
assert_ready!(sleep2);
assert_not_ready!(sleep1);
turn(timer, None);
assert_eq!(time.advanced(), ms(448));
assert_not_ready!(sleep1);
// Turn again, this time the time will advance to the second sleep
turn(timer, None);
assert_eq!(time.advanced(), ms(500));
assert_ready!(sleep1);
})
}
#[test]
fn short_sleep() {
mocked(|timer, time| {
// Create a `Sleep` that elapses in the future
let mut sleep = Sleep::new(time.now() + ms(1));
// The sleep has not elapsed.
assert_not_ready!(sleep);
// Turn the timer, but not enough timee will go by.
turn(timer, None);
// The sleep has elapsed.
assert_ready!(sleep);
// The time has advanced to the point of the sleep elapsing.
assert_eq!(time.advanced(), ms(1));
})
}
#[test]
fn sorta_long_sleep() {
const MIN_5: u64 = 5 * 60 * 1000;
mocked(|timer, time| {
// Create a `Sleep` that elapses in the future
let mut sleep = Sleep::new(time.now() + ms(MIN_5));
// The sleep has not elapsed.
assert_not_ready!(sleep);
let cascades = &[
262_144,
262_144 + 9 * 4096,
262_144 + 9 * 4096 + 15 * 64,
];
for &elapsed in cascades {
turn(timer, None);
assert_eq!(time.advanced(), ms(elapsed));
assert_not_ready!(sleep);
}
turn(timer, None);
assert_eq!(time.advanced(), ms(MIN_5));
// The sleep has elapsed.
assert_ready!(sleep);
})
}
#[test]
fn very_long_sleep() {
const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
mocked(|timer, time| {
// Create a `Sleep` that elapses in the future
let mut sleep = Sleep::new(time.now() + ms(MO_5));
// The sleep has not elapsed.
assert_not_ready!(sleep);
let cascades = &[
12_884_901_888,
12_952_010_752,
12_959_875_072,
12_959_997_952,
];
for &elapsed in cascades {
turn(timer, None);
assert_eq!(time.advanced(), ms(elapsed));
assert_not_ready!(sleep);
}
// Turn the timer, but not enough time will go by.
turn(timer, None);
// The time has advanced to the point of the sleep elapsing.
assert_eq!(time.advanced(), ms(MO_5));
// The sleep has elapsed.
assert_ready!(sleep);
})
}
#[test]
fn greater_than_max() {
const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
mocked(|timer, time| {
// Create a `Sleep` that elapses in the future
let mut sleep = Sleep::new(time.now() + ms(YR_5));
assert_not_ready!(sleep);
turn(timer, ms(0));
assert!(sleep.poll().is_err());
})
}
#[test]
fn unpark_is_delayed() {
mocked(|timer, time| {
let mut sleep1 = Sleep::new(time.now() + ms(100));
let mut sleep2 = Sleep::new(time.now() + ms(101));
let mut sleep3 = Sleep::new(time.now() + ms(200));
assert_not_ready!(sleep1);
assert_not_ready!(sleep2);
assert_not_ready!(sleep3);
time.park_for(ms(500));
turn(timer, None);
assert_eq!(time.advanced(), ms(500));
assert_ready!(sleep1);
assert_ready!(sleep2);
assert_ready!(sleep3);
})
}
#[test]
fn set_timeout_at_deadline_greater_than_max_timer() {
const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
const YR_5: u64 = 5 * YR_1;
mocked(|timer, time| {
for _ in 0..5 {
turn(timer, ms(YR_1));
}
let mut sleep = Sleep::new(time.now() + ms(1));
assert_not_ready!(sleep);
turn(timer, ms(1000));
assert_eq!(time.advanced(), Duration::from_millis(YR_5) + ms(1));
assert_ready!(sleep);
});
}
#[test]
fn reset_future_sleep_before_fire() {
mocked(|timer, time| {
let mut sleep = Sleep::new(time.now() + ms(100));
assert_not_ready!(sleep);
sleep.reset(time.now() + ms(200));
turn(timer, None);
assert_eq!(time.advanced(), ms(192));
assert_not_ready!(sleep);
turn(timer, None);
assert_eq!(time.advanced(), ms(200));
assert_ready!(sleep);
});
}
#[test]
fn reset_past_sleep_before_turn() {
mocked(|timer, time| {
let mut sleep = Sleep::new(time.now() + ms(100));
assert_not_ready!(sleep);
sleep.reset(time.now() + ms(80));
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
assert_not_ready!(sleep);
turn(timer, None);
assert_eq!(time.advanced(), ms(80));
assert_ready!(sleep);
});
}
#[test]
fn reset_past_sleep_before_fire() {
mocked(|timer, time| {
let mut sleep = Sleep::new(time.now() + ms(100));
assert_not_ready!(sleep);
turn(timer, ms(10));
assert_not_ready!(sleep);
sleep.reset(time.now() + ms(80));
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
assert_not_ready!(sleep);
turn(timer, None);
assert_eq!(time.advanced(), ms(90));
assert_ready!(sleep);
});
}
#[test]
fn reset_future_sleep_after_fire() {
mocked(|timer, time| {
let mut sleep = Sleep::new(time.now() + ms(100));
assert_not_ready!(sleep);
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(64));
turn(timer, None);
assert_eq!(time.advanced(), ms(100));
assert_ready!(sleep);
sleep.reset(time.now() + ms(10));
assert_not_ready!(sleep);
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(110));
assert_ready!(sleep);
});
}

View File

@ -0,0 +1,234 @@
#![allow(unused_macros, unused_imports, dead_code)]
use tokio_executor::park::{Park, Unpark};
use tokio_timer::timer::{Timer, Now};
use futures::future::{lazy, Future};
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::{Instant, Duration};
macro_rules! assert_ready {
($f:expr) => {
assert!($f.poll().unwrap().is_ready());
};
($f:expr, $expect:expr) => {
assert_eq!($f.poll().unwrap(), ::futures::Async::Ready($expect));
};
}
macro_rules! assert_not_ready {
($f:expr) => {
assert!(!$f.poll().unwrap().is_ready());
}
}
macro_rules! assert_elapsed {
($f:expr) => {
assert!($f.poll().unwrap_err().is_elapsed());
}
}
#[derive(Debug)]
pub struct MockTime {
inner: Inner,
_p: PhantomData<Rc<()>>,
}
#[derive(Debug)]
pub struct MockNow {
inner: Inner,
_p: PhantomData<Rc<()>>,
}
#[derive(Debug)]
pub struct MockPark {
inner: Inner,
_p: PhantomData<Rc<()>>,
}
#[derive(Debug)]
pub struct MockUnpark {
inner: Inner,
}
type Inner = Arc<Mutex<State>>;
#[derive(Debug)]
struct State {
base: Instant,
advance: Duration,
unparked: bool,
park_for: Option<Duration>,
}
pub fn ms(num: u64) -> Duration {
Duration::from_millis(num)
}
pub trait IntoTimeout {
fn into_timeout(self) -> Option<Duration>;
}
impl IntoTimeout for Option<Duration> {
fn into_timeout(self) -> Self {
self
}
}
impl IntoTimeout for Duration {
fn into_timeout(self) -> Option<Duration> {
Some(self)
}
}
/// Turn the timer state once
pub fn turn<T: IntoTimeout>(timer: &mut Timer<MockPark, MockNow>, duration: T) {
timer.turn(duration.into_timeout()).unwrap();
}
/// Advance the timer the specified amount
pub fn advance(timer: &mut Timer<MockPark, MockNow>, duration: Duration) {
let inner = timer.get_park().inner.clone();
let deadline = inner.lock().unwrap().now() + duration;
while inner.lock().unwrap().now() < deadline {
let dur = deadline - inner.lock().unwrap().now();
turn(timer, dur);
}
}
pub fn mocked<F, R>(f: F) -> R
where F: FnOnce(&mut Timer<MockPark, MockNow>, &mut MockTime) -> R
{
mocked_with_now(Instant::now(), f)
}
pub fn mocked_with_now<F, R>(now: Instant, f: F) -> R
where F: FnOnce(&mut Timer<MockPark, MockNow>, &mut MockTime) -> R
{
let mut time = MockTime::new(now);
let park = time.mock_park();
let now = time.mock_now();
let mut timer = Timer::new_with_now(park, now);
let handle = timer.handle();
let mut enter = ::tokio_executor::enter().unwrap();
::tokio_timer::with_default(&handle, &mut enter, |_| {
lazy(|| {
Ok::<_, ()>(f(&mut timer, &mut time))
}).wait().unwrap()
})
}
impl MockTime {
pub fn new(now: Instant) -> MockTime {
let state = State {
base: now,
advance: Duration::default(),
unparked: false,
park_for: None,
};
MockTime {
inner: Arc::new(Mutex::new(state)),
_p: PhantomData,
}
}
pub fn mock_now(&self) -> MockNow {
let inner = self.inner.clone();
MockNow {
inner,
_p: PhantomData,
}
}
pub fn mock_park(&self) -> MockPark {
let inner = self.inner.clone();
MockPark {
inner,
_p: PhantomData,
}
}
pub fn now(&self) -> Instant {
self.inner.lock().unwrap().now()
}
/// Returns the total amount of time the time has been advanced.
pub fn advanced(&self) -> Duration {
self.inner.lock().unwrap().advance
}
pub fn advance(&self, duration: Duration) {
let mut inner = self.inner.lock().unwrap();
inner.advance(duration);
}
/// The next call to park_timeout will be for this duration, regardless of
/// the timeout passed to `park_timeout`.
pub fn park_for(&self, duration: Duration) {
self.inner.lock().unwrap().park_for = Some(duration);
}
}
impl Park for MockPark {
type Unpark = MockUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
let inner = self.inner.clone();
MockUnpark { inner }
}
fn park(&mut self) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().map_err(|_| ())?;
let duration = inner.park_for.take()
.expect("call park_for first");
inner.advance(duration);
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap();
if let Some(duration) = inner.park_for.take() {
inner.advance(duration);
} else {
inner.advance(duration);
}
Ok(())
}
}
impl Unpark for MockUnpark {
fn unpark(&self) {
if let Ok(mut inner) = self.inner.lock() {
inner.unparked = true;
}
}
}
impl Now for MockNow {
fn now(&mut self) -> Instant {
self.inner.lock().unwrap().now()
}
}
impl State {
fn now(&self) -> Instant {
self.base + self.advance
}
fn advance(&mut self, duration: Duration) {
self.advance += duration;
}
}