mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
Use a timer heap instead of a timer wheel
In general it's easier to implement and should have more predictable performance semantics for applications in general. More serious timer usage can go through `tokio-timer` which has properly configurable timer wheels and such. Closes #2 Closes #7
This commit is contained in:
parent
66cff8e84b
commit
1f0d2198ad
305
src/heap.rs
Normal file
305
src/heap.rs
Normal file
@ -0,0 +1,305 @@
|
||||
//! A simple binary heap with support for removal of arbitrary elements
|
||||
//!
|
||||
//! This heap is used to manage timer state in the event loop. All timeouts go
|
||||
//! into this heap and we also cancel timeouts from this heap. The crucial
|
||||
//! feature of this heap over the standard library's `BinaryHeap` is the ability
|
||||
//! to remove arbitrary elements. (e.g. when a timer is canceled)
|
||||
//!
|
||||
//! Note that this heap is not at all optimized right now, it should hopefully
|
||||
//! just work.
|
||||
|
||||
use std::mem;
|
||||
|
||||
use slab::Slab;
|
||||
|
||||
pub struct Heap<T> {
|
||||
// Binary heap of items, plus the slab index indicating what position in the
|
||||
// list they're in.
|
||||
items: Vec<(T, usize)>,
|
||||
|
||||
// A map from a slab index (assigned to an item above) to the actual index
|
||||
// in the array the item appears at.
|
||||
index: Slab<usize>,
|
||||
}
|
||||
|
||||
pub struct Slot {
|
||||
idx: usize,
|
||||
}
|
||||
|
||||
impl<T: Ord> Heap<T> {
|
||||
pub fn new() -> Heap<T> {
|
||||
Heap {
|
||||
items: Vec::new(),
|
||||
index: Slab::with_capacity(128),
|
||||
}
|
||||
}
|
||||
|
||||
/// Pushes an element onto this heap, returning a slot token indicating
|
||||
/// where it was pushed on to.
|
||||
///
|
||||
/// The slot can later get passed to `remove` to remove the element from the
|
||||
/// heap, but only if the element was previously not removed from the heap.
|
||||
pub fn push(&mut self, t: T) -> Slot {
|
||||
self.assert_consistent();
|
||||
let len = self.items.len();
|
||||
if self.index.available() == 0 {
|
||||
self.index.reserve_exact(len);
|
||||
}
|
||||
let slot_idx = self.index.insert(len).unwrap();
|
||||
self.items.push((t, slot_idx));
|
||||
self.percolate_up(len);
|
||||
self.assert_consistent();
|
||||
Slot { idx: slot_idx }
|
||||
}
|
||||
|
||||
pub fn peek(&self) -> Option<&T> {
|
||||
self.assert_consistent();
|
||||
self.items.get(0).map(|i| &i.0)
|
||||
}
|
||||
|
||||
pub fn pop(&mut self) -> Option<T> {
|
||||
self.assert_consistent();
|
||||
if self.items.len() == 0 {
|
||||
return None
|
||||
}
|
||||
let slot = Slot { idx: self.items[0].1 };
|
||||
Some(self.remove(slot))
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, slot: Slot) -> T {
|
||||
self.assert_consistent();
|
||||
let idx = self.index.remove(slot.idx).unwrap();
|
||||
let (item, slot_idx) = self.items.swap_remove(idx);
|
||||
debug_assert_eq!(slot.idx, slot_idx);
|
||||
if idx < self.items.len() {
|
||||
self.index[self.items[idx].1] = idx;
|
||||
if self.items[idx].0 < item {
|
||||
self.percolate_up(idx);
|
||||
} else {
|
||||
self.percolate_down(idx);
|
||||
}
|
||||
}
|
||||
self.assert_consistent();
|
||||
return item
|
||||
}
|
||||
|
||||
fn percolate_up(&mut self, mut idx: usize) -> usize {
|
||||
while idx > 0 {
|
||||
let parent = (idx - 1) / 2;
|
||||
if self.items[idx].0 >= self.items[parent].0 {
|
||||
break
|
||||
}
|
||||
let (a, b) = self.items.split_at_mut(idx);
|
||||
mem::swap(&mut a[parent], &mut b[0]);
|
||||
self.index[a[parent].1] = parent;
|
||||
self.index[b[0].1] = idx;
|
||||
idx = parent;
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
fn percolate_down(&mut self, mut idx: usize) -> usize {
|
||||
loop {
|
||||
let left = 2 * idx + 1;
|
||||
let right = 2 * idx + 2;
|
||||
|
||||
let mut swap_left = true;
|
||||
match (self.items.get(left), self.items.get(right)) {
|
||||
(Some(left), None) => {
|
||||
if left.0 >= self.items[idx].0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
(Some(left), Some(right)) => {
|
||||
if left.0 < self.items[idx].0 {
|
||||
if right.0 < left.0 {
|
||||
swap_left = false;
|
||||
}
|
||||
} else if right.0 < self.items[idx].0 {
|
||||
swap_left = false;
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
(None, None) => break,
|
||||
(None, Some(_right)) => panic!("not possible"),
|
||||
}
|
||||
|
||||
let (a, b) = if swap_left {
|
||||
self.items.split_at_mut(left)
|
||||
} else {
|
||||
self.items.split_at_mut(right)
|
||||
};
|
||||
mem::swap(&mut a[idx], &mut b[0]);
|
||||
self.index[a[idx].1] = idx;
|
||||
self.index[b[0].1] = a.len();
|
||||
idx = a.len();
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
fn assert_consistent(&self) {
|
||||
if cfg!(not(debug_assertions)) {
|
||||
return
|
||||
}
|
||||
|
||||
assert_eq!(self.items.len(), self.index.len());
|
||||
|
||||
for (i, &(_, j)) in self.items.iter().enumerate() {
|
||||
if self.index[j] != i {
|
||||
panic!("self.index[j] != i : i={} j={} self.index[j]={}",
|
||||
i, j, self.index[j]);
|
||||
}
|
||||
}
|
||||
|
||||
for (i, &(ref item, _)) in self.items.iter().enumerate() {
|
||||
if i > 0 {
|
||||
assert!(*item >= self.items[(i - 1) / 2].0, "bad at index: {}", i);
|
||||
}
|
||||
if let Some(left) = self.items.get(2 * i + 1) {
|
||||
assert!(*item <= left.0, "bad left at index: {}", i);
|
||||
}
|
||||
if let Some(right) = self.items.get(2 * i + 2) {
|
||||
assert!(*item <= right.0, "bad right at index: {}", i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Heap;
|
||||
|
||||
#[test]
|
||||
fn simple() {
|
||||
let mut h = Heap::new();
|
||||
h.push(1);
|
||||
h.push(2);
|
||||
h.push(8);
|
||||
h.push(4);
|
||||
assert_eq!(h.pop(), Some(1));
|
||||
assert_eq!(h.pop(), Some(2));
|
||||
assert_eq!(h.pop(), Some(4));
|
||||
assert_eq!(h.pop(), Some(8));
|
||||
assert_eq!(h.pop(), None);
|
||||
assert_eq!(h.pop(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn simple2() {
|
||||
let mut h = Heap::new();
|
||||
h.push(5);
|
||||
h.push(4);
|
||||
h.push(3);
|
||||
h.push(2);
|
||||
h.push(1);
|
||||
assert_eq!(h.pop(), Some(1));
|
||||
h.push(8);
|
||||
assert_eq!(h.pop(), Some(2));
|
||||
h.push(1);
|
||||
assert_eq!(h.pop(), Some(1));
|
||||
assert_eq!(h.pop(), Some(3));
|
||||
assert_eq!(h.pop(), Some(4));
|
||||
h.push(5);
|
||||
assert_eq!(h.pop(), Some(5));
|
||||
assert_eq!(h.pop(), Some(5));
|
||||
assert_eq!(h.pop(), Some(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove() {
|
||||
let mut h = Heap::new();
|
||||
h.push(5);
|
||||
h.push(4);
|
||||
h.push(3);
|
||||
let two = h.push(2);
|
||||
h.push(1);
|
||||
assert_eq!(h.pop(), Some(1));
|
||||
assert_eq!(h.remove(two), 2);
|
||||
h.push(1);
|
||||
assert_eq!(h.pop(), Some(1));
|
||||
assert_eq!(h.pop(), Some(3));
|
||||
}
|
||||
|
||||
fn vec2heap<T: Ord>(v: Vec<T>) -> Heap<T> {
|
||||
let mut h = Heap::new();
|
||||
for t in v {
|
||||
h.push(t);
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_peek_and_pop() {
|
||||
let data = vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1];
|
||||
let mut sorted = data.clone();
|
||||
sorted.sort();
|
||||
let mut heap = vec2heap(data);
|
||||
while heap.peek().is_some() {
|
||||
assert_eq!(heap.peek().unwrap(), sorted.first().unwrap());
|
||||
assert_eq!(heap.pop().unwrap(), sorted.remove(0));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push() {
|
||||
let mut heap = Heap::new();
|
||||
heap.push(-2);
|
||||
heap.push(-4);
|
||||
heap.push(-9);
|
||||
assert!(*heap.peek().unwrap() == -9);
|
||||
heap.push(-11);
|
||||
assert!(*heap.peek().unwrap() == -11);
|
||||
heap.push(-5);
|
||||
assert!(*heap.peek().unwrap() == -11);
|
||||
heap.push(-27);
|
||||
assert!(*heap.peek().unwrap() == -27);
|
||||
heap.push(-3);
|
||||
assert!(*heap.peek().unwrap() == -27);
|
||||
heap.push(-103);
|
||||
assert!(*heap.peek().unwrap() == -103);
|
||||
}
|
||||
|
||||
fn check_to_vec(mut data: Vec<i32>) {
|
||||
let mut heap = Heap::new();
|
||||
for data in data.iter() {
|
||||
heap.push(*data);
|
||||
}
|
||||
data.sort();
|
||||
let mut v = Vec::new();
|
||||
while let Some(i) = heap.pop() {
|
||||
v.push(i);
|
||||
}
|
||||
assert_eq!(v, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_vec() {
|
||||
check_to_vec(vec![]);
|
||||
check_to_vec(vec![5]);
|
||||
check_to_vec(vec![3, 2]);
|
||||
check_to_vec(vec![2, 3]);
|
||||
check_to_vec(vec![5, 1, 2]);
|
||||
check_to_vec(vec![1, 100, 2, 3]);
|
||||
check_to_vec(vec![1, 3, 5, 7, 9, 2, 4, 6, 8, 0]);
|
||||
check_to_vec(vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]);
|
||||
check_to_vec(vec![9, 11, 9, 9, 9, 9, 11, 2, 3, 4, 11, 9, 0, 0, 0, 0]);
|
||||
check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
|
||||
check_to_vec(vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]);
|
||||
check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 1, 2]);
|
||||
check_to_vec(vec![5, 4, 3, 2, 1, 5, 4, 3, 2, 1, 5, 4, 3, 2, 1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_pop() {
|
||||
let mut heap = Heap::<i32>::new();
|
||||
assert!(heap.pop().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_peek() {
|
||||
let empty = Heap::<i32>::new();
|
||||
assert!(empty.peek().is_none());
|
||||
}
|
||||
}
|
@ -109,7 +109,7 @@ extern crate log;
|
||||
pub mod io;
|
||||
|
||||
mod mpsc_queue;
|
||||
mod timer_wheel;
|
||||
mod heap;
|
||||
pub mod channel;
|
||||
pub mod net;
|
||||
pub mod reactor;
|
||||
|
@ -17,7 +17,7 @@ use futures::task::{self, Unpark, Task, Spawn};
|
||||
use mio;
|
||||
use slab::Slab;
|
||||
|
||||
use timer_wheel::{TimerWheel, Timeout as WheelTimeout};
|
||||
use heap::{Heap, Slot};
|
||||
|
||||
mod channel;
|
||||
mod io_token;
|
||||
@ -68,8 +68,8 @@ struct Inner {
|
||||
// The slab below keeps track of the timeouts themselves as well as the
|
||||
// state of the timeout itself. The `TimeoutToken` type is an index into the
|
||||
// `timeouts` slab.
|
||||
timer_wheel: TimerWheel<usize>,
|
||||
timeouts: Slab<(WheelTimeout, TimeoutState)>,
|
||||
timer_heap: Heap<(Instant, usize)>,
|
||||
timeouts: Slab<(Slot, TimeoutState)>,
|
||||
}
|
||||
|
||||
/// Handle to an event loop, used to construct I/O objects, send messages, and
|
||||
@ -153,7 +153,7 @@ impl Core {
|
||||
io_dispatch: Slab::with_capacity(SLAB_CAPACITY),
|
||||
task_dispatch: Slab::with_capacity(SLAB_CAPACITY),
|
||||
timeouts: Slab::with_capacity(SLAB_CAPACITY),
|
||||
timer_wheel: TimerWheel::new(),
|
||||
timer_heap: Heap::new(),
|
||||
})),
|
||||
})
|
||||
}
|
||||
@ -242,11 +242,11 @@ impl Core {
|
||||
let start = Instant::now();
|
||||
loop {
|
||||
let inner = self.inner.borrow_mut();
|
||||
let timeout = inner.timer_wheel.next_timeout().map(|t| {
|
||||
if t < start {
|
||||
let timeout = inner.timer_heap.peek().map(|t| {
|
||||
if t.0 < start {
|
||||
Duration::new(0, 0)
|
||||
} else {
|
||||
t - start
|
||||
t.0 - start
|
||||
}
|
||||
});
|
||||
match inner.io.poll(&mut self.events, timeout) {
|
||||
@ -351,12 +351,15 @@ impl Core {
|
||||
fn consume_timeouts(&mut self, now: Instant) {
|
||||
loop {
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
let idx = match inner.timer_wheel.poll(now) {
|
||||
Some(idx) => idx,
|
||||
match inner.timer_heap.peek() {
|
||||
Some(head) if head.0 <= now => {}
|
||||
Some(_) => break,
|
||||
None => break,
|
||||
};
|
||||
trace!("firing timeout: {}", idx);
|
||||
let handle = inner.timeouts[idx].1.fire();
|
||||
let (_, slab_idx) = inner.timer_heap.pop().unwrap();
|
||||
|
||||
trace!("firing timeout: {}", slab_idx);
|
||||
let handle = inner.timeouts[slab_idx].1.fire();
|
||||
drop(inner);
|
||||
if let Some(handle) = handle {
|
||||
self.notify_handle(handle);
|
||||
@ -453,11 +456,10 @@ impl Inner {
|
||||
self.timeouts.reserve_exact(len);
|
||||
}
|
||||
let entry = self.timeouts.vacant_entry().unwrap();
|
||||
let timeout = self.timer_wheel.insert(at, entry.index());
|
||||
let when = *timeout.when();
|
||||
let entry = entry.insert((timeout, TimeoutState::NotFired));
|
||||
let slot = self.timer_heap.push((at, entry.index()));
|
||||
let entry = entry.insert((slot, TimeoutState::NotFired));
|
||||
debug!("added a timeout: {}", entry.index());
|
||||
Ok((entry.index(), when))
|
||||
Ok((entry.index(), at))
|
||||
}
|
||||
|
||||
fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
|
||||
@ -468,8 +470,8 @@ impl Inner {
|
||||
fn cancel_timeout(&mut self, token: usize) {
|
||||
debug!("cancel a timeout: {}", token);
|
||||
let pair = self.timeouts.remove(token);
|
||||
if let Some((timeout, _state)) = pair {
|
||||
self.timer_wheel.cancel(&timeout);
|
||||
if let Some((slot, _state)) = pair {
|
||||
self.timer_heap.remove(slot);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,513 +0,0 @@
|
||||
//! A timer wheel implementation
|
||||
|
||||
use std::{mem, usize};
|
||||
use std::time::{Instant, Duration};
|
||||
|
||||
use slab::Slab;
|
||||
|
||||
/// An implementation of a timer wheel where data can be associated with each
|
||||
/// timer firing.
|
||||
///
|
||||
/// This structure implements a timer wheel data structure where each timeout
|
||||
/// has a piece of associated data, `T`. A timer wheel supports O(1) insertion
|
||||
/// and removal of timers, as well as quickly figuring out what needs to get
|
||||
/// fired.
|
||||
///
|
||||
/// Note, though, that the resolution of a timer wheel means that timeouts will
|
||||
/// not arrive promptly when they expire, but rather in certain increments of
|
||||
/// each time. The time delta between each slot of a time wheel is of a fixed
|
||||
/// length, meaning that if a timeout is scheduled between two slots it'll end
|
||||
/// up getting scheduled into the later slot.
|
||||
pub struct TimerWheel<T> {
|
||||
// Actual timer wheel itself.
|
||||
//
|
||||
// Each slot represents a fixed duration of time, and this wheel also
|
||||
// behaves like a ring buffer. All timeouts scheduled will correspond to one
|
||||
// slot and therefore each slot has a linked list of timeouts scheduled in
|
||||
// it. Right now linked lists are done through indices into the `slab`
|
||||
// below.
|
||||
//
|
||||
// Each slot also contains the next timeout associated with it (the minimum
|
||||
// of the entire linked list).
|
||||
wheel: Vec<Slot>,
|
||||
|
||||
// A slab containing all the timeout entries themselves. This is the memory
|
||||
// backing the "linked lists" in the wheel above. Each entry has a prev/next
|
||||
// pointer (indices in this array) along with the data associated with the
|
||||
// timeout and the time the timeout will fire.
|
||||
slab: Slab<Entry<T>, usize>,
|
||||
|
||||
// The instant that this timer was created, through which all other timeout
|
||||
// computations are relative to.
|
||||
start: Instant,
|
||||
|
||||
// State used during `poll`. The `cur_wheel_tick` field is the current tick
|
||||
// we've poll'd to. That is, all events from `cur_wheel_tick` to the
|
||||
// actual current tick in time still need to be processed.
|
||||
//
|
||||
// The `cur_slab_idx` variable is basically just an iterator over the linked
|
||||
// list associated with a wheel slot. This will get incremented as we move
|
||||
// forward in `poll`
|
||||
cur_wheel_tick: u64,
|
||||
cur_slab_idx: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Slot {
|
||||
head: usize,
|
||||
next_timeout: Option<Instant>,
|
||||
}
|
||||
|
||||
struct Entry<T> {
|
||||
data: T,
|
||||
when: Instant,
|
||||
wheel_idx: usize,
|
||||
prev: usize,
|
||||
next: usize,
|
||||
}
|
||||
|
||||
/// A timeout which has been scheduled with a timer wheel.
|
||||
///
|
||||
/// This can be used to later cancel a timeout, if necessary.
|
||||
pub struct Timeout {
|
||||
when: Instant,
|
||||
slab_idx: usize,
|
||||
}
|
||||
|
||||
const EMPTY: usize = usize::MAX;
|
||||
const LEN: usize = 256;
|
||||
const MASK: usize = LEN - 1;
|
||||
const TICK_MS: u64 = 100;
|
||||
|
||||
impl<T> TimerWheel<T> {
|
||||
/// Creates a new timer wheel configured with no timeouts and with the
|
||||
/// default parameters.
|
||||
///
|
||||
/// Currently this is a timer wheel of length 256 with a 100ms time
|
||||
/// resolution.
|
||||
pub fn new() -> TimerWheel<T> {
|
||||
TimerWheel {
|
||||
wheel: vec![Slot { head: EMPTY, next_timeout: None }; LEN],
|
||||
slab: Slab::with_capacity(256),
|
||||
start: Instant::now(),
|
||||
cur_wheel_tick: 0,
|
||||
cur_slab_idx: EMPTY,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new timeout to get fired at a particular point in the future.
|
||||
///
|
||||
/// The timeout will be associated with the specified `data`, and this data
|
||||
/// will be returned from `poll` when it's ready.
|
||||
///
|
||||
/// The returned `Timeout` can later get passesd to `cancel` to retrieve the
|
||||
/// data and ensure the timeout doesn't fire.
|
||||
///
|
||||
/// This method completes in O(1) time.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method will panic if `at` is before the time that this timer wheel
|
||||
/// was created.
|
||||
pub fn insert(&mut self, mut at: Instant, data: T) -> Timeout {
|
||||
// First up, figure out where we're gonna go in the wheel. Note that if
|
||||
// we're being scheduled on or before the current wheel tick we just
|
||||
// make sure to defer ourselves to the next tick.
|
||||
let mut tick = self.time_to_ticks(at);
|
||||
if tick <= self.cur_wheel_tick {
|
||||
debug!("moving {} to {}", tick, self.cur_wheel_tick + 1);
|
||||
tick = self.cur_wheel_tick + 1;
|
||||
}
|
||||
let wheel_idx = self.ticks_to_wheel_idx(tick);
|
||||
trace!("inserting timeout at {} for {}", wheel_idx, tick);
|
||||
|
||||
let actual_tick = self.start +
|
||||
Duration::from_millis(TICK_MS) * (tick as u32);
|
||||
trace!("actual_tick: {:?}", actual_tick);
|
||||
trace!("at: {:?}", at);
|
||||
at = actual_tick;
|
||||
|
||||
// Next, make sure there's enough space in the slab for the timeout.
|
||||
if self.slab.vacant_entry().is_none() {
|
||||
let amt = self.slab.len();
|
||||
self.slab.reserve_exact(amt);
|
||||
}
|
||||
|
||||
// Insert ourselves at the head of the linked list in the wheel.
|
||||
let slot = &mut self.wheel[wheel_idx];
|
||||
let prev_head;
|
||||
{
|
||||
let entry = self.slab.vacant_entry().unwrap();
|
||||
prev_head = mem::replace(&mut slot.head, entry.index());
|
||||
trace!("timer wheel slab idx: {}", entry.index());
|
||||
|
||||
entry.insert(Entry {
|
||||
data: data,
|
||||
when: at,
|
||||
wheel_idx: wheel_idx,
|
||||
prev: EMPTY,
|
||||
next: prev_head,
|
||||
});
|
||||
}
|
||||
if prev_head != EMPTY {
|
||||
self.slab[prev_head].prev = slot.head;
|
||||
}
|
||||
|
||||
// Update the wheel slot's next timeout field.
|
||||
if at <= slot.next_timeout.unwrap_or(at) {
|
||||
debug!("updating[{}] next timeout: {:?}", wheel_idx, at);
|
||||
debug!(" start: {:?}", self.start);
|
||||
slot.next_timeout = Some(at);
|
||||
}
|
||||
|
||||
Timeout {
|
||||
when: at,
|
||||
slab_idx: slot.head,
|
||||
}
|
||||
}
|
||||
|
||||
/// Queries this timer to see if any timeouts are ready to fire.
|
||||
///
|
||||
/// This function will advance the internal wheel to the time specified by
|
||||
/// `at`, returning any timeout which has happened up to that point. This
|
||||
/// method should be called in a loop until it returns `None` to ensure that
|
||||
/// all timeouts are processed.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method will panic if `at` is before the instant that this timer
|
||||
/// wheel was created.
|
||||
pub fn poll(&mut self, at: Instant) -> Option<T> {
|
||||
let wheel_tick = self.time_to_ticks(at);
|
||||
|
||||
trace!("polling {} => {}", self.cur_wheel_tick, wheel_tick);
|
||||
|
||||
// Advance forward in time to the `wheel_tick` specified.
|
||||
//
|
||||
// TODO: don't visit slots in the wheel more than once
|
||||
while self.cur_wheel_tick <= wheel_tick {
|
||||
let head = self.cur_slab_idx;
|
||||
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
|
||||
trace!("next head[{} => {}]: {}",
|
||||
self.cur_wheel_tick, wheel_tick, head);
|
||||
|
||||
// If the current slot has no entries or we're done iterating go to
|
||||
// the next tick.
|
||||
if head == EMPTY {
|
||||
if head == self.wheel[idx].head {
|
||||
self.wheel[idx].next_timeout = None;
|
||||
}
|
||||
self.cur_wheel_tick += 1;
|
||||
let idx = self.ticks_to_wheel_idx(self.cur_wheel_tick);
|
||||
self.cur_slab_idx = self.wheel[idx].head;
|
||||
continue
|
||||
}
|
||||
|
||||
// If we're starting to iterate over a slot, clear its timeout as
|
||||
// we're probably going to remove entries. As we skip over each
|
||||
// element of this slot we'll restore the `next_timeout` field if
|
||||
// necessary.
|
||||
if head == self.wheel[idx].head {
|
||||
self.wheel[idx].next_timeout = None;
|
||||
}
|
||||
|
||||
// Otherwise, continue iterating over the linked list in the wheel
|
||||
// slot we're on and remove anything which has expired.
|
||||
self.cur_slab_idx = self.slab[head].next;
|
||||
let head_timeout = self.slab[head].when;
|
||||
if self.time_to_ticks(head_timeout) <= self.time_to_ticks(at) {
|
||||
return self.remove_slab(head).map(|e| e.data)
|
||||
} else {
|
||||
let next = self.wheel[idx].next_timeout.unwrap_or(head_timeout);
|
||||
if head_timeout <= next {
|
||||
self.wheel[idx].next_timeout = Some(head_timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns the instant in time that corresponds to the next timeout
|
||||
/// scheduled in this wheel.
|
||||
pub fn next_timeout(&self) -> Option<Instant> {
|
||||
// TODO: can this be optimized to not look at the whole array?
|
||||
let mut min = None;
|
||||
for a in self.wheel.iter().filter_map(|s| s.next_timeout.as_ref()) {
|
||||
if let Some(b) = min {
|
||||
if b < a {
|
||||
continue
|
||||
}
|
||||
}
|
||||
min = Some(a);
|
||||
}
|
||||
if let Some(min) = min {
|
||||
debug!("next timeout {:?}", min);
|
||||
debug!("now {:?}", Instant::now());
|
||||
} else {
|
||||
debug!("next timeout never");
|
||||
}
|
||||
min.map(|t| *t)
|
||||
}
|
||||
|
||||
/// Cancels the specified timeout.
|
||||
///
|
||||
/// For timeouts previously registered via `insert` they can be passed back
|
||||
/// to this method to cancel the associated timeout, retrieving the value
|
||||
/// inserted if the timeout has not already fired.
|
||||
///
|
||||
/// This method completes in O(1) time.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method may panic if `timeout` wasn't created by this timer wheel.
|
||||
pub fn cancel(&mut self, timeout: &Timeout) -> Option<T> {
|
||||
match self.slab.get(timeout.slab_idx) {
|
||||
Some(e) if e.when == timeout.when => {}
|
||||
_ => return None,
|
||||
}
|
||||
|
||||
self.remove_slab(timeout.slab_idx).map(|e| e.data)
|
||||
}
|
||||
|
||||
fn remove_slab(&mut self, slab_idx: usize) -> Option<Entry<T>> {
|
||||
debug!("removing timer slab {}", slab_idx);
|
||||
let entry = match self.slab.remove(slab_idx) {
|
||||
Some(e) => e,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
// Remove the node from the linked list
|
||||
if entry.prev == EMPTY {
|
||||
self.wheel[entry.wheel_idx].head = entry.next;
|
||||
} else {
|
||||
self.slab[entry.prev].next = entry.next;
|
||||
}
|
||||
if entry.next != EMPTY {
|
||||
self.slab[entry.next].prev = entry.prev;
|
||||
}
|
||||
|
||||
if self.cur_slab_idx == slab_idx {
|
||||
self.cur_slab_idx = entry.next;
|
||||
}
|
||||
|
||||
return Some(entry)
|
||||
}
|
||||
|
||||
fn time_to_ticks(&self, time: Instant) -> u64 {
|
||||
let dur = time - self.start;
|
||||
let ms = dur.subsec_nanos() as u64 / 1_000_000;
|
||||
let ms = dur.as_secs()
|
||||
.checked_mul(1_000)
|
||||
.and_then(|m| m.checked_add(ms))
|
||||
.expect("overflow scheduling timeout");
|
||||
ms / TICK_MS
|
||||
}
|
||||
|
||||
fn ticks_to_wheel_idx(&self, ticks: u64) -> usize {
|
||||
(ticks as usize) & MASK
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeout {
|
||||
pub fn when(&self) -> &Instant {
|
||||
&self.when
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
extern crate env_logger;
|
||||
|
||||
use std::time::{Instant, Duration};
|
||||
|
||||
use super::TimerWheel;
|
||||
|
||||
fn ms(amt: u64) -> Duration {
|
||||
Duration::from_millis(amt)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
assert!(timer.poll(now).is_none());
|
||||
assert!(timer.poll(now).is_none());
|
||||
|
||||
timer.insert(now + ms(200), 3);
|
||||
|
||||
assert!(timer.poll(now).is_none());
|
||||
assert!(timer.poll(now + ms(100)).is_none());
|
||||
let res = timer.poll(now + ms(200));
|
||||
assert!(res.is_some());
|
||||
assert_eq!(res.unwrap(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_past_done() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
timer.insert(now + ms(200), 3);
|
||||
let res = timer.poll(now + ms(300));
|
||||
assert!(res.is_some());
|
||||
assert_eq!(res.unwrap(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_ready() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
timer.insert(now + ms(200), 3);
|
||||
timer.insert(now + ms(201), 4);
|
||||
timer.insert(now + ms(202), 5);
|
||||
timer.insert(now + ms(300), 6);
|
||||
timer.insert(now + ms(301), 7);
|
||||
|
||||
let mut found = Vec::new();
|
||||
while let Some(i) = timer.poll(now + ms(400)) {
|
||||
found.push(i);
|
||||
}
|
||||
found.sort();
|
||||
assert_eq!(found, [3, 4, 5, 6, 7]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_now() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
timer.insert(now, 3);
|
||||
let res = timer.poll(now + ms(100));
|
||||
assert!(res.is_some());
|
||||
assert_eq!(res.unwrap(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cancel() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let timeout = timer.insert(now + ms(800), 3);
|
||||
assert!(timer.poll(now + ms(200)).is_none());
|
||||
assert!(timer.poll(now + ms(400)).is_none());
|
||||
assert_eq!(timer.cancel(&timeout), Some(3));
|
||||
assert!(timer.poll(now + ms(600)).is_none());
|
||||
assert!(timer.poll(now + ms(800)).is_none());
|
||||
assert!(timer.poll(now + ms(1000)).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_timeout() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = timer.start;
|
||||
|
||||
assert!(timer.next_timeout().is_none());
|
||||
timer.insert(now + ms(400), 3);
|
||||
let timeout = timer.next_timeout().expect("wanted a next_timeout");
|
||||
assert_eq!(timeout, now + ms(400));
|
||||
|
||||
timer.insert(now + ms(1000), 3);
|
||||
let timeout = timer.next_timeout().expect("wanted a next_timeout");
|
||||
assert_eq!(timeout, now + ms(400));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn around_the_boundary() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
timer.insert(now + ms(199), 3);
|
||||
timer.insert(now + ms(200), 4);
|
||||
timer.insert(now + ms(201), 5);
|
||||
timer.insert(now + ms(251), 6);
|
||||
timer.insert(now + ms(299), 7);
|
||||
timer.insert(now + ms(300), 8);
|
||||
timer.insert(now + ms(301), 9);
|
||||
|
||||
let mut found = Vec::new();
|
||||
while let Some(i) = timer.poll(now + ms(200)) {
|
||||
found.push(i);
|
||||
}
|
||||
found.sort();
|
||||
assert_eq!(found, [3, 4, 5, 6, 7]);
|
||||
|
||||
let mut found = Vec::new();
|
||||
while let Some(i) = timer.poll(now + ms(300)) {
|
||||
found.push(i);
|
||||
}
|
||||
found.sort();
|
||||
assert_eq!(found, [8, 9]);
|
||||
assert_eq!(timer.poll(now + ms(300)), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_clears_timeout() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = timer.start;
|
||||
|
||||
timer.insert(now + ms(100), 3);
|
||||
assert_eq!(timer.next_timeout(), Some(timer.start + ms(100)));
|
||||
assert_eq!(timer.poll(now + ms(200)), Some(3));
|
||||
assert_eq!(timer.next_timeout(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_then_poll() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let t = timer.insert(now + ms(1), 3);
|
||||
timer.cancel(&t).unwrap();
|
||||
assert_eq!(timer.poll(now + ms(200)), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_two_then_remove() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let t1 = timer.insert(now + ms(1), 1);
|
||||
timer.insert(now + ms(2), 2);
|
||||
assert_eq!(timer.poll(now + ms(200)), Some(2));
|
||||
timer.cancel(&t1).unwrap();
|
||||
assert_eq!(timer.poll(now + ms(200)), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_then_next_timeout() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = timer.start;
|
||||
|
||||
timer.insert(now + ms(200), 2);
|
||||
assert_eq!(timer.poll(now + ms(100)), None);
|
||||
assert_eq!(timer.next_timeout(), Some(now + ms(200)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_remove_next_timeout() {
|
||||
drop(env_logger::init());
|
||||
let mut timer = TimerWheel::<i32>::new();
|
||||
let now = Instant::now();
|
||||
|
||||
let t = timer.insert(now + ms(200), 2);
|
||||
assert_eq!(timer.cancel(&t), Some(2));
|
||||
if let Some(t) = timer.next_timeout() {
|
||||
assert_eq!(timer.poll(t + ms(100)), None);
|
||||
assert_eq!(timer.next_timeout(), None);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user