From 64d23899118dfc8f1d4d7a9b60c015e43260df80 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 11 Jan 2020 13:52:51 -0800 Subject: [PATCH] stream: add stream::once (#2094) An async equivalent to `iter::once` --- tokio/src/stream/mod.rs | 3 +++ tokio/src/stream/once.rs | 52 ++++++++++++++++++++++++++++++++++++++ tokio/tests/stream_once.rs | 12 +++++++++ 3 files changed, 67 insertions(+) create mode 100644 tokio/src/stream/once.rs create mode 100644 tokio/tests/stream_once.rs diff --git a/tokio/src/stream/mod.rs b/tokio/src/stream/mod.rs index b7b02d02f..fada4442b 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio/src/stream/mod.rs @@ -34,6 +34,9 @@ use merge::Merge; mod next; use next::Next; +mod once; +pub use once::{once, Once}; + mod pending; pub use pending::{pending, Pending}; diff --git a/tokio/src/stream/once.rs b/tokio/src/stream/once.rs new file mode 100644 index 000000000..04a642f30 --- /dev/null +++ b/tokio/src/stream/once.rs @@ -0,0 +1,52 @@ +use crate::stream::{self, Iter, Stream}; + +use core::option; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`once`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Once { + iter: Iter>, +} + +impl Unpin for Once {} + +/// Creates a stream that emits an element exactly once. +/// +/// The returned stream is immediately ready and emits the provided value once. +/// +/// # Examples +/// +/// ``` +/// use tokio::stream::{self, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// // one is the loneliest number +/// let mut one = stream::once(1); +/// +/// assert_eq!(Some(1), one.next().await); +/// +/// // just one, that's all we get +/// assert_eq!(None, one.next().await); +/// } +/// ``` +pub fn once(value: T) -> Once { + Once { + iter: stream::iter(Some(value).into_iter()), + } +} + +impl Stream for Once { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.iter).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } +} diff --git a/tokio/tests/stream_once.rs b/tokio/tests/stream_once.rs new file mode 100644 index 000000000..bb4635ac9 --- /dev/null +++ b/tokio/tests/stream_once.rs @@ -0,0 +1,12 @@ +use tokio::stream::{self, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut one = stream::once(1); + + assert_eq!(one.size_hint(), (1, Some(1))); + assert_eq!(Some(1), one.next().await); + + assert_eq!(one.size_hint(), (0, Some(0))); + assert_eq!(None, one.next().await); +}