From a8540948254ec69c630bacd0b4a58a20d701b7ac Mon Sep 17 00:00:00 2001 From: Bhargav Date: Sat, 21 Dec 2019 14:38:05 -0800 Subject: [PATCH] sync: impl `Stream` for broadcast::Receiver (#2012) --- tokio/src/sync/broadcast.rs | 23 +++++++++++++++++++++-- tokio/tests/sync_broadcast.rs | 17 +++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index fd9029a70..ce8037243 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -207,7 +207,7 @@ pub struct SendError(pub T); /// /// [`recv`]: crate::sync::broadcast::Receiver::recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum RecvError { /// There are no more active senders implying no further messages will ever /// be sent. @@ -225,7 +225,7 @@ pub enum RecvError { /// /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum TryRecvError { /// The channel is currently empty. There are still active /// [`Sender`][Sender] handles, so data may yet become available. @@ -861,6 +861,25 @@ where } } +#[cfg(feature = "stream")] +impl crate::stream::Stream for Receiver +where + T: Clone, +{ + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_recv(cx).map(|v| match v { + Ok(v) => Some(Ok(v)), + lag @ Err(RecvError::Lagged(_)) => Some(lag), + Err(RecvError::Closed) => None, + }) + } +} + impl Drop for Receiver { fn drop(&mut self) { let mut tail = self.shared.tail.lock().unwrap(); diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index b6f48ca5f..e9e7b3661 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -80,6 +80,23 @@ fn send_two_recv() { assert_empty!(rx2); } +#[tokio::test] +async fn send_recv_stream() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = broadcast::channel::(8); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + assert_eq!(Some(Ok(1)), rx.next().await); + assert_eq!(Some(Ok(2)), rx.next().await); + + drop(tx); + + assert_eq!(None, rx.next().await); +} + #[test] fn send_recv_bounded() { let (tx, mut rx) = broadcast::channel(16);