diff --git a/src/reactor/interval.rs b/src/reactor/interval.rs new file mode 100644 index 000000000..a66258fc4 --- /dev/null +++ b/src/reactor/interval.rs @@ -0,0 +1,171 @@ +//! Support for creating futures that represent intervals. +//! +//! This module contains the `Interval` type which is a stream that will +//! resolve at a fixed intervals in future + +use std::io; +use std::time::{Duration, Instant}; + +use futures::{Poll, Async}; +use futures::stream::{Stream}; + +use reactor::{Remote, Handle}; +use reactor::timeout_token::TimeoutToken; + +/// A stream representing notifications at fixed interval +/// +/// Intervals are created through the `Interval::new` or +/// `Interval::new_at` methods indicating when a first notification +/// should be triggered and when it will be repeated. +/// +/// Note that timeouts are not intended for high resolution timers, but rather +/// they will likely fire some granularity after the exact instant that they're +/// otherwise indicated to fire at. +pub struct Interval { + token: TimeoutToken, + next: Instant, + interval: Duration, + handle: Remote, +} + +impl Interval { + /// Creates a new interval which will fire at `dur` time into the future, + /// and will repeat every `dur` interval after + /// + /// This function will return a future that will resolve to the actual + /// interval object. The interval object itself is then a stream which will + /// be set to fire at the specified intervals + pub fn new(dur: Duration, handle: &Handle) -> io::Result { + Interval::new_at(Instant::now() + dur, dur, handle) + } + + /// Creates a new interval which will fire at the time specified by `at`, + /// and then will repeat every `dur` interval after + /// + /// This function will return a future that will resolve to the actual + /// timeout object. The timeout object itself is then a future which will be + /// set to fire at the specified point in the future. + pub fn new_at(at: Instant, dur: Duration, handle: &Handle) + -> io::Result + { + Ok(Interval { + token: try!(TimeoutToken::new(at, &handle)), + next: at, + interval: dur, + handle: handle.remote().clone(), + }) + } +} + +impl Stream for Interval { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { + // TODO: is this fast enough? + let now = Instant::now(); + if self.next <= now { + self.next = next_interval(self.next, now, self.interval); + self.token.reset_timeout(self.next, &self.handle); + Ok(Async::Ready(Some(()))) + } else { + self.token.update_timeout(&self.handle); + Ok(Async::NotReady) + } + } +} + +impl Drop for Interval { + fn drop(&mut self) { + self.token.cancel_timeout(&self.handle); + } +} + +/// Converts Duration object to raw nanoseconds if possible +/// +/// This is useful to divide intervals. +/// +/// While technically for large duration it's impossible to represent any +/// duration as nanoseconds, the largest duration we can represent is about +/// 427_000 years. Large enough for any interval we would use or calculate in +/// tokio. +fn duration_to_nanos(dur: Duration) -> Option { + dur.as_secs() + .checked_mul(1_000_000_000) + .and_then(|v| v.checked_add(dur.subsec_nanos() as u64)) +} + +fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant { + let new = prev + interval; + if new > now { + return new; + } else { + let spent_ns = duration_to_nanos(now.duration_since(prev)) + .expect("interval should be expired"); + let interval_ns = duration_to_nanos(interval) + .expect("interval is less that 427 thousand years"); + let mult = spent_ns/interval_ns + 1; + assert!(mult < (1 << 32), + "can't skip more than 4 billion intervals of {:?} \ + (trying to skip {})", interval, mult); + return prev + interval * (mult as u32); + } +} + +#[cfg(test)] +mod test { + use std::time::{Instant, Duration}; + use super::next_interval; + + struct Timeline(Instant); + + impl Timeline { + fn new() -> Timeline { + Timeline(Instant::now()) + } + fn at(&self, millis: u64) -> Instant { + self.0 + Duration::from_millis(millis) + } + fn at_ns(&self, sec: u64, nanos: u32) -> Instant { + self.0 + Duration::new(sec, nanos) + } + } + + fn dur(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + #[test] + fn norm_next() { + let tm = Timeline::new(); + assert_eq!(next_interval(tm.at(1), tm.at(2), dur(10)), tm.at(11)); + assert_eq!(next_interval(tm.at(7777), tm.at(7788), dur(100)), + tm.at(7877)); + assert_eq!(next_interval(tm.at(1), tm.at(1000), dur(2100)), + tm.at(2101)); + } + + #[test] + fn fast_forward() { + let tm = Timeline::new(); + assert_eq!(next_interval(tm.at(1), tm.at(1000), dur(10)), + tm.at(1001)); + assert_eq!(next_interval(tm.at(7777), tm.at(8888), dur(100)), + tm.at(8977)); + assert_eq!(next_interval(tm.at(1), tm.at(10000), dur(2100)), + tm.at(10501)); + } + + /// TODO: this test actually should be successful, but since we can't + /// multiply Duration on anything larger than u32 easily we decided + /// to allow thit to fail for now + #[test] + #[should_panic(expected = "can't skip more than 4 billion intervals")] + fn large_skip() { + let tm = Timeline::new(); + assert_eq!(next_interval( + tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)), + tm.at_ns(25, 1)); + } + +} diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index b86178311..eab8cfe2f 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -26,8 +26,10 @@ use self::channel::{Sender, Receiver, channel}; mod poll_evented; mod timeout; +mod interval; pub use self::poll_evented::PollEvented; pub use self::timeout::Timeout; +pub use self::interval::Interval; static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT; scoped_thread_local!(static CURRENT_LOOP: Core); @@ -118,6 +120,7 @@ enum Message { DropSource(usize), Schedule(usize, Task, Direction), UpdateTimeout(usize, Task), + ResetTimeout(usize, Instant), CancelTimeout(usize), Run(Box), } @@ -400,6 +403,9 @@ impl Core { self.notify_handle(task); } } + Message::ResetTimeout(t, at) => { + self.inner.borrow_mut().reset_timeout(t, at); + } Message::CancelTimeout(t) => { self.inner.borrow_mut().cancel_timeout(t) } @@ -451,7 +457,7 @@ impl Inner { } } - fn add_timeout(&mut self, at: Instant) -> io::Result<(usize, Instant)> { + fn add_timeout(&mut self, at: Instant) -> usize { if self.timeouts.vacant_entry().is_none() { let len = self.timeouts.len(); self.timeouts.reserve_exact(len); @@ -460,7 +466,7 @@ impl Inner { let slot = self.timer_heap.push((at, entry.index())); let entry = entry.insert((Some(slot), TimeoutState::NotFired)); debug!("added a timeout: {}", entry.index()); - Ok((entry.index(), at)) + return entry.index(); } fn update_timeout(&mut self, token: usize, handle: Task) -> Option { @@ -468,6 +474,19 @@ impl Inner { self.timeouts[token].1.block(handle) } + fn reset_timeout(&mut self, token: usize, at: Instant) { + let pair = &mut self.timeouts[token]; + // TODO: avoid remove + push and instead just do one sift of the heap? + // In theory we could update it in place and then do the percolation + // as necessary + if let Some(slot) = pair.0.take() { + self.timer_heap.remove(slot); + } + let slot = self.timer_heap.push((at, token)); + *pair = (Some(slot), TimeoutState::NotFired); + debug!("set a timeout: {}", token); + } + fn cancel_timeout(&mut self, token: usize) { debug!("cancel a timeout: {}", token); let pair = self.timeouts.remove(token); diff --git a/src/reactor/timeout.rs b/src/reactor/timeout.rs index 7efbf54b1..76fba9fd4 100644 --- a/src/reactor/timeout.rs +++ b/src/reactor/timeout.rs @@ -13,13 +13,14 @@ use reactor::timeout_token::TimeoutToken; /// A future representing the notification that a timeout has occurred. /// -/// Timeouts are created through the `LoopHandle::timeout` or -/// `LoopHandle::timeout_at` methods indicating when a timeout should fire at. +/// Timeouts are created through the `Timeout::new` or +/// `Timeout::new_at` methods indicating when a timeout should fire at. /// Note that timeouts are not intended for high resolution timers, but rather /// they will likely fire some granularity after the exact instant that they're /// otherwise indicated to fire at. pub struct Timeout { token: TimeoutToken, + when: Instant, handle: Remote, } @@ -41,6 +42,7 @@ impl Timeout { pub fn new_at(at: Instant, handle: &Handle) -> io::Result { Ok(Timeout { token: try!(TimeoutToken::new(at, &handle)), + when: at, handle: handle.remote().clone(), }) } @@ -53,7 +55,7 @@ impl Future for Timeout { fn poll(&mut self) -> Poll<(), io::Error> { // TODO: is this fast enough? let now = Instant::now(); - if *self.token.when() <= now { + if self.when <= now { Ok(Async::Ready(())) } else { self.token.update_timeout(&self.handle); diff --git a/src/reactor/timeout_token.rs b/src/reactor/timeout_token.rs index 98b38716b..ca82bde2a 100644 --- a/src/reactor/timeout_token.rs +++ b/src/reactor/timeout_token.rs @@ -8,7 +8,6 @@ use reactor::{Message, Handle, Remote}; /// A token that identifies an active timeout. pub struct TimeoutToken { token: usize, - when: Instant, } impl TimeoutToken { @@ -17,23 +16,13 @@ impl TimeoutToken { pub fn new(at: Instant, handle: &Handle) -> io::Result { match handle.inner.upgrade() { Some(inner) => { - let (token, when) = try!(inner.borrow_mut().add_timeout(at)); - Ok(TimeoutToken { token: token, when: when }) + let token = inner.borrow_mut().add_timeout(at); + Ok(TimeoutToken { token: token }) } None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")), } } - /// Returns the instant in time when this timeout token will "fire". - /// - /// Note that this instant may *not* be the instant that was passed in when - /// the timeout was created. The event loop does not support high resolution - /// timers, so the exact resolution of when a timeout may fire may be - /// slightly fudged. - pub fn when(&self) -> &Instant { - &self.when - } - /// Updates a previously added timeout to notify a new task instead. /// /// # Panics @@ -44,6 +33,16 @@ impl TimeoutToken { handle.send(Message::UpdateTimeout(self.token, task::park())) } + /// Resets previously added (or fired) timeout to an new timeout + /// + /// # Panics + /// + /// This method will panic if the timeout specified was not created by this + /// loop handle's `add_timeout` method. + pub fn reset_timeout(&mut self, at: Instant, handle: &Remote) { + handle.send(Message::ResetTimeout(self.token, at)); + } + /// Cancel a previously added timeout. /// /// # Panics diff --git a/tests/interval.rs b/tests/interval.rs new file mode 100644 index 000000000..90ece77bf --- /dev/null +++ b/tests/interval.rs @@ -0,0 +1,38 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio_core; + +use std::time::{Instant, Duration}; + +use futures::stream::{Stream}; +use tokio_core::reactor::{Core, Interval}; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn single() { + drop(env_logger::init()); + let mut l = t!(Core::new()); + let dur = Duration::from_millis(10); + let interval = t!(Interval::new(dur, &l.handle())); + let start = Instant::now(); + t!(l.run(interval.take(1).collect())); + assert!(start.elapsed() >= dur); +} + +#[test] +fn two_times() { + drop(env_logger::init()); + let mut l = t!(Core::new()); + let dur = Duration::from_millis(10); + let interval = t!(Interval::new(dur, &l.handle())); + let start = Instant::now(); + let result = t!(l.run(interval.take(2).collect())); + assert!(start.elapsed() >= dur*2); + assert_eq!(result, vec![(), ()]); +}