sync: implement oneshot::Receiver::is_terminated() (#7152)

This commit is contained in:
katelyn martin 2025-02-16 14:25:50 -05:00 committed by GitHub
parent aa70f6c5f0
commit 17117b591e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 155 additions and 3 deletions

View File

@ -931,6 +931,64 @@ impl<T> Receiver<T> {
}
}
/// Checks if this receiver is terminated.
///
/// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
/// If so, this receiver should no longer be polled.
///
/// # Examples
///
/// Sending a value and polling it.
///
/// ```
/// use tokio::sync::oneshot;
///
/// use std::task::Poll;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel();
///
/// // A receiver is not terminated when it is initialized.
/// assert!(!rx.is_terminated());
///
/// // A receiver is not terminated it is polled and is still pending.
/// let poll = futures::poll!(&mut rx);
/// assert_eq!(poll, Poll::Pending);
/// assert!(!rx.is_terminated());
///
/// // A receiver is not terminated if a value has been sent, but not yet read.
/// tx.send(0).unwrap();
/// assert!(!rx.is_terminated());
///
/// // A receiver *is* terminated after it has been polled and yielded a value.
/// assert_eq!((&mut rx).await, Ok(0));
/// assert!(rx.is_terminated());
/// }
/// ```
///
/// Dropping the sender.
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel::<()>();
///
/// // A receiver is not immediately terminated when the sender is dropped.
/// drop(tx);
/// assert!(!rx.is_terminated());
///
/// // A receiver *is* terminated after it has been polled and yielded an error.
/// let _ = (&mut rx).await.unwrap_err();
/// assert!(rx.is_terminated());
/// }
/// ```
pub fn is_terminated(&self) -> bool {
self.inner.is_none()
}
/// Attempts to receive a value.
///
/// If a pending value exists in the channel, it is returned. If no value
@ -1106,10 +1164,10 @@ impl<T> Future for Receiver<T> {
let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let res = ready!(inner.poll_recv(cx))?;
let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
res
} else {
@ -1117,7 +1175,7 @@ impl<T> Future for Receiver<T> {
};
self.inner = None;
Ready(Ok(ret))
Ready(ret)
}
}

View File

@ -292,3 +292,97 @@ fn sender_changes_task() {
assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
}
#[test]
fn receiver_is_terminated_send() {
let (tx, mut rx) = oneshot::channel::<i32>();
assert!(
!rx.is_terminated(),
"channel is NOT terminated before value is sent"
);
tx.send(17).unwrap();
assert!(
!rx.is_terminated(),
"channel is NOT terminated after value is sent"
);
let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_eq!(poll, Ok(17));
assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}
#[test]
fn receiver_is_terminated_try_recv() {
let (tx, mut rx) = oneshot::channel::<i32>();
assert!(
!rx.is_terminated(),
"channel is NOT terminated before value is sent"
);
tx.send(17).unwrap();
assert!(
!rx.is_terminated(),
"channel is NOT terminated after value is sent"
);
let value = rx.try_recv().expect("value is waiting");
assert_eq!(value, 17);
assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}
#[test]
fn receiver_is_terminated_drop() {
let (tx, mut rx) = oneshot::channel::<i32>();
assert!(
!rx.is_terminated(),
"channel is NOT terminated before sender is dropped"
);
drop(tx);
assert!(
!rx.is_terminated(),
"channel is NOT terminated after sender is dropped"
);
let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);
assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}
#[test]
fn receiver_is_terminated_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(
!rx.is_terminated(),
"channel is NOT terminated before closing"
);
rx.close();
assert!(
!rx.is_terminated(),
"channel is NOT terminated before closing"
);
let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);
assert!(
rx.is_terminated(),
"channel IS terminated after value is read"
);
}