test: unify MockTask and task::spawn (#1728)

Delete `MockTask` in favor of `task::spawn`. Both are functionally
equivalent.
This commit is contained in:
Carl Lerche 2019-11-03 14:10:14 -08:00 committed by GitHub
parent 3948e16292
commit 966ccd5d53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 500 additions and 667 deletions

View File

@ -24,7 +24,6 @@ tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
bytes = "0.4"
futures-core-preview = "=0.3.0-alpha.19"
pin-convert = "0.1.0"
[dev-dependencies]
futures-util-preview = "=0.3.0-alpha.19"

View File

@ -13,16 +13,11 @@
/// # Examples
///
/// ```
/// use std::future::Future;
/// use futures_util::{future, pin_mut};
/// use futures_util::future;
/// use tokio_test::{assert_ready, task};
///
/// task::mock(|cx| {
/// let fut = future::ready(());
///
/// pin_mut!(fut);
/// assert_ready!(fut.poll(cx));
/// })
/// let mut fut = task::spawn(future::ready(()));
/// assert_ready!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_ready {
@ -57,16 +52,11 @@ macro_rules! assert_ready {
/// # Examples
///
/// ```
/// use std::future::Future;
/// use futures_util::{future, pin_mut};
/// use futures_util::future;
/// use tokio_test::{assert_ready_ok, task};
///
/// task::mock(|cx| {
/// let fut = future::ok::<_, ()>(());
///
/// pin_mut!(fut);
/// assert_ready_ok!(fut.poll(cx));
/// })
/// let mut fut = task::spawn(future::ok::<_, ()>(()));
/// assert_ready_ok!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_ready_ok {
@ -95,16 +85,11 @@ macro_rules! assert_ready_ok {
/// # Examples
///
/// ```
/// use std::future::Future;
/// use futures_util::{future, pin_mut};
/// use futures_util::future;
/// use tokio_test::{assert_ready_err, task};
///
/// task::mock(|cx| {
/// let fut = future::err::<(), _>(());
///
/// pin_mut!(fut);
/// assert_ready_err!(fut.poll(cx));
/// })
/// let mut fut = task::spawn(future::err::<(), _>(()));
/// assert_ready_err!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_ready_err {
@ -133,16 +118,11 @@ macro_rules! assert_ready_err {
/// # Examples
///
/// ```
/// use std::future::Future;
/// use futures_util::{future, pin_mut};
/// use futures_util::future;
/// use tokio_test::{assert_pending, task};
///
/// task::mock(|cx| {
/// let fut = future::pending::<()>();
///
/// pin_mut!(fut);
/// assert_pending!(fut.poll(cx));
/// })
/// let mut fut = task::spawn(future::pending::<()>());
/// assert_pending!(fut.poll());
/// ```
#[macro_export]
macro_rules! assert_pending {
@ -177,16 +157,11 @@ macro_rules! assert_pending {
/// # Examples
///
/// ```
/// use std::future::Future;
/// use futures_util::{future, pin_mut};
/// use futures_util::future;
/// use tokio_test::{assert_ready_eq, task};
///
/// task::mock(|cx| {
/// let fut = future::ready(42);
///
/// pin_mut!(fut);
/// assert_ready_eq!(fut.poll(cx), 42);
/// })
/// let mut fut = task::spawn(future::ready(42));
/// assert_ready_eq!(fut.poll(), 42);
/// ```
#[macro_export]
macro_rules! assert_ready_eq {

View File

@ -1,44 +1,19 @@
//! Futures task based helpers
use tokio::executor::enter;
use pin_convert::AsPinMut;
use futures_core::Stream;
use std::future::Future;
use std::mem;
use std::ops;
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
/// Run the provided closure in a `MockTask` context.
///
/// # Examples
///
/// ```
/// use std::future::Future;
/// use futures_util::{future, pin_mut};
/// use tokio_test::task;
///
/// task::mock(|cx| {
/// let fut = future::ready(());
///
/// pin_mut!(fut);
/// assert!(fut.poll(cx).is_ready());
/// })
/// ```
pub fn mock<F, R>(f: F) -> R
where
F: Fn(&mut Context<'_>) -> R,
{
let mut task = MockTask::new();
task.enter(|cx| f(cx))
}
/// Mock task
///
/// A mock task is able to intercept and track wake notifications.
#[derive(Debug, Clone)]
pub struct MockTask {
waker: Arc<ThreadWaker>,
/// TOOD: dox
pub fn spawn<T>(task: T) -> Spawn<T> {
Spawn {
task: MockTask::new(),
future: Box::pin(task),
}
}
/// Future spawned on a mock task
@ -48,12 +23,12 @@ pub struct Spawn<T> {
future: Pin<Box<T>>,
}
/// TOOD: dox
pub fn spawn<T>(task: T) -> Spawn<T> {
Spawn {
task: MockTask::new(),
future: Box::pin(task),
}
/// Mock task
///
/// A mock task is able to intercept and track wake notifications.
#[derive(Debug, Clone)]
struct MockTask {
waker: Arc<ThreadWaker>,
}
#[derive(Debug)]
@ -66,11 +41,23 @@ const IDLE: usize = 0;
const WAKE: usize = 1;
const SLEEP: usize = 2;
impl<T: Future> Spawn<T> {
/// Poll a future
pub fn poll(&mut self) -> Poll<T::Output> {
let fut = self.future.as_mut();
self.task.enter(|cx| fut.poll(cx))
impl<T> Spawn<T> {
/// Consume `self` returning the inner value
pub fn into_inner(mut self) -> T
where
T: Unpin,
{
drop(self.task);
// Pin::into_inner is unstable, so we work around it
//
// Safety: `T` is bound by `Unpin`.
unsafe {
let ptr = Pin::get_mut(self.future.as_mut()) as *mut T;
let future = Box::from_raw(ptr);
mem::forget(self.future);
*future
}
}
/// Returns `true` if the inner future has received a wake notification
@ -85,35 +72,63 @@ impl<T: Future> Spawn<T> {
pub fn waker_ref_count(&self) -> usize {
self.task.waker_ref_count()
}
/// Enter the task context
pub fn enter<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Context<'_>, Pin<&mut T>) -> R,
{
let fut = self.future.as_mut();
self.task.enter(|cx| f(cx, fut))
}
}
impl<T: Unpin> ops::Deref for Spawn<T> {
type Target = T;
fn deref(&self) -> &T {
&self.future
}
}
impl<T: Unpin> ops::DerefMut for Spawn<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.future
}
}
impl<T: Future> Spawn<T> {
/// Poll a future
pub fn poll(&mut self) -> Poll<T::Output> {
let fut = self.future.as_mut();
self.task.enter(|cx| fut.poll(cx))
}
}
impl<T: Stream> Spawn<T> {
/// Poll a stream
pub fn poll_next(&mut self) -> Poll<Option<T::Item>> {
let stream = self.future.as_mut();
self.task.enter(|cx| stream.poll_next(cx))
}
}
impl MockTask {
/// Create a new mock task
pub fn new() -> Self {
fn new() -> Self {
MockTask {
waker: Arc::new(ThreadWaker::new()),
}
}
/// Poll a future
pub fn poll<T, F>(&mut self, mut fut: T) -> Poll<F::Output>
where
T: AsPinMut<F>,
F: Future,
{
self.enter(|cx| fut.as_pin_mut().poll(cx))
}
/// Run a closure from the context of the task.
///
/// Any wake notifications resulting from the execution of the closure are
/// tracked.
pub fn enter<F, R>(&mut self, f: F) -> R
fn enter<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Context<'_>) -> R,
{
let _enter = enter().unwrap();
self.waker.clear();
let waker = self.waker();
let mut cx = Context::from_waker(&waker);
@ -123,14 +138,14 @@ impl MockTask {
/// Returns `true` if the inner future has received a wake notification
/// since the last call to `enter`.
pub fn is_woken(&self) -> bool {
fn is_woken(&self) -> bool {
self.waker.is_woken()
}
/// Returns the number of references to the task waker
///
/// The task itself holds a reference. The return value will never be zero.
pub fn waker_ref_count(&self) -> usize {
fn waker_ref_count(&self) -> usize {
Arc::strong_count(&self.waker)
}
@ -193,7 +208,7 @@ impl ThreadWaker {
}
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker);
unsafe fn to_raw(waker: Arc<ThreadWaker>) -> RawWaker {
RawWaker::new(Arc::into_raw(waker) as *const (), &VTABLE)
@ -225,6 +240,6 @@ unsafe fn wake_by_ref(raw: *const ()) {
mem::forget(waker);
}
unsafe fn drop(raw: *const ()) {
unsafe fn drop_waker(raw: *const ()) {
let _ = from_raw(raw);
}

View File

@ -2,7 +2,7 @@
use tokio::prelude::*;
use tokio_test::assert_ready;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_util::codec::{Decoder, FramedRead};
use bytes::{Buf, BytesMut, IntoBuf};
@ -51,13 +51,13 @@ impl Decoder for U32Decoder {
#[test]
fn read_multi_frame_in_packet() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
@ -67,7 +67,7 @@ fn read_multi_frame_in_packet() {
#[test]
fn read_multi_frame_across_packets() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x00".to_vec()),
Ok(b"\x00\x00\x00\x01".to_vec()),
@ -75,7 +75,7 @@ fn read_multi_frame_across_packets() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
@ -85,7 +85,7 @@ fn read_multi_frame_across_packets() {
#[test]
fn read_not_ready() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
Ok(b"\x00\x00\x00\x00".to_vec()),
@ -93,7 +93,7 @@ fn read_not_ready() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
@ -103,7 +103,7 @@ fn read_not_ready() {
#[test]
fn read_partial_then_not_ready() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
@ -111,7 +111,7 @@ fn read_partial_then_not_ready() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
@ -122,13 +122,13 @@ fn read_partial_then_not_ready() {
#[test]
fn read_err() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Err(io::Error::new(io::ErrorKind::Other, "")),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert_eq!(
io::ErrorKind::Other,
assert_ready!(pin!(framed).poll_next(cx))
@ -141,14 +141,14 @@ fn read_err() {
#[test]
fn read_partial_then_err() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::Other, "")),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert_eq!(
io::ErrorKind::Other,
assert_ready!(pin!(framed).poll_next(cx))
@ -161,7 +161,7 @@ fn read_partial_then_err() {
#[test]
fn read_partial_would_block_then_err() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
@ -169,7 +169,7 @@ fn read_partial_would_block_then_err() {
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_eq!(
io::ErrorKind::Other,
@ -183,11 +183,11 @@ fn read_partial_would_block_then_err() {
#[test]
fn huge_size() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let data = [0; 32 * 1024];
let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
@ -210,11 +210,11 @@ fn huge_size() {
#[test]
fn data_remaining_is_error() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let slice = Slice(&[0; 5]);
let mut framed = FramedRead::new(slice, U32Decoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
});
@ -222,7 +222,7 @@ fn data_remaining_is_error() {
#[test]
fn multi_frames_on_eof() {
let mut task = MockTask::new();
let mut task = task::spawn(());
struct MyDecoder(Vec<u32>);
impl Decoder for MyDecoder {
@ -244,7 +244,7 @@ fn multi_frames_on_eof() {
let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
task.enter(|cx| {
task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);

View File

@ -1,8 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::io::AsyncWrite;
use tokio_test::assert_ready;
use tokio_test::task::MockTask;
use tokio_test::{assert_ready, task};
use tokio_util::codec::{Encoder, FramedWrite};
use bytes::{BufMut, BytesMut};
@ -43,13 +42,13 @@ impl Encoder for U32Encoder {
#[test]
fn write_multi_frame_in_packet() {
let mut task = MockTask::new();
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedWrite::new(mock, U32Encoder);
task.enter(|cx| {
task.enter(|cx, _| {
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
assert!(pin!(framed).start_send(0).is_ok());
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
@ -99,9 +98,9 @@ fn write_hits_backpressure() {
// 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer
assert_eq!(mock.calls.len(), 6);
let mut task = MockTask::new();
let mut task = task::spawn(());
let mut framed = FramedWrite::new(mock, U32Encoder);
task.enter(|cx| {
task.enter(|cx, _| {
// Send 8KB. This fills up FramedWrite2 buffer
for i in 0..ITER {
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());

View File

@ -2,7 +2,7 @@
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::*;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
@ -26,7 +26,7 @@ macro_rules! mock {
macro_rules! assert_next_eq {
($io:ident, $expect:expr) => {{
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
let res = assert_ready!($io.as_mut().poll_next(cx));
match res {
Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
@ -39,7 +39,7 @@ macro_rules! assert_next_eq {
macro_rules! assert_next_pending {
($io:ident) => {{
MockTask::new().enter(|cx| match $io.as_mut().poll_next(cx) {
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(e))) => panic!("error = {:?}", e),
Ready(None) => panic!("done"),
@ -50,7 +50,7 @@ macro_rules! assert_next_pending {
macro_rules! assert_next_err {
($io:ident) => {{
MockTask::new().enter(|cx| match $io.as_mut().poll_next(cx) {
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(_))) => {}
Ready(None) => panic!("done"),
@ -61,7 +61,7 @@ macro_rules! assert_next_err {
macro_rules! assert_done {
($io:ident) => {{
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
let res = assert_ready!($io.as_mut().poll_next(cx));
match res {
Some(Ok(v)) => panic!("value = {:?}", v),
@ -405,7 +405,7 @@ fn write_single_frame_length_adjusted() {
});
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
assert_ready_ok!(io.as_mut().poll_flush(cx));
@ -418,7 +418,7 @@ fn write_nothing_yields_nothing() {
let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.poll_flush(cx));
});
}
@ -435,7 +435,7 @@ fn write_single_frame_one_packet() {
);
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
assert_ready_ok!(io.as_mut().poll_flush(cx));
@ -459,7 +459,7 @@ fn write_single_multi_frame_one_packet() {
);
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@ -492,7 +492,7 @@ fn write_single_multi_frame_multi_packet() {
);
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@ -526,7 +526,7 @@ fn write_single_frame_would_block() {
);
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@ -549,7 +549,7 @@ fn write_single_frame_little_endian() {
});
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@ -569,7 +569,7 @@ fn write_single_frame_with_short_length_field() {
});
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
@ -586,7 +586,7 @@ fn write_max_frame_len() {
.new_write(mock! {});
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
@ -603,7 +603,7 @@ fn write_update_max_frame_len_at_rest() {
});
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
@ -628,7 +628,7 @@ fn write_update_max_frame_len_in_flight() {
});
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
@ -648,7 +648,7 @@ fn write_zero() {
let io = length_delimited::Builder::new().new_write(mock! {});
pin_mut!(io);
MockTask::new().enter(|cx| {
task::spawn(()).enter(|cx, _| {
assert_ready_ok!(io.as_mut().poll_ready(cx));
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));

View File

@ -1,9 +1,8 @@
use tokio::io::AsyncRead;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{assert_ready_err, assert_ready_ok};
use bytes::{BufMut, BytesMut};
use futures_util::pin_mut;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -30,12 +29,8 @@ fn read_buf_success() {
}
let mut buf = BytesMut::with_capacity(65);
let mut task = MockTask::new();
task.enter(|cx| {
let rd = Rd;
pin_mut!(rd);
task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(11, n);
@ -59,12 +54,8 @@ fn read_buf_error() {
}
let mut buf = BytesMut::with_capacity(65);
let mut task = MockTask::new();
task.enter(|cx| {
let rd = Rd;
pin_mut!(rd);
task::spawn(Rd).enter(|cx, rd| {
let err = assert_ready_err!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(err.kind(), io::ErrorKind::Other);
});
@ -86,14 +77,9 @@ fn read_buf_no_capacity() {
// Can't create BytesMut w/ zero capacity, so fill it up
let mut buf = BytesMut::with_capacity(64);
let mut task = MockTask::new();
buf.put(&[0; 64][..]);
task.enter(|cx| {
let rd = Rd;
pin_mut!(rd);
task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(0, n);
});
@ -118,12 +104,8 @@ fn read_buf_no_uninitialized() {
}
let mut buf = BytesMut::with_capacity(64);
let mut task = MockTask::new();
task.enter(|cx| {
let rd = Rd;
pin_mut!(rd);
task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(0, n);
});
@ -150,16 +132,12 @@ fn read_buf_uninitialized_ok() {
// Can't create BytesMut w/ zero capacity, so fill it up
let mut buf = BytesMut::with_capacity(64);
let mut task = MockTask::new();
unsafe {
buf.bytes_mut()[0..11].copy_from_slice(b"hello world");
}
task.enter(|cx| {
let rd = Rd;
pin_mut!(rd);
task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(0, n);
});

View File

@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::AtomicWaker;
use tokio_test::task::MockTask;
use tokio_test::task;
use std::task::Waker;
@ -16,23 +16,21 @@ impl AssertSync for Waker {}
#[test]
fn basic_usage() {
let waker = AtomicWaker::new();
let mut task = MockTask::new();
let mut waker = task::spawn(AtomicWaker::new());
task.enter(|cx| waker.register_by_ref(cx.waker()));
waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
waker.wake();
assert!(task.is_woken());
assert!(waker.is_woken());
}
#[test]
fn wake_without_register() {
let waker = AtomicWaker::new();
let mut waker = task::spawn(AtomicWaker::new());
waker.wake();
// Registering should not result in a notification
let mut task = MockTask::new();
task.enter(|cx| waker.register_by_ref(cx.waker()));
waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
assert!(!task.is_woken());
assert!(!waker.is_woken());
}

View File

@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::sync::mpsc;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
@ -14,13 +14,12 @@ impl AssertSend for mpsc::Receiver<i32> {}
#[test]
fn send_recv_with_buffer() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let (mut tx, mut rx) = mpsc::channel::<i32>(16);
let (tx, rx) = mpsc::channel::<i32>(16);
let mut tx = task::spawn(tx);
let mut rx = task::spawn(rx);
// Using poll_ready / try_send
assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx)));
assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
tx.try_send(1).unwrap();
// Without poll_ready
@ -28,13 +27,13 @@ fn send_recv_with_buffer() {
drop(tx);
let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
let val = assert_ready!(t2.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert!(val.is_none());
}
@ -56,15 +55,10 @@ async fn async_send_recv_with_buffer() {
fn send_sink_recv_with_buffer() {
use futures_core::Stream;
use futures_sink::Sink;
use futures_util::pin_mut;
let mut t1 = MockTask::new();
let (tx, rx) = mpsc::channel::<i32>(16);
t1.enter(|cx| {
pin_mut!(tx);
task::spawn(tx).enter(|cx, mut tx| {
assert_ready_ok!(tx.as_mut().poll_ready(cx));
assert_ok!(tx.as_mut().start_send(1));
@ -75,9 +69,7 @@ fn send_sink_recv_with_buffer() {
assert_ready_ok!(tx.as_mut().poll_close(cx));
});
t1.enter(|cx| {
pin_mut!(rx);
task::spawn(rx).enter(|cx, mut rx| {
let val = assert_ready!(rx.as_mut().poll_next(cx));
assert_eq!(val, Some(1));
@ -91,26 +83,26 @@ fn send_sink_recv_with_buffer() {
#[test]
fn start_send_past_cap() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let mut t3 = MockTask::new();
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
let mut t3 = task::spawn(());
let (mut tx1, mut rx) = mpsc::channel(1);
let mut tx2 = tx1.clone();
assert_ok!(tx1.try_send(()));
t1.enter(|cx| {
t1.enter(|cx, _| {
assert_pending!(tx1.poll_ready(cx));
});
t2.enter(|cx| {
t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
drop(tx1);
let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_some());
assert!(t2.is_woken());
@ -118,7 +110,7 @@ fn start_send_past_cap() {
drop(tx2);
let val = t3.enter(|cx| assert_ready!(rx.poll_recv(cx)));
let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_none());
}
@ -130,7 +122,7 @@ fn buffer_gteq_one() {
#[test]
fn send_recv_unbounded() {
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
@ -138,15 +130,15 @@ fn send_recv_unbounded() {
assert_ok!(tx.try_send(1));
assert_ok!(tx.try_send(2));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
drop(tx);
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_none());
}
@ -170,11 +162,11 @@ fn sink_send_recv_unbounded() {
use futures_sink::Sink;
use futures_util::pin_mut;
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (tx, rx) = mpsc::unbounded_channel::<i32>();
t1.enter(|cx| {
t1.enter(|cx, _| {
pin_mut!(tx);
assert_ready_ok!(tx.as_mut().poll_ready(cx));
@ -187,7 +179,7 @@ fn sink_send_recv_unbounded() {
assert_ready_ok!(tx.as_mut().poll_close(cx));
});
t1.enter(|cx| {
t1.enter(|cx, _| {
pin_mut!(rx);
let val = assert_ready!(rx.as_mut().poll_next(cx));
@ -205,7 +197,7 @@ fn sink_send_recv_unbounded() {
fn no_t_bounds_buffer() {
struct NoImpls;
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::channel(100);
// sender should be Debug even though T isn't Debug
@ -215,7 +207,7 @@ fn no_t_bounds_buffer() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
}
@ -223,7 +215,7 @@ fn no_t_bounds_buffer() {
fn no_t_bounds_unbounded() {
struct NoImpls;
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::unbounded_channel();
// sender should be Debug even though T isn't Debug
@ -233,19 +225,19 @@ fn no_t_bounds_unbounded() {
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
}
#[test]
fn send_recv_buffer_limited() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel::<i32>(1);
// Run on a task context
t1.enter(|cx| {
t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
// Send first message
@ -258,7 +250,7 @@ fn send_recv_buffer_limited() {
assert_err!(tx.try_send(1337));
});
t2.enter(|cx| {
t2.enter(|cx, _| {
// Take the value
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(1), val);
@ -266,7 +258,7 @@ fn send_recv_buffer_limited() {
assert!(t1.is_woken());
t1.enter(|cx| {
t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
assert_ok!(tx.try_send(2));
@ -275,26 +267,26 @@ fn send_recv_buffer_limited() {
assert_pending!(tx.poll_ready(cx));
});
t2.enter(|cx| {
t2.enter(|cx, _| {
// Take the value
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(2), val);
});
t1.enter(|cx| {
t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
});
}
#[test]
fn recv_close_gets_none_idle() {
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
rx.close();
t1.enter(|cx| {
t1.enter(|cx, _| {
let val = assert_ready!(rx.poll_recv(cx));
assert!(val.is_none());
assert_ready_err!(tx.poll_ready(cx));
@ -303,16 +295,16 @@ fn recv_close_gets_none_idle() {
#[test]
fn recv_close_gets_none_reserved() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let mut t3 = MockTask::new();
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
let mut t3 = task::spawn(());
let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx)));
assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
t2.enter(|cx| {
t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
@ -320,11 +312,11 @@ fn recv_close_gets_none_reserved() {
assert!(t2.is_woken());
t2.enter(|cx| {
t2.enter(|cx, _| {
assert_ready_err!(tx2.poll_ready(cx));
});
t3.enter(|cx| assert_pending!(rx.poll_recv(cx)));
t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));
assert!(!t1.is_woken());
assert!(!t2.is_woken());
@ -333,7 +325,7 @@ fn recv_close_gets_none_reserved() {
assert!(t3.is_woken());
t3.enter(|cx| {
t3.enter(|cx, _| {
let v = assert_ready!(rx.poll_recv(cx));
assert_eq!(v, Some(123));
@ -344,12 +336,12 @@ fn recv_close_gets_none_reserved() {
#[test]
fn tx_close_gets_none() {
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (_, mut rx) = mpsc::channel::<i32>(10);
// Run on a task context
t1.enter(|cx| {
t1.enter(|cx, _| {
let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
@ -357,7 +349,7 @@ fn tx_close_gets_none() {
#[test]
fn try_send_fail() {
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel(1);
@ -367,32 +359,32 @@ fn try_send_fail() {
let err = assert_err!(tx.try_send("fail"));
assert!(err.is_full());
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));
assert_ok!(tx.try_send("goodbye"));
drop(tx);
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("goodbye"));
let val = assert_ready!(t1.enter(|cx| rx.poll_recv(cx)));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_none());
}
#[test]
fn drop_tx_with_permit_releases_permit() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
let (mut tx1, _rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx)));
assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
t2.enter(|cx| {
t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
@ -400,12 +392,12 @@ fn drop_tx_with_permit_releases_permit() {
assert!(t2.is_woken());
assert_ready_ok!(t2.enter(|cx| tx2.poll_ready(cx)));
assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
}
#[test]
fn dropping_rx_closes_channel() {
let mut t1 = MockTask::new();
let mut t1 = task::spawn(());
let (mut tx, rx) = mpsc::channel(100);
@ -413,7 +405,7 @@ fn dropping_rx_closes_channel() {
assert_ok!(tx.try_send(msg.clone()));
drop(rx);
assert_ready_err!(t1.enter(|cx| tx.poll_ready(cx)));
assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));
assert_eq!(1, Arc::strong_count(&msg));
}

View File

@ -1,25 +1,27 @@
#![warn(rust_2018_idioms)]
use tokio::sync::oneshot;
use tokio_test::task::MockTask;
use tokio_test::*;
use std::future::Future;
use std::pin::Pin;
trait AssertSend: Send {}
impl AssertSend for oneshot::Sender<i32> {}
impl AssertSend for oneshot::Receiver<i32> {}
#[test]
fn send_recv() {
let (tx, mut rx) = oneshot::channel();
let mut task = MockTask::new();
let (tx, rx) = oneshot::channel();
let mut rx = task::spawn(rx);
assert_pending!(task.poll(&mut rx));
assert_pending!(rx.poll());
assert_ok!(tx.send(1));
assert!(task.is_woken());
assert!(rx.is_woken());
let val = assert_ready_ok!(task.poll(&mut rx));
let val = assert_ready_ok!(rx.poll());
assert_eq!(val, 1);
}
@ -33,15 +35,15 @@ async fn async_send_recv() {
#[test]
fn close_tx() {
let (tx, mut rx) = oneshot::channel::<i32>();
let mut task = MockTask::new();
let (tx, rx) = oneshot::channel::<i32>();
let mut rx = task::spawn(rx);
assert_pending!(task.poll(&mut rx));
assert_pending!(rx.poll());
drop(tx);
assert!(task.is_woken());
assert_ready_err!(task.poll(&mut rx));
assert!(rx.is_woken());
assert_ready_err!(rx.poll());
}
#[test]
@ -54,18 +56,18 @@ fn close_rx() {
// Second, via poll_closed();
let (mut tx, rx) = oneshot::channel();
let mut task = MockTask::new();
let (tx, rx) = oneshot::channel();
let mut tx = task::spawn(tx);
assert_pending!(task.enter(|cx| tx.poll_closed(cx)));
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
drop(rx);
assert!(task.is_woken());
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(task.enter(|cx| tx.poll_closed(cx)));
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_err!(tx.send(1));
assert_err!(tx.into_inner().send(1));
}
#[tokio::test]
@ -82,43 +84,46 @@ async fn async_rx_closed() {
#[test]
fn explicit_close_poll() {
// First, with message sent
let (tx, mut rx) = oneshot::channel();
let mut task = MockTask::new();
let (tx, rx) = oneshot::channel();
let mut rx = task::spawn(rx);
assert_ok!(tx.send(1));
rx.close();
let value = assert_ready_ok!(task.poll(&mut rx));
let value = assert_ready_ok!(rx.poll());
assert_eq!(value, 1);
// Second, without the message sent
let (mut tx, mut rx) = oneshot::channel::<i32>();
let (tx, rx) = oneshot::channel::<i32>();
let mut tx = task::spawn(tx);
let mut rx = task::spawn(rx);
assert_pending!(task.enter(|cx| tx.poll_closed(cx)));
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
rx.close();
assert!(task.is_woken());
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(task.enter(|cx| tx.poll_closed(cx)));
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_err!(tx.send(1));
assert_ready_err!(task.poll(&mut rx));
assert_err!(tx.into_inner().send(1));
assert_ready_err!(rx.poll());
// Again, but without sending the value this time
let (mut tx, mut rx) = oneshot::channel::<i32>();
let mut task = MockTask::new();
let (tx, rx) = oneshot::channel::<i32>();
let mut tx = task::spawn(tx);
let mut rx = task::spawn(rx);
assert_pending!(task.enter(|cx| tx.poll_closed(cx)));
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
rx.close();
assert!(task.is_woken());
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(task.enter(|cx| tx.poll_closed(cx)));
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_ready_err!(task.poll(&mut rx));
assert_ready_err!(rx.poll());
}
#[test]
@ -134,16 +139,16 @@ fn explicit_close_try_recv() {
assert_eq!(1, val);
// Second, without the message sent
let (mut tx, mut rx) = oneshot::channel::<i32>();
let mut task = MockTask::new();
let (tx, mut rx) = oneshot::channel::<i32>();
let mut tx = task::spawn(tx);
assert_pending!(task.enter(|cx| tx.poll_closed(cx)));
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
rx.close();
assert!(task.is_woken());
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(task.enter(|cx| tx.poll_closed(cx)));
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_err!(rx.try_recv());
}
@ -151,24 +156,24 @@ fn explicit_close_try_recv() {
#[test]
#[should_panic]
fn close_try_recv_poll() {
let (_tx, mut rx) = oneshot::channel::<i32>();
let mut task = MockTask::new();
let (_tx, rx) = oneshot::channel::<i32>();
let mut rx = task::spawn(rx);
rx.close();
assert_err!(rx.try_recv());
let _ = task.poll(&mut rx);
let _ = rx.poll();
}
#[test]
fn drops_tasks() {
let (mut tx, mut rx) = oneshot::channel::<i32>();
let mut tx_task = MockTask::new();
let mut rx_task = MockTask::new();
let mut tx_task = task::spawn(());
let mut rx_task = task::spawn(());
assert_pending!(tx_task.enter(|cx| tx.poll_closed(cx)));
assert_pending!(rx_task.poll(&mut rx));
assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
drop(tx);
drop(rx);
@ -181,15 +186,15 @@ fn drops_tasks() {
fn receiver_changes_task() {
let (tx, mut rx) = oneshot::channel();
let mut task1 = MockTask::new();
let mut task2 = MockTask::new();
let mut task1 = task::spawn(());
let mut task2 = task::spawn(());
assert_pending!(task1.poll(&mut rx));
assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
assert_eq!(2, task1.waker_ref_count());
assert_eq!(1, task2.waker_ref_count());
assert_pending!(task2.poll(&mut rx));
assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
assert_eq!(1, task1.waker_ref_count());
assert_eq!(2, task2.waker_ref_count());
@ -199,22 +204,22 @@ fn receiver_changes_task() {
assert!(!task1.is_woken());
assert!(task2.is_woken());
assert_ready_ok!(task2.poll(&mut rx));
assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
}
#[test]
fn sender_changes_task() {
let (mut tx, rx) = oneshot::channel::<i32>();
let mut task1 = MockTask::new();
let mut task2 = MockTask::new();
let mut task1 = task::spawn(());
let mut task2 = task::spawn(());
assert_pending!(task1.enter(|cx| tx.poll_closed(cx)));
assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
assert_eq!(2, task1.waker_ref_count());
assert_eq!(1, task2.waker_ref_count());
assert_pending!(task2.enter(|cx| tx.poll_closed(cx)));
assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
assert_eq!(1, task1.waker_ref_count());
assert_eq!(2, task2.waker_ref_count());
@ -224,5 +229,5 @@ fn sender_changes_task() {
assert!(!task1.is_woken());
assert!(task2.is_woken());
assert_ready!(task2.enter(|cx| tx.poll_closed(cx)));
assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
}

View File

@ -1,53 +1,49 @@
#![warn(rust_2018_idioms)]
use tokio::sync::semaphore::{Permit, Semaphore};
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};
#[test]
fn available_permits() {
let mut t1 = MockTask::new();
let s = Semaphore::new(100);
assert_eq!(s.available_permits(), 100);
// Polling for a permit succeeds immediately
let mut permit = Permit::new();
let mut permit = task::spawn(Permit::new());
assert!(!permit.is_acquired());
assert_ready_ok!(t1.enter(|cx| permit.poll_acquire(cx, &s)));
assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_eq!(s.available_permits(), 99);
assert!(permit.is_acquired());
// Polling again on the same waiter does not claim a new permit
assert_ready_ok!(t1.enter(|cx| permit.poll_acquire(cx, &s)));
assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_eq!(s.available_permits(), 99);
assert!(permit.is_acquired());
}
#[test]
fn unavailable_permits() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let s = Semaphore::new(1);
let mut permit_1 = Permit::new();
let mut permit_2 = Permit::new();
let mut permit_1 = task::spawn(Permit::new());
let mut permit_2 = task::spawn(Permit::new());
// Acquire the first permit
assert_ready_ok!(t1.enter(|cx| permit_1.poll_acquire(cx, &s)));
assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_eq!(s.available_permits(), 0);
t2.enter(|cx| {
permit_2.enter(|cx, mut p| {
// Try to acquire the second permit
assert_pending!(permit_2.poll_acquire(cx, &s));
assert_pending!(p.poll_acquire(cx, &s));
});
permit_1.release(&s);
assert_eq!(s.available_permits(), 0);
assert!(t2.is_woken());
assert_ready_ok!(t2.enter(|cx| permit_2.poll_acquire(cx, &s)));
assert!(permit_2.is_woken());
assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, &s)));
permit_2.release(&s);
assert_eq!(s.available_permits(), 1);
@ -55,22 +51,20 @@ fn unavailable_permits() {
#[test]
fn zero_permits() {
let mut t1 = MockTask::new();
let s = Semaphore::new(0);
assert_eq!(s.available_permits(), 0);
let mut permit = Permit::new();
let mut permit = task::spawn(Permit::new());
// Try to acquire the permit
t1.enter(|cx| {
assert_pending!(permit.poll_acquire(cx, &s));
permit.enter(|cx, mut p| {
assert_pending!(p.poll_acquire(cx, &s));
});
s.add_permits(1);
assert!(t1.is_woken());
assert_ready_ok!(t1.enter(|cx| permit.poll_acquire(cx, &s)));
assert!(permit.is_woken());
assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
}
#[test]
@ -82,62 +76,53 @@ fn validates_max_permits() {
#[test]
fn close_semaphore_prevents_acquire() {
let mut t1 = MockTask::new();
let s = Semaphore::new(1);
s.close();
assert_eq!(1, s.available_permits());
let mut permit = Permit::new();
let mut permit = task::spawn(Permit::new());
assert_ready_err!(t1.enter(|cx| permit.poll_acquire(cx, &s)));
assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_eq!(1, s.available_permits());
}
#[test]
fn close_semaphore_notifies_permit1() {
let mut t1 = MockTask::new();
let s = Semaphore::new(0);
let mut permit = Permit::new();
let mut permit = task::spawn(Permit::new());
assert_pending!(t1.enter(|cx| permit.poll_acquire(cx, &s)));
assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
s.close();
assert!(t1.is_woken());
assert_ready_err!(t1.enter(|cx| permit.poll_acquire(cx, &s)));
assert!(permit.is_woken());
assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, &s)));
}
#[test]
fn close_semaphore_notifies_permit2() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let mut t3 = MockTask::new();
let mut t4 = MockTask::new();
let s = Semaphore::new(2);
let mut permit1 = Permit::new();
let mut permit2 = Permit::new();
let mut permit3 = Permit::new();
let mut permit4 = Permit::new();
let mut permit1 = task::spawn(Permit::new());
let mut permit2 = task::spawn(Permit::new());
let mut permit3 = task::spawn(Permit::new());
let mut permit4 = task::spawn(Permit::new());
// Acquire a couple of permits
assert_ready_ok!(t1.enter(|cx| permit1.poll_acquire(cx, &s)));
assert_ready_ok!(t2.enter(|cx| permit2.poll_acquire(cx, &s)));
assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_pending!(t3.enter(|cx| permit3.poll_acquire(cx, &s)));
assert_pending!(t4.enter(|cx| permit4.poll_acquire(cx, &s)));
assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s)));
s.close();
assert!(t3.is_woken());
assert!(t4.is_woken());
assert!(permit3.is_woken());
assert!(permit4.is_woken());
assert_ready_err!(t3.enter(|cx| permit3.poll_acquire(cx, &s)));
assert_ready_err!(t4.enter(|cx| permit4.poll_acquire(cx, &s)));
assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, &s)));
assert_eq!(0, s.available_permits());
@ -145,7 +130,7 @@ fn close_semaphore_notifies_permit2() {
assert_eq!(1, s.available_permits());
assert_ready_err!(t1.enter(|cx| permit1.poll_acquire(cx, &s)));
assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, &s)));
permit2.release(&s);

View File

@ -2,21 +2,19 @@
use tokio::timer::delay;
use tokio::timer::timer::Handle;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{assert_pending, assert_ready, clock};
use std::time::{Duration, Instant};
#[test]
fn immediate_delay() {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create `Delay` that elapsed immediately.
let mut fut = delay(clock.now());
let mut fut = task::spawn(delay(clock.now()));
// Ready!
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
// Turn the timer, it runs for the elapsed time
clock.turn_for(ms(1000));
@ -28,38 +26,34 @@ fn immediate_delay() {
#[test]
fn delayed_delay_level_0() {
let mut task = MockTask::new();
for &i in &[1, 10, 60] {
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut fut = delay(clock.now() + ms(i));
let mut fut = task::spawn(delay(clock.now() + ms(i)));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(i));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
}
#[test]
fn sub_ms_delayed_delay() {
let mut task = MockTask::new();
clock::mock(|clock| {
for _ in 0..5 {
let deadline = clock.now() + Duration::from_millis(1) + Duration::new(0, 1);
let mut fut = delay(deadline);
let mut fut = task::spawn(delay(deadline));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn();
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
assert!(clock.now() >= deadline);
@ -70,68 +64,62 @@ fn sub_ms_delayed_delay() {
#[test]
fn delayed_delay_wrapping_level_0() {
let mut task = MockTask::new();
clock::mock(|clock| {
clock.turn_for(ms(5));
assert_eq!(clock.advanced(), ms(5));
let mut fut = delay(clock.now() + ms(60));
let mut fut = task::spawn(delay(clock.now() + ms(60)));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(64));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(65));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
#[test]
fn timer_wrapping_with_higher_levels() {
let mut task = MockTask::new();
clock::mock(|clock| {
// Set delay to hit level 1
let mut s1 = delay(clock.now() + ms(64));
assert_pending!(task.poll(&mut s1));
let mut s1 = task::spawn(delay(clock.now() + ms(64)));
assert_pending!(s1.poll());
// Turn a bit
clock.turn_for(ms(5));
// Set timeout such that it will hit level 0, but wrap
let mut s2 = delay(clock.now() + ms(60));
assert_pending!(task.poll(&mut s2));
let mut s2 = task::spawn(delay(clock.now() + ms(60)));
assert_pending!(s2.poll());
// This should result in s1 firing
clock.turn();
assert_eq!(clock.advanced(), ms(64));
assert_ready!(task.poll(&mut s1));
assert_pending!(task.poll(&mut s2));
assert_ready!(s1.poll());
assert_pending!(s2.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(65));
assert_ready!(task.poll(&mut s2));
assert_ready!(s2.poll());
});
}
#[test]
fn delay_with_deadline_in_past() {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create `Delay` that elapsed immediately.
let mut fut = delay(clock.now() - ms(100));
let mut fut = task::spawn(delay(clock.now() - ms(100)));
// Even though the delay expires in the past, it is not ready yet
// because the timer must observe it.
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
// Turn the timer, it runs for the elapsed time
clock.turn_for(ms(1000));
@ -143,56 +131,54 @@ fn delay_with_deadline_in_past() {
#[test]
fn delayed_delay_level_1() {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut fut = delay(clock.now() + ms(234));
let mut fut = task::spawn(delay(clock.now() + ms(234)));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer, this will wake up to cascade the timer down.
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(192));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer again
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(234));
// The delay has elapsed.
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut fut = delay(clock.now() + ms(234));
let mut fut = task::spawn(delay(clock.now() + ms(234)));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer with a smaller timeout than the cascade.
clock.turn_for(ms(100));
assert_eq!(clock.advanced(), ms(100));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer, this will wake up to cascade the timer down.
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(192));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer again
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(234));
// The delay has elapsed.
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
@ -202,39 +188,35 @@ fn creating_delay_outside_of_context() {
// This creates a delay outside of the context of a mock timer. This tests
// that it will still expire.
let mut fut = delay(now + ms(500));
let mut task = MockTask::new();
let mut fut = task::spawn(delay(now + ms(500)));
clock::mock_at(now, |clock| {
// This registers the delay with the timer
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
// Wait some time... the timer is cascading
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(448));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(500));
// The delay has elapsed
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
#[test]
fn concurrently_set_two_timers_second_one_shorter() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
clock::mock(|clock| {
let mut fut1 = delay(clock.now() + ms(500));
let mut fut2 = delay(clock.now() + ms(200));
let mut fut1 = task::spawn(delay(clock.now() + ms(500)));
let mut fut2 = task::spawn(delay(clock.now() + ms(200)));
// The delay has not elapsed
assert_pending!(t1.poll(&mut fut1));
assert_pending!(t2.poll(&mut fut2));
assert_pending!(fut1.poll());
assert_pending!(fut2.poll());
// Delay until a cascade
clock.turn();
@ -245,38 +227,36 @@ fn concurrently_set_two_timers_second_one_shorter() {
assert_eq!(clock.advanced(), ms(200));
// The shorter delay fires
assert_ready!(t2.poll(&mut fut2));
assert_pending!(t1.poll(&mut fut1));
assert_ready!(fut2.poll());
assert_pending!(fut1.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(448));
assert_pending!(t1.poll(&mut fut1));
assert_pending!(fut1.poll());
// Turn again, this time the time will advance to the second delay
clock.turn();
assert_eq!(clock.advanced(), ms(500));
assert_ready!(t1.poll(&mut fut1));
assert_ready!(fut1.poll());
})
}
#[test]
fn short_delay() {
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut fut = delay(clock.now() + ms(1));
let mut fut = task::spawn(delay(clock.now() + ms(1)));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer, but not enough time will go by.
clock.turn();
// The delay has elapsed.
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
// The time has advanced to the point of the delay elapsing.
assert_eq!(clock.advanced(), ms(1));
@ -287,14 +267,12 @@ fn short_delay() {
fn sorta_long_delay() {
const MIN_5: u64 = 5 * 60 * 1000;
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut fut = delay(clock.now() + ms(MIN_5));
let mut fut = task::spawn(delay(clock.now() + ms(MIN_5)));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
@ -302,14 +280,14 @@ fn sorta_long_delay() {
clock.turn();
assert_eq!(clock.advanced(), ms(elapsed));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
}
clock.turn();
assert_eq!(clock.advanced(), ms(MIN_5));
// The delay has elapsed.
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
})
}
@ -317,14 +295,12 @@ fn sorta_long_delay() {
fn very_long_delay() {
const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut fut = delay(clock.now() + ms(MO_5));
let mut fut = task::spawn(delay(clock.now() + ms(MO_5)));
// The delay has not elapsed.
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
let cascades = &[
12_884_901_888,
@ -337,7 +313,7 @@ fn very_long_delay() {
clock.turn();
assert_eq!(clock.advanced(), ms(elapsed));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
}
// Turn the timer, but not enough time will go by.
@ -347,7 +323,7 @@ fn very_long_delay() {
assert_eq!(clock.advanced(), ms(MO_5));
// The delay has elapsed.
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
})
}
@ -356,43 +332,37 @@ fn very_long_delay() {
fn greater_than_max() {
const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
let mut task = MockTask::new();
clock::mock(|clock| {
// Create a `Delay` that elapses in the future
let mut fut = delay(clock.now() + ms(YR_5));
let mut fut = task::spawn(delay(clock.now() + ms(YR_5)));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn_for(ms(0));
// boom
let _ = task.poll(&mut fut);
let _ = fut.poll();
})
}
#[test]
fn unpark_is_delayed() {
let mut t1 = MockTask::new();
let mut t2 = MockTask::new();
let mut t3 = MockTask::new();
clock::mock(|clock| {
let mut fut1 = delay(clock.now() + ms(100));
let mut fut2 = delay(clock.now() + ms(101));
let mut fut3 = delay(clock.now() + ms(200));
let mut fut1 = task::spawn(delay(clock.now() + ms(100)));
let mut fut2 = task::spawn(delay(clock.now() + ms(101)));
let mut fut3 = task::spawn(delay(clock.now() + ms(200)));
assert_pending!(t1.poll(&mut fut1));
assert_pending!(t2.poll(&mut fut2));
assert_pending!(t3.poll(&mut fut3));
assert_pending!(fut1.poll());
assert_pending!(fut2.poll());
assert_pending!(fut3.poll());
clock.park_for(ms(500));
assert_eq!(clock.advanced(), ms(500));
assert_ready!(t1.poll(&mut fut1));
assert_ready!(t2.poll(&mut fut2));
assert_ready!(t3.poll(&mut fut3));
assert_ready!(fut1.poll());
assert_ready!(fut2.poll());
assert_ready!(fut3.poll());
})
}
@ -401,102 +371,92 @@ fn set_timeout_at_deadline_greater_than_max_timer() {
const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
const YR_5: u64 = 5 * YR_1;
let mut task = MockTask::new();
clock::mock(|clock| {
for _ in 0..5 {
clock.turn_for(ms(YR_1));
}
let mut fut = delay(clock.now() + ms(1));
assert_pending!(task.poll(&mut fut));
let mut fut = task::spawn(delay(clock.now() + ms(1)));
assert_pending!(fut.poll());
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(YR_5) + ms(1));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
#[test]
fn reset_future_delay_before_fire() {
let mut task = MockTask::new();
clock::mock(|clock| {
let mut fut = delay(clock.now() + ms(100));
let mut fut = task::spawn(delay(clock.now() + ms(100)));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
fut.reset(clock.now() + ms(200));
clock.turn();
assert_eq!(clock.advanced(), ms(192));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(200));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
#[test]
fn reset_past_delay_before_turn() {
let mut task = MockTask::new();
clock::mock(|clock| {
let mut fut = delay(clock.now() + ms(100));
let mut fut = task::spawn(delay(clock.now() + ms(100)));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
fut.reset(clock.now() + ms(80));
clock.turn();
assert_eq!(clock.advanced(), ms(64));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(80));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
#[test]
fn reset_past_delay_before_fire() {
let mut task = MockTask::new();
clock::mock(|clock| {
let mut fut = delay(clock.now() + ms(100));
let mut fut = task::spawn(delay(clock.now() + ms(100)));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn_for(ms(10));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
fut.reset(clock.now() + ms(80));
clock.turn();
assert_eq!(clock.advanced(), ms(64));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn();
assert_eq!(clock.advanced(), ms(90));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
#[test]
fn reset_future_delay_after_fire() {
let mut task = MockTask::new();
clock::mock(|clock| {
let mut fut = delay(clock.now() + ms(100));
let mut fut = task::spawn(delay(clock.now() + ms(100)));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(64));
@ -504,15 +464,15 @@ fn reset_future_delay_after_fire() {
clock.turn();
assert_eq!(clock.advanced(), ms(100));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
fut.reset(clock.now() + ms(10));
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn_for(ms(1000));
assert_eq!(clock.advanced(), ms(110));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}
@ -520,16 +480,15 @@ fn reset_future_delay_after_fire() {
fn delay_with_default_handle() {
let handle = Handle::default();
let now = Instant::now();
let mut task = MockTask::new();
let mut fut = handle.delay(now + ms(1));
let mut fut = task::spawn(handle.delay(now + ms(1)));
clock::mock_at(now, |clock| {
assert_pending!(task.poll(&mut fut));
assert_pending!(fut.poll());
clock.turn_for(ms(1));
assert_ready!(task.poll(&mut fut));
assert_ready!(fut.poll());
});
}

View File

@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::timer::*;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{assert_pending, assert_ready_eq, clock};
use std::time::Duration;
@ -16,36 +16,28 @@ fn interval_zero_duration() {
#[test]
fn usage() {
let mut task = MockTask::new();
clock::mock(|clock| {
let start = clock.now();
let mut int = Interval::new(start, ms(300));
let mut int = task::spawn(Interval::new(start, ms(300)));
macro_rules! poll {
() => {
task.enter(|cx| int.poll_next(cx))
};
}
assert_ready_eq!(poll!(), Some(start));
assert_pending!(poll!());
assert_ready_eq!(int.poll_next(), Some(start));
assert_pending!(int.poll_next());
clock.advance(ms(100));
assert_pending!(poll!());
assert_pending!(int.poll_next());
clock.advance(ms(200));
assert_ready_eq!(poll!(), Some(start + ms(300)));
assert_pending!(poll!());
assert_ready_eq!(int.poll_next(), Some(start + ms(300)));
assert_pending!(int.poll_next());
clock.advance(ms(400));
assert_ready_eq!(poll!(), Some(start + ms(600)));
assert_pending!(poll!());
assert_ready_eq!(int.poll_next(), Some(start + ms(600)));
assert_pending!(int.poll_next());
clock.advance(ms(500));
assert_ready_eq!(poll!(), Some(start + ms(900)));
assert_ready_eq!(poll!(), Some(start + ms(1200)));
assert_pending!(poll!());
assert_ready_eq!(int.poll_next(), Some(start + ms(900)));
assert_ready_eq!(int.poll_next(), Some(start + ms(1200)));
assert_pending!(int.poll_next());
});
}

View File

@ -1,14 +1,14 @@
#![warn(rust_2018_idioms)]
use tokio::timer::*;
use tokio_test::task::MockTask;
use tokio_test::{assert_ok, assert_pending, assert_ready, clock};
use tokio_test::{assert_ok, assert_pending, assert_ready};
use tokio_test::{clock, task};
use std::time::Duration;
macro_rules! poll {
($task:ident, $queue:ident) => {
$task.enter(|cx| $queue.poll_next(cx))
($queue:ident) => {
$queue.enter(|cx, mut queue| queue.poll_next(cx))
};
}
@ -23,26 +23,22 @@ macro_rules! assert_ready_ok {
#[test]
fn single_immediate_delay() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let _key = queue.insert_at("foo", clock.now());
let entry = assert_ready_ok!(poll!(t, queue));
let entry = assert_ready_ok!(poll!(queue));
assert_eq!(*entry.get_ref(), "foo");
let entry = assert_ready!(poll!(t, queue));
let entry = assert_ready!(poll!(queue));
assert!(entry.is_none())
});
}
#[test]
fn multi_immediate_delays() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let _k = queue.insert_at("1", clock.now());
let _k = queue.insert_at("2", clock.now());
@ -51,11 +47,11 @@ fn multi_immediate_delays() {
let mut res = vec![];
while res.len() < 3 {
let entry = assert_ready_ok!(poll!(t, queue));
let entry = assert_ready_ok!(poll!(queue));
res.push(entry.into_inner());
}
let entry = assert_ready!(poll!(t, queue));
let entry = assert_ready!(poll!(queue));
assert!(entry.is_none());
res.sort();
@ -68,26 +64,24 @@ fn multi_immediate_delays() {
#[test]
fn single_short_delay() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let _key = queue.insert_at("foo", clock.now() + ms(5));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
clock.turn_for(ms(1));
assert!(!t.is_woken());
assert!(!queue.is_woken());
clock.turn_for(ms(5));
assert!(t.is_woken());
assert!(queue.is_woken());
let entry = assert_ready_ok!(poll!(t, queue));
let entry = assert_ready_ok!(poll!(queue));
assert_eq!(*entry.get_ref(), "foo");
let entry = assert_ready!(poll!(t, queue));
let entry = assert_ready!(poll!(queue));
assert!(entry.is_none());
});
}
@ -97,33 +91,31 @@ fn multi_delay_at_start() {
let long = 262_144 + 9 * 4096;
let delays = &[1000, 2, 234, long, 60, 10];
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
// Setup the delays
for &i in delays {
let _key = queue.insert_at(i, clock.now() + ms(i));
}
assert_pending!(poll!(t, queue));
assert!(!t.is_woken());
assert_pending!(poll!(queue));
assert!(!queue.is_woken());
for elapsed in 0..1200 {
clock.turn_for(ms(1));
let elapsed = elapsed + 1;
if delays.contains(&elapsed) {
assert!(t.is_woken());
assert_ready!(poll!(t, queue));
assert_pending!(poll!(t, queue));
assert!(queue.is_woken());
assert_ready!(poll!(queue));
assert_pending!(poll!(queue));
} else {
if t.is_woken() {
if queue.is_woken() {
let cascade = &[192, 960];
assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
}
}
}
@ -132,10 +124,8 @@ fn multi_delay_at_start() {
#[test]
fn insert_in_past_fires_immediately() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let now = clock.now();
@ -143,73 +133,67 @@ fn insert_in_past_fires_immediately() {
queue.insert_at("foo", now);
assert_ready!(poll!(t, queue));
assert_ready!(poll!(queue));
});
}
#[test]
fn remove_entry() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let key = queue.insert_at("foo", clock.now() + ms(5));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
let entry = queue.remove(&key);
assert_eq!(entry.into_inner(), "foo");
clock.turn_for(ms(10));
let entry = assert_ready!(poll!(t, queue));
let entry = assert_ready!(poll!(queue));
assert!(entry.is_none());
});
}
#[test]
fn reset_entry() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let now = clock.now();
let key = queue.insert_at("foo", now + ms(5));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
clock.turn_for(ms(1));
queue.reset_at(&key, now + ms(10));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
clock.turn_for(ms(7));
assert!(!t.is_woken());
assert!(!queue.is_woken());
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
clock.turn_for(ms(3));
assert!(t.is_woken());
assert!(queue.is_woken());
let entry = assert_ready_ok!(poll!(t, queue));
let entry = assert_ready_ok!(poll!(queue));
assert_eq!(*entry.get_ref(), "foo");
let entry = assert_ready!(poll!(t, queue));
let entry = assert_ready!(poll!(queue));
assert!(entry.is_none())
});
}
#[test]
fn reset_much_later() {
let mut t = MockTask::new();
// Reproduces tokio-rs/tokio#849.
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let epoch = clock.now();
@ -217,7 +201,7 @@ fn reset_much_later() {
let key = queue.insert_at("foo", epoch + ms(200));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
clock.turn_for(ms(3));
@ -225,17 +209,15 @@ fn reset_much_later() {
clock.turn_for(ms(20));
assert!(t.is_woken());
assert!(queue.is_woken());
});
}
#[test]
fn reset_twice() {
let mut t = MockTask::new();
// Reproduces tokio-rs/tokio#849.
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let epoch = clock.now();
@ -243,7 +225,7 @@ fn reset_twice() {
let key = queue.insert_at("foo", epoch + ms(200));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
clock.turn_for(ms(3));
@ -255,7 +237,7 @@ fn reset_twice() {
clock.turn_for(ms(20));
assert!(t.is_woken());
assert!(queue.is_woken());
});
}
@ -277,45 +259,41 @@ fn remove_expired_item() {
#[test]
fn expires_before_last_insert() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let epoch = clock.now();
queue.insert_at("foo", epoch + ms(10_000));
// Delay should be set to 8.192s here.
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
// Delay should be set to the delay of the new item here
queue.insert_at("bar", epoch + ms(600));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
clock.advance(ms(600));
assert!(t.is_woken());
assert!(queue.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
let entry = assert_ready_ok!(poll!(queue)).into_inner();
assert_eq!(entry, "bar");
})
}
#[test]
fn multi_reset() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let epoch = clock.now();
let foo = queue.insert_at("foo", epoch + ms(200));
let bar = queue.insert_at("bar", epoch + ms(250));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
queue.reset_at(&foo, epoch + ms(300));
queue.reset_at(&bar, epoch + ms(350));
@ -325,73 +303,67 @@ fn multi_reset() {
#[test]
fn expire_first_key_when_reset_to_expire_earlier() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let epoch = clock.now();
let foo = queue.insert_at("foo", epoch + ms(200));
queue.insert_at("bar", epoch + ms(250));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
queue.reset_at(&foo, epoch + ms(100));
clock.advance(ms(100));
assert!(t.is_woken());
assert!(queue.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
let entry = assert_ready_ok!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");
})
}
#[test]
fn expire_second_key_when_reset_to_expire_earlier() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let epoch = clock.now();
queue.insert_at("foo", epoch + ms(200));
let bar = queue.insert_at("bar", epoch + ms(250));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
queue.reset_at(&bar, epoch + ms(100));
clock.advance(ms(100));
assert!(t.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
assert!(queue.is_woken());
let entry = assert_ready_ok!(poll!(queue)).into_inner();
assert_eq!(entry, "bar");
})
}
#[test]
fn reset_first_expiring_item_to_expire_later() {
let mut t = MockTask::new();
clock::mock(|clock| {
let mut queue = DelayQueue::new();
let mut queue = task::spawn(DelayQueue::new());
let epoch = clock.now();
let foo = queue.insert_at("foo", epoch + ms(200));
let _bar = queue.insert_at("bar", epoch + ms(250));
assert_pending!(poll!(t, queue));
assert_pending!(poll!(queue));
queue.reset_at(&foo, epoch + ms(300));
clock.advance(ms(250));
assert!(t.is_woken());
assert!(queue.is_woken());
let entry = assert_ready_ok!(poll!(t, queue)).into_inner();
let entry = assert_ready_ok!(poll!(queue)).into_inner();
assert_eq!(entry, "bar");
})
}

View File

@ -2,63 +2,51 @@
use tokio::sync::mpsc;
use tokio::timer::throttle::Throttle;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{assert_pending, assert_ready_eq, clock};
use futures_core::Stream;
use std::time::Duration;
macro_rules! poll {
($task:ident, $stream:ident) => {{
use std::pin::Pin;
$task.enter(|cx| Pin::new(&mut $stream).poll_next(cx))
}};
}
#[test]
fn throttle() {
let mut t = MockTask::new();
clock::mock(|clock| {
let (mut tx, rx) = mpsc::unbounded_channel();
let mut stream = Throttle::new(rx, ms(1));
let mut stream = task::spawn(Throttle::new(rx, ms(1)));
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
for i in 0..3 {
tx.try_send(i).unwrap();
}
for i in 0..3 {
assert_ready_eq!(poll!(t, stream), Some(i));
assert_pending!(poll!(t, stream));
assert_ready_eq!(stream.poll_next(), Some(i));
assert_pending!(stream.poll_next());
clock.advance(ms(1));
}
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
});
}
#[test]
fn throttle_dur_0() {
let mut t = MockTask::new();
clock::mock(|_| {
let (mut tx, rx) = mpsc::unbounded_channel();
let mut stream = Throttle::new(rx, ms(0));
let mut stream = task::spawn(Throttle::new(rx, ms(0)));
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
for i in 0..3 {
tx.try_send(i).unwrap();
}
for i in 0..3 {
assert_ready_eq!(poll!(t, stream), Some(i));
assert_ready_eq!(stream.poll_next(), Some(i));
}
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
});
}

View File

@ -2,7 +2,7 @@
use tokio::sync::oneshot;
use tokio::timer::*;
use tokio_test::task::MockTask;
use tokio_test::task;
use tokio_test::{
assert_err, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, clock,
};
@ -11,79 +11,70 @@ use std::time::Duration;
#[test]
fn simultaneous_deadline_future_completion() {
let mut t = MockTask::new();
clock::mock(|clock| {
// Create a future that is immediately ready
let fut = Box::pin(Timeout::new_at(async {}, clock.now()));
let mut fut = task::spawn(Timeout::new_at(async {}, clock.now()));
// Ready!
assert_ready_ok!(t.poll(fut));
assert_ready_ok!(fut.poll());
});
}
#[test]
fn completed_future_past_deadline() {
let mut t = MockTask::new();
clock::mock(|clock| {
// Wrap it with a deadline
let fut = Timeout::new_at(async {}, clock.now() - ms(1000));
let fut = Box::pin(fut);
let mut fut = task::spawn(Timeout::new_at(async {}, clock.now() - ms(1000)));
// Ready!
assert_ready_ok!(t.poll(fut));
assert_ready_ok!(fut.poll());
});
}
#[test]
fn future_and_deadline_in_future() {
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (tx, rx) = oneshot::channel();
// Wrap it with a deadline
let mut fut = Timeout::new_at(rx, clock.now() + ms(100));
let mut fut = task::spawn(Timeout::new_at(rx, clock.now() + ms(100)));
assert_pending!(t.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer, it runs for the elapsed time
clock.advance(ms(90));
assert_pending!(t.poll(&mut fut));
assert_pending!(fut.poll());
// Complete the future
tx.send(()).unwrap();
assert_ready_ok!(t.poll(&mut fut)).unwrap();
assert_ready_ok!(fut.poll()).unwrap();
});
}
#[test]
fn future_and_timeout_in_future() {
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (tx, rx) = oneshot::channel();
// Wrap it with a deadline
let mut fut = Timeout::new(rx, ms(100));
let mut fut = task::spawn(Timeout::new(rx, ms(100)));
// Ready!
assert_pending!(t.poll(&mut fut));
assert_pending!(fut.poll());
// Turn the timer, it runs for the elapsed time
clock.advance(ms(90));
assert_pending!(t.poll(&mut fut));
assert_pending!(fut.poll());
// Complete the future
tx.send(()).unwrap();
assert_ready_ok!(t.poll(&mut fut)).unwrap();
assert_ready_ok!(fut.poll()).unwrap();
});
}
@ -103,64 +94,51 @@ impl Future for Empty {
#[test]
fn deadline_now_elapses() {
let mut t = MockTask::new();
clock::mock(|clock| {
// Wrap it with a deadline
let mut fut = Timeout::new_at(Empty, clock.now());
let mut fut = task::spawn(Timeout::new_at(Empty, clock.now()));
assert_ready_err!(t.poll(&mut fut));
assert_ready_err!(fut.poll());
});
}
#[test]
fn deadline_future_elapses() {
let mut t = MockTask::new();
clock::mock(|clock| {
// Wrap it with a deadline
let mut fut = Timeout::new_at(Empty, clock.now() + ms(300));
let mut fut = task::spawn(Timeout::new_at(Empty, clock.now() + ms(300)));
assert_pending!(t.poll(&mut fut));
assert_pending!(fut.poll());
clock.advance(ms(300));
assert_ready_err!(t.poll(&mut fut));
assert_ready_err!(fut.poll());
});
}
macro_rules! poll {
($task:ident, $stream:ident) => {{
use futures_core::Stream;
$task.enter(|cx| Pin::new(&mut $stream).poll_next(cx))
}};
}
#[test]
fn stream_and_timeout_in_future() {
use tokio::sync::mpsc;
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (mut tx, rx) = mpsc::unbounded_channel();
// Wrap it with a deadline
let mut stream = Timeout::new(rx, ms(100));
let mut stream = task::spawn(Timeout::new(rx, ms(100)));
// Not ready
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
// Turn the timer, it runs for the elapsed time
clock.advance(ms(90));
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
// Complete the future
tx.try_send(()).unwrap();
let item = assert_ready!(poll!(t, stream));
let item = assert_ready!(stream.poll_next());
assert!(item.is_some());
});
}
@ -169,30 +147,28 @@ fn stream_and_timeout_in_future() {
fn idle_stream_timesout_periodically() {
use tokio::sync::mpsc;
let mut t = MockTask::new();
clock::mock(|clock| {
// Not yet complete
let (_tx, rx) = mpsc::unbounded_channel::<()>();
// Wrap it with a deadline
let mut stream = Timeout::new(rx, ms(100));
let mut stream = task::spawn(Timeout::new(rx, ms(100)));
// Not ready
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
// Turn the timer, it runs for the elapsed time
clock.advance(ms(100));
let v = assert_ready!(poll!(t, stream)).unwrap();
let v = assert_ready!(stream.poll_next()).unwrap();
assert_err!(v);
// Stream's timeout should reset
assert_pending!(poll!(t, stream));
assert_pending!(stream.poll_next());
// Turn the timer, it runs for the elapsed time
clock.advance(ms(100));
let v = assert_ready!(poll!(t, stream)).unwrap();
let v = assert_ready!(stream.poll_next()).unwrap();
assert_err!(v)
});
}