tokio-timer: Fix multi reset DelayQueue bug (#871)

Fixes #868
This commit is contained in:
Zahari Dichev 2019-02-05 00:37:58 +02:00 committed by Carl Lerche
parent 61d4aa98e4
commit 13c96187f8
2 changed files with 112 additions and 24 deletions

View File

@ -342,7 +342,7 @@ impl<T> DelayQueue<T> {
let should_set_delay = if let Some(ref delay) = self.delay {
let current_exp = self.normalize_deadline(delay.deadline());
current_exp > when
} else { false };
} else { true };
if should_set_delay {
self.delay = Some(self.handle.delay(self.start + Duration::from_millis(when)));
@ -507,24 +507,18 @@ impl<T> DelayQueue<T> {
// Normalize the deadline. Values cannot be set to expire in the past.
let when = self.normalize_deadline(when);
// This is needed only for the debug assertion inside the if-let.
let old = self.start + Duration::from_millis(self.slab[key.index].when);
self.slab[key.index].when = when;
if let Some(ref mut delay) = self.delay {
debug_assert!(old >= delay.deadline());
let start = self.start;
let next_poll = self.wheel.poll_at()
.map(move |t| start + Duration::from_millis(t));
if next_poll != Some(delay.deadline()) {
delay.reset(self.start + Duration::from_millis(when));
}
}
self.insert_idx(when, key.index);
let next_deadline = self.next_deadline();
if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
delay.reset(deadline);
}
}
/// Returns the next time poll as determined by the wheel
fn next_deadline(&mut self) -> Option<Instant> {
self.wheel.poll_at().map(|poll_at| self.start + Duration::from_millis(poll_at))
}
/// Sets the delay of the item associated with `key` to expire after
@ -694,14 +688,12 @@ impl<T> DelayQueue<T> {
return Ok(Some(idx).into());
}
let deadline = match self.wheel.poll_at() {
Some(poll_at) => {
self.start + Duration::from_millis(poll_at)
}
None => return Ok(None.into()),
};
if let Some(deadline) = self.next_deadline() {
self.delay = Some(self.handle.delay(deadline));
} else {
return Ok(None.into())
}
self.delay = Some(self.handle.delay(deadline));
}
}

View File

@ -311,3 +311,99 @@ fn expires_before_last_insert() {
})
}
#[test]
fn multi_reset() {
mocked(|_, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let epoch = time.now();
let foo = queue.insert_at("foo", epoch + ms(200));
let bar = queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
queue.reset_at(&foo, epoch + ms(300));
queue.reset_at(&bar, epoch + ms(350));
queue.reset_at(&foo, epoch + ms(400));
})
}
#[test]
fn expire_first_key_when_reset_to_expire_earlier() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let epoch = time.now();
let foo = queue.insert_at("foo", epoch + ms(200));
queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
queue.reset_at(&foo, epoch + ms(100));
advance(timer, ms(100));
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap().into_inner();
assert_eq!(entry, "foo");
})
}
#[test]
fn expire_second_key_when_reset_to_expire_earlier() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let epoch = time.now();
queue.insert_at("foo", epoch + ms(200));
let bar = queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
queue.reset_at(&bar, epoch + ms(100));
advance(timer, ms(100));
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap().into_inner();
assert_eq!(entry, "bar");
})
}
#[test]
fn reset_first_expiring_item_to_expire_later() {
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();
let epoch = time.now();
let foo = queue.insert_at("foo", epoch + ms(200));
let bar = queue.insert_at("bar", epoch + ms(250));
task.enter(|| {
assert_not_ready!(queue);
});
queue.reset_at(&foo, epoch + ms(300));
advance(timer, ms(250));
assert!(task.is_notified());
let entry = assert_ready!(queue).unwrap().into_inner();
assert_eq!(entry, "bar");
})
}