stream: add StreamExt::chain (#2093)

Asynchronous equivalent to `Iterator::chain`.
This commit is contained in:
Carl Lerche 2020-01-11 16:33:52 -08:00 committed by GitHub
parent 64d2389911
commit 7c3f1cb4a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 166 additions and 0 deletions

57
tokio/src/stream/chain.rs Normal file
View File

@ -0,0 +1,57 @@
use crate::stream::{Fuse, Stream};
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
pin_project! {
/// Stream returned by the [`chain`](super::StreamExt::chain) method.
pub struct Chain<T, U> {
#[pin]
a: Fuse<T>,
#[pin]
b: U,
}
}
impl<T, U> Chain<T, U> {
pub(super) fn new(a: T, b: U) -> Chain<T, U>
where
T: Stream,
U: Stream,
{
Chain { a: Fuse::new(a), b }
}
}
impl<T, U> Stream for Chain<T, U>
where
T: Stream,
U: Stream<Item = T::Item>,
{
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
use Poll::Ready;
let me = self.project();
if let Some(v) = ready!(me.a.poll_next(cx)) {
return Ready(Some(v));
}
me.b.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (a_lower, a_upper) = self.a.size_hint();
let (b_lower, b_upper) = self.b.size_hint();
let upper = match (a_upper, b_upper) {
(Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper),
_ => None,
};
(a_lower + b_lower, upper)
}
}

View File

@ -10,6 +10,9 @@ use all::AllFuture;
mod any;
use any::AnyFuture;
mod chain;
use chain::Chain;
mod empty;
pub use empty::{empty, Empty};
@ -539,6 +542,41 @@ pub trait StreamExt: Stream {
{
AnyFuture::new(self, f)
}
/// Combine two streams into one by first returning all values from the
/// first stream then all values from the second stream.
///
/// As long as `self` still has values to emit, no values from `other` are
/// emitted, even if some are ready.
///
/// # Examples
///
/// ```
/// use tokio::stream::{self, StreamExt};
///
/// #[tokio::main]
/// async fn main() {
/// let one = stream::iter(vec![1, 2, 3]);
/// let two = stream::iter(vec![4, 5, 6]);
///
/// let mut stream = one.chain(two);
///
/// assert_eq!(stream.next().await, Some(1));
/// assert_eq!(stream.next().await, Some(2));
/// assert_eq!(stream.next().await, Some(3));
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, Some(5));
/// assert_eq!(stream.next().await, Some(6));
/// assert_eq!(stream.next().await, None);
/// }
/// ```
fn chain<U>(self, other: U) -> Chain<Self, U>
where
U: Stream<Item = Self::Item>,
Self: Sized,
{
Chain::new(self, other)
}
}
impl<St: ?Sized> StreamExt for St where St: Stream {}

View File

@ -0,0 +1,71 @@
use tokio::stream::{self, Stream, StreamExt};
use tokio::sync::mpsc;
use tokio_test::{assert_pending, assert_ready, task};
#[tokio::test]
async fn basic_usage() {
let one = stream::iter(vec![1, 2, 3]);
let two = stream::iter(vec![4, 5, 6]);
let mut stream = one.chain(two);
assert_eq!(stream.size_hint(), (6, Some(6)));
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.size_hint(), (5, Some(5)));
assert_eq!(stream.next().await, Some(2));
assert_eq!(stream.size_hint(), (4, Some(4)));
assert_eq!(stream.next().await, Some(3));
assert_eq!(stream.size_hint(), (3, Some(3)));
assert_eq!(stream.next().await, Some(4));
assert_eq!(stream.size_hint(), (2, Some(2)));
assert_eq!(stream.next().await, Some(5));
assert_eq!(stream.size_hint(), (1, Some(1)));
assert_eq!(stream.next().await, Some(6));
assert_eq!(stream.size_hint(), (0, Some(0)));
assert_eq!(stream.next().await, None);
assert_eq!(stream.size_hint(), (0, Some(0)));
assert_eq!(stream.next().await, None);
}
#[tokio::test]
async fn pending_first() {
let (tx1, rx1) = mpsc::unbounded_channel();
let (tx2, rx2) = mpsc::unbounded_channel();
let mut stream = task::spawn(rx1.chain(rx2));
assert_eq!(stream.size_hint(), (0, None));
assert_pending!(stream.poll_next());
tx2.send(2).unwrap();
assert!(!stream.is_woken());
assert_pending!(stream.poll_next());
tx1.send(1).unwrap();
assert!(stream.is_woken());
assert_eq!(Some(1), assert_ready!(stream.poll_next()));
assert_pending!(stream.poll_next());
drop(tx1);
assert_eq!(stream.size_hint(), (0, None));
assert!(stream.is_woken());
assert_eq!(Some(2), assert_ready!(stream.poll_next()));
assert_eq!(stream.size_hint(), (0, None));
drop(tx2);
assert_eq!(stream.size_hint(), (0, None));
assert_eq!(None, assert_ready!(stream.poll_next()));
}