mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
sync: make oneshot::Sender::poll_closed public again (#3032)
This commit is contained in:
parent
cbb8fe6069
commit
6d0ba19af5
@ -196,54 +196,6 @@ impl<T> Sender<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
// Keep track of task budget
|
||||
let coop = ready!(crate::coop::poll_proceed(cx));
|
||||
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
|
||||
let mut state = State::load(&inner.state, Acquire);
|
||||
|
||||
if state.is_closed() {
|
||||
coop.made_progress();
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
if state.is_tx_task_set() {
|
||||
let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
|
||||
|
||||
if !will_notify {
|
||||
state = State::unset_tx_task(&inner.state);
|
||||
|
||||
if state.is_closed() {
|
||||
// Set the flag again so that the waker is released in drop
|
||||
State::set_tx_task(&inner.state);
|
||||
coop.made_progress();
|
||||
return Ready(());
|
||||
} else {
|
||||
unsafe { inner.drop_tx_task() };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !state.is_tx_task_set() {
|
||||
// Attempt to set the task
|
||||
unsafe {
|
||||
inner.set_tx_task(cx);
|
||||
}
|
||||
|
||||
// Update the state
|
||||
state = State::set_tx_task(&inner.state);
|
||||
|
||||
if state.is_closed() {
|
||||
coop.made_progress();
|
||||
return Ready(());
|
||||
}
|
||||
}
|
||||
|
||||
Pending
|
||||
}
|
||||
|
||||
/// Waits for the associated [`Receiver`] handle to close.
|
||||
///
|
||||
/// A [`Receiver`] is closed by either calling [`close`] explicitly or the
|
||||
@ -350,6 +302,94 @@ impl<T> Sender<T> {
|
||||
let state = State::load(&inner.state, Acquire);
|
||||
state.is_closed()
|
||||
}
|
||||
|
||||
/// Check whether the oneshot channel has been closed, and if not, schedules the
|
||||
/// `Waker` in the provided `Context` to receive a notification when the channel is
|
||||
/// closed.
|
||||
///
|
||||
/// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
|
||||
/// [`Receiver`] value is dropped.
|
||||
///
|
||||
/// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
|
||||
/// to the most recent call will be scheduled to receive a wakeup.
|
||||
///
|
||||
/// [`Receiver`]: struct@crate::sync::oneshot::Receiver
|
||||
/// [`close`]: fn@crate::sync::oneshot::Receiver::close
|
||||
///
|
||||
/// # Return value
|
||||
///
|
||||
/// This function returns:
|
||||
///
|
||||
/// * `Poll::Pending` if the channel is still open.
|
||||
/// * `Poll::Ready(())` if the channel is closed.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::sync::oneshot;
|
||||
///
|
||||
/// use futures::future::poll_fn;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (mut tx, mut rx) = oneshot::channel::<()>();
|
||||
///
|
||||
/// tokio::spawn(async move {
|
||||
/// rx.close();
|
||||
/// });
|
||||
///
|
||||
/// poll_fn(|cx| tx.poll_closed(cx)).await;
|
||||
///
|
||||
/// println!("the receiver dropped");
|
||||
/// }
|
||||
/// ```
|
||||
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
// Keep track of task budget
|
||||
let coop = ready!(crate::coop::poll_proceed(cx));
|
||||
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
|
||||
let mut state = State::load(&inner.state, Acquire);
|
||||
|
||||
if state.is_closed() {
|
||||
coop.made_progress();
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
if state.is_tx_task_set() {
|
||||
let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
|
||||
|
||||
if !will_notify {
|
||||
state = State::unset_tx_task(&inner.state);
|
||||
|
||||
if state.is_closed() {
|
||||
// Set the flag again so that the waker is released in drop
|
||||
State::set_tx_task(&inner.state);
|
||||
coop.made_progress();
|
||||
return Ready(());
|
||||
} else {
|
||||
unsafe { inner.drop_tx_task() };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !state.is_tx_task_set() {
|
||||
// Attempt to set the task
|
||||
unsafe {
|
||||
inner.set_tx_task(cx);
|
||||
}
|
||||
|
||||
// Update the state
|
||||
state = State::set_tx_task(&inner.state);
|
||||
|
||||
if state.is_closed() {
|
||||
coop.made_progress();
|
||||
return Ready(());
|
||||
}
|
||||
}
|
||||
|
||||
Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user