mirror of
				https://github.com/tokio-rs/tokio.git
				synced 2025-11-03 14:02:47 +00:00 
			
		
		
		
	stream: add StreamExt::skip_while (#2205)
async version of Iterator::skip_while Refs: #2104
This commit is contained in:
		
							parent
							
								
									79e4514283
								
							
						
					
					
						commit
						513671f8de
					
				@ -56,6 +56,9 @@ pub use stream_map::StreamMap;
 | 
				
			|||||||
mod skip;
 | 
					mod skip;
 | 
				
			||||||
use skip::Skip;
 | 
					use skip::Skip;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					mod skip_while;
 | 
				
			||||||
 | 
					use skip_while::SkipWhile;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
mod try_next;
 | 
					mod try_next;
 | 
				
			||||||
use try_next::TryNext;
 | 
					use try_next::TryNext;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -479,6 +482,37 @@ pub trait StreamExt: Stream {
 | 
				
			|||||||
        Skip::new(self, n)
 | 
					        Skip::new(self, n)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Skip elements from the underlying stream while the provided predicate
 | 
				
			||||||
 | 
					    /// resolves to `true`.
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// This function, like [`Iterator::skip_while`], will ignore elemets from the
 | 
				
			||||||
 | 
					    /// stream until the predicate `f` resolves to `false`. Once one element
 | 
				
			||||||
 | 
					    /// returns false, the rest of the elements will be yielded.
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// # Examples
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// ```
 | 
				
			||||||
 | 
					    /// # #[tokio::main]
 | 
				
			||||||
 | 
					    /// # async fn main() {
 | 
				
			||||||
 | 
					    /// use tokio::stream::{self, StreamExt};
 | 
				
			||||||
 | 
					    /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// assert_eq!(Some(3), stream.next().await);
 | 
				
			||||||
 | 
					    /// assert_eq!(Some(4), stream.next().await);
 | 
				
			||||||
 | 
					    /// assert_eq!(Some(1), stream.next().await);
 | 
				
			||||||
 | 
					    /// assert_eq!(None, stream.next().await);
 | 
				
			||||||
 | 
					    /// # }
 | 
				
			||||||
 | 
					    /// ```
 | 
				
			||||||
 | 
					    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
 | 
				
			||||||
 | 
					    where
 | 
				
			||||||
 | 
					        F: FnMut(&Self::Item) -> bool,
 | 
				
			||||||
 | 
					        Self: Sized,
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        SkipWhile::new(self, f)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /// Tests if every element of the stream matches a predicate.
 | 
					    /// Tests if every element of the stream matches a predicate.
 | 
				
			||||||
    ///
 | 
					    ///
 | 
				
			||||||
    /// `all()` takes a closure that returns `true` or `false`. It applies
 | 
					    /// `all()` takes a closure that returns `true` or `false`. It applies
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										73
									
								
								tokio/src/stream/skip_while.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										73
									
								
								tokio/src/stream/skip_while.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,73 @@
 | 
				
			|||||||
 | 
					use crate::stream::Stream;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use core::fmt;
 | 
				
			||||||
 | 
					use core::pin::Pin;
 | 
				
			||||||
 | 
					use core::task::{Context, Poll};
 | 
				
			||||||
 | 
					use pin_project_lite::pin_project;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pin_project! {
 | 
				
			||||||
 | 
					    /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
 | 
				
			||||||
 | 
					    #[must_use = "streams do nothing unless polled"]
 | 
				
			||||||
 | 
					    pub struct SkipWhile<St, F> {
 | 
				
			||||||
 | 
					        #[pin]
 | 
				
			||||||
 | 
					        stream: St,
 | 
				
			||||||
 | 
					        predicate: Option<F>,
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl<St, F> fmt::Debug for SkipWhile<St, F>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    St: fmt::Debug,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 | 
				
			||||||
 | 
					        f.debug_struct("SkipWhile")
 | 
				
			||||||
 | 
					            .field("stream", &self.stream)
 | 
				
			||||||
 | 
					            .finish()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl<St, F> SkipWhile<St, F> {
 | 
				
			||||||
 | 
					    pub(super) fn new(stream: St, predicate: F) -> Self {
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            stream,
 | 
				
			||||||
 | 
					            predicate: Some(predicate),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl<St, F> Stream for SkipWhile<St, F>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    St: Stream,
 | 
				
			||||||
 | 
					    F: FnMut(&St::Item) -> bool,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    type Item = St::Item;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
 | 
				
			||||||
 | 
					        let mut this = self.project();
 | 
				
			||||||
 | 
					        if let Some(predicate) = this.predicate {
 | 
				
			||||||
 | 
					            loop {
 | 
				
			||||||
 | 
					                match ready!(this.stream.as_mut().poll_next(cx)) {
 | 
				
			||||||
 | 
					                    Some(item) => {
 | 
				
			||||||
 | 
					                        if !(predicate)(&item) {
 | 
				
			||||||
 | 
					                            *this.predicate = None;
 | 
				
			||||||
 | 
					                            return Poll::Ready(Some(item));
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    None => return Poll::Ready(None),
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            this.stream.poll_next(cx)
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn size_hint(&self) -> (usize, Option<usize>) {
 | 
				
			||||||
 | 
					        let (lower, upper) = self.stream.size_hint();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if self.predicate.is_some() {
 | 
				
			||||||
 | 
					            return (0, upper);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        (lower, upper)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user