stream: update features and doc for broadcast/watch stream (#3504)

This commit is contained in:
Alice Ryhl 2021-02-05 20:39:27 +01:00 committed by GitHub
parent fcb6d041b9
commit 0a04954d5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 6 deletions

View File

@ -25,12 +25,13 @@ time = ["tokio/time"]
net = ["tokio/net"]
io-util = ["tokio/io-util"]
fs = ["tokio/fs"]
sync = ["tokio/sync", "tokio-util"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.0", features = ["sync"] }
tokio-util = { version = "0.6.3" }
tokio-util = { version = "0.6.3", optional = true }
[dev-dependencies]
tokio = { version = "1.0", features = ["full", "test-util"] }

View File

@ -38,6 +38,16 @@ macro_rules! cfg_time {
}
}
macro_rules! cfg_sync {
($($item:item)*) => {
$(
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
$item
)*
}
}
macro_rules! ready {
($e:expr $(,)?) => {
match $e {

View File

@ -1,17 +1,25 @@
//! Wrappers for Tokio types that implement `Stream`.
/// Error types for the wrappers.
pub mod errors {
cfg_sync! {
pub use crate::wrappers::broadcast::BroadcastStreamRecvError;
}
}
mod mpsc_bounded;
pub use mpsc_bounded::ReceiverStream;
mod mpsc_unbounded;
pub use mpsc_unbounded::UnboundedReceiverStream;
mod broadcast;
pub use broadcast::BroadcastStream;
pub use broadcast::BroadcastStreamRecvError;
cfg_sync! {
mod broadcast;
pub use broadcast::BroadcastStream;
mod watch;
pub use watch::WatchStream;
mod watch;
pub use watch::WatchStream;
}
cfg_time! {
mod interval;

View File

@ -12,6 +12,7 @@ use std::task::{Context, Poll};
///
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct BroadcastStream<T> {
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
}

View File

@ -12,6 +12,7 @@ use tokio::sync::watch::error::RecvError;
///
/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct WatchStream<T> {
inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
}

View File

@ -163,6 +163,11 @@ pub struct Sender<T> {
/// Must not be used concurrently. Messages may be retrieved using
/// [`recv`][Receiver::recv].
///
/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
/// wrapper.
///
/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
///
/// # Examples
///
/// ```

View File

@ -61,6 +61,11 @@ use std::ops;
/// Receives values from the associated [`Sender`](struct@Sender).
///
/// Instances are created by the [`channel`](fn@channel) function.
///
/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
/// wrapper.
///
/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
#[derive(Debug)]
pub struct Receiver<T> {
/// Pointer to the shared state