timer: finish updating timer (#1222)

* timer: restructure feature flags
* update timer tests
* Add `async-traits` to CI

This also disables a buggy `threadpool` test. This test should be fixed in the future.

Refs #1225
This commit is contained in:
Carl Lerche 2019-06-30 08:48:53 -07:00 committed by GitHub
parent 8e7d8af588
commit b2c777846e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 786 additions and 755 deletions

View File

@ -19,7 +19,8 @@ jobs:
displayName: Test tokio
cross: true
crates:
- tokio
tokio:
- default
# Test crates that are platform specific
- template: ci/azure-test-stable.yml
@ -30,11 +31,14 @@ jobs:
rust: $(nightly)
crates:
# - tokio-fs
- tokio-reactor
tokio-reactor:
- default
# - tokio-signal
- tokio-tcp
tokio-tcp:
- default
# - tokio-tls
- tokio-udp
tokio-udp:
- default
# - tokio-uds
# Test crates that are NOT platform specific
@ -45,15 +49,24 @@ jobs:
rust: $(nightly)
crates:
# - tokio-buf
- tokio-codec
- tokio-current-thread
- tokio-executor
- tokio-io
- tokio-sync
- tokio-macros
tokio-codec:
- default
tokio-current-thread:
- default
tokio-executor:
- default
tokio-io:
- default
tokio-sync:
- default
tokio-macros:
- default
# - tokio-threadpool
- tokio-timer
- tokio-test
tokio-timer:
- default
- async-traits
tokio-test:
- default
# - template: ci/azure-cargo-check.yml
# parameters:

View File

@ -34,9 +34,12 @@ jobs:
- template: azure-patch-crates.yml
- ${{ each crate in parameters.crates }}:
- script: cargo test --lib && cargo test --tests && cargo test --examples
env:
LOOM_MAX_DURATION: 10
CI: 'True'
displayName: cargo test -p ${{ crate }} (PATCHED)
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
- ${{ each feature in crate.value }}:
- script: cargo test --tests --no-default-features --features ${{ feature }}
env:
LOOM_MAX_DURATION: 10
CI: 'True'
displayName: cargo test --tests --features ${{ feature }}
- script: cargo test --examples --no-default-features --features ${{ feature }}
displayName: cargo test --examples --features ${{ feature }}

View File

@ -51,6 +51,7 @@ fn readiness() {
}
#[test]
#[ignore]
fn lock() {
let mut lock = Lock::new(false);

View File

@ -24,5 +24,5 @@ publish = false
[dependencies]
assertive = { git = "http://github.com/carllerche/assertive" }
pin-convert = "0.1.0"
# tokio-timer = { version = "0.3.0", path = "../tokio-timer" }
tokio-timer = { version = "0.3.0", path = "../tokio-timer" }
tokio-executor = { version = "0.2.0", path = "../tokio-executor" }

View File

@ -20,14 +20,14 @@
//! });
//! ```
use futures::{future::lazy, Future};
use tokio_executor::park::{Park, Unpark};
use tokio_timer::clock::{Clock, Now};
use tokio_timer::Timer;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio_executor::park::{Park, Unpark};
use tokio_timer::clock::{Clock, Now};
use tokio_timer::Timer;
/// Run the provided closure with a `MockClock` that starts at the current time.
pub fn mock<F, R>(f: F) -> R
@ -123,17 +123,16 @@ impl MockClock {
where
F: FnOnce(&mut Handle) -> R,
{
let mut enter = ::tokio_executor::enter().unwrap();
::tokio_timer::clock::with_default(&self.clock, &mut enter, |enter| {
::tokio_timer::clock::with_default(&self.clock, || {
let park = self.time.mock_park();
let timer = Timer::new(park);
let handle = timer.handle();
let time = self.time.clone();
::tokio_timer::with_default(&handle, enter, |_| {
::tokio_timer::with_default(&handle, || {
let mut handle = Handle::new(timer, time);
lazy(|| Ok::<_, ()>(f(&mut handle))).wait().unwrap()
f(&mut handle)
// lazy(|| Ok::<_, ()>(f(&mut handle))).wait().unwrap()
})
})
}
@ -145,8 +144,13 @@ impl Handle {
}
/// Turn the internal timer and mock park for the provided duration.
pub fn turn(&mut self, duration: Option<Duration>) {
self.timer.turn(duration).unwrap();
pub fn turn(&mut self) {
self.timer.turn(None).unwrap();
}
/// Turn the internal timer and mock park for the provided duration.
pub fn turn_for(&mut self, duration: Duration) {
self.timer.turn(Some(duration)).unwrap();
}
/// Advance the `MockClock` by the provided duration.
@ -156,14 +160,26 @@ impl Handle {
while inner.lock().unwrap().now() < deadline {
let dur = deadline - inner.lock().unwrap().now();
self.turn(Some(dur));
self.turn_for(dur);
}
}
/// Returns the total amount of time the time has been advanced.
pub fn advanced(&self) -> Duration {
self.time.inner.lock().unwrap().advance
}
/// Get the currently mocked time
pub fn now(&mut self) -> Instant {
self.time.now()
}
/// Turn the internal timer once, but force "parking" for `duration` regardless of any pending
/// timeouts
pub fn park_for(&mut self, duration: Duration) {
self.time.inner.lock().unwrap().park_for = Some(duration);
self.turn()
}
}
impl MockTime {

View File

@ -20,7 +20,7 @@
//! assert_ready!(fut.poll());
//! ```
// pub mod clock;
pub mod clock;
mod macros;
pub mod task;

View File

@ -74,41 +74,16 @@ macro_rules! assert_pending {
}};
}
/*
/// Assert if a poll is ready and check for equality on the value
#[macro_export]
macro_rules! assert_ready_eq {
($e:expr, $expect:expr) => {
use $crate::codegen::futures::Async::Ready;
match $e {
Ok(e) => assert_eq!(e, Ready($expect)),
Err(e) => panic!("error = {:?}", e),
}
let val = $crate::assert_ready!($e);
assert_eq!(val, $expect)
};
($e:expr, $expect:expr, $($msg:tt),+) => {
use $crate::codegen::futures::Async::Ready;
match $e {
Ok(e) => assert_eq!(e, Ready($expect), $($msg)+),
Err(e) => {
let msg = format_args!($($msg),+);
panic!("error = {:?}; {}", e, msg)
}
}
let val = $crate::assert_ready!($e);
assert_eq!(val, $expect, $($msg),*)
};
}
*/
/*
/// Assert if the deadline has passed
#[macro_export]
macro_rules! assert_elapsed {
($e:expr) => {
assert!($e.unwrap_err().is_elapsed());
};
($e:expr, $($msg:expr),+) => {
assert!($e.unwrap_err().is_elapsed(), $msg);
};
}
*/

View File

@ -1,3 +1,5 @@
#![cfg(features = "broken")]
extern crate env_logger;
extern crate futures;
extern crate tokio_threadpool;

View File

@ -1,3 +1,5 @@
#![cfg(features = "broken")]
extern crate env_logger;
extern crate futures;
extern crate tokio_threadpool;

View File

@ -22,32 +22,22 @@ Timer facilities for Tokio
publish = false
[features]
# individual `Stream` impls if you so desire
delay-queue = ["futures-core-preview"]
interval = ["futures-core-preview"]
timeout-stream = ["futures-core-preview"]
throttle = ["futures-core-preview"]
# easily enable all `Stream` impls
streams = [
"delay-queue",
"interval",
"timeout-stream",
"throttle",
]
async-traits = ["futures-core-preview"]
[dependencies]
tokio-executor = { version = "0.2.0", path = "../tokio-executor" }
tokio-sync = { version = "0.2.0", path = "../tokio-sync" }
crossbeam-utils = "0.6.0"
async-util = { git = "https://github.com/tokio-rs/async" }
crossbeam-utils = "0.6.0"
# Backs `DelayQueue`
slab = "0.4.1"
# optionals
futures-core-preview = { version = "0.3.0-alpha.16", optional = true }
[dev-dependencies]
rand = "0.6"
tokio-mock-task = "0.1.0"
tokio = { version = "0.2.0", path = "../tokio" }
tokio-current-thread = { version = "0.2.0", path = "../tokio-current-thread" }
tokio-sync = { version = "0.2.0", path = "../tokio-sync", features = ["async-traits"] }
tokio-test = { version = "0.2.0", path = "../tokio-test" }

View File

@ -4,7 +4,6 @@ use std::cell::Cell;
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use tokio_executor::Enter;
/// A handle to a source of time.
///
@ -108,9 +107,9 @@ impl fmt::Debug for Clock {
/// # Panics
///
/// This function panics if there already is a default clock set.
pub fn with_default<F, R>(clock: &Clock, enter: &mut Enter, f: F) -> R
pub fn with_default<F, R>(clock: &Clock, f: F) -> R
where
F: FnOnce(&mut Enter) -> R,
F: FnOnce() -> R,
{
CLOCK.with(|cell| {
assert!(
@ -132,6 +131,6 @@ where
cell.set(Some(clock as *const Clock));
f(enter)
f()
})
}

View File

@ -74,7 +74,7 @@ impl Delay {
}
// Used by `Timeout<Stream>`
#[cfg(feature = "timeout-stream")]
#[cfg(feature = "async-traits")]
pub(crate) fn reset_timeout(&mut self) {
self.registration.reset_timeout();
}

View File

@ -8,7 +8,7 @@ use crate::clock::now;
use crate::timer::Handle;
use crate::wheel::{self, Wheel};
use crate::{Delay, Error};
use futures_core::Stream;
use slab::Slab;
use std::cmp;
use std::future::Future;
@ -347,6 +347,34 @@ impl<T> DelayQueue<T> {
Key::new(key)
}
/// TODO: Dox... also is the fn signature correct?
pub fn poll_next(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Expired<T>, Error>>> {
let item = ready!(self.poll_idx(cx));
Poll::Ready(item.map(|result| {
result.map(|idx| {
let data = self.slab.remove(idx);
debug_assert!(data.next.is_none());
debug_assert!(data.prev.is_none());
Expired {
key: Key::new(idx),
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
})
}))
}
/// TODO: Dox... also is the fn signature correct?
pub async fn next(&mut self) -> Option<Result<Expired<T>, Error>> {
use async_util::future::poll_fn;
poll_fn(|cx| self.poll_next(cx)).await
}
/// Insert `value` into the queue set to expire after the requested duration
/// elapses.
///
@ -696,26 +724,14 @@ impl<T> DelayQueue<T> {
// We never put `T` in a `Pin`...
impl<T> Unpin for DelayQueue<T> {}
impl<T> Stream for DelayQueue<T> {
#[cfg(feature = "async-traits")]
impl<T> futures_core::Stream for DelayQueue<T> {
// DelayQueue seems much more specific, where a user may care that it
// has reached capacity, so return those errors instead of panicking.
type Item = Result<Expired<T>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.poll_idx(cx));
Poll::Ready(item.map(|result| {
result.map(|idx| {
let data = self.slab.remove(idx);
debug_assert!(data.next.is_none());
debug_assert!(data.prev.is_none());
Expired {
key: Key::new(idx),
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
})
}))
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
DelayQueue::poll_next(self.get_mut(), cx)
}
}

View File

@ -1,6 +1,6 @@
use crate::clock;
use crate::Delay;
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
@ -52,12 +52,9 @@ impl Interval {
pub(crate) fn new_with_delay(delay: Delay, duration: Duration) -> Interval {
Interval { delay, duration }
}
}
impl Stream for Interval {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
/// TODO: dox
pub fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Instant>> {
// Wait for the delay to be done
ready!(Pin::new(&mut self.delay).poll(cx));
@ -72,4 +69,20 @@ impl Stream for Interval {
// Return the current instant
Poll::Ready(Some(now))
}
/// TODO: dox
pub async fn next(&mut self) -> Option<Instant> {
use async_util::future::poll_fn;
poll_fn(|cx| self.poll_next(cx)).await
}
}
#[cfg(feature = "async-traits")]
impl futures_core::Stream for Interval {
type Item = Instant;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
Interval::poll_next(self.get_mut(), cx)
}
}

View File

@ -2,6 +2,7 @@
#![deny(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![cfg_attr(test, deny(warnings))]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
#![feature(async_await)]
//! Utilities for tracking time.
//!
@ -41,9 +42,8 @@ macro_rules! ready {
}
pub mod clock;
#[cfg(feature = "delay-queue")]
pub mod delay_queue;
#[cfg(feature = "throttle")]
#[cfg(feature = "async-traits")]
pub mod throttle;
pub mod timeout;
pub mod timer;
@ -51,16 +51,13 @@ pub mod timer;
mod atomic;
mod delay;
mod error;
#[cfg(feature = "interval")]
mod interval;
mod wheel;
pub use delay::Delay;
#[cfg(feature = "delay-queue")]
#[doc(inline)]
pub use delay_queue::DelayQueue;
pub use error::Error;
#[cfg(feature = "interval")]
pub use interval::Interval;
#[doc(inline)]
pub use timeout::Timeout;

View File

@ -25,7 +25,7 @@ impl<T> Throttle<T> {
pub fn new(stream: T, duration: Duration) -> Self {
Self {
delay: Delay::new_timeout(clock::now() + duration, duration),
has_delayed: false,
has_delayed: true,
stream: stream,
}
}

View File

@ -6,8 +6,6 @@
use crate::clock::now;
use crate::Delay;
#[cfg(feature = "timeout-stream")]
use futures_core::Stream;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
@ -175,10 +173,10 @@ where
}
}
#[cfg(feature = "timeout-stream")]
impl<T> Stream for Timeout<T>
#[cfg(feature = "async-traits")]
impl<T> futures_core::Stream for Timeout<T>
where
T: Stream,
T: futures_core::Stream,
{
type Item = Result<T::Item, Elapsed>;
@ -202,8 +200,10 @@ where
}
// Now check the timer
ready!(self.map_unchecked_mut(|me| &mut me.delay).poll(cx));
ready!(self.as_mut().map_unchecked_mut(|me| &mut me.delay).poll(cx));
// if delay was ready, timeout elapsed!
self.as_mut().get_unchecked_mut().delay.reset_timeout();
Poll::Ready(Some(Err(Elapsed(()))))
}
}

View File

@ -4,7 +4,6 @@ use std::cell::RefCell;
use std::fmt;
use std::sync::{Arc, Weak};
use std::time::Instant;
use tokio_executor::Enter;
/// Handle to timer instance.
///
@ -58,9 +57,9 @@ thread_local! {
///
/// [`Delay`]: ../struct.Delay.html
/// [`Delay::new`]: ../struct.Delay.html#method.new
pub fn with_default<F, R>(handle: &Handle, enter: &mut Enter, f: F) -> R
pub fn with_default<F, R>(handle: &Handle, f: F) -> R
where
F: FnOnce(&mut Enter) -> R,
F: FnOnce() -> R,
{
// Ensure that the timer is removed from the thread-local context
// when leaving the scope. This handles cases that involve panicking.
@ -96,7 +95,7 @@ where
*current = Some(handle.clone());
}
f(enter)
f()
})
}

View File

@ -43,7 +43,7 @@ impl Registration {
}
// Used by `Timeout<Stream>`
#[cfg(feature = "timeout-stream")]
#[cfg(feature = "async-traits")]
pub fn reset_timeout(&mut self) {
let deadline = crate::clock::now() + self.entry.time_ref().duration;
self.entry.time_mut().deadline = deadline;

View File

@ -1,8 +1,6 @@
#![deny(warnings, rust_2018_idioms)]
#![cfg(feature = "broken")]
use std::time::Instant;
use tokio_executor;
use tokio_timer::clock;
use tokio_timer::clock::*;
@ -40,9 +38,7 @@ fn execution_context() {
let now = ConstNow(Instant::now());
let clock = Clock::new_with_now(now);
let mut enter = tokio_executor::enter().unwrap();
with_default(&clock, &mut enter, |_| {
with_default(&clock, || {
let a = Instant::now();
let b = clock::now();

View File

@ -1,103 +0,0 @@
#![deny(warnings, rust_2018_idioms)]
#![allow(deprecated)]
#![cfg(feature = "broken")]
mod support;
use crate::support::*;
use futures::sync::oneshot;
use futures::{future, Future};
use tokio_timer::*;
#[test]
fn simultaneous_deadline_future_completion() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now());
// Ready!
assert_ready!(fut);
});
}
#[test]
fn completed_future_past_deadline() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now() - ms(1000));
// Ready!
assert_ready!(fut);
});
}
#[test]
fn future_and_deadline_in_future() {
mocked(|timer, time| {
// Not yet complete
let (tx, rx) = oneshot::channel();
// Wrap it with a deadline
let mut fut = Deadline::new(rx, time.now() + ms(100));
// Ready!
assert_not_ready!(fut);
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
assert_not_ready!(fut);
// Complete the future
tx.send(()).unwrap();
assert_ready!(fut);
});
}
#[test]
fn deadline_now_elapses() {
mocked(|_, time| {
let fut = future::empty::<(), ()>();
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now());
assert_elapsed!(fut);
});
}
#[test]
fn deadline_future_elapses() {
mocked(|timer, time| {
let fut = future::empty::<(), ()>();
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now() + ms(300));
assert_not_ready!(fut);
advance(timer, ms(300));
assert_elapsed!(fut);
});
}
#[test]
fn future_errors_first() {
mocked(|_, time| {
let fut = future::err::<(), ()>(());
// Wrap it with a deadline
let mut fut = Deadline::new(fut, time.now() + ms(100));
// Ready!
assert!(fut.poll().unwrap_err().is_inner());
});
}

View File

@ -1,186 +1,199 @@
#![deny(warnings, rust_2018_idioms)]
#![cfg(feature = "broken")]
#![feature(async_await)]
mod support;
use crate::support::*;
use futures::Future;
use std::time::{Duration, Instant};
use tokio_test::task::MockTask;
use tokio_test::{assert_pending, assert_ready, clock};
use tokio_timer::timer::Handle;
use tokio_timer::*;
use tokio_timer::Delay;
use std::time::{Duration, Instant};
#[test]
fn immediate_delay() {
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create `Delay` that elapsed immediately.
let mut delay = Delay::new(time.now());
let mut delay = Delay::new(clock.now());
// Ready!
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
// Turn the timer, it runs for the elapsed time
turn(timer, ms(1000));
clock.turn_for(ms(1000));
// The time has not advanced. The `turn` completed immediately.
assert_eq!(time.advanced(), ms(1000));
assert_eq!(clock.advanced(), ms(1000));
});
}
#[test]
fn delayed_delay_level_0() {
let mut task = MockTask::new();
for &i in &[1, 10, 60] {
mocked(|timer, time| {
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut delay = Delay::new(time.now() + ms(i));
let mut delay = Delay::new(clock.now() + ms(i));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(i));
clock.turn();
assert_eq!(clock.advanced(), ms(i));
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
});
}
}
#[test]
fn sub_ms_delayed_delay() {
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
for _ in 0..5 {
let deadline = time.now() + Duration::from_millis(1) + Duration::new(0, 1);
let deadline = clock.now() + Duration::from_millis(1) + Duration::new(0, 1);
let mut delay = Delay::new(deadline);
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
turn(timer, None);
assert_ready!(delay);
clock.turn();
assert_ready!(task.poll(&mut delay));
assert!(time.now() >= deadline);
assert!(clock.now() >= deadline);
time.advance(Duration::new(0, 1));
clock.advance(Duration::new(0, 1));
}
});
}
#[test]
fn delayed_delay_wrapping_level_0() {
mocked(|timer, time| {
turn(timer, ms(5));
assert_eq!(time.advanced(), ms(5));
let mut task = MockTask::new();
let mut delay = Delay::new(time.now() + ms(60));
clock::mock(|clock| {
clock.turn_for(ms(5));
assert_eq!(clock.advanced(), ms(5));
assert_not_ready!(delay);
let mut delay = Delay::new(clock.now() + ms(60));
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
turn(timer, None);
assert_eq!(time.advanced(), ms(65));
clock.turn();
assert_eq!(clock.advanced(), ms(64));
assert_pending!(task.poll(&mut delay));
assert_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(65));
assert_ready!(task.poll(&mut delay));
});
}
#[test]
fn timer_wrapping_with_higher_levels() {
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Set delay to hit level 1
let mut s1 = Delay::new(time.now() + ms(64));
assert_not_ready!(s1);
let mut s1 = Delay::new(clock.now() + ms(64));
assert_pending!(task.poll(&mut s1));
// Turn a bit
turn(timer, ms(5));
clock.turn_for(ms(5));
// Set timeout such that it will hit level 0, but wrap
let mut s2 = Delay::new(time.now() + ms(60));
assert_not_ready!(s2);
let mut s2 = Delay::new(clock.now() + ms(60));
assert_pending!(task.poll(&mut s2));
// This should result in s1 firing
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
clock.turn();
assert_eq!(clock.advanced(), ms(64));
assert_ready!(s1);
assert_not_ready!(s2);
assert_ready!(task.poll(&mut s1));
assert_pending!(task.poll(&mut s2));
turn(timer, None);
assert_eq!(time.advanced(), ms(65));
clock.turn();
assert_eq!(clock.advanced(), ms(65));
assert_ready!(s2);
assert_ready!(task.poll(&mut s2));
});
}
#[test]
fn delay_with_deadline_in_past() {
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create `Delay` that elapsed immediately.
let mut delay = Delay::new(time.now() - ms(100));
let mut delay = Delay::new(clock.now() - ms(100));
// Even though the delay expires in the past, it is not ready yet
// because the timer must observe it.
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
// Turn the timer, it runs for the elapsed time
turn(timer, ms(1000));
clock.turn_for(ms(1000));
// The time has not advanced. The `turn` completed immediately.
assert_eq!(time.advanced(), ms(1000));
assert_eq!(clock.advanced(), ms(1000));
});
}
#[test]
fn delayed_delay_level_1() {
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut delay = Delay::new(time.now() + ms(234));
let mut delay = Delay::new(clock.now() + ms(234));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
// Turn the timer, this will wake up to cascade the timer down.
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(192));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(192));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
// Turn the timer again
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(234));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(234));
// The delay has elapsed.
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
});
mocked(|timer, time| {
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut delay = Delay::new(time.now() + ms(234));
let mut delay = Delay::new(clock.now() + ms(234));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
// Turn the timer with a smaller timeout than the cascade.
turn(timer, ms(100));
assert_eq!(time.advanced(), ms(100));
clock.turn_for(ms(100));
assert_eq!(clock.advanced(), ms(100));
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
// Turn the timer, this will wake up to cascade the timer down.
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(192));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(192));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
// Turn the timer again
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(234));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(234));
// The delay has elapsed.
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
});
}
@ -191,77 +204,83 @@ fn creating_delay_outside_of_context() {
// This creates a delay outside of the context of a mock timer. This tests
// that it will still expire.
let mut delay = Delay::new(now + ms(500));
let mut task = MockTask::new();
mocked_with_now(now, |timer, time| {
clock::mock_at(now, |clock| {
// This registers the delay with the timer
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
// Wait some time... the timer is cascading
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(448));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(448));
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(500));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(500));
// The delay has elapsed
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
});
}
#[test]
fn concurrently_set_two_timers_second_one_shorter() {
mocked(|timer, time| {
let mut delay1 = Delay::new(time.now() + ms(500));
let mut delay2 = Delay::new(time.now() + ms(200));
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
clock::mock(|clock| {
let mut delay1 = Delay::new(clock.now() + ms(500));
let mut delay2 = Delay::new(clock.now() + ms(200));
// The delay has not elapsed
assert_not_ready!(delay1);
assert_not_ready!(delay2);
assert_pending!(t1.poll(&mut delay1));
assert_pending!(t2.poll(&mut delay2));
// Delay until a cascade
turn(timer, None);
assert_eq!(time.advanced(), ms(192));
clock.turn();
assert_eq!(clock.advanced(), ms(192));
// Delay until the second timer.
turn(timer, None);
assert_eq!(time.advanced(), ms(200));
clock.turn();
assert_eq!(clock.advanced(), ms(200));
// The shorter delay fires
assert_ready!(delay2);
assert_not_ready!(delay1);
assert_ready!(t2.poll(&mut delay2));
assert_pending!(t1.poll(&mut delay1));
turn(timer, None);
assert_eq!(time.advanced(), ms(448));
clock.turn();
assert_eq!(clock.advanced(), ms(448));
assert_not_ready!(delay1);
assert_pending!(t1.poll(&mut delay1));
// Turn again, this time the time will advance to the second delay
turn(timer, None);
assert_eq!(time.advanced(), ms(500));
clock.turn();
assert_eq!(clock.advanced(), ms(500));
assert_ready!(delay1);
assert_ready!(t1.poll(&mut delay1));
})
}
#[test]
fn short_delay() {
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut delay = Delay::new(time.now() + ms(1));
let mut delay = Delay::new(clock.now() + ms(1));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
// Turn the timer, but not enough time will go by.
turn(timer, None);
clock.turn();
// The delay has elapsed.
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
// The time has advanced to the point of the delay elapsing.
assert_eq!(time.advanced(), ms(1));
assert_eq!(clock.advanced(), ms(1));
})
}
@ -269,27 +288,29 @@ fn short_delay() {
fn sorta_long_delay() {
const MIN_5: u64 = 5 * 60 * 1000;
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut delay = Delay::new(time.now() + ms(MIN_5));
let mut delay = Delay::new(clock.now() + ms(MIN_5));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
for &elapsed in cascades {
turn(timer, None);
assert_eq!(time.advanced(), ms(elapsed));
clock.turn();
assert_eq!(clock.advanced(), ms(elapsed));
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
}
turn(timer, None);
assert_eq!(time.advanced(), ms(MIN_5));
clock.turn();
assert_eq!(clock.advanced(), ms(MIN_5));
// The delay has elapsed.
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
})
}
@ -297,12 +318,14 @@ fn sorta_long_delay() {
fn very_long_delay() {
const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut delay = Delay::new(time.now() + ms(MO_5));
let mut delay = Delay::new(clock.now() + ms(MO_5));
// The delay has not elapsed.
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
let cascades = &[
12_884_901_888,
@ -312,59 +335,65 @@ fn very_long_delay() {
];
for &elapsed in cascades {
turn(timer, None);
assert_eq!(time.advanced(), ms(elapsed));
clock.turn();
assert_eq!(clock.advanced(), ms(elapsed));
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
}
// Turn the timer, but not enough time will go by.
turn(timer, None);
clock.turn();
// The time has advanced to the point of the delay elapsing.
assert_eq!(time.advanced(), ms(MO_5));
assert_eq!(clock.advanced(), ms(MO_5));
// The delay has elapsed.
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
})
}
#[test]
#[should_panic]
fn greater_than_max() {
const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut delay = Delay::new(time.now() + ms(YR_5));
let mut delay = Delay::new(clock.now() + ms(YR_5));
assert_not_ready!(delay);
assert_pending!(task.poll(&mut delay));
turn(timer, ms(0));
clock.turn_for(ms(0));
assert!(delay.poll().is_err());
// boom
let _ = task.poll(&mut delay);
})
}
#[test]
fn unpark_is_delayed() {
mocked(|timer, time| {
let mut delay1 = Delay::new(time.now() + ms(100));
let mut delay2 = Delay::new(time.now() + ms(101));
let mut delay3 = Delay::new(time.now() + ms(200));
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let mut t3 = MockTask::new();
assert_not_ready!(delay1);
assert_not_ready!(delay2);
assert_not_ready!(delay3);
clock::mock(|clock| {
let mut delay1 = Delay::new(clock.now() + ms(100));
let mut delay2 = Delay::new(clock.now() + ms(101));
let mut delay3 = Delay::new(clock.now() + ms(200));
time.park_for(ms(500));
assert_pending!(t1.poll(&mut delay1));
assert_pending!(t2.poll(&mut delay2));
assert_pending!(t3.poll(&mut delay3));
turn(timer, None);
clock.park_for(ms(500));
assert_eq!(time.advanced(), ms(500));
assert_eq!(clock.advanced(), ms(500));
assert_ready!(delay1);
assert_ready!(delay2);
assert_ready!(delay3);
assert_ready!(t1.poll(&mut delay1));
assert_ready!(t2.poll(&mut delay2));
assert_ready!(t3.poll(&mut delay3));
})
}
@ -373,108 +402,118 @@ fn set_timeout_at_deadline_greater_than_max_timer() {
const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
const YR_5: u64 = 5 * YR_1;
mocked(|timer, time| {
let mut task = MockTask::new();
clock::mock(|clock| {
for _ in 0..5 {
turn(timer, ms(YR_1));
clock.turn_for(ms(YR_1));
}
let mut delay = Delay::new(time.now() + ms(1));
assert_not_ready!(delay);
let mut delay = Delay::new(clock.now() + ms(1));
assert_pending!(task.poll(&mut delay));
turn(timer, ms(1000));
assert_eq!(time.advanced(), Duration::from_millis(YR_5) + ms(1));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(YR_5) + ms(1));
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
});
}
#[test]
fn reset_future_delay_before_fire() {
mocked(|timer, time| {
let mut delay = Delay::new(time.now() + ms(100));
let mut task = MockTask::new();
assert_not_ready!(delay);
clock::mock(|clock| {
let mut delay = Delay::new(clock.now() + ms(100));
delay.reset(time.now() + ms(200));
assert_pending!(task.poll(&mut delay));
turn(timer, None);
assert_eq!(time.advanced(), ms(192));
delay.reset(clock.now() + ms(200));
assert_not_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(192));
turn(timer, None);
assert_eq!(time.advanced(), ms(200));
assert_pending!(task.poll(&mut delay));
assert_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(200));
assert_ready!(task.poll(&mut delay));
});
}
#[test]
fn reset_past_delay_before_turn() {
mocked(|timer, time| {
let mut delay = Delay::new(time.now() + ms(100));
let mut task = MockTask::new();
assert_not_ready!(delay);
clock::mock(|clock| {
let mut delay = Delay::new(clock.now() + ms(100));
delay.reset(time.now() + ms(80));
assert_pending!(task.poll(&mut delay));
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
delay.reset(clock.now() + ms(80));
assert_not_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(64));
turn(timer, None);
assert_eq!(time.advanced(), ms(80));
assert_pending!(task.poll(&mut delay));
assert_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(80));
assert_ready!(task.poll(&mut delay));
});
}
#[test]
fn reset_past_delay_before_fire() {
mocked(|timer, time| {
let mut delay = Delay::new(time.now() + ms(100));
let mut task = MockTask::new();
assert_not_ready!(delay);
turn(timer, ms(10));
clock::mock(|clock| {
let mut delay = Delay::new(clock.now() + ms(100));
assert_not_ready!(delay);
delay.reset(time.now() + ms(80));
assert_pending!(task.poll(&mut delay));
clock.turn_for(ms(10));
turn(timer, None);
assert_eq!(time.advanced(), ms(64));
assert_pending!(task.poll(&mut delay));
delay.reset(clock.now() + ms(80));
assert_not_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(64));
turn(timer, None);
assert_eq!(time.advanced(), ms(90));
assert_pending!(task.poll(&mut delay));
assert_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(90));
assert_ready!(task.poll(&mut delay));
});
}
#[test]
fn reset_future_delay_after_fire() {
mocked(|timer, time| {
let mut delay = Delay::new(time.now() + ms(100));
let mut task = MockTask::new();
assert_not_ready!(delay);
clock::mock(|clock| {
let mut delay = Delay::new(clock.now() + ms(100));
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(64));
assert_pending!(task.poll(&mut delay));
turn(timer, None);
assert_eq!(time.advanced(), ms(100));
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(64));
assert_ready!(delay);
clock.turn();
assert_eq!(clock.advanced(), ms(100));
delay.reset(time.now() + ms(10));
assert_not_ready!(delay);
assert_ready!(task.poll(&mut delay));
turn(timer, ms(1000));
assert_eq!(time.advanced(), ms(110));
delay.reset(clock.now() + ms(10));
assert_pending!(task.poll(&mut delay));
assert_ready!(delay);
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(110));
assert_ready!(task.poll(&mut delay));
});
}
@ -482,14 +521,19 @@ fn reset_future_delay_after_fire() {
fn delay_with_default_handle() {
let handle = Handle::default();
let now = Instant::now();
let mut task = MockTask::new();
let mut delay = handle.delay(now + ms(1));
mocked_with_now(now, |timer, _time| {
assert_not_ready!(delay);
clock::mock_at(now, |clock| {
assert_pending!(task.poll(&mut delay));
turn(timer, ms(1));
clock.turn_for(ms(1));
assert_ready!(delay);
assert_ready!(task.poll(&mut delay));
});
}
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

View File

@ -1,18 +1,21 @@
#![deny(warnings, rust_2018_idioms)]
#![cfg(feature = "broken")]
#![feature(async_await)]
use tokio_current_thread::CurrentThread;
use tokio_executor::park::{Park, Unpark, UnparkThread};
use tokio_timer::{Delay, Timer};
use futures::stream::FuturesUnordered;
use futures::{Future, Stream};
use rand;
use rand::Rng;
use std::cmp;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Barrier};
use std::task::{Context, Poll};
use std::thread;
use std::time::{Duration, Instant};
use tokio_executor::park::{Park, Unpark, UnparkThread};
use tokio_timer::*;
struct Signal {
rem: AtomicUsize,
@ -43,7 +46,7 @@ fn hammer_complete() {
let done = done.clone();
thread::spawn(move || {
let mut exec = FuturesUnordered::new();
let mut exec = CurrentThread::new();
let mut rng = rand::thread_rng();
barrier.wait();
@ -51,18 +54,18 @@ fn hammer_complete() {
for _ in 0..PER_THREAD {
let deadline =
Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
let delay = handle.delay(deadline);
exec.push({
handle.delay(deadline).and_then(move |_| {
let now = Instant::now();
assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
Ok(())
})
exec.spawn(async move {
delay.await;
let now = Instant::now();
assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
});
}
// Run the logic
exec.for_each(|_| Ok(())).wait().unwrap();
exec.run().unwrap();
if 1 == done.rem.fetch_sub(1, SeqCst) {
done.unpark.unpark();
@ -100,7 +103,7 @@ fn hammer_cancel() {
let done = done.clone();
thread::spawn(move || {
let mut exec = FuturesUnordered::new();
let mut exec = CurrentThread::new();
let mut rng = rand::thread_rng();
barrier.wait();
@ -117,23 +120,16 @@ fn hammer_cancel() {
let delay = handle.delay(deadline1);
let join = handle.timeout(delay, deadline2);
exec.push({
join.and_then(move |_| {
let now = Instant::now();
assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
Ok(())
})
exec.spawn(async move {
let _ = join.await;
let now = Instant::now();
assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
});
}
// Run the logic
exec.or_else(|e| {
assert!(e.is_elapsed());
Ok::<_, ()>(())
})
.for_each(|_| Ok(()))
.wait()
.unwrap();
exec.run().unwrap();
if 1 == done.rem.fetch_sub(1, SeqCst) {
done.unpark.unpark();
@ -171,7 +167,7 @@ fn hammer_reset() {
let done = done.clone();
thread::spawn(move || {
let mut exec = FuturesUnordered::new();
let mut exec = CurrentThread::new();
let mut rng = rand::thread_rng();
barrier.wait();
@ -186,44 +182,56 @@ fn hammer_reset() {
let deadline3 =
deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
exec.push({
handle
.delay(deadline1)
// Select over a second delay
.select2(handle.delay(deadline2))
.map_err(|e| panic!("boom; err={:?}", e))
.and_then(move |res| {
use futures::future::Either::*;
struct Select {
a: Option<Delay>,
b: Option<Delay>,
}
let now = Instant::now();
assert!(
now >= deadline1,
"deadline greater by {:?}",
deadline1 - now
);
impl Future for Select {
type Output = Delay;
let mut other = match res {
A((_, other)) => other,
B((_, other)) => other,
};
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let res = Pin::new(self.a.as_mut().unwrap()).poll(cx);
other.reset(deadline3);
other
})
.and_then(move |_| {
let now = Instant::now();
assert!(
now >= deadline3,
"deadline greater by {:?}",
deadline3 - now
);
Ok(())
})
if res.is_ready() {
return Poll::Ready(self.a.take().unwrap());
}
let res = Pin::new(self.b.as_mut().unwrap()).poll(cx);
if res.is_ready() {
return Poll::Ready(self.b.take().unwrap());
}
Poll::Pending
}
}
let s = Select {
a: Some(handle.delay(deadline1)),
b: Some(handle.delay(deadline2)),
};
exec.spawn(async move {
let mut delay = s.await;
let now = Instant::now();
assert!(
now >= deadline1,
"deadline greater by {:?}",
deadline1 - now
);
delay.reset(deadline3);
delay.await;
});
}
// Run the logic
exec.for_each(|_| Ok(())).wait().unwrap();
exec.run().unwrap();
if 1 == done.rem.fetch_sub(1, SeqCst) {
done.unpark.unpark();

View File

@ -1,43 +1,55 @@
#![deny(warnings, rust_2018_idioms)]
#![cfg(feature = "broken")]
#![feature(async_await)]
mod support;
use crate::support::*;
use futures::Stream;
use tokio_test::task::MockTask;
use tokio_test::{assert_pending, assert_ready_eq, clock};
use tokio_timer::*;
use std::time::Duration;
#[test]
#[should_panic]
fn interval_zero_duration() {
mocked(|_, time| {
let _ = Interval::new(time.now(), ms(0));
clock::mock(|clock| {
let _ = Interval::new(clock.now(), ms(0));
});
}
#[test]
fn usage() {
mocked(|timer, time| {
let start = time.now();
let mut task = MockTask::new();
clock::mock(|clock| {
let start = clock.now();
let mut int = Interval::new(start, ms(300));
assert_ready_eq!(int, Some(start));
assert_not_ready!(int);
macro_rules! poll {
() => {
task.enter(|cx| int.poll_next(cx))
};
}
advance(timer, ms(100));
assert_not_ready!(int);
assert_ready_eq!(poll!(), Some(start));
assert_pending!(poll!());
advance(timer, ms(200));
assert_ready_eq!(int, Some(start + ms(300)));
assert_not_ready!(int);
clock.advance(ms(100));
assert_pending!(poll!());
advance(timer, ms(400));
assert_ready_eq!(int, Some(start + ms(600)));
assert_not_ready!(int);
clock.advance(ms(200));
assert_ready_eq!(poll!(), Some(start + ms(300)));
assert_pending!(poll!());
advance(timer, ms(500));
assert_ready_eq!(int, Some(start + ms(900)));
assert_ready_eq!(int, Some(start + ms(1200)));
assert_not_ready!(int);
clock.advance(ms(400));
assert_ready_eq!(poll!(), Some(start + ms(600)));
assert_pending!(poll!());
clock.advance(ms(500));
assert_ready_eq!(poll!(), Some(start + ms(900)));
assert_ready_eq!(poll!(), Some(start + ms(1200)));
assert_pending!(poll!());
});
}
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

View File

@ -1,44 +1,61 @@
#![deny(warnings, rust_2018_idioms)]
#![cfg(feature = "broken")]
mod support;
use crate::support::*;
use futures::Stream;
use tokio_mock_task::MockTask;
use tokio_test::task::MockTask;
use tokio_test::{assert_ok, assert_pending, assert_ready, clock};
use tokio_timer::*;
use std::time::Duration;
macro_rules! poll {
($task:ident, $queue:ident) => {
$task.enter(|cx| $queue.poll_next(cx))
};
}
macro_rules! assert_ready_ok {
($e:expr) => {{
assert_ok!(match assert_ready!($e) {
Some(v) => v,
None => panic!("None"),
})
}};
}
#[test]
fn single_immediate_delay() {
mocked(|_timer, time| {
let mut queue = DelayQueue::new();
let _key = queue.insert_at("foo", time.now());
let mut t = MockTask::new();
let entry = assert_ready!(queue).unwrap();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let _key = queue.insert_at("foo", clock.now());
let entry = assert_ready_ok!(poll!(t, queue));
assert_eq!(*entry.get_ref(), "foo");
let entry = assert_ready!(queue);
let entry = assert_ready!(poll!(t, queue));
assert!(entry.is_none())
});
}
#[test]
fn multi_immediate_delays() {
mocked(|_timer, time| {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let _k = queue.insert_at("1", time.now());
let _k = queue.insert_at("2", time.now());
let _k = queue.insert_at("3", time.now());
let _k = queue.insert_at("1", clock.now());
let _k = queue.insert_at("2", clock.now());
let _k = queue.insert_at("3", clock.now());
let mut res = vec![];
while res.len() < 3 {
let entry = assert_ready!(queue).unwrap();
let entry = assert_ready_ok!(poll!(t, queue));
res.push(entry.into_inner());
}
let entry = assert_ready!(queue);
let entry = assert_ready!(poll!(t, queue));
assert!(entry.is_none());
res.sort();
@ -51,28 +68,26 @@ fn multi_immediate_delays() {
#[test]
fn single_short_delay() {
mocked(|timer, time| {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let _key = queue.insert_at("foo", time.now() + ms(5));
let _key = queue.insert_at("foo", clock.now() + ms(5));
let mut task = MockTask::new();
assert_pending!(poll!(t, queue));
task.enter(|| {
assert_not_ready!(queue);
});
clock.turn_for(ms(1));
turn(timer, ms(1));
assert!(!t.is_woken());
assert!(!task.is_notified());
clock.turn_for(ms(5));
turn(timer, ms(5));
assert!(t.is_woken());
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap();
let entry = assert_ready_ok!(poll!(t, queue));
assert_eq!(*entry.get_ref(), "foo");
let entry = assert_ready!(queue);
let entry = assert_ready!(poll!(t, queue));
assert!(entry.is_none());
});
}
@ -82,40 +97,33 @@ fn multi_delay_at_start() {
let long = 262_144 + 9 * 4096;
let delays = &[1000, 2, 234, long, 60, 10];
mocked(|timer, time| {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
// Setup the delays
for &i in delays {
let _key = queue.insert_at(i, time.now() + ms(i));
let _key = queue.insert_at(i, clock.now() + ms(i));
}
task.enter(|| {
assert_not_ready!(queue);
});
assert!(!task.is_notified());
assert_pending!(poll!(t, queue));
assert!(!t.is_woken());
for elapsed in 0..1200 {
turn(timer, ms(1));
clock.turn_for(ms(1));
let elapsed = elapsed + 1;
if delays.contains(&elapsed) {
assert!(task.is_notified());
task.enter(|| {
assert_ready!(queue);
assert_not_ready!(queue);
});
assert!(t.is_woken());
assert_ready!(poll!(t, queue));
assert_pending!(poll!(t, queue));
} else {
if task.is_notified() {
if t.is_woken() {
let cascade = &[192, 960];
assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
task.enter(|| {
assert_not_ready!(queue, "elapsed={}", elapsed);
});
assert_pending!(poll!(t, queue));
}
}
}
@ -124,150 +132,141 @@ fn multi_delay_at_start() {
#[test]
fn insert_in_past_fires_immediately() {
mocked(|timer, time| {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let now = time.now();
let now = clock.now();
turn(timer, ms(10));
clock.turn_for(ms(10));
queue.insert_at("foo", now);
assert_ready!(queue);
assert_ready!(poll!(t, queue));
});
}
#[test]
fn remove_entry() {
mocked(|timer, time| {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let key = queue.insert_at("foo", time.now() + ms(5));
let key = queue.insert_at("foo", clock.now() + ms(5));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
let entry = queue.remove(&key);
assert_eq!(entry.into_inner(), "foo");
turn(timer, ms(10));
clock.turn_for(ms(10));
task.enter(|| {
let entry = assert_ready!(queue);
assert!(entry.is_none());
});
let entry = assert_ready!(poll!(t, queue));
assert!(entry.is_none());
});
}
#[test]
fn reset_entry() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let mut t = MockTask::new();
let now = time.now();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let now = clock.now();
let key = queue.insert_at("foo", now + ms(5));
task.enter(|| {
assert_not_ready!(queue);
});
turn(timer, ms(1));
assert_pending!(poll!(t, queue));
clock.turn_for(ms(1));
queue.reset_at(&key, now + ms(10));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
turn(timer, ms(7));
clock.turn_for(ms(7));
assert!(!task.is_notified());
assert!(!t.is_woken());
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
turn(timer, ms(3));
clock.turn_for(ms(3));
assert!(task.is_notified());
assert!(t.is_woken());
let entry = assert_ready!(queue).unwrap();
let entry = assert_ready_ok!(poll!(t, queue));
assert_eq!(*entry.get_ref(), "foo");
let entry = assert_ready!(queue);
let entry = assert_ready!(poll!(t, queue));
assert!(entry.is_none())
});
}
#[test]
fn reset_much_later() {
let mut t = MockTask::new();
// Reproduces tokio-rs/tokio#849.
mocked(|timer, time| {
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let epoch = time.now();
let epoch = clock.now();
turn(timer, ms(1));
clock.turn_for(ms(1));
let key = queue.insert_at("foo", epoch + ms(200));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
turn(timer, ms(3));
clock.turn_for(ms(3));
queue.reset_at(&key, epoch + ms(5));
turn(timer, ms(20));
clock.turn_for(ms(20));
assert!(task.is_notified());
assert!(t.is_woken());
});
}
#[test]
fn reset_twice() {
let mut t = MockTask::new();
// Reproduces tokio-rs/tokio#849.
mocked(|timer, time| {
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let epoch = time.now();
let epoch = clock.now();
turn(timer, ms(1));
clock.turn_for(ms(1));
let key = queue.insert_at("foo", epoch + ms(200));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
turn(timer, ms(3));
clock.turn_for(ms(3));
queue.reset_at(&key, epoch + ms(50));
turn(timer, ms(20));
clock.turn_for(ms(20));
queue.reset_at(&key, epoch + ms(40));
turn(timer, ms(20));
clock.turn_for(ms(20));
assert!(task.is_notified());
assert!(t.is_woken());
});
}
#[test]
fn remove_expired_item() {
mocked(|timer, time| {
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let now = time.now();
let now = clock.now();
turn(timer, ms(10));
clock.turn_for(ms(10));
let key = queue.insert_at("foo", now);
@ -278,48 +277,45 @@ fn remove_expired_item() {
#[test]
fn expires_before_last_insert() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let mut t = MockTask::new();
let epoch = time.now();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let epoch = clock.now();
queue.insert_at("foo", epoch + ms(10_000));
// Delay should be set to 8.192s here.
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
// Delay should be set to the delay of the new item here
queue.insert_at("bar", epoch + ms(600));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
advance(timer, ms(600));
clock.advance(ms(600));
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap().into_inner();
assert!(t.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
assert_eq!(entry, "bar");
})
}
#[test]
fn multi_reset() {
mocked(|_, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let mut t = MockTask::new();
let epoch = time.now();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let epoch = clock.now();
let foo = queue.insert_at("foo", epoch + ms(200));
let bar = queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
queue.reset_at(&foo, epoch + ms(300));
queue.reset_at(&bar, epoch + ms(350));
@ -329,74 +325,77 @@ fn multi_reset() {
#[test]
fn expire_first_key_when_reset_to_expire_earlier() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let mut t = MockTask::new();
let epoch = time.now();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let epoch = clock.now();
let foo = queue.insert_at("foo", epoch + ms(200));
queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
queue.reset_at(&foo, epoch + ms(100));
advance(timer, ms(100));
clock.advance(ms(100));
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap().into_inner();
assert!(t.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
assert_eq!(entry, "foo");
})
}
#[test]
fn expire_second_key_when_reset_to_expire_earlier() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let mut t = MockTask::new();
let epoch = time.now();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let epoch = clock.now();
queue.insert_at("foo", epoch + ms(200));
let bar = queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
queue.reset_at(&bar, epoch + ms(100));
advance(timer, ms(100));
clock.advance(ms(100));
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap().into_inner();
assert!(t.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
assert_eq!(entry, "bar");
})
}
#[test]
fn reset_first_expiring_item_to_expire_later() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let mut t = MockTask::new();
let epoch = time.now();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let epoch = clock.now();
let foo = queue.insert_at("foo", epoch + ms(200));
let _bar = queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
assert_pending!(poll!(t, queue));
queue.reset_at(&foo, epoch + ms(300));
advance(timer, ms(250));
clock.advance(ms(250));
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap().into_inner();
assert!(t.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
assert_eq!(entry, "bar");
})
}
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

View File

@ -1,49 +1,68 @@
#![deny(warnings, rust_2018_idioms)]
#![cfg(feature = "broken")]
#![cfg(feature = "async-traits")]
mod support;
use crate::support::*;
use futures::{prelude::*, sync::mpsc};
use tokio_sync::mpsc;
use tokio_test::task::MockTask;
use tokio_test::{assert_pending, assert_ready_eq, clock};
use tokio_timer::throttle::Throttle;
use futures_core::Stream;
use std::time::Duration;
macro_rules! poll {
($task:ident, $stream:ident) => {{
use std::pin::Pin;
$task.enter(|cx| Pin::new(&mut $stream).poll_next(cx))
}};
}
#[test]
fn throttle() {
mocked(|timer, _| {
let (tx, rx) = mpsc::unbounded();
let mut t = MockTask::new();
clock::mock(|clock| {
let (mut tx, rx) = mpsc::unbounded_channel();
let mut stream = Throttle::new(rx, ms(1));
assert_not_ready!(stream);
assert_pending!(poll!(t, stream));
for i in 0..3 {
tx.unbounded_send(i).unwrap();
tx.try_send(i).unwrap();
}
for i in 0..3 {
assert_ready_eq!(stream, Some(i));
assert_not_ready!(stream);
assert_ready_eq!(poll!(t, stream), Some(i));
assert_pending!(poll!(t, stream));
advance(timer, ms(1));
clock.advance(ms(1));
}
assert_not_ready!(stream);
assert_pending!(poll!(t, stream));
});
}
#[test]
fn throttle_dur_0() {
mocked(|_, _| {
let (tx, rx) = mpsc::unbounded();
let mut t = MockTask::new();
clock::mock(|_| {
let (mut tx, rx) = mpsc::unbounded_channel();
let mut stream = Throttle::new(rx, ms(0));
assert_not_ready!(stream);
assert_pending!(poll!(t, stream));
for i in 0..3 {
tx.unbounded_send(i).unwrap();
}
for i in 0..3 {
assert_ready_eq!(stream, Some(i));
tx.try_send(i).unwrap();
}
assert_not_ready!(stream);
for i in 0..3 {
assert_ready_eq!(poll!(t, stream), Some(i));
}
assert_pending!(poll!(t, stream));
});
}
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

View File

@ -1,68 +1,72 @@
#![deny(warnings, rust_2018_idioms)]
#![cfg(feature = "broken")]
#![feature(async_await)]
mod support;
use crate::support::*;
use futures::sync::{mpsc, oneshot};
use futures::{future, Future, Stream};
use tokio_sync::oneshot;
use tokio_test::task::MockTask;
use tokio_test::{
assert_err, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, clock,
};
use tokio_timer::*;
use std::time::Duration;
#[test]
fn simultaneous_deadline_future_completion() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
let mut t = MockTask::new();
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now());
clock::mock(|clock| {
// Create a future that is immediately ready
let fut = Box::pin(Timeout::new_at(async {}, clock.now()));
// Ready!
assert_ready!(fut);
assert_ready_ok!(t.poll(fut));
});
}
#[test]
fn completed_future_past_deadline() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
let mut t = MockTask::new();
clock::mock(|clock| {
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now() - ms(1000));
let fut = Timeout::new_at(async {}, clock.now() - ms(1000));
let fut = Box::pin(fut);
// Ready!
assert_ready!(fut);
assert_ready_ok!(t.poll(fut));
});
}
#[test]
fn future_and_deadline_in_future() {
mocked(|timer, time| {
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (tx, rx) = oneshot::channel();
// Wrap it with a deadline
let mut fut = Timeout::new_at(rx, time.now() + ms(100));
let mut fut = Timeout::new_at(rx, clock.now() + ms(100));
// Ready!
assert_not_ready!(fut);
assert_pending!(t.poll(&mut fut));
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
clock.advance(ms(90));
assert_not_ready!(fut);
assert_pending!(t.poll(&mut fut));
// Complete the future
tx.send(()).unwrap();
assert_ready!(fut);
assert_ready_ok!(t.poll(&mut fut)).unwrap();
});
}
#[test]
fn future_and_timeout_in_future() {
mocked(|timer, _time| {
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (tx, rx) = oneshot::channel();
@ -70,107 +74,133 @@ fn future_and_timeout_in_future() {
let mut fut = Timeout::new(rx, ms(100));
// Ready!
assert_not_ready!(fut);
assert_pending!(t.poll(&mut fut));
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
clock.advance(ms(90));
assert_not_ready!(fut);
assert_pending!(t.poll(&mut fut));
// Complete the future
tx.send(()).unwrap();
assert_ready!(fut);
assert_ready_ok!(t.poll(&mut fut)).unwrap();
});
}
struct Empty;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl Future for Empty {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}
#[test]
fn deadline_now_elapses() {
mocked(|_, time| {
let fut = future::empty::<(), ()>();
let mut t = MockTask::new();
clock::mock(|clock| {
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now());
let mut fut = Timeout::new_at(Empty, clock.now());
assert_elapsed!(fut);
assert_ready_err!(t.poll(&mut fut));
});
}
#[test]
fn deadline_future_elapses() {
mocked(|timer, time| {
let fut = future::empty::<(), ()>();
let mut t = MockTask::new();
clock::mock(|clock| {
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now() + ms(300));
let mut fut = Timeout::new_at(Empty, clock.now() + ms(300));
assert_not_ready!(fut);
assert_pending!(t.poll(&mut fut));
advance(timer, ms(300));
clock.advance(ms(300));
assert_elapsed!(fut);
assert_ready_err!(t.poll(&mut fut));
});
}
#[test]
fn future_errors_first() {
mocked(|_, time| {
let fut = future::err::<(), ()>(());
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now() + ms(100));
// Ready!
assert!(fut.poll().unwrap_err().is_inner());
});
#[cfg(feature = "async-traits")]
macro_rules! poll {
($task:ident, $stream:ident) => {{
use futures_core::Stream;
$task.enter(|cx| Pin::new(&mut $stream).poll_next(cx))
}};
}
#[test]
#[cfg(feature = "async-traits")]
fn stream_and_timeout_in_future() {
mocked(|timer, _time| {
use tokio_sync::mpsc;
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (tx, rx) = mpsc::unbounded();
let (mut tx, rx) = mpsc::unbounded_channel();
// Wrap it with a deadline
let mut stream = Timeout::new(rx, ms(100));
// Not ready
assert_not_ready!(stream);
assert_pending!(poll!(t, stream));
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
clock.advance(ms(90));
assert_not_ready!(stream);
assert_pending!(poll!(t, stream));
// Complete the future
tx.unbounded_send(()).unwrap();
tx.try_send(()).unwrap();
let item = assert_ready!(stream);
let item = assert_ready!(poll!(t, stream));
assert!(item.is_some());
});
}
#[test]
#[cfg(feature = "async-traits")]
fn idle_stream_timesout_periodically() {
mocked(|timer, _time| {
use tokio_sync::mpsc;
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (_tx, rx) = mpsc::unbounded::<()>();
let (_tx, rx) = mpsc::unbounded_channel::<()>();
// Wrap it with a deadline
let mut stream = Timeout::new(rx, ms(100));
// Not ready
assert_not_ready!(stream);
assert_pending!(poll!(t, stream));
// Turn the timer, it runs for the elapsed time
advance(timer, ms(100));
clock.advance(ms(100));
let v = assert_ready!(poll!(t, stream)).unwrap();
assert_err!(v);
assert_elapsed!(stream);
// Stream's timeout should reset
assert_not_ready!(stream);
assert_pending!(poll!(t, stream));
// Turn the timer, it runs for the elapsed time
advance(timer, ms(100));
assert_elapsed!(stream);
clock.advance(ms(100));
let v = assert_ready!(poll!(t, stream)).unwrap();
assert_err!(v)
});
}
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}