diff --git a/src/timer.rs b/src/timer.rs index 79fa790d6..04abf2e59 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -10,9 +10,9 @@ //! is initialized with a `Duration` and repeatedly yields each time the //! duration elapses. //! -//! * [`Deadline`][Deadline] wraps a future, requiring that it completes before -//! a specified `Instant` in time. If the future does not complete in time, -//! then it is canceled and an error is returned. +//! * [`Timeout`][Timeeout]: Wraps a future or stream, setting an upper bound to the +//! amount of time it is allowed to execute. If the future or stream does not +//! completee in time, then it is canceled and an error is returned. //! //! * [`DelayQueue`]: A queue where items are returned once the requested delay //! has expired. @@ -48,7 +48,7 @@ //! ``` //! //! Require that an operation takes no more than 300ms. Note that this uses the -//! [`deadline`][ext] function on the [`FutureExt`][ext] trait. This trait is +//! [`timeout`][ext] function on the [`FutureExt`][ext] trait. This trait is //! included in the prelude. //! //! ``` @@ -64,11 +64,9 @@ //! } //! //! # fn main() { -//! let when = Instant::now() + Duration::from_millis(300); -//! //! tokio::run({ //! long_op() -//! .deadline(when) +//! .timeout(Duration::from_millis(300)) //! .map_err(|e| { //! println!("operation timed out"); //! }) @@ -78,18 +76,27 @@ //! //! [runtime]: ../runtime/struct.Runtime.html //! [tokio-timer]: https://docs.rs/tokio-timer -//! [ext]: ../util/trait.FutureExt.html#method.deadline -//! [Deadline]: struct.Deadline.html +//! [ext]: ../util/trait.FutureExt.html#method.timeout +//! [Timeout]: struct.Timeout.html //! [Delay]: struct.Delay.html //! [Interval]: struct.Interval.html //! [`DelayQueue`]: struct.DelayQueue.html pub use tokio_timer::{ delay_queue, - Deadline, - DeadlineError, DelayQueue, Error, Interval, Delay, + Timeout, + timeout, }; + +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type Deadline = ::tokio_timer::Deadline; +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type DeadlineError = ::tokio_timer::DeadlineError; diff --git a/src/util/future.rs b/src/util/future.rs index d03ce7fad..27098e6b1 100644 --- a/src/util/future.rs +++ b/src/util/future.rs @@ -1,14 +1,16 @@ +#[allow(deprecated)] use tokio_timer::Deadline; +use tokio_timer::Timeout; use futures::Future; -use std::time::Instant; +use std::time::{Instant, Duration}; /// An extension trait for `Future` that provides a variety of convenient /// combinator functions. /// -/// Currently, there only is a [`deadline`] function, but this will increase +/// Currently, there only is a [`timeout`] function, but this will increase /// over time. /// /// Users are not expected to implement this trait. All types that implement @@ -17,18 +19,17 @@ use std::time::Instant; /// This trait can be imported directly or via the Tokio prelude: `use /// tokio::prelude::*`. /// -/// [`deadline`]: #method.deadline +/// [`timeout`]: #method.timeout pub trait FutureExt: Future { - /// Creates a new future which allows `self` until `deadline`. + /// Creates a new future which allows `self` until `timeout`. /// /// This combinator creates a new future which wraps the receiving future - /// with a deadline. The returned future is allowed to execute until it - /// completes or `deadline` is reached, whichever happens first. + /// with a timeout. The returned future is allowed to execute until it + /// completes or `timeout` has elapsed, whichever happens first. /// - /// If the future completes before `deadline` then the future will resolve - /// with that item. Otherwise the future will resolve to an error once - /// `deadline` is reached. + /// If the future completes before `timeout` then the future will resolve + /// with that item. Otherwise the future will resolve to an error. /// /// # Examples /// @@ -36,7 +37,7 @@ pub trait FutureExt: Future { /// # extern crate tokio; /// # extern crate futures; /// use tokio::prelude::*; - /// use std::time::{Duration, Instant}; + /// use std::time::Duration; /// # use futures::future::{self, FutureResult}; /// /// # fn long_future() -> FutureResult<(), ()> { @@ -45,12 +46,21 @@ pub trait FutureExt: Future { /// # /// # fn main() { /// let future = long_future() - /// .deadline(Instant::now() + Duration::from_secs(1)) + /// .timeout(Duration::from_secs(1)) /// .map_err(|e| println!("error = {:?}", e)); /// /// tokio::run(future); /// # } /// ``` + fn timeout(self, timeout: Duration) -> Timeout + where Self: Sized, + { + Timeout::new(self, timeout) + } + + #[deprecated(since = "0.1.8", note = "use `timeout` instead")] + #[allow(deprecated)] + #[doc(hidden)] fn deadline(self, deadline: Instant) -> Deadline where Self: Sized, { diff --git a/tokio-timer/src/deadline.rs b/tokio-timer/src/deadline.rs index 05ff827c5..4061533e6 100644 --- a/tokio-timer/src/deadline.rs +++ b/tokio-timer/src/deadline.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + use Delay; use futures::{Future, Poll, Async}; @@ -6,21 +8,16 @@ use std::error; use std::fmt; use std::time::Instant; -/// Allows a given `Future` to execute until the specified deadline. -/// -/// If the inner future completes before the deadline is reached, then -/// `Deadline` completes with that value. Otherwise, `Deadline` completes with a -/// [`DeadlineError`]. -/// -/// [`DeadlineError`]: struct.DeadlineError.html -#[must_use = "futures do nothing unless polled"] +#[deprecated(since = "0.2.6", note = "use Timeout instead")] +#[doc(hidden)] #[derive(Debug)] pub struct Deadline { future: T, delay: Delay, } -/// Error returned by `Deadline` future. +#[deprecated(since = "0.2.6", note = "use Timeout instead")] +#[doc(hidden)] #[derive(Debug)] pub struct DeadlineError(Kind); diff --git a/tokio-timer/src/delay.rs b/tokio-timer/src/delay.rs index 5d84dcbd8..65ba0facc 100644 --- a/tokio-timer/src/delay.rs +++ b/tokio-timer/src/delay.rs @@ -3,7 +3,7 @@ use timer::{Registration, HandlePriv}; use futures::{Future, Poll}; -use std::time::Instant; +use std::time::{Instant, Duration}; /// A future that completes at a specified instant in time. /// @@ -13,6 +13,11 @@ use std::time::Instant; /// `Delay` has a resolution of one millisecond and should not be used for tasks /// that require high-resolution timers. /// +/// # Cancellation +/// +/// Canceling a `Delay` is done by dropping the value. No additional cleanup or +/// other work is required. +/// /// [`new`]: #method.new #[derive(Debug)] pub struct Delay { @@ -29,13 +34,18 @@ impl Delay { /// as to how the sub-millisecond portion of `deadline` will be handled. /// `Delay` should not be used for high-resolution timer use cases. pub fn new(deadline: Instant) -> Delay { - let registration = Registration::new(deadline); + let registration = Registration::new(deadline, Duration::from_millis(0)); Delay { registration } } + pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay { + let registration = Registration::new(deadline, duration); + Delay { registration } + } + pub(crate) fn new_with_handle(deadline: Instant, handle: HandlePriv) -> Delay { - let mut registration = Registration::new(deadline); + let mut registration = Registration::new(deadline, Duration::from_millis(0)); registration.register_with(handle); Delay { registration } @@ -64,6 +74,10 @@ impl Delay { self.registration.reset(deadline); } + pub(crate) fn reset_timeout(&mut self) { + self.registration.reset_timeout(); + } + /// Register the delay with the timer instance for the current execution /// context. fn register(&mut self) { diff --git a/tokio-timer/src/lib.rs b/tokio-timer/src/lib.rs index ee84073d2..47d219f94 100644 --- a/tokio-timer/src/lib.rs +++ b/tokio-timer/src/lib.rs @@ -1,4 +1,4 @@ -//! Utilities for scheduling work to happen after a period of time. +//! Utilities for tracking time. //! //! This crate provides a number of utilities for working with periods of time: //! @@ -6,18 +6,19 @@ //! //! * [`Interval`] A stream that yields at fixed time intervals. //! -//! * [`Deadline`]: Wraps a future, requiring it to complete before a specified -//! instant in time, erroring if the future takes too long. +//! * [`Timeout`]: Wraps a future or stream, setting an upper bound to the +//! amount of time it is allowed to execute. If the future or stream does not +//! completee in time, then it is canceled and an error is returned. //! //! * [`DelayQueue`]: A queue where items are returned once the requested delay //! has expired. //! //! These three types are backed by a [`Timer`] instance. In order for -//! [`Delay`], [`Interval`], and [`Deadline`] to function, the associated +//! [`Delay`], [`Interval`], and [`Timeout`] to function, the associated //! [`Timer`] instance must be running on some thread. //! //! [`Delay`]: struct.Delay.html -//! [`Deadline`]: struct.Deadline.html +//! [`Timeout`]: struct.Timeout.html //! [`Interval`]: struct.Interval.html //! [`Timer`]: timer/struct.Timer.html @@ -33,6 +34,7 @@ extern crate slab; pub mod clock; pub mod delay_queue; +pub mod timeout; pub mod timer; mod atomic; @@ -42,6 +44,9 @@ mod error; mod interval; mod wheel; +#[deprecated(since = "0.2.6", note = "use Timeout instead")] +#[doc(hidden)] +#[allow(deprecated)] pub use self::deadline::{Deadline, DeadlineError}; #[doc(inline)] pub use self::delay_queue::DelayQueue; @@ -49,6 +54,7 @@ pub use self::delay::Delay; pub use self::error::Error; pub use self::interval::Interval; #[doc(inline)] +pub use self::timeout::Timeout; pub use self::timer::{with_default, Timer}; use std::time::{Duration, Instant}; diff --git a/tokio-timer/src/timeout.rs b/tokio-timer/src/timeout.rs new file mode 100644 index 000000000..5e44969cc --- /dev/null +++ b/tokio-timer/src/timeout.rs @@ -0,0 +1,280 @@ +//! Allows a future or stream to execute for a maximum amount of time. +//! +//! See [`Timeout`] documentation for more details. +//! +//! [`Timeout`]: struct.Timeout.html + +use Delay; +use clock::now; + +use futures::{Future, Stream, Poll, Async}; + +use std::error; +use std::fmt; +use std::time::{Instant, Duration}; + +/// Allows a `Future` or `Stream` to execute for a limited amount of time. +/// +/// If thee future or stream completes before the timeout has expired, then +/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an +/// [`Error`]. +/// +/// # Futures and Streams +/// +/// The exact behavor depends on if the inner value is a `Future` or a `Stream`. +/// In the case of a `Future`, `Timeout` will require the future to complete by +/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item +/// to take the entire timeout before returning an error. +/// +/// In order to set an upper bound on the processing of the *entire* stream, +/// then a timeout should be set on the future that processes the stream. For +/// example: +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tokio; +/// use tokio::timer::Timeout; +/// use futures::{Future, Stream}; +/// use futures::sync::mpsc; +/// use std::time::Duration; +/// +/// # fn main() { +/// let (tx, rx) = mpsc::unbounded(); +/// # tx.unbounded_send(()).unwrap(); +/// # drop(tx); +/// let process = rx.for_each(|item| { +/// // do something with `iteem` +/// # drop(item); +/// # Ok(()) +/// }); +/// +/// Timeout::new(process, Duration::from_secs(1)); +/// # } +/// ``` +/// +/// # Cancelation +/// +/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup +/// or otheer work is required. +/// +/// The original future or stream may be obtained by calling [`into_inner`]. This +/// consumes the `Timeout`. +/// +/// [`Error`]: struct.Error.html +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Timeout { + value: T, + delay: Delay, +} + +/// Error returned by `Timeout`. +#[derive(Debug)] +pub struct Error(Kind); + +/// Timeout error variants +#[derive(Debug)] +enum Kind { + /// Inner value returned an error + Inner(T), + + /// The timeout elapsed. + Elapsed, + + /// Timer returned an error. + Timer(::Error), +} + +impl Timeout { + /// Create a new `Timeout` that allows `value` to execute for a duration of + /// at most `timeout`. + /// + /// The exact behavior depends on if `value` is a `Future` or a `Stream`. + /// + /// See [type] level documentation for more details. + /// + /// [type]: # + pub fn new(value: T, timeout: Duration) -> Timeout { + let delay = Delay::new_timeout(now() + timeout, timeout); + + Timeout { + value, + delay, + } + } + + /// Gets a reference to the underlying value in this timeout. + pub fn get_ref(&self) -> &T { + &self.value + } + + /// Gets a mutable reference to the underlying value in this timeout. + pub fn get_mut(&mut self) -> &mut T { + &mut self.value + } + + /// Consumes this timeout, returning the underlying value. + pub fn into_inner(self) -> T { + self.value + } +} + +impl Timeout { + /// Create a new `Timeout` that completes when `future` completes or when + /// `deadline` is reached. + /// + /// This function differs from `new` in that: + /// + /// * It only accepts `Future` arguments. + /// * It sets an explicit `Instant` at which the timeout expires. + pub fn new_at(future: T, deadline: Instant) -> Timeout { + let delay = Delay::new(deadline); + + Timeout { + value: future, + delay, + } + } +} + +impl Future for Timeout +where T: Future, +{ + type Item = T::Item; + type Error = Error; + + fn poll(&mut self) -> Poll { + // First, try polling the future + match self.value.poll() { + Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), + Ok(Async::NotReady) => {} + Err(e) => return Err(Error::inner(e)), + } + + // Now check the timer + match self.delay.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => { + Err(Error::elapsed()) + }, + Err(e) => Err(Error::timer(e)), + } + } +} + +impl Stream for Timeout +where T: Stream, +{ + type Item = T::Item; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // First, try polling the future + match self.value.poll() { + Ok(Async::Ready(v)) => { + if v.is_some() { + self.delay.reset_timeout(); + } + return Ok(Async::Ready(v)) + } + Ok(Async::NotReady) => {} + Err(e) => return Err(Error::inner(e)), + } + + // Now check the timer + match self.delay.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => { + Err(Error::elapsed()) + }, + Err(e) => Err(Error::timer(e)), + } + } +} + +// ===== impl Error ===== + +impl Error { + /// Create a new `Error` representing the inner value completing with `Err`. + pub fn inner(err: T) -> Error { + Error(Kind::Inner(err)) + } + + /// Returns `true` if the error was caused by the inner value completing + /// with `Err`. + pub fn is_inner(&self) -> bool { + match self.0 { + Kind::Inner(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the inner future error. + pub fn into_inner(self) -> Option { + match self.0 { + Kind::Inner(err) => Some(err), + _ => None, + } + } + + /// Create a new `Error` representing the inner value not completing before + /// the deadline is reached. + pub fn elapsed() -> Error { + Error(Kind::Elapsed) + } + + /// Returns `true` if the error was caused by the inner value not completing + /// before the deadline is reached. + pub fn is_elapsed(&self) -> bool { + match self.0 { + Kind::Elapsed => true, + _ => false, + } + } + + /// Creates a new `Error` representing an error encountered by the timer + /// implementation + pub fn timer(err: ::Error) -> Error { + Error(Kind::Timer(err)) + } + + /// Returns `true` if the error was caused by the timer. + pub fn is_timer(&self) -> bool { + match self.0 { + Kind::Timer(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the error raised by the timer implementation. + pub fn into_timer(self) -> Option<::Error> { + match self.0 { + Kind::Timer(err) => Some(err), + _ => None, + } + } +} + +impl error::Error for Error { + fn description(&self) -> &str { + use self::Kind::*; + + match self.0 { + Inner(ref e) => e.description(), + Elapsed => "deadline has elapsed", + Timer(ref e) => e.description(), + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use self::Kind::*; + + match self.0 { + Inner(ref e) => e.fmt(fmt), + Elapsed => "deadline has elapsed".fmt(fmt), + Timer(ref e) => e.fmt(fmt), + } + } +} diff --git a/tokio-timer/src/timer/entry.rs b/tokio-timer/src/timer/entry.rs index 7830b7776..e1a866a9f 100644 --- a/tokio-timer/src/timer/entry.rs +++ b/tokio-timer/src/timer/entry.rs @@ -11,7 +11,7 @@ use std::ptr; use std::sync::{Arc, Weak}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{SeqCst, Relaxed}; -use std::time::Instant; +use std::time::{Instant, Duration}; use std::u64; /// Internal state shared between a `Delay` instance and the timer. @@ -95,6 +95,7 @@ pub(crate) struct Entry { #[derive(Debug)] pub(crate) struct Time { pub(crate) deadline: Instant, + pub(crate) duration: Duration, } /// Flag indicating a timer entry has elapsed @@ -106,10 +107,11 @@ const ERROR: u64 = u64::MAX; // ===== impl Entry ===== impl Entry { - pub fn new(deadline: Instant) -> Entry { + pub fn new(deadline: Instant, duration: Duration) -> Entry { Entry { time: CachePadded::new(UnsafeCell::new(Time { deadline, + duration, })), inner: None, task: AtomicTask::new(), diff --git a/tokio-timer/src/timer/registration.rs b/tokio-timer/src/timer/registration.rs index 3a1414950..81cc3e511 100644 --- a/tokio-timer/src/timer/registration.rs +++ b/tokio-timer/src/timer/registration.rs @@ -1,10 +1,11 @@ use Error; +use clock::now; use timer::{HandlePriv, Entry}; use futures::Poll; use std::sync::Arc; -use std::time::Instant; +use std::time::{Instant, Duration}; /// Registration with a timer. /// @@ -16,11 +17,11 @@ pub(crate) struct Registration { } impl Registration { - pub fn new(deadline: Instant) -> Registration { + pub fn new(deadline: Instant, duration: Duration) -> Registration { fn is_send() {} is_send::(); - Registration { entry: Arc::new(Entry::new(deadline)) } + Registration { entry: Arc::new(Entry::new(deadline, duration)) } } pub fn deadline(&self) -> Instant { @@ -42,6 +43,12 @@ impl Registration { Entry::reset(&mut self.entry); } + pub fn reset_timeout(&mut self) { + let deadline = now() + self.entry.time_ref().duration; + self.entry.time_mut().deadline = deadline; + Entry::reset(&mut self.entry); + } + pub fn is_elapsed(&self) -> bool { self.entry.is_elapsed() } diff --git a/tokio-timer/tests/timeout.rs b/tokio-timer/tests/timeout.rs new file mode 100644 index 000000000..4cf9837ec --- /dev/null +++ b/tokio-timer/tests/timeout.rs @@ -0,0 +1,154 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_timer; + +#[macro_use] +mod support; +use support::*; + +use tokio_timer::*; + +use futures::{future, Future, Stream}; +use futures::sync::{oneshot, mpsc}; + +#[test] +fn simultaneous_deadline_future_completion() { + mocked(|_, time| { + // Create a future that is immediately ready + let fut = future::ok::<_, ()>(()); + + // Wrap it with a deadline + let mut fut = Timeout::new_at(fut, time.now()); + + // Ready! + assert_ready!(fut); + }); +} + +#[test] +fn completed_future_past_deadline() { + mocked(|_, time| { + // Create a future that is immediately ready + let fut = future::ok::<_, ()>(()); + + // Wrap it with a deadline + let mut fut = Timeout::new_at(fut, time.now() - ms(1000)); + + // Ready! + assert_ready!(fut); + }); +} + +#[test] +fn future_and_deadline_in_future() { + mocked(|timer, time| { + // Not yet complete + let (tx, rx) = oneshot::channel(); + + // Wrap it with a deadline + let mut fut = Timeout::new_at(rx, time.now() + ms(100)); + + // Ready! + assert_not_ready!(fut); + + // Turn the timer, it runs for the elapsed time + advance(timer, ms(90)); + + assert_not_ready!(fut); + + // Complete the future + tx.send(()).unwrap(); + + assert_ready!(fut); + }); +} + +#[test] +fn future_and_timeout_in_future() { + mocked(|timer, _time| { + // Not yet complete + let (tx, rx) = oneshot::channel(); + + // Wrap it with a deadline + let mut fut = Timeout::new(rx, ms(100)); + + // Ready! + assert_not_ready!(fut); + + // Turn the timer, it runs for the elapsed time + advance(timer, ms(90)); + + assert_not_ready!(fut); + + // Complete the future + tx.send(()).unwrap(); + + assert_ready!(fut); + }); +} + +#[test] +fn deadline_now_elapses() { + mocked(|_, time| { + let fut = future::empty::<(), ()>(); + + // Wrap it with a deadline + let mut fut = Timeout::new_at(fut, time.now()); + + assert_elapsed!(fut); + }); +} + +#[test] +fn deadline_future_elapses() { + mocked(|timer, time| { + let fut = future::empty::<(), ()>(); + + // Wrap it with a deadline + let mut fut = Timeout::new_at(fut, time.now() + ms(300)); + + assert_not_ready!(fut); + + advance(timer, ms(300)); + + assert_elapsed!(fut); + }); +} + +#[test] +fn future_errors_first() { + mocked(|_, time| { + let fut = future::err::<(), ()>(()); + + // Wrap it with a deadline + let mut fut = Timeout::new_at(fut, time.now() + ms(100)); + + // Ready! + assert!(fut.poll().unwrap_err().is_inner()); + }); +} + +#[test] +fn stream_and_timeout_in_future() { + mocked(|timer, _time| { + // Not yet complete + let (tx, rx) = mpsc::unbounded(); + + // Wrap it with a deadline + let mut stream = Timeout::new(rx, ms(100)); + + // Not ready + assert_not_ready!(stream); + + // Turn the timer, it runs for the elapsed time + advance(timer, ms(90)); + + assert_not_ready!(stream); + + // Complete the future + tx.unbounded_send(()).unwrap(); + + let item = assert_ready!(stream); + assert!(item.is_some()); + }); +}