mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
Fix a few more timer wheel bugs
This commit is contained in:
parent
681316dfa1
commit
7c4bac5166
@ -134,6 +134,7 @@ impl<T> TimerWheel<T> {
|
|||||||
{
|
{
|
||||||
let entry = self.slab.vacant_entry().unwrap();
|
let entry = self.slab.vacant_entry().unwrap();
|
||||||
prev_head = mem::replace(&mut slot.head, entry.index());
|
prev_head = mem::replace(&mut slot.head, entry.index());
|
||||||
|
trace!("timer wheel slab idx: {}", entry.index());
|
||||||
|
|
||||||
entry.insert(Entry {
|
entry.insert(Entry {
|
||||||
data: data,
|
data: data,
|
||||||
@ -151,7 +152,10 @@ impl<T> TimerWheel<T> {
|
|||||||
if at <= slot.next_timeout.unwrap_or(at) {
|
if at <= slot.next_timeout.unwrap_or(at) {
|
||||||
let tick = tick as u32;
|
let tick = tick as u32;
|
||||||
let actual_tick = self.start + Duration::from_millis(TICK_MS) * tick;
|
let actual_tick = self.start + Duration::from_millis(TICK_MS) * tick;
|
||||||
|
trace!("actual_tick: {:?}", actual_tick);
|
||||||
|
trace!("at: {:?}", at);
|
||||||
let at = cmp::max(actual_tick, at);
|
let at = cmp::max(actual_tick, at);
|
||||||
|
debug!("updating[{}] next timeout: {:?}", wheel_idx, at);
|
||||||
slot.next_timeout = Some(at);
|
slot.next_timeout = Some(at);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,16 +186,19 @@ impl<T> TimerWheel<T> {
|
|||||||
// TODO: don't visit slots in the wheel more than once
|
// TODO: don't visit slots in the wheel more than once
|
||||||
while self.cur_wheel_tick <= wheel_tick {
|
while self.cur_wheel_tick <= wheel_tick {
|
||||||
let head = self.cur_slab_idx;
|
let head = self.cur_slab_idx;
|
||||||
|
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
|
||||||
trace!("next head[{} => {}]: {}",
|
trace!("next head[{} => {}]: {}",
|
||||||
self.cur_wheel_tick, wheel_tick, head);
|
self.cur_wheel_tick, wheel_tick, head);
|
||||||
|
|
||||||
// If the current slot has no entries or we're done iterating go to
|
// If the current slot has no entries or we're done iterating go to
|
||||||
// the next tick.
|
// the next tick.
|
||||||
if head == EMPTY {
|
if head == EMPTY {
|
||||||
|
if head == self.wheel[idx].head {
|
||||||
|
self.wheel[idx].next_timeout = None;
|
||||||
|
}
|
||||||
self.cur_wheel_tick += 1;
|
self.cur_wheel_tick += 1;
|
||||||
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
|
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
|
||||||
self.cur_slab_idx = self.wheel[idx].head;
|
self.cur_slab_idx = self.wheel[idx].head;
|
||||||
self.wheel[idx].next_timeout = None;
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,7 +206,9 @@ impl<T> TimerWheel<T> {
|
|||||||
// we're probably going to remove entries. As we skip over each
|
// we're probably going to remove entries. As we skip over each
|
||||||
// element of this slot we'll restore the `next_timeout` field if
|
// element of this slot we'll restore the `next_timeout` field if
|
||||||
// necessary.
|
// necessary.
|
||||||
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
|
if head == self.wheel[idx].head {
|
||||||
|
self.wheel[idx].next_timeout = None;
|
||||||
|
}
|
||||||
|
|
||||||
// Otherwise, continue iterating over the linked list in the wheel
|
// Otherwise, continue iterating over the linked list in the wheel
|
||||||
// slot we're on and remove anything which has expired.
|
// slot we're on and remove anything which has expired.
|
||||||
@ -233,6 +242,8 @@ impl<T> TimerWheel<T> {
|
|||||||
if let Some(min) = min {
|
if let Some(min) = min {
|
||||||
debug!("next timeout {:?}", min);
|
debug!("next timeout {:?}", min);
|
||||||
debug!("now {:?}", Instant::now());
|
debug!("now {:?}", Instant::now());
|
||||||
|
} else {
|
||||||
|
debug!("next timeout never");
|
||||||
}
|
}
|
||||||
return min
|
return min
|
||||||
}
|
}
|
||||||
@ -458,4 +469,29 @@ mod tests {
|
|||||||
timer.cancel(&t1).unwrap();
|
timer.cancel(&t1).unwrap();
|
||||||
assert_eq!(timer.poll(now + ms(200)), None);
|
assert_eq!(timer.poll(now + ms(200)), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_then_next_timeout() {
|
||||||
|
drop(env_logger::init());
|
||||||
|
let mut timer = TimerWheel::<i32>::new();
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
timer.insert(now + ms(200), 2);
|
||||||
|
assert_eq!(timer.poll(now + ms(100)), None);
|
||||||
|
assert_eq!(timer.next_timeout(), Some(now + ms(200)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn add_remove_next_timeout() {
|
||||||
|
drop(env_logger::init());
|
||||||
|
let mut timer = TimerWheel::<i32>::new();
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
let t = timer.insert(now + ms(200), 2);
|
||||||
|
assert_eq!(timer.cancel(&t), Some(2));
|
||||||
|
if let Some(t) = timer.next_timeout() {
|
||||||
|
assert_eq!(timer.poll(t + ms(100)), None);
|
||||||
|
assert_eq!(timer.next_timeout(), None);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user