From b9cc032d3bcf7686f0130163c13d3d660d30ae2c Mon Sep 17 00:00:00 2001 From: Jake Date: Tue, 25 Feb 2020 09:06:02 -0800 Subject: [PATCH] mpsc: add `Sender::send_timeout` (#2227) --- tokio/src/sync/mpsc/bounded.rs | 79 ++++++++++++++++++++++++++++++++++ tokio/src/sync/mpsc/error.rs | 33 +++++++++++++- 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index b95611d88..ba89fdd72 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -2,6 +2,11 @@ use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; use crate::sync::semaphore_ll as semaphore; +cfg_time! { + use crate::sync::mpsc::error::SendTimeoutError; + use crate::time::Duration; +} + use std::fmt; use std::task::{Context, Poll}; @@ -322,3 +327,77 @@ impl Sender { } } } + +cfg_time! { + impl Sender { + /// Sends a value, waiting until there is capacity, but only for a limited time. + /// + /// Shares the same success and error conditions as [`send`], adding one more + /// condition for an unsuccessful send, which is when the provided timeout has + /// elapsed, and there is no capacity available. + /// + /// [`send`]: Sender::send + /// + /// # Errors + /// + /// If the receive half of the channel is closed, either due to [`close`] being + /// called or the [`Receiver`] handle dropping, or if the timeout specified + /// elapses before the capacity is available the function returns an error. + /// The error includes the value passed to `send_timeout`. + /// + /// [`close`]: Receiver::close + /// [`Receiver`]: Receiver + /// + /// # Examples + /// + /// In the following example, each call to `send_timeout` will block until the + /// previously sent value was received, unless the timeout has elapsed. + /// + /// ```rust + /// use tokio::sync::mpsc; + /// use tokio::time::{delay_for, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await { + /// println!("send error: #{:?}", e); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// delay_for(Duration::from_millis(200)).await; + /// } + /// } + /// ``` + pub async fn send_timeout( + &mut self, + value: T, + timeout: Duration, + ) -> Result<(), SendTimeoutError> { + use crate::future::poll_fn; + + match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { + Err(_) => { + return Err(SendTimeoutError::Timeout(value)); + } + Ok(Err(_)) => { + return Err(SendTimeoutError::Closed(value)); + } + Ok(_) => {} + } + + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), + } + } + } +} diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs index 802bc4884..991935631 100644 --- a/tokio/src/sync/mpsc/error.rs +++ b/tokio/src/sync/mpsc/error.rs @@ -96,7 +96,7 @@ impl Error for TryRecvError {} // ===== ClosedError ===== -/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)]. +/// Error returned by [`Sender::poll_ready`](super::Sender::poll_ready)]. #[derive(Debug)] pub struct ClosedError(()); @@ -113,3 +113,34 @@ impl fmt::Display for ClosedError { } impl Error for ClosedError {} + +cfg_time! { + // ===== SendTimeoutError ===== + + #[derive(Debug)] + /// Error returned by [`Sender::send_timeout`](super::Sender::send_timeout)]. + pub enum SendTimeoutError { + /// The data could not be sent on the channel because the channel is + /// full, and the timeout to send has elapsed. + Timeout(T), + + /// The receive half of the channel was explicitly closed or has been + /// dropped. + Closed(T), + } + + impl Error for SendTimeoutError {} + + impl fmt::Display for SendTimeoutError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + SendTimeoutError::Timeout(..) => "timed out waiting on send operation", + SendTimeoutError::Closed(..) => "channel closed", + } + ) + } + } +}