Introduce Timeout and deprecate Deadline. (#558)

This patch introduces `Timeout`. This new type allows setting a timeout
both using a duration and an instant. Given this overlap with
`Deadline`, `Deadline` is deprecated.

In addition to supporting future timeouts, the `Timeout` combinator is
able to provide timeout functionality to streams. It does this by
applying a duration based timeout to each item being yielded.

The main reason for introducing `Timeout` is that a deadline approach
does not work with streams. Since `Timeout` needed to be introduced
anyway, keeping `Deadline` around does not make sense.
This commit is contained in:
Carl Lerche 2018-08-22 20:39:46 -07:00 committed by GitHub
parent cf184eb326
commit 8bf2e9aeb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 521 additions and 44 deletions

View File

@ -10,9 +10,9 @@
//! is initialized with a `Duration` and repeatedly yields each time the
//! duration elapses.
//!
//! * [`Deadline`][Deadline] wraps a future, requiring that it completes before
//! a specified `Instant` in time. If the future does not complete in time,
//! then it is canceled and an error is returned.
//! * [`Timeout`][Timeeout]: Wraps a future or stream, setting an upper bound to the
//! amount of time it is allowed to execute. If the future or stream does not
//! completee in time, then it is canceled and an error is returned.
//!
//! * [`DelayQueue`]: A queue where items are returned once the requested delay
//! has expired.
@ -48,7 +48,7 @@
//! ```
//!
//! Require that an operation takes no more than 300ms. Note that this uses the
//! [`deadline`][ext] function on the [`FutureExt`][ext] trait. This trait is
//! [`timeout`][ext] function on the [`FutureExt`][ext] trait. This trait is
//! included in the prelude.
//!
//! ```
@ -64,11 +64,9 @@
//! }
//!
//! # fn main() {
//! let when = Instant::now() + Duration::from_millis(300);
//!
//! tokio::run({
//! long_op()
//! .deadline(when)
//! .timeout(Duration::from_millis(300))
//! .map_err(|e| {
//! println!("operation timed out");
//! })
@ -78,18 +76,27 @@
//!
//! [runtime]: ../runtime/struct.Runtime.html
//! [tokio-timer]: https://docs.rs/tokio-timer
//! [ext]: ../util/trait.FutureExt.html#method.deadline
//! [Deadline]: struct.Deadline.html
//! [ext]: ../util/trait.FutureExt.html#method.timeout
//! [Timeout]: struct.Timeout.html
//! [Delay]: struct.Delay.html
//! [Interval]: struct.Interval.html
//! [`DelayQueue`]: struct.DelayQueue.html
pub use tokio_timer::{
delay_queue,
Deadline,
DeadlineError,
DelayQueue,
Error,
Interval,
Delay,
Timeout,
timeout,
};
#[deprecated(since = "0.1.8", note = "use Timeout instead")]
#[allow(deprecated)]
#[doc(hidden)]
pub type Deadline<T> = ::tokio_timer::Deadline<T>;
#[deprecated(since = "0.1.8", note = "use Timeout instead")]
#[allow(deprecated)]
#[doc(hidden)]
pub type DeadlineError<T> = ::tokio_timer::DeadlineError<T>;

View File

@ -1,14 +1,16 @@
#[allow(deprecated)]
use tokio_timer::Deadline;
use tokio_timer::Timeout;
use futures::Future;
use std::time::Instant;
use std::time::{Instant, Duration};
/// An extension trait for `Future` that provides a variety of convenient
/// combinator functions.
///
/// Currently, there only is a [`deadline`] function, but this will increase
/// Currently, there only is a [`timeout`] function, but this will increase
/// over time.
///
/// Users are not expected to implement this trait. All types that implement
@ -17,18 +19,17 @@ use std::time::Instant;
/// This trait can be imported directly or via the Tokio prelude: `use
/// tokio::prelude::*`.
///
/// [`deadline`]: #method.deadline
/// [`timeout`]: #method.timeout
pub trait FutureExt: Future {
/// Creates a new future which allows `self` until `deadline`.
/// Creates a new future which allows `self` until `timeout`.
///
/// This combinator creates a new future which wraps the receiving future
/// with a deadline. The returned future is allowed to execute until it
/// completes or `deadline` is reached, whichever happens first.
/// with a timeout. The returned future is allowed to execute until it
/// completes or `timeout` has elapsed, whichever happens first.
///
/// If the future completes before `deadline` then the future will resolve
/// with that item. Otherwise the future will resolve to an error once
/// `deadline` is reached.
/// If the future completes before `timeout` then the future will resolve
/// with that item. Otherwise the future will resolve to an error.
///
/// # Examples
///
@ -36,7 +37,7 @@ pub trait FutureExt: Future {
/// # extern crate tokio;
/// # extern crate futures;
/// use tokio::prelude::*;
/// use std::time::{Duration, Instant};
/// use std::time::Duration;
/// # use futures::future::{self, FutureResult};
///
/// # fn long_future() -> FutureResult<(), ()> {
@ -45,12 +46,21 @@ pub trait FutureExt: Future {
/// #
/// # fn main() {
/// let future = long_future()
/// .deadline(Instant::now() + Duration::from_secs(1))
/// .timeout(Duration::from_secs(1))
/// .map_err(|e| println!("error = {:?}", e));
///
/// tokio::run(future);
/// # }
/// ```
fn timeout(self, timeout: Duration) -> Timeout<Self>
where Self: Sized,
{
Timeout::new(self, timeout)
}
#[deprecated(since = "0.1.8", note = "use `timeout` instead")]
#[allow(deprecated)]
#[doc(hidden)]
fn deadline(self, deadline: Instant) -> Deadline<Self>
where Self: Sized,
{

View File

@ -1,3 +1,5 @@
#![allow(deprecated)]
use Delay;
use futures::{Future, Poll, Async};
@ -6,21 +8,16 @@ use std::error;
use std::fmt;
use std::time::Instant;
/// Allows a given `Future` to execute until the specified deadline.
///
/// If the inner future completes before the deadline is reached, then
/// `Deadline` completes with that value. Otherwise, `Deadline` completes with a
/// [`DeadlineError`].
///
/// [`DeadlineError`]: struct.DeadlineError.html
#[must_use = "futures do nothing unless polled"]
#[deprecated(since = "0.2.6", note = "use Timeout instead")]
#[doc(hidden)]
#[derive(Debug)]
pub struct Deadline<T> {
future: T,
delay: Delay,
}
/// Error returned by `Deadline` future.
#[deprecated(since = "0.2.6", note = "use Timeout instead")]
#[doc(hidden)]
#[derive(Debug)]
pub struct DeadlineError<T>(Kind<T>);

View File

@ -3,7 +3,7 @@ use timer::{Registration, HandlePriv};
use futures::{Future, Poll};
use std::time::Instant;
use std::time::{Instant, Duration};
/// A future that completes at a specified instant in time.
///
@ -13,6 +13,11 @@ use std::time::Instant;
/// `Delay` has a resolution of one millisecond and should not be used for tasks
/// that require high-resolution timers.
///
/// # Cancellation
///
/// Canceling a `Delay` is done by dropping the value. No additional cleanup or
/// other work is required.
///
/// [`new`]: #method.new
#[derive(Debug)]
pub struct Delay {
@ -29,13 +34,18 @@ impl Delay {
/// as to how the sub-millisecond portion of `deadline` will be handled.
/// `Delay` should not be used for high-resolution timer use cases.
pub fn new(deadline: Instant) -> Delay {
let registration = Registration::new(deadline);
let registration = Registration::new(deadline, Duration::from_millis(0));
Delay { registration }
}
pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay {
let registration = Registration::new(deadline, duration);
Delay { registration }
}
pub(crate) fn new_with_handle(deadline: Instant, handle: HandlePriv) -> Delay {
let mut registration = Registration::new(deadline);
let mut registration = Registration::new(deadline, Duration::from_millis(0));
registration.register_with(handle);
Delay { registration }
@ -64,6 +74,10 @@ impl Delay {
self.registration.reset(deadline);
}
pub(crate) fn reset_timeout(&mut self) {
self.registration.reset_timeout();
}
/// Register the delay with the timer instance for the current execution
/// context.
fn register(&mut self) {

View File

@ -1,4 +1,4 @@
//! Utilities for scheduling work to happen after a period of time.
//! Utilities for tracking time.
//!
//! This crate provides a number of utilities for working with periods of time:
//!
@ -6,18 +6,19 @@
//!
//! * [`Interval`] A stream that yields at fixed time intervals.
//!
//! * [`Deadline`]: Wraps a future, requiring it to complete before a specified
//! instant in time, erroring if the future takes too long.
//! * [`Timeout`]: Wraps a future or stream, setting an upper bound to the
//! amount of time it is allowed to execute. If the future or stream does not
//! completee in time, then it is canceled and an error is returned.
//!
//! * [`DelayQueue`]: A queue where items are returned once the requested delay
//! has expired.
//!
//! These three types are backed by a [`Timer`] instance. In order for
//! [`Delay`], [`Interval`], and [`Deadline`] to function, the associated
//! [`Delay`], [`Interval`], and [`Timeout`] to function, the associated
//! [`Timer`] instance must be running on some thread.
//!
//! [`Delay`]: struct.Delay.html
//! [`Deadline`]: struct.Deadline.html
//! [`Timeout`]: struct.Timeout.html
//! [`Interval`]: struct.Interval.html
//! [`Timer`]: timer/struct.Timer.html
@ -33,6 +34,7 @@ extern crate slab;
pub mod clock;
pub mod delay_queue;
pub mod timeout;
pub mod timer;
mod atomic;
@ -42,6 +44,9 @@ mod error;
mod interval;
mod wheel;
#[deprecated(since = "0.2.6", note = "use Timeout instead")]
#[doc(hidden)]
#[allow(deprecated)]
pub use self::deadline::{Deadline, DeadlineError};
#[doc(inline)]
pub use self::delay_queue::DelayQueue;
@ -49,6 +54,7 @@ pub use self::delay::Delay;
pub use self::error::Error;
pub use self::interval::Interval;
#[doc(inline)]
pub use self::timeout::Timeout;
pub use self::timer::{with_default, Timer};
use std::time::{Duration, Instant};

280
tokio-timer/src/timeout.rs Normal file
View File

@ -0,0 +1,280 @@
//! Allows a future or stream to execute for a maximum amount of time.
//!
//! See [`Timeout`] documentation for more details.
//!
//! [`Timeout`]: struct.Timeout.html
use Delay;
use clock::now;
use futures::{Future, Stream, Poll, Async};
use std::error;
use std::fmt;
use std::time::{Instant, Duration};
/// Allows a `Future` or `Stream` to execute for a limited amount of time.
///
/// If thee future or stream completes before the timeout has expired, then
/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an
/// [`Error`].
///
/// # Futures and Streams
///
/// The exact behavor depends on if the inner value is a `Future` or a `Stream`.
/// In the case of a `Future`, `Timeout` will require the future to complete by
/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item
/// to take the entire timeout before returning an error.
///
/// In order to set an upper bound on the processing of the *entire* stream,
/// then a timeout should be set on the future that processes the stream. For
/// example:
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio;
/// use tokio::timer::Timeout;
/// use futures::{Future, Stream};
/// use futures::sync::mpsc;
/// use std::time::Duration;
///
/// # fn main() {
/// let (tx, rx) = mpsc::unbounded();
/// # tx.unbounded_send(()).unwrap();
/// # drop(tx);
/// let process = rx.for_each(|item| {
/// // do something with `iteem`
/// # drop(item);
/// # Ok(())
/// });
///
/// Timeout::new(process, Duration::from_secs(1));
/// # }
/// ```
///
/// # Cancelation
///
/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup
/// or otheer work is required.
///
/// The original future or stream may be obtained by calling [`into_inner`]. This
/// consumes the `Timeout`.
///
/// [`Error`]: struct.Error.html
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<T> {
value: T,
delay: Delay,
}
/// Error returned by `Timeout`.
#[derive(Debug)]
pub struct Error<T>(Kind<T>);
/// Timeout error variants
#[derive(Debug)]
enum Kind<T> {
/// Inner value returned an error
Inner(T),
/// The timeout elapsed.
Elapsed,
/// Timer returned an error.
Timer(::Error),
}
impl<T> Timeout<T> {
/// Create a new `Timeout` that allows `value` to execute for a duration of
/// at most `timeout`.
///
/// The exact behavior depends on if `value` is a `Future` or a `Stream`.
///
/// See [type] level documentation for more details.
///
/// [type]: #
pub fn new(value: T, timeout: Duration) -> Timeout<T> {
let delay = Delay::new_timeout(now() + timeout, timeout);
Timeout {
value,
delay,
}
}
/// Gets a reference to the underlying value in this timeout.
pub fn get_ref(&self) -> &T {
&self.value
}
/// Gets a mutable reference to the underlying value in this timeout.
pub fn get_mut(&mut self) -> &mut T {
&mut self.value
}
/// Consumes this timeout, returning the underlying value.
pub fn into_inner(self) -> T {
self.value
}
}
impl<T: Future> Timeout<T> {
/// Create a new `Timeout` that completes when `future` completes or when
/// `deadline` is reached.
///
/// This function differs from `new` in that:
///
/// * It only accepts `Future` arguments.
/// * It sets an explicit `Instant` at which the timeout expires.
pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
let delay = Delay::new(deadline);
Timeout {
value: future,
delay,
}
}
}
impl<T> Future for Timeout<T>
where T: Future,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.value.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
// Now check the timer
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
Err(Error::elapsed())
},
Err(e) => Err(Error::timer(e)),
}
}
}
impl<T> Stream for Timeout<T>
where T: Stream,
{
type Item = T::Item;
type Error = Error<T::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// First, try polling the future
match self.value.poll() {
Ok(Async::Ready(v)) => {
if v.is_some() {
self.delay.reset_timeout();
}
return Ok(Async::Ready(v))
}
Ok(Async::NotReady) => {}
Err(e) => return Err(Error::inner(e)),
}
// Now check the timer
match self.delay.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => {
Err(Error::elapsed())
},
Err(e) => Err(Error::timer(e)),
}
}
}
// ===== impl Error =====
impl<T> Error<T> {
/// Create a new `Error` representing the inner value completing with `Err`.
pub fn inner(err: T) -> Error<T> {
Error(Kind::Inner(err))
}
/// Returns `true` if the error was caused by the inner value completing
/// with `Err`.
pub fn is_inner(&self) -> bool {
match self.0 {
Kind::Inner(_) => true,
_ => false,
}
}
/// Consumes `self`, returning the inner future error.
pub fn into_inner(self) -> Option<T> {
match self.0 {
Kind::Inner(err) => Some(err),
_ => None,
}
}
/// Create a new `Error` representing the inner value not completing before
/// the deadline is reached.
pub fn elapsed() -> Error<T> {
Error(Kind::Elapsed)
}
/// Returns `true` if the error was caused by the inner value not completing
/// before the deadline is reached.
pub fn is_elapsed(&self) -> bool {
match self.0 {
Kind::Elapsed => true,
_ => false,
}
}
/// Creates a new `Error` representing an error encountered by the timer
/// implementation
pub fn timer(err: ::Error) -> Error<T> {
Error(Kind::Timer(err))
}
/// Returns `true` if the error was caused by the timer.
pub fn is_timer(&self) -> bool {
match self.0 {
Kind::Timer(_) => true,
_ => false,
}
}
/// Consumes `self`, returning the error raised by the timer implementation.
pub fn into_timer(self) -> Option<::Error> {
match self.0 {
Kind::Timer(err) => Some(err),
_ => None,
}
}
}
impl<T: error::Error> error::Error for Error<T> {
fn description(&self) -> &str {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.description(),
Elapsed => "deadline has elapsed",
Timer(ref e) => e.description(),
}
}
}
impl<T: fmt::Display> fmt::Display for Error<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Kind::*;
match self.0 {
Inner(ref e) => e.fmt(fmt),
Elapsed => "deadline has elapsed".fmt(fmt),
Timer(ref e) => e.fmt(fmt),
}
}
}

View File

@ -11,7 +11,7 @@ use std::ptr;
use std::sync::{Arc, Weak};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{SeqCst, Relaxed};
use std::time::Instant;
use std::time::{Instant, Duration};
use std::u64;
/// Internal state shared between a `Delay` instance and the timer.
@ -95,6 +95,7 @@ pub(crate) struct Entry {
#[derive(Debug)]
pub(crate) struct Time {
pub(crate) deadline: Instant,
pub(crate) duration: Duration,
}
/// Flag indicating a timer entry has elapsed
@ -106,10 +107,11 @@ const ERROR: u64 = u64::MAX;
// ===== impl Entry =====
impl Entry {
pub fn new(deadline: Instant) -> Entry {
pub fn new(deadline: Instant, duration: Duration) -> Entry {
Entry {
time: CachePadded::new(UnsafeCell::new(Time {
deadline,
duration,
})),
inner: None,
task: AtomicTask::new(),

View File

@ -1,10 +1,11 @@
use Error;
use clock::now;
use timer::{HandlePriv, Entry};
use futures::Poll;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Instant, Duration};
/// Registration with a timer.
///
@ -16,11 +17,11 @@ pub(crate) struct Registration {
}
impl Registration {
pub fn new(deadline: Instant) -> Registration {
pub fn new(deadline: Instant, duration: Duration) -> Registration {
fn is_send<T: Send + Sync>() {}
is_send::<Registration>();
Registration { entry: Arc::new(Entry::new(deadline)) }
Registration { entry: Arc::new(Entry::new(deadline, duration)) }
}
pub fn deadline(&self) -> Instant {
@ -42,6 +43,12 @@ impl Registration {
Entry::reset(&mut self.entry);
}
pub fn reset_timeout(&mut self) {
let deadline = now() + self.entry.time_ref().duration;
self.entry.time_mut().deadline = deadline;
Entry::reset(&mut self.entry);
}
pub fn is_elapsed(&self) -> bool {
self.entry.is_elapsed()
}

View File

@ -0,0 +1,154 @@
extern crate futures;
extern crate tokio_executor;
extern crate tokio_timer;
#[macro_use]
mod support;
use support::*;
use tokio_timer::*;
use futures::{future, Future, Stream};
use futures::sync::{oneshot, mpsc};
#[test]
fn simultaneous_deadline_future_completion() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now());
// Ready!
assert_ready!(fut);
});
}
#[test]
fn completed_future_past_deadline() {
mocked(|_, time| {
// Create a future that is immediately ready
let fut = future::ok::<_, ()>(());
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now() - ms(1000));
// Ready!
assert_ready!(fut);
});
}
#[test]
fn future_and_deadline_in_future() {
mocked(|timer, time| {
// Not yet complete
let (tx, rx) = oneshot::channel();
// Wrap it with a deadline
let mut fut = Timeout::new_at(rx, time.now() + ms(100));
// Ready!
assert_not_ready!(fut);
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
assert_not_ready!(fut);
// Complete the future
tx.send(()).unwrap();
assert_ready!(fut);
});
}
#[test]
fn future_and_timeout_in_future() {
mocked(|timer, _time| {
// Not yet complete
let (tx, rx) = oneshot::channel();
// Wrap it with a deadline
let mut fut = Timeout::new(rx, ms(100));
// Ready!
assert_not_ready!(fut);
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
assert_not_ready!(fut);
// Complete the future
tx.send(()).unwrap();
assert_ready!(fut);
});
}
#[test]
fn deadline_now_elapses() {
mocked(|_, time| {
let fut = future::empty::<(), ()>();
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now());
assert_elapsed!(fut);
});
}
#[test]
fn deadline_future_elapses() {
mocked(|timer, time| {
let fut = future::empty::<(), ()>();
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now() + ms(300));
assert_not_ready!(fut);
advance(timer, ms(300));
assert_elapsed!(fut);
});
}
#[test]
fn future_errors_first() {
mocked(|_, time| {
let fut = future::err::<(), ()>(());
// Wrap it with a deadline
let mut fut = Timeout::new_at(fut, time.now() + ms(100));
// Ready!
assert!(fut.poll().unwrap_err().is_inner());
});
}
#[test]
fn stream_and_timeout_in_future() {
mocked(|timer, _time| {
// Not yet complete
let (tx, rx) = mpsc::unbounded();
// Wrap it with a deadline
let mut stream = Timeout::new(rx, ms(100));
// Not ready
assert_not_ready!(stream);
// Turn the timer, it runs for the elapsed time
advance(timer, ms(90));
assert_not_ready!(stream);
// Complete the future
tx.unbounded_send(()).unwrap();
let item = assert_ready!(stream);
assert!(item.is_some());
});
}