mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
sync: fix notifications getting dropped on receiver drop (#3652)
This commit is contained in:
parent
fee76ea7d5
commit
f7c181c2c4
@ -681,35 +681,17 @@ impl Drop for Notified<'_> {
|
|||||||
let mut waiters = notify.waiters.lock();
|
let mut waiters = notify.waiters.lock();
|
||||||
let mut notify_state = notify.state.load(SeqCst);
|
let mut notify_state = notify.state.load(SeqCst);
|
||||||
|
|
||||||
// `Notify.state` may be in any of the three states (Empty, Waiting,
|
// remove the entry from the list (if not already removed)
|
||||||
// Notified). It doesn't actually matter what the atomic is set to
|
|
||||||
// at this point. We hold the lock and will ensure the atomic is in
|
|
||||||
// the correct state once the lock is dropped.
|
|
||||||
//
|
|
||||||
// Because the atomic state is not checked, at first glance, it may
|
|
||||||
// seem like this routine does not handle the case where the
|
|
||||||
// receiver is notified but has not yet observed the notification.
|
|
||||||
// If this happens, no matter how many notifications happen between
|
|
||||||
// this receiver being notified and the receive future dropping, all
|
|
||||||
// we need to do is ensure that one notification is returned back to
|
|
||||||
// the `Notify`. This is done by calling `notify_locked` if `self`
|
|
||||||
// has the `notified` flag set.
|
|
||||||
|
|
||||||
// remove the entry from the list
|
|
||||||
//
|
//
|
||||||
// safety: the waiter is only added to `waiters` by virtue of it
|
// safety: the waiter is only added to `waiters` by virtue of it
|
||||||
// being the only `LinkedList` available to the type.
|
// being the only `LinkedList` available to the type.
|
||||||
unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
|
unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
|
||||||
|
|
||||||
if waiters.is_empty() {
|
if waiters.is_empty() {
|
||||||
notify_state = set_state(notify_state, EMPTY);
|
if let WAITING = get_state(notify_state) {
|
||||||
// If the state *should* be `NOTIFIED`, the call to
|
notify_state = set_state(notify_state, EMPTY);
|
||||||
// `notify_locked` below will end up doing the
|
notify.state.store(notify_state, SeqCst);
|
||||||
// `store(NOTIFIED)`. If a concurrent receiver races and
|
}
|
||||||
// observes the incorrect `EMPTY` state, it will then obtain the
|
|
||||||
// lock and block until `notify.state` is in the correct final
|
|
||||||
// state.
|
|
||||||
notify.state.store(notify_state, SeqCst);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if the node was notified but not received. In this case, if
|
// See if the node was notified but not received. In this case, if
|
||||||
|
@ -134,3 +134,20 @@ fn notify_in_drop_after_wake() {
|
|||||||
// Now, notifying **should not** deadlock
|
// Now, notifying **should not** deadlock
|
||||||
notify.notify_waiters();
|
notify.notify_waiters();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn notify_one_after_dropped_all() {
|
||||||
|
let notify = Notify::new();
|
||||||
|
let mut notified1 = spawn(async { notify.notified().await });
|
||||||
|
|
||||||
|
assert_pending!(notified1.poll());
|
||||||
|
|
||||||
|
notify.notify_waiters();
|
||||||
|
notify.notify_one();
|
||||||
|
|
||||||
|
drop(notified1);
|
||||||
|
|
||||||
|
let mut notified2 = spawn(async { notify.notified().await });
|
||||||
|
|
||||||
|
assert_ready!(notified2.poll());
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user