From 1a5de2c79d9d825c367f7881646306fe7f9aaa0c Mon Sep 17 00:00:00 2001 From: Tore Pettersen Date: Sat, 1 Feb 2020 23:03:34 +0100 Subject: [PATCH] stream: add StreamExt::skip (#2204) skip version of take Refs: #2104 --- tokio/src/stream/mod.rs | 28 ++++++++++++++++++ tokio/src/stream/skip.rs | 63 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 tokio/src/stream/skip.rs diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index 3cc7e68fb..2ee278bf5 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -53,6 +53,9 @@ pub use pending::{pending, Pending}; mod stream_map; pub use stream_map::StreamMap; +mod skip; +use skip::Skip; + mod try_next; use try_next::TryNext; @@ -451,6 +454,31 @@ pub trait StreamExt: Stream { TakeWhile::new(self, f) } + /// Creates a new stream that will skip the `n` first items of the + /// underlying stream. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).skip(7); + /// + /// assert_eq!(Some(8), stream.next().await); + /// assert_eq!(Some(9), stream.next().await); + /// assert_eq!(Some(10), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip(self, n: usize) -> Skip + where + Self: Sized, + { + Skip::new(self, n) + } + /// Tests if every element of the stream matches a predicate. /// /// `all()` takes a closure that returns `true` or `false`. It applies diff --git a/tokio/src/stream/skip.rs b/tokio/src/stream/skip.rs new file mode 100644 index 000000000..39540cc98 --- /dev/null +++ b/tokio/src/stream/skip.rs @@ -0,0 +1,63 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip`](super::StreamExt::skip) method. + #[must_use = "streams do nothing unless polled"] + pub struct Skip { + #[pin] + stream: St, + remaining: usize, + } +} + +impl fmt::Debug for Skip +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Skip") + .field("stream", &self.stream) + .finish() + } +} + +impl Skip { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl Stream for Skip +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if self.remaining == 0 { + return Poll::Ready(Some(e)); + } + *self.as_mut().project().remaining -= 1; + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_sub(self.remaining); + let upper = upper.map(|x| x.saturating_sub(self.remaining)); + + (lower, upper) + } +}