mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
402 lines
8.9 KiB
Rust
402 lines
8.9 KiB
Rust
#![deny(warnings, rust_2018_idioms)]
|
|
|
|
mod support;
|
|
use crate::support::*;
|
|
|
|
use futures::Stream;
|
|
use tokio_mock_task::MockTask;
|
|
use tokio_timer::*;
|
|
|
|
#[test]
|
|
fn single_immediate_delay() {
|
|
mocked(|_timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let _key = queue.insert_at("foo", time.now());
|
|
|
|
let entry = assert_ready!(queue).unwrap();
|
|
assert_eq!(*entry.get_ref(), "foo");
|
|
|
|
let entry = assert_ready!(queue);
|
|
assert!(entry.is_none())
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn multi_immediate_delays() {
|
|
mocked(|_timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
|
|
let _k = queue.insert_at("1", time.now());
|
|
let _k = queue.insert_at("2", time.now());
|
|
let _k = queue.insert_at("3", time.now());
|
|
|
|
let mut res = vec![];
|
|
|
|
while res.len() < 3 {
|
|
let entry = assert_ready!(queue).unwrap();
|
|
res.push(entry.into_inner());
|
|
}
|
|
|
|
let entry = assert_ready!(queue);
|
|
assert!(entry.is_none());
|
|
|
|
res.sort();
|
|
|
|
assert_eq!("1", res[0]);
|
|
assert_eq!("2", res[1]);
|
|
assert_eq!("3", res[2]);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn single_short_delay() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let _key = queue.insert_at("foo", time.now() + ms(5));
|
|
|
|
let mut task = MockTask::new();
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
turn(timer, ms(1));
|
|
|
|
assert!(!task.is_notified());
|
|
|
|
turn(timer, ms(5));
|
|
|
|
assert!(task.is_notified());
|
|
|
|
let entry = assert_ready!(queue).unwrap();
|
|
assert_eq!(*entry.get_ref(), "foo");
|
|
|
|
let entry = assert_ready!(queue);
|
|
assert!(entry.is_none());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn multi_delay_at_start() {
|
|
let long = 262_144 + 9 * 4096;
|
|
let delays = &[1000, 2, 234, long, 60, 10];
|
|
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
// Setup the delays
|
|
for &i in delays {
|
|
let _key = queue.insert_at(i, time.now() + ms(i));
|
|
}
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
assert!(!task.is_notified());
|
|
|
|
for elapsed in 0..1200 {
|
|
turn(timer, ms(1));
|
|
let elapsed = elapsed + 1;
|
|
|
|
if delays.contains(&elapsed) {
|
|
assert!(task.is_notified());
|
|
|
|
task.enter(|| {
|
|
assert_ready!(queue);
|
|
assert_not_ready!(queue);
|
|
});
|
|
} else {
|
|
if task.is_notified() {
|
|
let cascade = &[192, 960];
|
|
assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue, "elapsed={}", elapsed);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn insert_in_past_fires_immediately() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
|
|
let now = time.now();
|
|
|
|
turn(timer, ms(10));
|
|
|
|
queue.insert_at("foo", now);
|
|
|
|
assert_ready!(queue);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn remove_entry() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let key = queue.insert_at("foo", time.now() + ms(5));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
let entry = queue.remove(&key);
|
|
assert_eq!(entry.into_inner(), "foo");
|
|
|
|
turn(timer, ms(10));
|
|
|
|
task.enter(|| {
|
|
let entry = assert_ready!(queue);
|
|
assert!(entry.is_none());
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn reset_entry() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let now = time.now();
|
|
let key = queue.insert_at("foo", now + ms(5));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
turn(timer, ms(1));
|
|
|
|
queue.reset_at(&key, now + ms(10));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
turn(timer, ms(7));
|
|
|
|
assert!(!task.is_notified());
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
turn(timer, ms(3));
|
|
|
|
assert!(task.is_notified());
|
|
|
|
let entry = assert_ready!(queue).unwrap();
|
|
assert_eq!(*entry.get_ref(), "foo");
|
|
|
|
let entry = assert_ready!(queue);
|
|
assert!(entry.is_none())
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn reset_much_later() {
|
|
// Reproduces tokio-rs/tokio#849.
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let epoch = time.now();
|
|
|
|
turn(timer, ms(1));
|
|
|
|
let key = queue.insert_at("foo", epoch + ms(200));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
turn(timer, ms(3));
|
|
|
|
queue.reset_at(&key, epoch + ms(5));
|
|
|
|
turn(timer, ms(20));
|
|
|
|
assert!(task.is_notified());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn reset_twice() {
|
|
// Reproduces tokio-rs/tokio#849.
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let epoch = time.now();
|
|
|
|
turn(timer, ms(1));
|
|
|
|
let key = queue.insert_at("foo", epoch + ms(200));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
turn(timer, ms(3));
|
|
|
|
queue.reset_at(&key, epoch + ms(50));
|
|
|
|
turn(timer, ms(20));
|
|
|
|
queue.reset_at(&key, epoch + ms(40));
|
|
|
|
turn(timer, ms(20));
|
|
|
|
assert!(task.is_notified());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn remove_expired_item() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
|
|
let now = time.now();
|
|
|
|
turn(timer, ms(10));
|
|
|
|
let key = queue.insert_at("foo", now);
|
|
|
|
let entry = queue.remove(&key);
|
|
assert_eq!(entry.into_inner(), "foo");
|
|
})
|
|
}
|
|
|
|
#[test]
|
|
fn expires_before_last_insert() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let epoch = time.now();
|
|
|
|
queue.insert_at("foo", epoch + ms(10_000));
|
|
|
|
// Delay should be set to 8.192s here.
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
// Delay should be set to the delay of the new item here
|
|
queue.insert_at("bar", epoch + ms(600));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
advance(timer, ms(600));
|
|
|
|
assert!(task.is_notified());
|
|
let entry = assert_ready!(queue).unwrap().into_inner();
|
|
assert_eq!(entry, "bar");
|
|
})
|
|
}
|
|
|
|
#[test]
|
|
fn multi_reset() {
|
|
mocked(|_, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let epoch = time.now();
|
|
|
|
let foo = queue.insert_at("foo", epoch + ms(200));
|
|
let bar = queue.insert_at("bar", epoch + ms(250));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
queue.reset_at(&foo, epoch + ms(300));
|
|
queue.reset_at(&bar, epoch + ms(350));
|
|
queue.reset_at(&foo, epoch + ms(400));
|
|
})
|
|
}
|
|
|
|
#[test]
|
|
fn expire_first_key_when_reset_to_expire_earlier() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let epoch = time.now();
|
|
|
|
let foo = queue.insert_at("foo", epoch + ms(200));
|
|
queue.insert_at("bar", epoch + ms(250));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
queue.reset_at(&foo, epoch + ms(100));
|
|
|
|
advance(timer, ms(100));
|
|
|
|
assert!(task.is_notified());
|
|
let entry = assert_ready!(queue).unwrap().into_inner();
|
|
assert_eq!(entry, "foo");
|
|
})
|
|
}
|
|
|
|
#[test]
|
|
fn expire_second_key_when_reset_to_expire_earlier() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let epoch = time.now();
|
|
|
|
queue.insert_at("foo", epoch + ms(200));
|
|
let bar = queue.insert_at("bar", epoch + ms(250));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
queue.reset_at(&bar, epoch + ms(100));
|
|
|
|
advance(timer, ms(100));
|
|
|
|
assert!(task.is_notified());
|
|
let entry = assert_ready!(queue).unwrap().into_inner();
|
|
assert_eq!(entry, "bar");
|
|
})
|
|
}
|
|
|
|
#[test]
|
|
fn reset_first_expiring_item_to_expire_later() {
|
|
mocked(|timer, time| {
|
|
let mut queue = DelayQueue::new();
|
|
let mut task = MockTask::new();
|
|
|
|
let epoch = time.now();
|
|
|
|
let foo = queue.insert_at("foo", epoch + ms(200));
|
|
let _bar = queue.insert_at("bar", epoch + ms(250));
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(queue);
|
|
});
|
|
|
|
queue.reset_at(&foo, epoch + ms(300));
|
|
advance(timer, ms(250));
|
|
|
|
assert!(task.is_notified());
|
|
let entry = assert_ready!(queue).unwrap().into_inner();
|
|
assert_eq!(entry, "bar");
|
|
})
|
|
}
|