stream: add Stream wrappers in tokio-stream (#3343)

This commit is contained in:
Alice Ryhl 2021-01-04 19:03:18 +01:00 committed by GitHub
parent 3b6bee822d
commit 7f17822ed9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 553 additions and 8 deletions

View File

@ -22,6 +22,9 @@ categories = ["asynchronous"]
[features]
default = ["time"]
time = ["tokio/time"]
net = ["tokio/net"]
io-util = ["tokio/io-util"]
fs = ["tokio/fs"]
[dependencies]
futures-core = { version = "0.3.0" }

View File

@ -81,6 +81,8 @@
#[macro_use]
mod macros;
pub mod wrappers;
mod stream_ext;
pub use stream_ext::{collect::FromStream, StreamExt};

View File

@ -1,3 +1,33 @@
macro_rules! cfg_fs {
($($item:item)*) => {
$(
#[cfg(feature = "fs")]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
$item
)*
}
}
macro_rules! cfg_io_util {
($($item:item)*) => {
$(
#[cfg(feature = "io-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
$item
)*
}
}
macro_rules! cfg_net {
($($item:item)*) => {
$(
#[cfg(feature = "net")]
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
$item
)*
}
}
macro_rules! cfg_time {
($($item:item)*) => {
$(

View File

@ -0,0 +1,35 @@
//! Wrappers for Tokio types that implement `Stream`.
mod mpsc_bounded;
pub use mpsc_bounded::ReceiverStream;
mod mpsc_unbounded;
pub use mpsc_unbounded::UnboundedReceiverStream;
cfg_time! {
mod interval;
pub use interval::IntervalStream;
}
cfg_net! {
mod tcp_listener;
pub use tcp_listener::TcpListenerStream;
#[cfg(unix)]
mod unix_listener;
#[cfg(unix)]
pub use unix_listener::UnixListenerStream;
}
cfg_io_util! {
mod split;
pub use split::SplitStream;
mod lines;
pub use lines::LinesStream;
}
cfg_fs! {
mod read_dir;
pub use read_dir::ReadDirStream;
}

View File

@ -0,0 +1,50 @@
use crate::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{Instant, Interval};
/// A wrapper around [`Interval`] that implements [`Stream`].
///
/// [`Interval`]: struct@tokio::time::Interval
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub struct IntervalStream {
inner: Interval,
}
impl IntervalStream {
/// Create a new `IntervalStream`.
pub fn new(interval: Interval) -> Self {
Self { inner: interval }
}
/// Get back the inner `Interval`.
pub fn into_inner(self) -> Interval {
self.inner
}
}
impl Stream for IntervalStream {
type Item = Instant;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
self.inner.poll_tick(cx).map(Some)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(std::usize::MAX, None)
}
}
impl AsRef<Interval> for IntervalStream {
fn as_ref(&self) -> &Interval {
&self.inner
}
}
impl AsMut<Interval> for IntervalStream {
fn as_mut(&mut self) -> &mut Interval {
&mut self.inner
}
}

View File

@ -0,0 +1,59 @@
use crate::Stream;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, Lines};
pin_project! {
/// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
///
/// [`tokio::io::Lines`]: struct@tokio::io::Lines
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct LinesStream<R> {
#[pin]
inner: Lines<R>,
}
}
impl<R> LinesStream<R> {
/// Create a new `LinesStream`.
pub fn new(lines: Lines<R>) -> Self {
Self { inner: lines }
}
/// Get back the inner `Lines`.
pub fn into_inner(self) -> Lines<R> {
self.inner
}
/// Obtain a pinned reference to the inner `Lines<R>`.
pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> {
self.project().inner
}
}
impl<R: AsyncBufRead> Stream for LinesStream<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project()
.inner
.poll_next_line(cx)
.map(Result::transpose)
}
}
impl<R> AsRef<Lines<R>> for LinesStream<R> {
fn as_ref(&self) -> &Lines<R> {
&self.inner
}
}
impl<R> AsMut<Lines<R>> for LinesStream<R> {
fn as_mut(&mut self) -> &mut Lines<R> {
&mut self.inner
}
}

View File

@ -0,0 +1,59 @@
use crate::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::Receiver;
/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
///
/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
pub struct ReceiverStream<T> {
inner: Receiver<T>,
}
impl<T> ReceiverStream<T> {
/// Create a new `ReceiverStream`.
pub fn new(recv: Receiver<T>) -> Self {
Self { inner: recv }
}
/// Get back the inner `Receiver`.
pub fn into_inner(self) -> Receiver<T> {
self.inner
}
/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered. Any
/// outstanding [`Permit`] values will still be able to send messages.
///
/// To guarantee no messages are dropped, after calling `close()`, you must
/// receive all items from the stream until `None` is returned.
///
/// [`Permit`]: struct@tokio::sync::mpsc::Permit
pub fn close(&mut self) {
self.inner.close()
}
}
impl<T> Stream for ReceiverStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}
impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
fn as_ref(&self) -> &Receiver<T> {
&self.inner
}
}
impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
fn as_mut(&mut self) -> &mut Receiver<T> {
&mut self.inner
}
}

View File

@ -0,0 +1,53 @@
use crate::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::UnboundedReceiver;
/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
///
/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
pub struct UnboundedReceiverStream<T> {
inner: UnboundedReceiver<T>,
}
impl<T> UnboundedReceiverStream<T> {
/// Create a new `UnboundedReceiverStream`.
pub fn new(recv: UnboundedReceiver<T>) -> Self {
Self { inner: recv }
}
/// Get back the inner `UnboundedReceiver`.
pub fn into_inner(self) -> UnboundedReceiver<T> {
self.inner
}
/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.inner.close()
}
}
impl<T> Stream for UnboundedReceiverStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}
impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
fn as_ref(&self) -> &UnboundedReceiver<T> {
&self.inner
}
}
impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
&mut self.inner
}
}

View File

@ -0,0 +1,47 @@
use crate::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{DirEntry, ReadDir};
/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
///
/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
pub struct ReadDirStream {
inner: ReadDir,
}
impl ReadDirStream {
/// Create a new `ReadDirStream`.
pub fn new(read_dir: ReadDir) -> Self {
Self { inner: read_dir }
}
/// Get back the inner `ReadDir`.
pub fn into_inner(self) -> ReadDir {
self.inner
}
}
impl Stream for ReadDirStream {
type Item = io::Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_entry(cx).map(Result::transpose)
}
}
impl AsRef<ReadDir> for ReadDirStream {
fn as_ref(&self) -> &ReadDir {
&self.inner
}
}
impl AsMut<ReadDir> for ReadDirStream {
fn as_mut(&mut self) -> &mut ReadDir {
&mut self.inner
}
}

View File

@ -0,0 +1,59 @@
use crate::Stream;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, Split};
pin_project! {
/// A wrapper around [`tokio::io::Split`] that implements [`Stream`].
///
/// [`tokio::io::Split`]: struct@tokio::io::Split
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct SplitStream<R> {
#[pin]
inner: Split<R>,
}
}
impl<R> SplitStream<R> {
/// Create a new `SplitStream`.
pub fn new(split: Split<R>) -> Self {
Self { inner: split }
}
/// Get back the inner `Split`.
pub fn into_inner(self) -> Split<R> {
self.inner
}
/// Obtain a pinned reference to the inner `Split<R>`.
pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split<R>> {
self.project().inner
}
}
impl<R: AsyncBufRead> Stream for SplitStream<R> {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project()
.inner
.poll_next_segment(cx)
.map(Result::transpose)
}
}
impl<R> AsRef<Split<R>> for SplitStream<R> {
fn as_ref(&self) -> &Split<R> {
&self.inner
}
}
impl<R> AsMut<Split<R>> for SplitStream<R> {
fn as_mut(&mut self) -> &mut Split<R> {
&mut self.inner
}
}

View File

@ -0,0 +1,54 @@
use crate::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::net::{TcpListener, TcpStream};
/// A wrapper around [`TcpListener`] that implements [`Stream`].
///
/// [`TcpListener`]: struct@tokio::net::TcpListener
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
pub struct TcpListenerStream {
inner: TcpListener,
}
impl TcpListenerStream {
/// Create a new `TcpListenerStream`.
pub fn new(listener: TcpListener) -> Self {
Self { inner: listener }
}
/// Get back the inner `TcpListener`.
pub fn into_inner(self) -> TcpListener {
self.inner
}
}
impl Stream for TcpListenerStream {
type Item = io::Result<TcpStream>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<TcpStream>>> {
match self.inner.poll_accept(cx) {
Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Pending => Poll::Pending,
}
}
}
impl AsRef<TcpListener> for TcpListenerStream {
fn as_ref(&self) -> &TcpListener {
&self.inner
}
}
impl AsMut<TcpListener> for TcpListenerStream {
fn as_mut(&mut self) -> &mut TcpListener {
&mut self.inner
}
}

View File

@ -0,0 +1,54 @@
use crate::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::net::{UnixListener, UnixStream};
/// A wrapper around [`UnixListener`] that implements [`Stream`].
///
/// [`UnixListener`]: struct@tokio::net::UnixListener
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))]
pub struct UnixListenerStream {
inner: UnixListener,
}
impl UnixListenerStream {
/// Create a new `UnixListenerStream`.
pub fn new(listener: UnixListener) -> Self {
Self { inner: listener }
}
/// Get back the inner `UnixListener`.
pub fn into_inner(self) -> UnixListener {
self.inner
}
}
impl Stream for UnixListenerStream {
type Item = io::Result<UnixStream>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<UnixStream>>> {
match self.inner.poll_accept(cx) {
Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Pending => Poll::Pending,
}
}
}
impl AsRef<UnixListener> for UnixListenerStream {
fn as_ref(&self) -> &UnixListener {
&self.inner
}
}
impl AsMut<UnixListener> for UnixListenerStream {
fn as_mut(&mut self) -> &mut UnixListener {
&mut self.inner
}
}

View File

@ -20,12 +20,15 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
Ok(ReadDir(State::Idle(Some(std))))
}
/// Stream of the entries in a directory.
/// Read the the entries in a directory.
///
/// This stream is returned from the [`read_dir`] function of this module and
/// will yield instances of [`DirEntry`]. Through a [`DirEntry`]
/// information like the entry's path and possibly other metadata can be
/// learned.
/// This struct is returned from the [`read_dir`] function of this module and
/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information
/// like the entry's path and possibly other metadata can be learned.
///
/// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`].
///
/// [`ReadDirStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReadDirStream.html
///
/// # Errors
///

View File

@ -8,7 +8,15 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// Stream for the [`lines`](crate::io::AsyncBufReadExt::lines) method.
/// Read lines from an [`AsyncBufRead`].
///
/// A `Lines` can be turned into a `Stream` with [`LinesStream`].
///
/// This type is usually created using the [`lines`] method.
///
/// [`AsyncBufRead`]: crate::io::AsyncBufRead
/// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html
/// [`lines`]: crate::io::AsyncBufReadExt::lines
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]

View File

@ -8,7 +8,11 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method.
/// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method.
///
/// A `Split` can be turned into a `Stream` with [`SplitStream`].
///
/// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]

View File

@ -14,6 +14,10 @@ cfg_net! {
/// You can accept a new connection by using the [`accept`](`TcpListener::accept`)
/// method.
///
/// A `TcpListener` can be turned into a `Stream` with [`TcpListenerStream`].
///
/// [`TcpListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.TcpListenerStream.html
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all

View File

@ -14,6 +14,10 @@ cfg_net_unix! {
///
/// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method.
///
/// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`].
///
/// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all

View File

@ -33,6 +33,10 @@ pub struct Permit<'a, T> {
/// Receive values from the associated `Sender`.
///
/// Instances are created by the [`channel`](channel) function.
///
/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
///
/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
pub struct Receiver<T> {
/// The channel receiver
chan: chan::Rx<T, Semaphore>,

View File

@ -33,6 +33,10 @@ impl<T> fmt::Debug for UnboundedSender<T> {
///
/// Instances are created by the
/// [`unbounded_channel`](unbounded_channel) function.
///
/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
///
/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
pub struct UnboundedReceiver<T> {
/// The channel receiver
chan: chan::Rx<T, Semaphore>,

View File

@ -106,7 +106,16 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval {
}
}
/// Stream returned by [`interval`](interval) and [`interval_at`](interval_at).
/// Interval returned by [`interval`](interval) and [`interval_at`](interval_at).
///
/// This type allows you to wait on a sequence of instants with a certain
/// duration between each instant. Unlike calling [`sleep`](crate::time::sleep)
/// in a loop, this lets you count the time spent between the calls to `sleep`
/// as well.
///
/// An `Interval` can be turned into a `Stream` with [`IntervalStream`].
///
/// [`IntervalStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.IntervalStream.html
#[derive(Debug)]
pub struct Interval {
/// Future that completes the next time the `Interval` yields a value.