time: stream throttle (#1949)

This commit is contained in:
Artem Vorotnikov 2019-12-14 09:06:41 +03:00 committed by Carl Lerche
parent d593c5b051
commit 4b85565bd7
3 changed files with 107 additions and 46 deletions

View File

@ -96,6 +96,11 @@ mod timeout;
#[doc(inline)] #[doc(inline)]
pub use timeout::{timeout, timeout_at, Timeout, Elapsed}; pub use timeout::{timeout, timeout_at, Timeout, Elapsed};
cfg_stream! {
mod throttle;
pub use throttle::{throttle, Throttle};
}
mod wheel; mod wheel;
#[cfg(test)] #[cfg(test)]

View File

@ -7,34 +7,62 @@ use std::marker::Unpin;
use std::pin::Pin; use std::pin::Pin;
use std::task::{self, Poll}; use std::task::{self, Poll};
use futures_core::Stream;
use pin_project_lite::pin_project;
/// Slow down a stream by enforcing a delay between items. /// Slow down a stream by enforcing a delay between items.
#[derive(Debug)] /// They will be produced not more often than the specified interval.
#[must_use = "streams do nothing unless polled"] ///
pub struct Throttle<T> { /// # Example
/// `None` when duration is zero. ///
delay: Option<Delay>, /// Create a throttled stream.
/// ```rust,norun
/// use futures::stream::StreamExt;
/// use std::time::Duration;
/// use tokio::time::throttle;
///
/// # async fn dox() {
/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one"));
///
/// loop {
/// // The string will be produced at most every 2 seconds
/// println!("{:?}", item_stream.next().await);
/// }
/// # }
/// ```
pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
where
T: Stream,
{
let delay = if duration == Duration::from_millis(0) {
None
} else {
Some(Delay::new_timeout(Instant::now() + duration, duration))
};
/// Set to true when `delay` has returned ready, but `stream` hasn't. Throttle {
has_delayed: bool, delay,
duration,
/// The stream to throttle has_delayed: true,
stream: T, stream,
}
} }
impl<T> Throttle<T> { pin_project! {
/// Slow down a stream by enforcing a delay between items. /// Stream for the [`throttle`](throttle) function.
pub fn new(stream: T, duration: Duration) -> Self { #[derive(Debug)]
let delay = if duration == Duration::from_millis(0) { #[must_use = "streams do nothing unless polled"]
None pub struct Throttle<T> {
} else { // `None` when duration is zero.
Some(Delay::new_timeout(Instant::now() + duration, duration)) delay: Option<Delay>,
}; duration: Duration,
Self { // Set to true when `delay` has returned ready, but `stream` hasn't.
delay, has_delayed: bool,
has_delayed: true,
stream, // The stream to throttle
} #[pin]
stream: T,
} }
} }
@ -68,29 +96,27 @@ impl<T: Stream> Stream for Throttle<T> {
type Item = T::Item; type Item = T::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { if !self.has_delayed && self.delay.is_some() {
if !self.has_delayed && self.delay.is_some() { ready!(Pin::new(self.as_mut()
ready!(self .project().delay.as_mut().unwrap())
.as_mut() .poll(cx));
.map_unchecked_mut(|me| me.delay.as_mut().unwrap()) *self.as_mut().project().has_delayed = true;
.poll(cx));
self.as_mut().get_unchecked_mut().has_delayed = true;
}
let value = ready!(self
.as_mut()
.map_unchecked_mut(|me| &mut me.stream)
.poll_next(cx));
if value.is_some() {
if let Some(ref mut delay) = self.as_mut().get_unchecked_mut().delay {
delay.reset_timeout();
}
self.as_mut().get_unchecked_mut().has_delayed = false;
}
Poll::Ready(value)
} }
let value = ready!(self
.as_mut()
.project().stream
.poll_next(cx));
if value.is_some() {
let dur = self.duration;
if let Some(ref mut delay) = self.as_mut().project().delay {
delay.reset(Instant::now() + dur);
}
*self.as_mut().project().has_delayed = false;
}
Poll::Ready(value)
} }
} }

View File

@ -0,0 +1,30 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::time::{self, throttle};
use tokio_test::*;
use std::time::Duration;
#[tokio::test]
async fn usage() {
time::pause();
let mut stream = task::spawn(throttle(
Duration::from_millis(100),
futures::stream::repeat(()),
));
assert_ready!(stream.poll_next());
assert_pending!(stream.poll_next());
time::advance(Duration::from_millis(90)).await;
assert_pending!(stream.poll_next());
time::advance(Duration::from_millis(101)).await;
assert!(stream.is_woken());
assert_ready!(stream.poll_next());
}