mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
stream: add StreamExt::all (#2035)
This commit is contained in:
parent
e8fcf55881
commit
3cf91db4b6
45
tokio/src/stream/all.rs
Normal file
45
tokio/src/stream/all.rs
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
use crate::stream::Stream;
|
||||||
|
|
||||||
|
use core::future::Future;
|
||||||
|
use core::pin::Pin;
|
||||||
|
use core::task::{Context, Poll};
|
||||||
|
|
||||||
|
/// Future for the [`all`](super::StreamExt::all) method.
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||||
|
pub struct AllFuture<'a, St: ?Sized, F> {
|
||||||
|
stream: &'a mut St,
|
||||||
|
f: F,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, St: ?Sized, F> AllFuture<'a, St, F> {
|
||||||
|
pub(super) fn new(stream: &'a mut St, f: F) -> Self {
|
||||||
|
Self { stream, f }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St: ?Sized + Unpin, F> Unpin for AllFuture<'_, St, F> {}
|
||||||
|
|
||||||
|
impl<St, F> Future for AllFuture<'_, St, F>
|
||||||
|
where
|
||||||
|
St: ?Sized + Stream + Unpin,
|
||||||
|
F: FnMut(St::Item) -> bool,
|
||||||
|
{
|
||||||
|
type Output = bool;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
|
||||||
|
|
||||||
|
match next {
|
||||||
|
Some(v) => {
|
||||||
|
if !(&mut self.f)(v) {
|
||||||
|
Poll::Ready(false)
|
||||||
|
} else {
|
||||||
|
cx.waker().wake_by_ref();
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => Poll::Ready(true),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,9 @@
|
|||||||
//!
|
//!
|
||||||
//! This module provides helpers to work with them.
|
//! This module provides helpers to work with them.
|
||||||
|
|
||||||
|
mod all;
|
||||||
|
use all::AllFuture;
|
||||||
|
|
||||||
mod filter;
|
mod filter;
|
||||||
use filter::Filter;
|
use filter::Filter;
|
||||||
|
|
||||||
@ -265,6 +268,59 @@ pub trait StreamExt: Stream {
|
|||||||
{
|
{
|
||||||
TakeWhile::new(self, f)
|
TakeWhile::new(self, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tests if every element of the stream matches a predicate.
|
||||||
|
/// `all()` takes a closure that returns `true` or `false`. It applies
|
||||||
|
/// this closure to each element of the stream, and if they all return
|
||||||
|
/// `true`, then so does `all`. If any of them return `false`, it
|
||||||
|
/// returns `false`. An empty stream returns `true`.
|
||||||
|
///
|
||||||
|
/// `all()` is short-circuiting; in other words, it will stop processing
|
||||||
|
/// as soon as it finds a `false`, given that no matter what else happens,
|
||||||
|
/// the result will also be `false`.
|
||||||
|
///
|
||||||
|
/// An empty stream returns `true`.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// Basic usage:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # #[tokio::main]
|
||||||
|
/// # async fn main() {
|
||||||
|
/// use tokio::stream::{self, StreamExt};
|
||||||
|
///
|
||||||
|
/// let a = [1, 2, 3];
|
||||||
|
///
|
||||||
|
/// assert!(stream::iter(&a).all(|&x| x > 0).await);
|
||||||
|
///
|
||||||
|
/// assert!(!stream::iter(&a).all(|&x| x > 2).await);
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// Stopping at the first `false`:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # #[tokio::main]
|
||||||
|
/// # async fn main() {
|
||||||
|
/// use tokio::stream::{self, StreamExt};
|
||||||
|
///
|
||||||
|
/// let a = [1, 2, 3];
|
||||||
|
///
|
||||||
|
/// let mut iter = stream::iter(&a);
|
||||||
|
///
|
||||||
|
/// assert!(!iter.all(|&x| x != 2).await);
|
||||||
|
///
|
||||||
|
/// // we can still use `iter`, as there are more elements.
|
||||||
|
/// assert_eq!(iter.next().await, Some(&3));
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
|
||||||
|
where
|
||||||
|
Self: Unpin,
|
||||||
|
{
|
||||||
|
AllFuture::new(self, f)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<St: ?Sized> StreamExt for St where St: Stream {}
|
impl<St: ?Sized> StreamExt for St where St: Stream {}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user