stream: add StreamExt::take_while (#2029)

This commit is contained in:
Artem Vorotnikov 2019-12-25 23:48:02 +03:00 committed by Carl Lerche
parent 50b91c0247
commit a515f9c459
9 changed files with 122 additions and 22 deletions

View File

@ -26,11 +26,7 @@ where
}
}
impl<St, F> Filter<St, F>
where
St: Stream,
F: FnMut(&St::Item) -> bool,
{
impl<St, F> Filter<St, F> {
pub(super) fn new(stream: St, f: F) -> Self {
Self { stream, f }
}

View File

@ -26,11 +26,7 @@ where
}
}
impl<St, F, T> FilterMap<St, F>
where
St: Stream,
F: FnMut(St::Item) -> Option<T>,
{
impl<St, F> FilterMap<St, F> {
pub(super) fn new(stream: St, f: F) -> Self {
Self { stream, f }
}

View File

@ -24,12 +24,8 @@ where
}
}
impl<St, T, F> Map<St, F>
where
St: Stream,
F: FnMut(St::Item) -> T,
{
pub(super) fn new(stream: St, f: F) -> Map<St, F> {
impl<St, F> Map<St, F> {
pub(super) fn new(stream: St, f: F) -> Self {
Map { stream, f }
}
}

View File

@ -25,6 +25,9 @@ use try_next::TryNext;
mod take;
use take::Take;
mod take_while;
use take_while::TakeWhile;
pub use futures_core::Stream;
/// An extension trait for `Stream`s that provides a variety of convenient
@ -232,6 +235,36 @@ pub trait StreamExt: Stream {
{
Take::new(self, n)
}
/// Take elements from this stream while the provided predicate
/// resolves to `true`.
///
/// This function, like `Iterator::take_while`, will take elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false it will always return that the stream is done.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
///
/// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
///
/// assert_eq!(Some(1), stream.next().await);
/// assert_eq!(Some(2), stream.next().await);
/// assert_eq!(Some(3), stream.next().await);
/// assert_eq!(None, stream.next().await);
/// # }
/// ```
fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
where
F: FnMut(&Self::Item) -> bool,
Self: Sized,
{
TakeWhile::new(self, f)
}
}
impl<T: ?Sized> StreamExt for T where T: Stream {}
impl<St: ?Sized> StreamExt for St where St: Stream {}

View File

@ -13,7 +13,7 @@ pub struct Next<'a, St: ?Sized> {
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}
impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
impl<'a, St: ?Sized> Next<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
Next { stream }
}

View File

@ -7,7 +7,7 @@ use core::task::{Context, Poll};
use pin_project_lite::pin_project;
pin_project! {
/// Stream for the [`map`](super::StreamExt::map) method.
/// Stream for the [`take`](super::StreamExt::take) method.
#[must_use = "streams do nothing unless polled"]
pub struct Take<St> {
#[pin]
@ -27,7 +27,7 @@ where
}
}
impl<St: Stream> Take<St> {
impl<St> Take<St> {
pub(super) fn new(stream: St, remaining: usize) -> Self {
Self { stream, remaining }
}

View File

@ -0,0 +1,79 @@
use crate::stream::Stream;
use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
pin_project! {
/// Stream for the [`take_while`](super::StreamExt::take_while) method.
#[must_use = "streams do nothing unless polled"]
pub struct TakeWhile<St, F> {
#[pin]
stream: St,
predicate: F,
done: bool,
}
}
impl<St, F> fmt::Debug for TakeWhile<St, F>
where
St: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TakeWhile")
.field("stream", &self.stream)
.field("done", &self.done)
.finish()
}
}
impl<St, F> TakeWhile<St, F> {
pub(super) fn new(stream: St, predicate: F) -> Self {
Self {
stream,
predicate,
done: false,
}
}
}
impl<St, F> Stream for TakeWhile<St, F>
where
St: Stream,
F: FnMut(&St::Item) -> bool,
{
type Item = St::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !*self.as_mut().project().done {
self.as_mut().project().stream.poll_next(cx).map(|ready| {
let ready = ready.and_then(|item| {
if !(self.as_mut().project().predicate)(&item) {
None
} else {
Some(item)
}
});
if ready.is_none() {
*self.as_mut().project().done = true;
}
ready
})
} else {
Poll::Ready(None)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.done {
return (0, Some(0));
}
let (_, upper) = self.stream.size_hint();
(0, upper)
}
}

View File

@ -13,7 +13,7 @@ pub struct TryNext<'a, St: ?Sized> {
impl<St: ?Sized + Unpin> Unpin for TryNext<'_, St> {}
impl<'a, St: ?Sized + Stream + Unpin> TryNext<'a, St> {
impl<'a, St: ?Sized> TryNext<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
Self {
inner: Next::new(stream),

View File

@ -85,7 +85,7 @@ impl MockClock {
let ctx = context::ThreadContext::clone_current();
let _e = ctx
.with_clock(self.clock.clone())
.with_time_handle(Some(handle.clone()))
.with_time_handle(Some(handle))
.enter();
let time = self.time.clone();