stream: add StreamExt::fuse (#2085)

This commit is contained in:
Carl Lerche 2020-01-09 20:51:06 -08:00 committed by GitHub
parent f5c20cd228
commit cfd9b36d89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 171 additions and 0 deletions

53
tokio/src/stream/fuse.rs Normal file
View File

@ -0,0 +1,53 @@
use crate::stream::Stream;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// Stream returned by [`fuse()`][super::StreamExt::fuse].
#[derive(Debug)]
pub struct Fuse<T> {
#[pin]
stream: Option<T>,
}
}
impl<T> Fuse<T>
where
T: Stream,
{
pub(crate) fn new(stream: T) -> Fuse<T> {
Fuse {
stream: Some(stream),
}
}
}
impl<T> Stream for Fuse<T>
where
T: Stream,
{
type Item = T::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
let res = match Option::as_pin_mut(self.as_mut().project().stream) {
Some(stream) => ready!(stream.poll_next(cx)),
None => return Poll::Ready(None),
};
if res.is_none() {
// Do not poll the stream anymore
self.as_mut().project().stream.set(None);
}
Poll::Ready(res)
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.stream {
Some(ref stream) => stream.size_hint(),
None => (0, Some(0)),
}
}
}

View File

@ -16,6 +16,9 @@ use filter::Filter;
mod filter_map;
use filter_map::FilterMap;
mod fuse;
use fuse::Fuse;
mod iter;
pub use iter::{iter, Iter};
@ -222,6 +225,71 @@ pub trait StreamExt: Stream {
FilterMap::new(self, f)
}
/// Creates a stream which ends after the first `None`.
///
/// After a stream returns `None`, behavior is undefined. Future calls to
/// `poll_next` may or may not return `Some(T)` again or they may panic.
/// `fuse()` adapts a stream, ensuring that after `None` is given, it will
/// return `None` forever.
///
/// # Examples
///
/// ```
/// use tokio::stream::{Stream, StreamExt};
///
/// use std::pin::Pin;
/// use std::task::{Context, Poll};
///
/// // a stream which alternates between Some and None
/// struct Alternate {
/// state: i32,
/// }
///
/// impl Stream for Alternate {
/// type Item = i32;
///
/// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
/// let val = self.state;
/// self.state = self.state + 1;
///
/// // if it's even, Some(i32), else None
/// if val % 2 == 0 {
/// Poll::Ready(Some(val))
/// } else {
/// Poll::Ready(None)
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let mut stream = Alternate { state: 0 };
///
/// // the stream goes back and forth
/// assert_eq!(stream.next().await, Some(0));
/// assert_eq!(stream.next().await, None);
/// assert_eq!(stream.next().await, Some(2));
/// assert_eq!(stream.next().await, None);
///
/// // however, once it is fused
/// let mut stream = stream.fuse();
///
/// assert_eq!(stream.next().await, Some(4));
/// assert_eq!(stream.next().await, None);
///
/// // it will always return `None` after the first time.
/// assert_eq!(stream.next().await, None);
/// assert_eq!(stream.next().await, None);
/// assert_eq!(stream.next().await, None);
/// }
/// ```
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse::new(self)
}
/// Creates a new stream of at most `n` items of the underlying stream.
///
/// Once `n` items have been yielded from this stream then it will always

View File

@ -0,0 +1,50 @@
use tokio::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
// a stream which alternates between Some and None
struct Alternate {
state: i32,
}
impl Stream for Alternate {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
let val = self.state;
self.state = self.state + 1;
// if it's even, Some(i32), else None
if val % 2 == 0 {
Poll::Ready(Some(val))
} else {
Poll::Ready(None)
}
}
}
#[tokio::test]
async fn basic_usage() {
let mut stream = Alternate { state: 0 };
// the stream goes back and forth
assert_eq!(stream.next().await, Some(0));
assert_eq!(stream.next().await, None);
assert_eq!(stream.next().await, Some(2));
assert_eq!(stream.next().await, None);
// however, once it is fused
let mut stream = stream.fuse();
assert_eq!(stream.size_hint(), (0, None));
assert_eq!(stream.next().await, Some(4));
assert_eq!(stream.size_hint(), (0, None));
assert_eq!(stream.next().await, None);
// it will always return `None` after the first time.
assert_eq!(stream.size_hint(), (0, Some(0)));
assert_eq!(stream.next().await, None);
assert_eq!(stream.size_hint(), (0, Some(0)));
}