sync: broadcast channel API tweaks (#2898)

Removes deprecated APIs and makes some small breaking changes.
This commit is contained in:
Alice Ryhl 2020-10-05 18:30:48 +02:00 committed by GitHub
parent 1684e1c809
commit 242ea01189
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 222 deletions

View File

@ -107,6 +107,7 @@
//! assert_eq!(20, rx.recv().await.unwrap());
//! assert_eq!(30, rx.recv().await.unwrap());
//! }
//! ```
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
@ -194,9 +195,6 @@ pub struct Receiver<T> {
/// Next position to read from
next: u64,
/// Used to support the deprecated `poll_recv` fn
waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
}
/// Error returned by [`Sender::send`][Sender::send].
@ -400,7 +398,7 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
/// tx.send(20).unwrap();
/// }
/// ```
pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0, "capacity is empty");
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
@ -433,7 +431,6 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
let rx = Receiver {
shared: shared.clone(),
next: 0,
waiter: None,
};
let tx = Sender { shared };
@ -540,11 +537,7 @@ impl<T> Sender<T> {
drop(tail);
Receiver {
shared,
next,
waiter: None,
}
Receiver { shared, next }
}
/// Returns the number of active receivers
@ -784,107 +777,7 @@ impl<T> Receiver<T> {
}
}
impl<T> Receiver<T>
where
T: Clone,
{
/// Attempts to return a pending value on this receiver without awaiting.
///
/// This is useful for a flavor of "optimistic check" before deciding to
/// await on a receiver.
///
/// Compared with [`recv`], this function has three failure cases instead of two
/// (one for closed, one for an empty buffer, one for a lagging receiver).
///
/// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
/// dropped, indicating that no further values can be sent on the channel.
///
/// If the [`Receiver`] handle falls behind, once the channel is full, newly
/// sent values will overwrite old values. At this point, a call to [`recv`]
/// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
/// internal cursor is updated to point to the oldest value still held by
/// the channel. A subsequent call to [`try_recv`] will return this value
/// **unless** it has been since overwritten. If there are no values to
/// receive, `Err(TryRecvError::Empty)` is returned.
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = broadcast::channel(16);
///
/// assert!(rx.try_recv().is_err());
///
/// tx.send(10).unwrap();
///
/// let value = rx.try_recv().unwrap();
/// assert_eq!(10, value);
/// }
/// ```
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let guard = self.recv_ref(None)?;
guard.clone_value().ok_or(TryRecvError::Closed)
}
#[doc(hidden)]
#[deprecated(since = "0.2.21", note = "use async fn recv()")]
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
use Poll::{Pending, Ready};
// The borrow checker prohibits calling `self.poll_ref` while passing in
// a mutable ref to a field (as it should). To work around this,
// `waiter` is first *removed* from `self` then `poll_recv` is called.
//
// However, for safety, we must ensure that `waiter` is **not** dropped.
// It could be contained in the intrusive linked list. The `Receiver`
// drop implementation handles cleanup.
//
// The guard pattern is used to ensure that, on return, even due to
// panic, the waiter node is replaced on `self`.
struct Guard<'a, T> {
waiter: Option<Pin<Box<UnsafeCell<Waiter>>>>,
receiver: &'a mut Receiver<T>,
}
impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
self.receiver.waiter = self.waiter.take();
}
}
let waiter = self.waiter.take().or_else(|| {
Some(Box::pin(UnsafeCell::new(Waiter {
queued: false,
waker: None,
pointers: linked_list::Pointers::new(),
_p: PhantomPinned,
})))
});
let guard = Guard {
waiter,
receiver: self,
};
let res = guard
.receiver
.recv_ref(Some((&guard.waiter.as_ref().unwrap(), cx.waker())));
match res {
Ok(guard) => Ready(guard.clone_value().ok_or(RecvError::Closed)),
Err(TryRecvError::Closed) => Ready(Err(RecvError::Closed)),
Err(TryRecvError::Lagged(n)) => Ready(Err(RecvError::Lagged(n))),
Err(TryRecvError::Empty) => Pending,
}
}
impl<T: Clone> Receiver<T> {
/// Receives the next value for this receiver.
///
/// Each [`Receiver`] handle will receive a clone of all values sent
@ -949,31 +842,97 @@ where
/// assert_eq!(20, rx.recv().await.unwrap());
/// assert_eq!(30, rx.recv().await.unwrap());
/// }
/// ```
pub async fn recv(&mut self) -> Result<T, RecvError> {
let fut = Recv::<_, T>::new(Borrow(self));
fut.await
}
/// Attempts to return a pending value on this receiver without awaiting.
///
/// This is useful for a flavor of "optimistic check" before deciding to
/// await on a receiver.
///
/// Compared with [`recv`], this function has three failure cases instead of two
/// (one for closed, one for an empty buffer, one for a lagging receiver).
///
/// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
/// dropped, indicating that no further values can be sent on the channel.
///
/// If the [`Receiver`] handle falls behind, once the channel is full, newly
/// sent values will overwrite old values. At this point, a call to [`recv`]
/// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
/// internal cursor is updated to point to the oldest value still held by
/// the channel. A subsequent call to [`try_recv`] will return this value
/// **unless** it has been since overwritten. If there are no values to
/// receive, `Err(TryRecvError::Empty)` is returned.
///
/// [`recv`]: crate::sync::broadcast::Receiver::recv
/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
/// [`Receiver`]: crate::sync::broadcast::Receiver
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = broadcast::channel(16);
///
/// assert!(rx.try_recv().is_err());
///
/// tx.send(10).unwrap();
///
/// let value = rx.try_recv().unwrap();
/// assert_eq!(10, value);
/// }
/// ```
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let guard = self.recv_ref(None)?;
guard.clone_value().ok_or(TryRecvError::Closed)
}
/// Convert the receiver into a `Stream`.
///
/// The conversion allows using `Receiver` with APIs that require stream
/// values.
///
/// # Examples
///
/// ```
/// use tokio::stream::StreamExt;
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = broadcast::channel(128);
///
/// tokio::spawn(async move {
/// for i in 0..10_i32 {
/// tx.send(i).unwrap();
/// }
/// });
///
/// // Streams must be pinned to iterate.
/// tokio::pin! {
/// let stream = rx
/// .into_stream()
/// .filter(Result::is_ok)
/// .map(Result::unwrap)
/// .filter(|v| v % 2 == 0)
/// .map(|v| v + 1);
/// }
///
/// while let Some(i) = stream.next().await {
/// println!("{}", i);
/// }
/// }
/// ```
#[cfg(feature = "stream")]
#[doc(hidden)]
#[deprecated(since = "0.2.21", note = "use `into_stream()`")]
impl<T> crate::stream::Stream for Receiver<T>
where
T: Clone,
{
type Item = Result<T, RecvError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, RecvError>>> {
#[allow(deprecated)]
self.poll_recv(cx).map(|v| match v {
Ok(v) => Some(Ok(v)),
lag @ Err(RecvError::Lagged(_)) => Some(lag),
Err(RecvError::Closed) => None,
})
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> {
Recv::new(Borrow(self))
}
}
@ -981,23 +940,6 @@ impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut tail = self.shared.tail.lock();
if let Some(waiter) = &self.waiter {
// safety: tail lock is held
let queued = waiter.with(|ptr| unsafe { (*ptr).queued });
if queued {
// Remove the node
//
// safety: tail lock is held and the wait node is verified to be in
// the list.
unsafe {
waiter.with_mut(|ptr| {
tail.waiters.remove((&mut *ptr).into());
});
}
}
}
tail.rx_cnt -= 1;
let until = tail.pos;
@ -1071,48 +1013,6 @@ where
cfg_stream! {
use futures_core::Stream;
impl<T: Clone> Receiver<T> {
/// Convert the receiver into a `Stream`.
///
/// The conversion allows using `Receiver` with APIs that require stream
/// values.
///
/// # Examples
///
/// ```
/// use tokio::stream::StreamExt;
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = broadcast::channel(128);
///
/// tokio::spawn(async move {
/// for i in 0..10_i32 {
/// tx.send(i).unwrap();
/// }
/// });
///
/// // Streams must be pinned to iterate.
/// tokio::pin! {
/// let stream = rx
/// .into_stream()
/// .filter(Result::is_ok)
/// .map(Result::unwrap)
/// .filter(|v| v % 2 == 0)
/// .map(|v| v + 1);
/// }
///
/// while let Some(i) = stream.next().await {
/// println!("{}", i);
/// }
/// }
/// ```
pub fn into_stream(self) -> impl Stream<Item = Result<T, RecvError>> {
Recv::new(Borrow(self))
}
}
impl<R, T: Clone> Stream for Recv<R, T>
where
R: AsMut<Receiver<T>>,

View File

@ -491,39 +491,6 @@ fn lagging_receiver_recovers_after_wrap_open() {
assert_empty!(rx);
}
#[tokio::test]
async fn send_recv_stream_ready_deprecated() {
use tokio::stream::StreamExt;
let (tx, mut rx) = broadcast::channel::<i32>(8);
assert_ok!(tx.send(1));
assert_ok!(tx.send(2));
assert_eq!(Some(Ok(1)), rx.next().await);
assert_eq!(Some(Ok(2)), rx.next().await);
drop(tx);
assert_eq!(None, rx.next().await);
}
#[tokio::test]
async fn send_recv_stream_pending_deprecated() {
use tokio::stream::StreamExt;
let (tx, mut rx) = broadcast::channel::<i32>(8);
let mut recv = task::spawn(rx.next());
assert_pending!(recv.poll());
assert_ok!(tx.send(1));
assert!(recv.is_woken());
let val = assert_ready!(recv.poll());
assert_eq!(val, Some(Ok(1)));
}
fn is_closed(err: broadcast::RecvError) -> bool {
match err {
broadcast::RecvError::Closed => true,