mirror of
https://github.com/tokio-rs/tokio.git
synced 2026-03-12 20:19:56 +00:00
101 lines
3.8 KiB
Rust
101 lines
3.8 KiB
Rust
//! Benchmark remote task spawning (push_remote_task) at different concurrency
|
|
//! levels on the multi-threaded scheduler.
|
|
//!
|
|
//! This measures contention on the scheduler's inject queue mutex when multiple
|
|
//! external (non-worker) threads spawn tasks into the tokio runtime simultaneously.
|
|
//! Every rt.spawn() from an external thread unconditionally goes through
|
|
//! push_remote_task, making this a direct measurement of inject queue contention.
|
|
//!
|
|
//! For each parallelism level N (1, 2, 4, 8, 16, 32, 64, capped at available parallelism):
|
|
//! - Spawns N std::threads (external to the runtime)
|
|
//! - Each thread spawns TOTAL_TASKS / N tasks into the runtime via rt.spawn()
|
|
//! - All threads are synchronized with a barrier to maximize contention
|
|
//! - Tasks are trivial no-ops to isolate the push overhead
|
|
|
|
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
|
use std::sync::Barrier;
|
|
use tokio::runtime::{self, Runtime};
|
|
|
|
/// Total number of tasks spawned across all threads per iteration.
|
|
/// Must be divisible by the largest parallelism level (64).
|
|
const TOTAL_TASKS: usize = 12_800;
|
|
const _: () = assert!(TOTAL_TASKS % 64 == 0, "TOTAL_TASKS must be divisible by 64");
|
|
|
|
fn remote_spawn_contention(c: &mut Criterion) {
|
|
let parallelism_levels = parallelism_levels();
|
|
let mut group = c.benchmark_group("remote_spawn");
|
|
|
|
for num_threads in ¶llelism_levels {
|
|
let num_threads = *num_threads;
|
|
group.bench_with_input(
|
|
BenchmarkId::new("threads", num_threads),
|
|
&num_threads,
|
|
|b, &num_threads| {
|
|
let rt = rt();
|
|
let tasks_per_thread = TOTAL_TASKS / num_threads;
|
|
let barrier = Barrier::new(num_threads);
|
|
|
|
b.iter_custom(|iters| {
|
|
let mut total_duration = std::time::Duration::ZERO;
|
|
for _ in 0..iters {
|
|
let start = std::time::Instant::now();
|
|
|
|
let all_handles = std::thread::scope(|s| {
|
|
let handles: Vec<_> = (0..num_threads)
|
|
.map(|_| {
|
|
let barrier = &barrier;
|
|
let rt = &rt;
|
|
s.spawn(move || {
|
|
let mut join_handles = Vec::with_capacity(tasks_per_thread);
|
|
barrier.wait();
|
|
|
|
for _ in 0..tasks_per_thread {
|
|
join_handles.push(rt.spawn(async {}));
|
|
}
|
|
join_handles
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
handles
|
|
.into_iter()
|
|
.flat_map(|h| h.join().unwrap())
|
|
.collect::<Vec<_>>()
|
|
});
|
|
|
|
total_duration += start.elapsed();
|
|
|
|
rt.block_on(async {
|
|
for h in all_handles {
|
|
h.await.unwrap();
|
|
}
|
|
});
|
|
}
|
|
total_duration
|
|
});
|
|
},
|
|
);
|
|
}
|
|
|
|
group.finish();
|
|
}
|
|
|
|
fn parallelism_levels() -> Vec<usize> {
|
|
let max_parallelism = std::thread::available_parallelism()
|
|
.map(|p| p.get())
|
|
.unwrap_or(1);
|
|
|
|
[1, 2, 4, 8, 16, 32, 64]
|
|
.into_iter()
|
|
.filter(|&n| n <= max_parallelism)
|
|
.collect()
|
|
}
|
|
|
|
fn rt() -> Runtime {
|
|
runtime::Builder::new_multi_thread().build().unwrap()
|
|
}
|
|
|
|
criterion_group!(remote_spawn_benches, remote_spawn_contention);
|
|
|
|
criterion_main!(remote_spawn_benches);
|