diff --git a/benches/sync_watch.rs b/benches/sync_watch.rs index 63370edcb..94f3339fb 100644 --- a/benches/sync_watch.rs +++ b/benches/sync_watch.rs @@ -3,7 +3,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::{watch, Notify}; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use criterion::measurement::WallTime; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_multi_thread() @@ -25,16 +26,15 @@ fn do_work(rng: &mut impl RngCore) -> u32 { .fold(0, u32::wrapping_add) } -fn contention_resubscribe(c: &mut Criterion) { - const NTASK: u64 = 1000; - +fn contention_resubscribe(g: &mut BenchmarkGroup) { let rt = rt(); - let (snd, rcv) = watch::channel(0i32); + let (snd, _) = watch::channel(0i32); + let snd = Arc::new(snd); let wg = Arc::new((AtomicU64::new(0), Notify::new())); - for n in 0..NTASK { - let mut rcv = rcv.clone(); + for n in 0..N_TASKS { + let mut rcv = snd.subscribe(); let wg = wg.clone(); - let mut rng = rand::rngs::StdRng::seed_from_u64(n); + let mut rng = rand::rngs::StdRng::seed_from_u64(n as u64); rt.spawn(async move { while rcv.changed().await.is_ok() { let _ = *rcv.borrow(); // contend on rwlock @@ -47,21 +47,39 @@ fn contention_resubscribe(c: &mut Criterion) { }); } - c.bench_function("contention_resubscribe", |b| { + const N_ITERS: usize = 100; + g.bench_function(N_TASKS.to_string(), |b| { b.iter(|| { - rt.block_on(async { - for _ in 0..100 { - assert_eq!(wg.0.fetch_add(NTASK, Ordering::Relaxed), 0); - let _ = snd.send(black_box(42)); - while wg.0.load(Ordering::Acquire) > 0 { - wg.1.notified().await; - } + rt.block_on({ + let snd = snd.clone(); + let wg = wg.clone(); + async move { + tokio::spawn(async move { + for _ in 0..N_ITERS { + assert_eq!(wg.0.fetch_add(N_TASKS as u64, Ordering::Relaxed), 0); + let _ = snd.send(black_box(42)); + while wg.0.load(Ordering::Acquire) > 0 { + wg.1.notified().await; + } + } + }) + .await + .unwrap(); } }); }) }); } -criterion_group!(contention, contention_resubscribe); +fn bench_contention_resubscribe(c: &mut Criterion) { + let mut group = c.benchmark_group("contention_resubscribe"); + contention_resubscribe::<10>(&mut group); + contention_resubscribe::<100>(&mut group); + contention_resubscribe::<500>(&mut group); + contention_resubscribe::<1000>(&mut group); + group.finish(); +} + +criterion_group!(contention, bench_contention_resubscribe); criterion_main!(contention);