mpsc: add Sender::send_timeout (#2227)

This commit is contained in:
Jake 2020-02-25 09:06:02 -08:00 committed by GitHub
parent 4213b79461
commit b9cc032d3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 1 deletions

View File

@ -2,6 +2,11 @@ use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
use crate::sync::semaphore_ll as semaphore;
cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
use crate::time::Duration;
}
use std::fmt;
use std::task::{Context, Poll};
@ -322,3 +327,77 @@ impl<T> Sender<T> {
}
}
}
cfg_time! {
impl<T> Sender<T> {
/// Sends a value, waiting until there is capacity, but only for a limited time.
///
/// Shares the same success and error conditions as [`send`], adding one more
/// condition for an unsuccessful send, which is when the provided timeout has
/// elapsed, and there is no capacity available.
///
/// [`send`]: Sender::send
///
/// # Errors
///
/// If the receive half of the channel is closed, either due to [`close`] being
/// called or the [`Receiver`] handle dropping, or if the timeout specified
/// elapses before the capacity is available the function returns an error.
/// The error includes the value passed to `send_timeout`.
///
/// [`close`]: Receiver::close
/// [`Receiver`]: Receiver
///
/// # Examples
///
/// In the following example, each call to `send_timeout` will block until the
/// previously sent value was received, unless the timeout has elapsed.
///
/// ```rust
/// use tokio::sync::mpsc;
/// use tokio::time::{delay_for, Duration};
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx, mut rx) = mpsc::channel(1);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
/// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
/// println!("send error: #{:?}", e);
/// return;
/// }
/// }
/// });
///
/// while let Some(i) = rx.recv().await {
/// println!("got = {}", i);
/// delay_for(Duration::from_millis(200)).await;
/// }
/// }
/// ```
pub async fn send_timeout(
&mut self,
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
use crate::future::poll_fn;
match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await {
Err(_) => {
return Err(SendTimeoutError::Timeout(value));
}
Ok(Err(_)) => {
return Err(SendTimeoutError::Closed(value));
}
Ok(_) => {}
}
match self.try_send(value) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => unreachable!(),
Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)),
}
}
}
}

View File

@ -96,7 +96,7 @@ impl Error for TryRecvError {}
// ===== ClosedError =====
/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
/// Error returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
#[derive(Debug)]
pub struct ClosedError(());
@ -113,3 +113,34 @@ impl fmt::Display for ClosedError {
}
impl Error for ClosedError {}
cfg_time! {
// ===== SendTimeoutError =====
#[derive(Debug)]
/// Error returned by [`Sender::send_timeout`](super::Sender::send_timeout)].
pub enum SendTimeoutError<T> {
/// The data could not be sent on the channel because the channel is
/// full, and the timeout to send has elapsed.
Timeout(T),
/// The receive half of the channel was explicitly closed or has been
/// dropped.
Closed(T),
}
impl<T: fmt::Debug> Error for SendTimeoutError<T> {}
impl<T> fmt::Display for SendTimeoutError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"{}",
match self {
SendTimeoutError::Timeout(..) => "timed out waiting on send operation",
SendTimeoutError::Closed(..) => "channel closed",
}
)
}
}
}