diff --git a/src/heap.rs b/src/heap.rs new file mode 100644 index 000000000..953ab87a7 --- /dev/null +++ b/src/heap.rs @@ -0,0 +1,305 @@ +//! A simple binary heap with support for removal of arbitrary elements +//! +//! This heap is used to manage timer state in the event loop. All timeouts go +//! into this heap and we also cancel timeouts from this heap. The crucial +//! feature of this heap over the standard library's `BinaryHeap` is the ability +//! to remove arbitrary elements. (e.g. when a timer is canceled) +//! +//! Note that this heap is not at all optimized right now, it should hopefully +//! just work. + +use std::mem; + +use slab::Slab; + +pub struct Heap { + // Binary heap of items, plus the slab index indicating what position in the + // list they're in. + items: Vec<(T, usize)>, + + // A map from a slab index (assigned to an item above) to the actual index + // in the array the item appears at. + index: Slab, +} + +pub struct Slot { + idx: usize, +} + +impl Heap { + pub fn new() -> Heap { + Heap { + items: Vec::new(), + index: Slab::with_capacity(128), + } + } + + /// Pushes an element onto this heap, returning a slot token indicating + /// where it was pushed on to. + /// + /// The slot can later get passed to `remove` to remove the element from the + /// heap, but only if the element was previously not removed from the heap. + pub fn push(&mut self, t: T) -> Slot { + self.assert_consistent(); + let len = self.items.len(); + if self.index.available() == 0 { + self.index.reserve_exact(len); + } + let slot_idx = self.index.insert(len).unwrap(); + self.items.push((t, slot_idx)); + self.percolate_up(len); + self.assert_consistent(); + Slot { idx: slot_idx } + } + + pub fn peek(&self) -> Option<&T> { + self.assert_consistent(); + self.items.get(0).map(|i| &i.0) + } + + pub fn pop(&mut self) -> Option { + self.assert_consistent(); + if self.items.len() == 0 { + return None + } + let slot = Slot { idx: self.items[0].1 }; + Some(self.remove(slot)) + } + + pub fn remove(&mut self, slot: Slot) -> T { + self.assert_consistent(); + let idx = self.index.remove(slot.idx).unwrap(); + let (item, slot_idx) = self.items.swap_remove(idx); + debug_assert_eq!(slot.idx, slot_idx); + if idx < self.items.len() { + self.index[self.items[idx].1] = idx; + if self.items[idx].0 < item { + self.percolate_up(idx); + } else { + self.percolate_down(idx); + } + } + self.assert_consistent(); + return item + } + + fn percolate_up(&mut self, mut idx: usize) -> usize { + while idx > 0 { + let parent = (idx - 1) / 2; + if self.items[idx].0 >= self.items[parent].0 { + break + } + let (a, b) = self.items.split_at_mut(idx); + mem::swap(&mut a[parent], &mut b[0]); + self.index[a[parent].1] = parent; + self.index[b[0].1] = idx; + idx = parent; + } + return idx + } + + fn percolate_down(&mut self, mut idx: usize) -> usize { + loop { + let left = 2 * idx + 1; + let right = 2 * idx + 2; + + let mut swap_left = true; + match (self.items.get(left), self.items.get(right)) { + (Some(left), None) => { + if left.0 >= self.items[idx].0 { + break + } + } + (Some(left), Some(right)) => { + if left.0 < self.items[idx].0 { + if right.0 < left.0 { + swap_left = false; + } + } else if right.0 < self.items[idx].0 { + swap_left = false; + } else { + break + } + } + + (None, None) => break, + (None, Some(_right)) => panic!("not possible"), + } + + let (a, b) = if swap_left { + self.items.split_at_mut(left) + } else { + self.items.split_at_mut(right) + }; + mem::swap(&mut a[idx], &mut b[0]); + self.index[a[idx].1] = idx; + self.index[b[0].1] = a.len(); + idx = a.len(); + } + return idx + } + + fn assert_consistent(&self) { + if cfg!(not(debug_assertions)) { + return + } + + assert_eq!(self.items.len(), self.index.len()); + + for (i, &(_, j)) in self.items.iter().enumerate() { + if self.index[j] != i { + panic!("self.index[j] != i : i={} j={} self.index[j]={}", + i, j, self.index[j]); + } + } + + for (i, &(ref item, _)) in self.items.iter().enumerate() { + if i > 0 { + assert!(*item >= self.items[(i - 1) / 2].0, "bad at index: {}", i); + } + if let Some(left) = self.items.get(2 * i + 1) { + assert!(*item <= left.0, "bad left at index: {}", i); + } + if let Some(right) = self.items.get(2 * i + 2) { + assert!(*item <= right.0, "bad right at index: {}", i); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::Heap; + + #[test] + fn simple() { + let mut h = Heap::new(); + h.push(1); + h.push(2); + h.push(8); + h.push(4); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.pop(), Some(2)); + assert_eq!(h.pop(), Some(4)); + assert_eq!(h.pop(), Some(8)); + assert_eq!(h.pop(), None); + assert_eq!(h.pop(), None); + } + + #[test] + fn simple2() { + let mut h = Heap::new(); + h.push(5); + h.push(4); + h.push(3); + h.push(2); + h.push(1); + assert_eq!(h.pop(), Some(1)); + h.push(8); + assert_eq!(h.pop(), Some(2)); + h.push(1); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.pop(), Some(3)); + assert_eq!(h.pop(), Some(4)); + h.push(5); + assert_eq!(h.pop(), Some(5)); + assert_eq!(h.pop(), Some(5)); + assert_eq!(h.pop(), Some(8)); + } + + #[test] + fn remove() { + let mut h = Heap::new(); + h.push(5); + h.push(4); + h.push(3); + let two = h.push(2); + h.push(1); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.remove(two), 2); + h.push(1); + assert_eq!(h.pop(), Some(1)); + assert_eq!(h.pop(), Some(3)); + } + + fn vec2heap(v: Vec) -> Heap { + let mut h = Heap::new(); + for t in v { + h.push(t); + } + return h + } + + #[test] + fn test_peek_and_pop() { + let data = vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]; + let mut sorted = data.clone(); + sorted.sort(); + let mut heap = vec2heap(data); + while heap.peek().is_some() { + assert_eq!(heap.peek().unwrap(), sorted.first().unwrap()); + assert_eq!(heap.pop().unwrap(), sorted.remove(0)); + } + } + + #[test] + fn test_push() { + let mut heap = Heap::new(); + heap.push(-2); + heap.push(-4); + heap.push(-9); + assert!(*heap.peek().unwrap() == -9); + heap.push(-11); + assert!(*heap.peek().unwrap() == -11); + heap.push(-5); + assert!(*heap.peek().unwrap() == -11); + heap.push(-27); + assert!(*heap.peek().unwrap() == -27); + heap.push(-3); + assert!(*heap.peek().unwrap() == -27); + heap.push(-103); + assert!(*heap.peek().unwrap() == -103); + } + + fn check_to_vec(mut data: Vec) { + let mut heap = Heap::new(); + for data in data.iter() { + heap.push(*data); + } + data.sort(); + let mut v = Vec::new(); + while let Some(i) = heap.pop() { + v.push(i); + } + assert_eq!(v, data); + } + + #[test] + fn test_to_vec() { + check_to_vec(vec![]); + check_to_vec(vec![5]); + check_to_vec(vec![3, 2]); + check_to_vec(vec![2, 3]); + check_to_vec(vec![5, 1, 2]); + check_to_vec(vec![1, 100, 2, 3]); + check_to_vec(vec![1, 3, 5, 7, 9, 2, 4, 6, 8, 0]); + check_to_vec(vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]); + check_to_vec(vec![9, 11, 9, 9, 9, 9, 11, 2, 3, 4, 11, 9, 0, 0, 0, 0]); + check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + check_to_vec(vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]); + check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 1, 2]); + check_to_vec(vec![5, 4, 3, 2, 1, 5, 4, 3, 2, 1, 5, 4, 3, 2, 1]); + } + + #[test] + fn test_empty_pop() { + let mut heap = Heap::::new(); + assert!(heap.pop().is_none()); + } + + #[test] + fn test_empty_peek() { + let empty = Heap::::new(); + assert!(empty.peek().is_none()); + } +} diff --git a/src/lib.rs b/src/lib.rs index f162e9e8c..409ec8c38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,7 +109,7 @@ extern crate log; pub mod io; mod mpsc_queue; -mod timer_wheel; +mod heap; pub mod channel; pub mod net; pub mod reactor; diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index c3e0a24ed..8065a5640 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -17,7 +17,7 @@ use futures::task::{self, Unpark, Task, Spawn}; use mio; use slab::Slab; -use timer_wheel::{TimerWheel, Timeout as WheelTimeout}; +use heap::{Heap, Slot}; mod channel; mod io_token; @@ -68,8 +68,8 @@ struct Inner { // The slab below keeps track of the timeouts themselves as well as the // state of the timeout itself. The `TimeoutToken` type is an index into the // `timeouts` slab. - timer_wheel: TimerWheel, - timeouts: Slab<(WheelTimeout, TimeoutState)>, + timer_heap: Heap<(Instant, usize)>, + timeouts: Slab<(Slot, TimeoutState)>, } /// Handle to an event loop, used to construct I/O objects, send messages, and @@ -153,7 +153,7 @@ impl Core { io_dispatch: Slab::with_capacity(SLAB_CAPACITY), task_dispatch: Slab::with_capacity(SLAB_CAPACITY), timeouts: Slab::with_capacity(SLAB_CAPACITY), - timer_wheel: TimerWheel::new(), + timer_heap: Heap::new(), })), }) } @@ -242,11 +242,11 @@ impl Core { let start = Instant::now(); loop { let inner = self.inner.borrow_mut(); - let timeout = inner.timer_wheel.next_timeout().map(|t| { - if t < start { + let timeout = inner.timer_heap.peek().map(|t| { + if t.0 < start { Duration::new(0, 0) } else { - t - start + t.0 - start } }); match inner.io.poll(&mut self.events, timeout) { @@ -351,12 +351,15 @@ impl Core { fn consume_timeouts(&mut self, now: Instant) { loop { let mut inner = self.inner.borrow_mut(); - let idx = match inner.timer_wheel.poll(now) { - Some(idx) => idx, + match inner.timer_heap.peek() { + Some(head) if head.0 <= now => {} + Some(_) => break, None => break, }; - trace!("firing timeout: {}", idx); - let handle = inner.timeouts[idx].1.fire(); + let (_, slab_idx) = inner.timer_heap.pop().unwrap(); + + trace!("firing timeout: {}", slab_idx); + let handle = inner.timeouts[slab_idx].1.fire(); drop(inner); if let Some(handle) = handle { self.notify_handle(handle); @@ -453,11 +456,10 @@ impl Inner { self.timeouts.reserve_exact(len); } let entry = self.timeouts.vacant_entry().unwrap(); - let timeout = self.timer_wheel.insert(at, entry.index()); - let when = *timeout.when(); - let entry = entry.insert((timeout, TimeoutState::NotFired)); + let slot = self.timer_heap.push((at, entry.index())); + let entry = entry.insert((slot, TimeoutState::NotFired)); debug!("added a timeout: {}", entry.index()); - Ok((entry.index(), when)) + Ok((entry.index(), at)) } fn update_timeout(&mut self, token: usize, handle: Task) -> Option { @@ -468,8 +470,8 @@ impl Inner { fn cancel_timeout(&mut self, token: usize) { debug!("cancel a timeout: {}", token); let pair = self.timeouts.remove(token); - if let Some((timeout, _state)) = pair { - self.timer_wheel.cancel(&timeout); + if let Some((slot, _state)) = pair { + self.timer_heap.remove(slot); } } diff --git a/src/timer_wheel.rs b/src/timer_wheel.rs deleted file mode 100644 index 4751b3144..000000000 --- a/src/timer_wheel.rs +++ /dev/null @@ -1,513 +0,0 @@ -//! A timer wheel implementation - -use std::{mem, usize}; -use std::time::{Instant, Duration}; - -use slab::Slab; - -/// An implementation of a timer wheel where data can be associated with each -/// timer firing. -/// -/// This structure implements a timer wheel data structure where each timeout -/// has a piece of associated data, `T`. A timer wheel supports O(1) insertion -/// and removal of timers, as well as quickly figuring out what needs to get -/// fired. -/// -/// Note, though, that the resolution of a timer wheel means that timeouts will -/// not arrive promptly when they expire, but rather in certain increments of -/// each time. The time delta between each slot of a time wheel is of a fixed -/// length, meaning that if a timeout is scheduled between two slots it'll end -/// up getting scheduled into the later slot. -pub struct TimerWheel { - // Actual timer wheel itself. - // - // Each slot represents a fixed duration of time, and this wheel also - // behaves like a ring buffer. All timeouts scheduled will correspond to one - // slot and therefore each slot has a linked list of timeouts scheduled in - // it. Right now linked lists are done through indices into the `slab` - // below. - // - // Each slot also contains the next timeout associated with it (the minimum - // of the entire linked list). - wheel: Vec, - - // A slab containing all the timeout entries themselves. This is the memory - // backing the "linked lists" in the wheel above. Each entry has a prev/next - // pointer (indices in this array) along with the data associated with the - // timeout and the time the timeout will fire. - slab: Slab, usize>, - - // The instant that this timer was created, through which all other timeout - // computations are relative to. - start: Instant, - - // State used during `poll`. The `cur_wheel_tick` field is the current tick - // we've poll'd to. That is, all events from `cur_wheel_tick` to the - // actual current tick in time still need to be processed. - // - // The `cur_slab_idx` variable is basically just an iterator over the linked - // list associated with a wheel slot. This will get incremented as we move - // forward in `poll` - cur_wheel_tick: u64, - cur_slab_idx: usize, -} - -#[derive(Clone)] -struct Slot { - head: usize, - next_timeout: Option, -} - -struct Entry { - data: T, - when: Instant, - wheel_idx: usize, - prev: usize, - next: usize, -} - -/// A timeout which has been scheduled with a timer wheel. -/// -/// This can be used to later cancel a timeout, if necessary. -pub struct Timeout { - when: Instant, - slab_idx: usize, -} - -const EMPTY: usize = usize::MAX; -const LEN: usize = 256; -const MASK: usize = LEN - 1; -const TICK_MS: u64 = 100; - -impl TimerWheel { - /// Creates a new timer wheel configured with no timeouts and with the - /// default parameters. - /// - /// Currently this is a timer wheel of length 256 with a 100ms time - /// resolution. - pub fn new() -> TimerWheel { - TimerWheel { - wheel: vec![Slot { head: EMPTY, next_timeout: None }; LEN], - slab: Slab::with_capacity(256), - start: Instant::now(), - cur_wheel_tick: 0, - cur_slab_idx: EMPTY, - } - } - - /// Creates a new timeout to get fired at a particular point in the future. - /// - /// The timeout will be associated with the specified `data`, and this data - /// will be returned from `poll` when it's ready. - /// - /// The returned `Timeout` can later get passesd to `cancel` to retrieve the - /// data and ensure the timeout doesn't fire. - /// - /// This method completes in O(1) time. - /// - /// # Panics - /// - /// This method will panic if `at` is before the time that this timer wheel - /// was created. - pub fn insert(&mut self, mut at: Instant, data: T) -> Timeout { - // First up, figure out where we're gonna go in the wheel. Note that if - // we're being scheduled on or before the current wheel tick we just - // make sure to defer ourselves to the next tick. - let mut tick = self.time_to_ticks(at); - if tick <= self.cur_wheel_tick { - debug!("moving {} to {}", tick, self.cur_wheel_tick + 1); - tick = self.cur_wheel_tick + 1; - } - let wheel_idx = self.ticks_to_wheel_idx(tick); - trace!("inserting timeout at {} for {}", wheel_idx, tick); - - let actual_tick = self.start + - Duration::from_millis(TICK_MS) * (tick as u32); - trace!("actual_tick: {:?}", actual_tick); - trace!("at: {:?}", at); - at = actual_tick; - - // Next, make sure there's enough space in the slab for the timeout. - if self.slab.vacant_entry().is_none() { - let amt = self.slab.len(); - self.slab.reserve_exact(amt); - } - - // Insert ourselves at the head of the linked list in the wheel. - let slot = &mut self.wheel[wheel_idx]; - let prev_head; - { - let entry = self.slab.vacant_entry().unwrap(); - prev_head = mem::replace(&mut slot.head, entry.index()); - trace!("timer wheel slab idx: {}", entry.index()); - - entry.insert(Entry { - data: data, - when: at, - wheel_idx: wheel_idx, - prev: EMPTY, - next: prev_head, - }); - } - if prev_head != EMPTY { - self.slab[prev_head].prev = slot.head; - } - - // Update the wheel slot's next timeout field. - if at <= slot.next_timeout.unwrap_or(at) { - debug!("updating[{}] next timeout: {:?}", wheel_idx, at); - debug!(" start: {:?}", self.start); - slot.next_timeout = Some(at); - } - - Timeout { - when: at, - slab_idx: slot.head, - } - } - - /// Queries this timer to see if any timeouts are ready to fire. - /// - /// This function will advance the internal wheel to the time specified by - /// `at`, returning any timeout which has happened up to that point. This - /// method should be called in a loop until it returns `None` to ensure that - /// all timeouts are processed. - /// - /// # Panics - /// - /// This method will panic if `at` is before the instant that this timer - /// wheel was created. - pub fn poll(&mut self, at: Instant) -> Option { - let wheel_tick = self.time_to_ticks(at); - - trace!("polling {} => {}", self.cur_wheel_tick, wheel_tick); - - // Advance forward in time to the `wheel_tick` specified. - // - // TODO: don't visit slots in the wheel more than once - while self.cur_wheel_tick <= wheel_tick { - let head = self.cur_slab_idx; - let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick); - trace!("next head[{} => {}]: {}", - self.cur_wheel_tick, wheel_tick, head); - - // If the current slot has no entries or we're done iterating go to - // the next tick. - if head == EMPTY { - if head == self.wheel[idx].head { - self.wheel[idx].next_timeout = None; - } - self.cur_wheel_tick += 1; - let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick); - self.cur_slab_idx = self.wheel[idx].head; - continue - } - - // If we're starting to iterate over a slot, clear its timeout as - // we're probably going to remove entries. As we skip over each - // element of this slot we'll restore the `next_timeout` field if - // necessary. - if head == self.wheel[idx].head { - self.wheel[idx].next_timeout = None; - } - - // Otherwise, continue iterating over the linked list in the wheel - // slot we're on and remove anything which has expired. - self.cur_slab_idx = self.slab[head].next; - let head_timeout = self.slab[head].when; - if self.time_to_ticks(head_timeout) <= self.time_to_ticks(at) { - return self.remove_slab(head).map(|e| e.data) - } else { - let next = self.wheel[idx].next_timeout.unwrap_or(head_timeout); - if head_timeout <= next { - self.wheel[idx].next_timeout = Some(head_timeout); - } - } - } - - None - } - - /// Returns the instant in time that corresponds to the next timeout - /// scheduled in this wheel. - pub fn next_timeout(&self) -> Option { - // TODO: can this be optimized to not look at the whole array? - let mut min = None; - for a in self.wheel.iter().filter_map(|s| s.next_timeout.as_ref()) { - if let Some(b) = min { - if b < a { - continue - } - } - min = Some(a); - } - if let Some(min) = min { - debug!("next timeout {:?}", min); - debug!("now {:?}", Instant::now()); - } else { - debug!("next timeout never"); - } - min.map(|t| *t) - } - - /// Cancels the specified timeout. - /// - /// For timeouts previously registered via `insert` they can be passed back - /// to this method to cancel the associated timeout, retrieving the value - /// inserted if the timeout has not already fired. - /// - /// This method completes in O(1) time. - /// - /// # Panics - /// - /// This method may panic if `timeout` wasn't created by this timer wheel. - pub fn cancel(&mut self, timeout: &Timeout) -> Option { - match self.slab.get(timeout.slab_idx) { - Some(e) if e.when == timeout.when => {} - _ => return None, - } - - self.remove_slab(timeout.slab_idx).map(|e| e.data) - } - - fn remove_slab(&mut self, slab_idx: usize) -> Option> { - debug!("removing timer slab {}", slab_idx); - let entry = match self.slab.remove(slab_idx) { - Some(e) => e, - None => return None, - }; - - // Remove the node from the linked list - if entry.prev == EMPTY { - self.wheel[entry.wheel_idx].head = entry.next; - } else { - self.slab[entry.prev].next = entry.next; - } - if entry.next != EMPTY { - self.slab[entry.next].prev = entry.prev; - } - - if self.cur_slab_idx == slab_idx { - self.cur_slab_idx = entry.next; - } - - return Some(entry) - } - - fn time_to_ticks(&self, time: Instant) -> u64 { - let dur = time - self.start; - let ms = dur.subsec_nanos() as u64 / 1_000_000; - let ms = dur.as_secs() - .checked_mul(1_000) - .and_then(|m| m.checked_add(ms)) - .expect("overflow scheduling timeout"); - ms / TICK_MS - } - - fn ticks_to_wheel_idx(&self, ticks: u64) -> usize { - (ticks as usize) & MASK - } -} - -impl Timeout { - pub fn when(&self) -> &Instant { - &self.when - } -} - -#[cfg(test)] -mod tests { - extern crate env_logger; - - use std::time::{Instant, Duration}; - - use super::TimerWheel; - - fn ms(amt: u64) -> Duration { - Duration::from_millis(amt) - } - - #[test] - fn smoke() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - assert!(timer.poll(now).is_none()); - assert!(timer.poll(now).is_none()); - - timer.insert(now + ms(200), 3); - - assert!(timer.poll(now).is_none()); - assert!(timer.poll(now + ms(100)).is_none()); - let res = timer.poll(now + ms(200)); - assert!(res.is_some()); - assert_eq!(res.unwrap(), 3); - } - - #[test] - fn poll_past_done() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - timer.insert(now + ms(200), 3); - let res = timer.poll(now + ms(300)); - assert!(res.is_some()); - assert_eq!(res.unwrap(), 3); - } - - #[test] - fn multiple_ready() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - timer.insert(now + ms(200), 3); - timer.insert(now + ms(201), 4); - timer.insert(now + ms(202), 5); - timer.insert(now + ms(300), 6); - timer.insert(now + ms(301), 7); - - let mut found = Vec::new(); - while let Some(i) = timer.poll(now + ms(400)) { - found.push(i); - } - found.sort(); - assert_eq!(found, [3, 4, 5, 6, 7]); - } - - #[test] - fn poll_now() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - timer.insert(now, 3); - let res = timer.poll(now + ms(100)); - assert!(res.is_some()); - assert_eq!(res.unwrap(), 3); - } - - #[test] - fn cancel() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - let timeout = timer.insert(now + ms(800), 3); - assert!(timer.poll(now + ms(200)).is_none()); - assert!(timer.poll(now + ms(400)).is_none()); - assert_eq!(timer.cancel(&timeout), Some(3)); - assert!(timer.poll(now + ms(600)).is_none()); - assert!(timer.poll(now + ms(800)).is_none()); - assert!(timer.poll(now + ms(1000)).is_none()); - } - - #[test] - fn next_timeout() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = timer.start; - - assert!(timer.next_timeout().is_none()); - timer.insert(now + ms(400), 3); - let timeout = timer.next_timeout().expect("wanted a next_timeout"); - assert_eq!(timeout, now + ms(400)); - - timer.insert(now + ms(1000), 3); - let timeout = timer.next_timeout().expect("wanted a next_timeout"); - assert_eq!(timeout, now + ms(400)); - } - - #[test] - fn around_the_boundary() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - timer.insert(now + ms(199), 3); - timer.insert(now + ms(200), 4); - timer.insert(now + ms(201), 5); - timer.insert(now + ms(251), 6); - timer.insert(now + ms(299), 7); - timer.insert(now + ms(300), 8); - timer.insert(now + ms(301), 9); - - let mut found = Vec::new(); - while let Some(i) = timer.poll(now + ms(200)) { - found.push(i); - } - found.sort(); - assert_eq!(found, [3, 4, 5, 6, 7]); - - let mut found = Vec::new(); - while let Some(i) = timer.poll(now + ms(300)) { - found.push(i); - } - found.sort(); - assert_eq!(found, [8, 9]); - assert_eq!(timer.poll(now + ms(300)), None); - } - - #[test] - fn remove_clears_timeout() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = timer.start; - - timer.insert(now + ms(100), 3); - assert_eq!(timer.next_timeout(), Some(timer.start + ms(100))); - assert_eq!(timer.poll(now + ms(200)), Some(3)); - assert_eq!(timer.next_timeout(), None); - } - - #[test] - fn remove_then_poll() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - let t = timer.insert(now + ms(1), 3); - timer.cancel(&t).unwrap(); - assert_eq!(timer.poll(now + ms(200)), None); - } - - #[test] - fn add_two_then_remove() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - let t1 = timer.insert(now + ms(1), 1); - timer.insert(now + ms(2), 2); - assert_eq!(timer.poll(now + ms(200)), Some(2)); - timer.cancel(&t1).unwrap(); - assert_eq!(timer.poll(now + ms(200)), None); - } - - #[test] - fn poll_then_next_timeout() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = timer.start; - - timer.insert(now + ms(200), 2); - assert_eq!(timer.poll(now + ms(100)), None); - assert_eq!(timer.next_timeout(), Some(now + ms(200))); - } - - #[test] - fn add_remove_next_timeout() { - drop(env_logger::init()); - let mut timer = TimerWheel::::new(); - let now = Instant::now(); - - let t = timer.insert(now + ms(200), 2); - assert_eq!(timer.cancel(&t), Some(2)); - if let Some(t) = timer.next_timeout() { - assert_eq!(timer.poll(t + ms(100)), None); - assert_eq!(timer.next_timeout(), None); - } - } -}