diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index a51829927..417c3b470 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -28,13 +28,13 @@ jobs: - scope: --skip loom_pool max_preemptions: 2 - scope: loom_pool::group_a - max_preemptions: 1 + max_preemptions: 2 - scope: loom_pool::group_b max_preemptions: 2 - scope: loom_pool::group_c - max_preemptions: 1 + max_preemptions: 2 - scope: loom_pool::group_d - max_preemptions: 1 + max_preemptions: 2 - scope: time::driver max_preemptions: 2 steps: diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index 6bdbff961..0a00ea004 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -4,7 +4,6 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; -use crate::loom::thread; use crate::runtime::driver::{self, Driver}; use crate::util::TryLock; @@ -104,18 +103,14 @@ impl Unparker { impl Inner { /// Parks the current thread for at most `dur`. fn park(&self, handle: &driver::Handle) { - for _ in 0..3 { - // If we were previously notified then we consume this notification and - // return quickly. - if self - .state - .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) - .is_ok() - { - return; - } - - thread::yield_now(); + // If we were previously notified then we consume this notification and + // return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; } if let Some(mut driver) = self.shared.driver.try_lock() { diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index b3ecd4312..e2c826f07 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -323,8 +323,7 @@ mod group_d { // Spawn a task let c2 = c1.clone(); pool.spawn(track(async move { - gated().await; - gated().await; + multi_gated().await; if 1 == c1.fetch_add(1, Relaxed) { done_tx1.assert_send(()); @@ -333,8 +332,7 @@ mod group_d { // Spawn a second task pool.spawn(track(async move { - gated().await; - gated().await; + multi_gated().await; if 1 == c2.fetch_add(1, Relaxed) { done_tx2.assert_send(()); @@ -349,14 +347,11 @@ mod group_d { fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) + .event_interval(2) .build() .unwrap() } -fn gated() -> impl Future { - gated2(false) -} - fn gated2(thread: bool) -> impl Future { use loom::thread; use std::sync::Arc; @@ -394,6 +389,38 @@ fn gated2(thread: bool) -> impl Future { }) } +async fn multi_gated() { + struct Gate { + waker: loom::future::AtomicWaker, + count: AtomicUsize, + } + + let gate = Arc::new(Gate { + waker: loom::future::AtomicWaker::new(), + count: AtomicUsize::new(0), + }); + + { + let gate = gate.clone(); + spawn(track(async move { + for i in 1..3 { + gate.count.store(i, SeqCst); + gate.waker.wake(); + } + })); + } + + poll_fn(move |cx| { + if gate.count.load(SeqCst) < 2 { + gate.waker.register_by_ref(cx.waker()); + Poll::Pending + } else { + Poll::Ready(()) + } + }) + .await; +} + fn track(f: T) -> Track { Track { inner: f,