mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
runtime: fix flaky test wake_while_rt_is_dropping
(#5905)
This commit is contained in:
parent
e5e88551d2
commit
52e6510215
@ -950,10 +950,6 @@ rt_test! {
|
|||||||
#[test]
|
#[test]
|
||||||
fn wake_while_rt_is_dropping() {
|
fn wake_while_rt_is_dropping() {
|
||||||
use tokio::sync::Barrier;
|
use tokio::sync::Barrier;
|
||||||
use core::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
|
|
||||||
let drop_triggered = Arc::new(AtomicBool::new(false));
|
|
||||||
let set_drop_triggered = drop_triggered.clone();
|
|
||||||
|
|
||||||
struct OnDrop<F: FnMut()>(F);
|
struct OnDrop<F: FnMut()>(F);
|
||||||
|
|
||||||
@ -965,56 +961,39 @@ rt_test! {
|
|||||||
|
|
||||||
let (tx1, rx1) = oneshot::channel();
|
let (tx1, rx1) = oneshot::channel();
|
||||||
let (tx2, rx2) = oneshot::channel();
|
let (tx2, rx2) = oneshot::channel();
|
||||||
let (tx3, rx3) = oneshot::channel();
|
|
||||||
|
|
||||||
let barrier = Arc::new(Barrier::new(4));
|
let barrier = Arc::new(Barrier::new(3));
|
||||||
let barrier1 = barrier.clone();
|
let barrier1 = barrier.clone();
|
||||||
let barrier2 = barrier.clone();
|
let barrier2 = barrier.clone();
|
||||||
let barrier3 = barrier.clone();
|
|
||||||
|
|
||||||
let rt = rt();
|
let rt = rt();
|
||||||
|
|
||||||
rt.spawn(async move {
|
rt.spawn(async move {
|
||||||
|
let mut tx2 = Some(tx2);
|
||||||
|
let _d = OnDrop(move || {
|
||||||
|
let _ = tx2.take().unwrap().send(());
|
||||||
|
});
|
||||||
|
|
||||||
// Ensure a waker gets stored in oneshot 1.
|
// Ensure a waker gets stored in oneshot 1.
|
||||||
let _ = tokio::join!(rx1, barrier1.wait());
|
let _ = tokio::join!(rx1, barrier1.wait());
|
||||||
tx3.send(()).unwrap();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
rt.spawn(async move {
|
rt.spawn(async move {
|
||||||
let h1 = tokio::runtime::Handle::current();
|
let mut tx1 = Some(tx1);
|
||||||
// When this task is dropped, we'll be "closing remotes".
|
|
||||||
// We spawn a new task that owns the `tx1`, to move its Drop
|
|
||||||
// out of here.
|
|
||||||
//
|
|
||||||
// Importantly, the oneshot 1 has a waker already stored, so
|
|
||||||
// the eventual drop here will try to re-schedule again.
|
|
||||||
let mut opt_tx1 = Some(tx1);
|
|
||||||
let _d = OnDrop(move || {
|
let _d = OnDrop(move || {
|
||||||
let tx1 = opt_tx1.take().unwrap();
|
let _ = tx1.take().unwrap().send(());
|
||||||
h1.spawn(async move {
|
|
||||||
tx1.send(()).unwrap();
|
|
||||||
});
|
|
||||||
// Just a sanity check that this entire thing actually happened
|
|
||||||
set_drop_triggered.store(true, Ordering::Relaxed);
|
|
||||||
});
|
});
|
||||||
let _ = tokio::join!(rx2, barrier2.wait());
|
|
||||||
});
|
|
||||||
|
|
||||||
rt.spawn(async move {
|
// Ensure a waker gets stored in oneshot 2.
|
||||||
let _ = tokio::join!(rx3, barrier3.wait());
|
let _ = tokio::join!(rx2, barrier2.wait());
|
||||||
// We'll never get here, but once task 3 drops, this will
|
|
||||||
// force task 2 to re-schedule since it's waiting on oneshot 2.
|
|
||||||
tx2.send(()).unwrap();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Wait until every oneshot channel has been polled.
|
// Wait until every oneshot channel has been polled.
|
||||||
rt.block_on(barrier.wait());
|
rt.block_on(barrier.wait());
|
||||||
|
|
||||||
// Drop the rt
|
// Drop the rt. Regardless of which task is dropped first, its destructor will wake the
|
||||||
|
// other task.
|
||||||
drop(rt);
|
drop(rt);
|
||||||
|
|
||||||
// Make sure that the spawn actually happened
|
|
||||||
assert!(drop_triggered.load(Ordering::Relaxed));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
|
#[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user