mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
ci: speed up multi-threaded runtime loom tests. (#5723)
Increase max preemption back to 2 while running the tests in under 90 minutes.
This commit is contained in:
parent
080d52902f
commit
98c8c38e96
6
.github/workflows/loom.yml
vendored
6
.github/workflows/loom.yml
vendored
@ -28,13 +28,13 @@ jobs:
|
|||||||
- scope: --skip loom_pool
|
- scope: --skip loom_pool
|
||||||
max_preemptions: 2
|
max_preemptions: 2
|
||||||
- scope: loom_pool::group_a
|
- scope: loom_pool::group_a
|
||||||
max_preemptions: 1
|
max_preemptions: 2
|
||||||
- scope: loom_pool::group_b
|
- scope: loom_pool::group_b
|
||||||
max_preemptions: 2
|
max_preemptions: 2
|
||||||
- scope: loom_pool::group_c
|
- scope: loom_pool::group_c
|
||||||
max_preemptions: 1
|
max_preemptions: 2
|
||||||
- scope: loom_pool::group_d
|
- scope: loom_pool::group_d
|
||||||
max_preemptions: 1
|
max_preemptions: 2
|
||||||
- scope: time::driver
|
- scope: time::driver
|
||||||
max_preemptions: 2
|
max_preemptions: 2
|
||||||
steps:
|
steps:
|
||||||
|
@ -4,7 +4,6 @@
|
|||||||
|
|
||||||
use crate::loom::sync::atomic::AtomicUsize;
|
use crate::loom::sync::atomic::AtomicUsize;
|
||||||
use crate::loom::sync::{Arc, Condvar, Mutex};
|
use crate::loom::sync::{Arc, Condvar, Mutex};
|
||||||
use crate::loom::thread;
|
|
||||||
use crate::runtime::driver::{self, Driver};
|
use crate::runtime::driver::{self, Driver};
|
||||||
use crate::util::TryLock;
|
use crate::util::TryLock;
|
||||||
|
|
||||||
@ -104,18 +103,14 @@ impl Unparker {
|
|||||||
impl Inner {
|
impl Inner {
|
||||||
/// Parks the current thread for at most `dur`.
|
/// Parks the current thread for at most `dur`.
|
||||||
fn park(&self, handle: &driver::Handle) {
|
fn park(&self, handle: &driver::Handle) {
|
||||||
for _ in 0..3 {
|
// If we were previously notified then we consume this notification and
|
||||||
// If we were previously notified then we consume this notification and
|
// return quickly.
|
||||||
// return quickly.
|
if self
|
||||||
if self
|
.state
|
||||||
.state
|
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
|
||||||
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
|
.is_ok()
|
||||||
.is_ok()
|
{
|
||||||
{
|
return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread::yield_now();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(mut driver) = self.shared.driver.try_lock() {
|
if let Some(mut driver) = self.shared.driver.try_lock() {
|
||||||
|
@ -323,8 +323,7 @@ mod group_d {
|
|||||||
// Spawn a task
|
// Spawn a task
|
||||||
let c2 = c1.clone();
|
let c2 = c1.clone();
|
||||||
pool.spawn(track(async move {
|
pool.spawn(track(async move {
|
||||||
gated().await;
|
multi_gated().await;
|
||||||
gated().await;
|
|
||||||
|
|
||||||
if 1 == c1.fetch_add(1, Relaxed) {
|
if 1 == c1.fetch_add(1, Relaxed) {
|
||||||
done_tx1.assert_send(());
|
done_tx1.assert_send(());
|
||||||
@ -333,8 +332,7 @@ mod group_d {
|
|||||||
|
|
||||||
// Spawn a second task
|
// Spawn a second task
|
||||||
pool.spawn(track(async move {
|
pool.spawn(track(async move {
|
||||||
gated().await;
|
multi_gated().await;
|
||||||
gated().await;
|
|
||||||
|
|
||||||
if 1 == c2.fetch_add(1, Relaxed) {
|
if 1 == c2.fetch_add(1, Relaxed) {
|
||||||
done_tx2.assert_send(());
|
done_tx2.assert_send(());
|
||||||
@ -349,14 +347,11 @@ mod group_d {
|
|||||||
fn mk_pool(num_threads: usize) -> Runtime {
|
fn mk_pool(num_threads: usize) -> Runtime {
|
||||||
runtime::Builder::new_multi_thread()
|
runtime::Builder::new_multi_thread()
|
||||||
.worker_threads(num_threads)
|
.worker_threads(num_threads)
|
||||||
|
.event_interval(2)
|
||||||
.build()
|
.build()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn gated() -> impl Future<Output = &'static str> {
|
|
||||||
gated2(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn gated2(thread: bool) -> impl Future<Output = &'static str> {
|
fn gated2(thread: bool) -> impl Future<Output = &'static str> {
|
||||||
use loom::thread;
|
use loom::thread;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -394,6 +389,38 @@ fn gated2(thread: bool) -> impl Future<Output = &'static str> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn multi_gated() {
|
||||||
|
struct Gate {
|
||||||
|
waker: loom::future::AtomicWaker,
|
||||||
|
count: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
let gate = Arc::new(Gate {
|
||||||
|
waker: loom::future::AtomicWaker::new(),
|
||||||
|
count: AtomicUsize::new(0),
|
||||||
|
});
|
||||||
|
|
||||||
|
{
|
||||||
|
let gate = gate.clone();
|
||||||
|
spawn(track(async move {
|
||||||
|
for i in 1..3 {
|
||||||
|
gate.count.store(i, SeqCst);
|
||||||
|
gate.waker.wake();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
poll_fn(move |cx| {
|
||||||
|
if gate.count.load(SeqCst) < 2 {
|
||||||
|
gate.waker.register_by_ref(cx.waker());
|
||||||
|
Poll::Pending
|
||||||
|
} else {
|
||||||
|
Poll::Ready(())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
fn track<T: Future>(f: T) -> Track<T> {
|
fn track<T: Future>(f: T) -> Track<T> {
|
||||||
Track {
|
Track {
|
||||||
inner: f,
|
inner: f,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user