mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
sync: umplement Stream::size_hint
for ReceiverStream
and UnboundedReceiverStream
(#7492)
This commit is contained in:
parent
0e5c5d64f5
commit
9f423053fb
@ -67,6 +67,25 @@ impl<T> Stream for ReceiverStream<T> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.inner.poll_recv(cx)
|
||||
}
|
||||
|
||||
/// Returns the bounds of the stream based on the underlying receiver.
|
||||
///
|
||||
/// For open channels, it returns `(receiver.len(), None)`.
|
||||
///
|
||||
/// For closed channels, it returns `(receiver.len(), Some(used_capacity))`
|
||||
/// where `used_capacity` is calculated as `receiver.max_capacity() -
|
||||
/// receiver.capacity()`. This accounts for any [`Permit`] that is still
|
||||
/// able to send a message.
|
||||
///
|
||||
/// [`Permit`]: struct@tokio::sync::mpsc::Permit
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
if self.inner.is_closed() {
|
||||
let used_capacity = self.inner.max_capacity() - self.inner.capacity();
|
||||
(self.inner.len(), Some(used_capacity))
|
||||
} else {
|
||||
(self.inner.len(), None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
|
||||
|
@ -61,6 +61,20 @@ impl<T> Stream for UnboundedReceiverStream<T> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.inner.poll_recv(cx)
|
||||
}
|
||||
|
||||
/// Returns the bounds of the stream based on the underlying receiver.
|
||||
///
|
||||
/// For open channels, it returns `(receiver.len(), None)`.
|
||||
///
|
||||
/// For closed channels, it returns `(receiver.len(), receiver.len())`.
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
if self.inner.is_closed() {
|
||||
let len = self.inner.len();
|
||||
(len, Some(len))
|
||||
} else {
|
||||
(self.inner.len(), None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
|
||||
|
109
tokio-stream/tests/mpsc_bounded_stream.rs
Normal file
109
tokio-stream/tests/mpsc_bounded_stream.rs
Normal file
@ -0,0 +1,109 @@
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_stream_open() {
|
||||
let (tx, rx) = mpsc::channel(4);
|
||||
|
||||
tx.send(1).await.unwrap();
|
||||
tx.send(2).await.unwrap();
|
||||
|
||||
let mut stream = ReceiverStream::new(rx);
|
||||
|
||||
assert_eq!(stream.size_hint(), (2, None));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (1, None));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, None));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_stream_closed() {
|
||||
let (tx, rx) = mpsc::channel(4);
|
||||
|
||||
tx.send(1).await.unwrap();
|
||||
tx.send(2).await.unwrap();
|
||||
|
||||
let mut stream = ReceiverStream::new(rx);
|
||||
stream.close();
|
||||
|
||||
assert_eq!(stream.size_hint(), (2, Some(2)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (1, Some(1)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_sender_dropped() {
|
||||
let (tx, rx) = mpsc::channel(4);
|
||||
|
||||
tx.send(1).await.unwrap();
|
||||
tx.send(2).await.unwrap();
|
||||
|
||||
let mut stream = ReceiverStream::new(rx);
|
||||
drop(tx);
|
||||
|
||||
assert_eq!(stream.size_hint(), (2, Some(2)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (1, Some(1)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn size_hint_stream_instantly_closed() {
|
||||
let (_tx, rx) = mpsc::channel::<i32>(4);
|
||||
|
||||
let mut stream = ReceiverStream::new(rx);
|
||||
stream.close();
|
||||
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_stream_closed_permits_send() {
|
||||
let (tx, rx) = mpsc::channel(4);
|
||||
|
||||
tx.send(1).await.unwrap();
|
||||
let permit1 = tx.reserve().await.unwrap();
|
||||
let permit2 = tx.reserve().await.unwrap();
|
||||
|
||||
let mut stream = ReceiverStream::new(rx);
|
||||
stream.close();
|
||||
|
||||
assert_eq!(stream.size_hint(), (1, Some(3)));
|
||||
permit1.send(2);
|
||||
assert_eq!(stream.size_hint(), (2, Some(3)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (1, Some(2)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, Some(1)));
|
||||
permit2.send(3);
|
||||
assert_eq!(stream.size_hint(), (1, Some(1)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
assert_eq!(stream.next().await, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_stream_closed_permits_drop() {
|
||||
let (tx, rx) = mpsc::channel(4);
|
||||
|
||||
tx.send(1).await.unwrap();
|
||||
let permit1 = tx.reserve().await.unwrap();
|
||||
let permit2 = tx.reserve().await.unwrap();
|
||||
|
||||
let mut stream = ReceiverStream::new(rx);
|
||||
stream.close();
|
||||
|
||||
assert_eq!(stream.size_hint(), (1, Some(3)));
|
||||
drop(permit1);
|
||||
assert_eq!(stream.size_hint(), (1, Some(2)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, Some(1)));
|
||||
drop(permit2);
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
assert_eq!(stream.next().await, None);
|
||||
}
|
63
tokio-stream/tests/mpsc_unbounded_stream.rs
Normal file
63
tokio-stream/tests/mpsc_unbounded_stream.rs
Normal file
@ -0,0 +1,63 @@
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_stream_open() {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
tx.send(1).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
|
||||
let mut stream = UnboundedReceiverStream::new(rx);
|
||||
|
||||
assert_eq!(stream.size_hint(), (2, None));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (1, None));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, None));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_stream_closed() {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
tx.send(1).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
|
||||
let mut stream = UnboundedReceiverStream::new(rx);
|
||||
stream.close();
|
||||
|
||||
assert_eq!(stream.size_hint(), (2, Some(2)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (1, Some(1)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn size_hint_sender_dropped() {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
tx.send(1).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
|
||||
let mut stream = UnboundedReceiverStream::new(rx);
|
||||
drop(tx);
|
||||
|
||||
assert_eq!(stream.size_hint(), (2, Some(2)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (1, Some(1)));
|
||||
stream.next().await;
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn size_hint_stream_instantly_closed() {
|
||||
let (_tx, rx) = mpsc::unbounded_channel::<i32>();
|
||||
|
||||
let mut stream = UnboundedReceiverStream::new(rx);
|
||||
stream.close();
|
||||
|
||||
assert_eq!(stream.size_hint(), (0, Some(0)));
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user