Add enumerate combinator to Stream (#832)

This commit is contained in:
Zahari Dichev 2019-01-24 21:50:34 +02:00 committed by Carl Lerche
parent 0ec8986b0b
commit fbad6297c5
5 changed files with 132 additions and 2 deletions

View File

@ -79,7 +79,7 @@ macro_rules! if_runtime {
)*)
}
#[cfg_attr(feature = "rt-full", macro_use)]
#[macro_use]
extern crate futures;
#[cfg(feature = "io")]

80
src/util/enumerate.rs Normal file
View File

@ -0,0 +1,80 @@
use futures::{Async, Poll, Stream, Sink, StartSend};
/// A stream combinator which combines the yields the current item
/// plus its count starting from 0.
///
/// This structure is produced by the `Stream::enumerate` method.
#[derive(Debug)]
#[must_use = "Does nothing unless polled"]
pub struct Enumerate<T> {
inner: T,
count: usize,
}
impl<T> Enumerate<T> {
pub(crate) fn new(stream: T) -> Self {
Self { inner: stream, count: 0 }
}
/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Acquires a mutable reference to the underlying stream that this
/// combinator is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
/// Consumes this combinator, returning the underlying stream.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T> Stream for Enumerate<T>
where
T: Stream,
{
type Item = (usize, T::Item);
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, T::Error> {
match try_ready!(self.inner.poll()) {
Some(item) => {
let ret = Some((self.count, item));
self.count += 1;
Ok(Async::Ready(ret))
}
None => return Ok(Async::Ready(None)),
}
}
}
// Forwarding impl of Sink from the underlying stream
impl<T> Sink for Enumerate<T>
where T: Sink
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
fn close(&mut self) -> Poll<(), T::SinkError> {
self.inner.close()
}
}

View File

@ -9,6 +9,7 @@
mod future;
mod stream;
mod enumerate;
pub use self::future::FutureExt;
pub use self::stream::StreamExt;

View File

@ -8,7 +8,7 @@ use futures::Stream;
#[cfg(feature = "timer")]
use std::time::Duration;
pub use util::enumerate::Enumerate;
/// An extension trait for `Stream` that provides a variety of convenient
/// combinator functions.
@ -34,6 +34,24 @@ pub trait StreamExt: Stream {
Throttle::new(self, duration)
}
/// Creates a new stream which gives the current iteration count as well
/// as the next value.
///
/// The stream returned yields pairs `(i, val)`, where `i` is the
/// current index of iteration and `val` is the value returned by the
/// iterator.
///
/// # Overflow Behavior
///
/// The method does no guarding against overflows, so counting elements of
/// an iterator with more than [`std::usize::MAX`] elements either produces the
/// wrong result or panics.
fn enumerate(self) -> Enumerate<Self>
where Self: Sized,
{
Enumerate::new(self)
}
/// Creates a new stream which allows `self` until `timeout`.
///
/// This combinator creates a new stream which wraps the receiving stream

View File

@ -0,0 +1,31 @@
extern crate futures;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_timer;
#[macro_use]
mod support;
use futures::prelude::*;
use futures::sync::mpsc;
use tokio::util::StreamExt;
#[test]
fn enumerate() {
use futures::*;
let (mut tx, rx) = mpsc::channel(1);
std::thread::spawn(|| {
for i in 0..5 {
tx = tx.send(i * 2).wait().unwrap();
}
});
let mut result = rx.enumerate().collect();
assert_eq!(
result.wait(),
Ok(vec![(0, 0), (1, 2), (2, 4), (3, 6), (4, 8)])
);
}