sync: impl Stream for broadcast::Receiver (#2012)

This commit is contained in:
Bhargav 2019-12-21 14:38:05 -08:00 committed by Carl Lerche
parent 3d1b4b3058
commit a854094825
2 changed files with 38 additions and 2 deletions

View File

@ -207,7 +207,7 @@ pub struct SendError<T>(pub T);
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum RecvError {
/// There are no more active senders implying no further messages will ever
/// be sent.
@ -225,7 +225,7 @@ pub enum RecvError {
///
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum TryRecvError {
/// The channel is currently empty. There are still active
/// [`Sender`][Sender] handles, so data may yet become available.
@ -861,6 +861,25 @@ where
}
}
#[cfg(feature = "stream")]
impl<T> crate::stream::Stream for Receiver<T>
where
T: Clone,
{
type Item = Result<T, RecvError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, RecvError>>> {
self.poll_recv(cx).map(|v| match v {
Ok(v) => Some(Ok(v)),
lag @ Err(RecvError::Lagged(_)) => Some(lag),
Err(RecvError::Closed) => None,
})
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut tail = self.shared.tail.lock().unwrap();

View File

@ -80,6 +80,23 @@ fn send_two_recv() {
assert_empty!(rx2);
}
#[tokio::test]
async fn send_recv_stream() {
use tokio::stream::StreamExt;
let (tx, mut rx) = broadcast::channel::<i32>(8);
assert_ok!(tx.send(1));
assert_ok!(tx.send(2));
assert_eq!(Some(Ok(1)), rx.next().await);
assert_eq!(Some(Ok(2)), rx.next().await);
drop(tx);
assert_eq!(None, rx.next().await);
}
#[test]
fn send_recv_bounded() {
let (tx, mut rx) = broadcast::channel(16);