From 13c96187f8b44e1f410842a032af8dc63be0a4e0 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 5 Feb 2019 00:37:58 +0200 Subject: [PATCH] tokio-timer: Fix multi reset DelayQueue bug (#871) Fixes #868 --- tokio-timer/src/delay_queue.rs | 40 ++++++-------- tokio-timer/tests/queue.rs | 96 ++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 24 deletions(-) diff --git a/tokio-timer/src/delay_queue.rs b/tokio-timer/src/delay_queue.rs index ee28020ad..39bfb268a 100644 --- a/tokio-timer/src/delay_queue.rs +++ b/tokio-timer/src/delay_queue.rs @@ -342,7 +342,7 @@ impl DelayQueue { let should_set_delay = if let Some(ref delay) = self.delay { let current_exp = self.normalize_deadline(delay.deadline()); current_exp > when - } else { false }; + } else { true }; if should_set_delay { self.delay = Some(self.handle.delay(self.start + Duration::from_millis(when))); @@ -507,24 +507,18 @@ impl DelayQueue { // Normalize the deadline. Values cannot be set to expire in the past. let when = self.normalize_deadline(when); - // This is needed only for the debug assertion inside the if-let. - let old = self.start + Duration::from_millis(self.slab[key.index].when); - self.slab[key.index].when = when; - - if let Some(ref mut delay) = self.delay { - debug_assert!(old >= delay.deadline()); - - let start = self.start; - let next_poll = self.wheel.poll_at() - .map(move |t| start + Duration::from_millis(t)); - - if next_poll != Some(delay.deadline()) { - delay.reset(self.start + Duration::from_millis(when)); - } - } - self.insert_idx(when, key.index); + + let next_deadline = self.next_deadline(); + if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) { + delay.reset(deadline); + } + } + + /// Returns the next time poll as determined by the wheel + fn next_deadline(&mut self) -> Option { + self.wheel.poll_at().map(|poll_at| self.start + Duration::from_millis(poll_at)) } /// Sets the delay of the item associated with `key` to expire after @@ -694,14 +688,12 @@ impl DelayQueue { return Ok(Some(idx).into()); } - let deadline = match self.wheel.poll_at() { - Some(poll_at) => { - self.start + Duration::from_millis(poll_at) - } - None => return Ok(None.into()), - }; + if let Some(deadline) = self.next_deadline() { + self.delay = Some(self.handle.delay(deadline)); + } else { + return Ok(None.into()) + } - self.delay = Some(self.handle.delay(deadline)); } } diff --git a/tokio-timer/tests/queue.rs b/tokio-timer/tests/queue.rs index 17d876070..3d49bb392 100644 --- a/tokio-timer/tests/queue.rs +++ b/tokio-timer/tests/queue.rs @@ -311,3 +311,99 @@ fn expires_before_last_insert() { }) } + +#[test] +fn multi_reset() { + mocked(|_, time| { + let mut queue = DelayQueue::new(); + let mut task = MockTask::new(); + + let epoch = time.now(); + + let foo = queue.insert_at("foo", epoch + ms(200)); + let bar = queue.insert_at("bar", epoch + ms(250)); + + task.enter(|| { + assert_not_ready!(queue); + }); + + queue.reset_at(&foo, epoch + ms(300)); + queue.reset_at(&bar, epoch + ms(350)); + queue.reset_at(&foo, epoch + ms(400)); + }) +} + +#[test] +fn expire_first_key_when_reset_to_expire_earlier() { + mocked(|timer, time| { + let mut queue = DelayQueue::new(); + let mut task = MockTask::new(); + + let epoch = time.now(); + + let foo = queue.insert_at("foo", epoch + ms(200)); + queue.insert_at("bar", epoch + ms(250)); + + task.enter(|| { + assert_not_ready!(queue); + }); + + queue.reset_at(&foo, epoch + ms(100)); + + advance(timer, ms(100)); + + assert!(task.is_notified()); + let entry = assert_ready!(queue).unwrap().into_inner(); + assert_eq!(entry, "foo"); + }) +} + +#[test] +fn expire_second_key_when_reset_to_expire_earlier() { + mocked(|timer, time| { + let mut queue = DelayQueue::new(); + let mut task = MockTask::new(); + + let epoch = time.now(); + + queue.insert_at("foo", epoch + ms(200)); + let bar = queue.insert_at("bar", epoch + ms(250)); + + task.enter(|| { + assert_not_ready!(queue); + }); + + queue.reset_at(&bar, epoch + ms(100)); + + advance(timer, ms(100)); + + assert!(task.is_notified()); + let entry = assert_ready!(queue).unwrap().into_inner(); + assert_eq!(entry, "bar"); + }) +} + + +#[test] +fn reset_first_expiring_item_to_expire_later() { + mocked(|timer, time| { + let mut queue = DelayQueue::new(); + let mut task = MockTask::new(); + + let epoch = time.now(); + + let foo = queue.insert_at("foo", epoch + ms(200)); + let bar = queue.insert_at("bar", epoch + ms(250)); + + task.enter(|| { + assert_not_ready!(queue); + }); + + queue.reset_at(&foo, epoch + ms(300)); + advance(timer, ms(250)); + + assert!(task.is_notified()); + let entry = assert_ready!(queue).unwrap().into_inner(); + assert_eq!(entry, "bar"); + }) +}