benches: move sender to a spawned task in watch benchmark (#6034)

This commit is contained in:
Tymoteusz Wiśniewski 2023-09-28 11:28:12 +02:00 committed by GitHub
parent 453c720709
commit ca89c5b2ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,7 +3,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{watch, Notify}; use tokio::sync::{watch, Notify};
use criterion::{black_box, criterion_group, criterion_main, Criterion}; use criterion::measurement::WallTime;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
fn rt() -> tokio::runtime::Runtime { fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
@ -25,16 +26,15 @@ fn do_work(rng: &mut impl RngCore) -> u32 {
.fold(0, u32::wrapping_add) .fold(0, u32::wrapping_add)
} }
fn contention_resubscribe(c: &mut Criterion) { fn contention_resubscribe<const N_TASKS: usize>(g: &mut BenchmarkGroup<WallTime>) {
const NTASK: u64 = 1000;
let rt = rt(); let rt = rt();
let (snd, rcv) = watch::channel(0i32); let (snd, _) = watch::channel(0i32);
let snd = Arc::new(snd);
let wg = Arc::new((AtomicU64::new(0), Notify::new())); let wg = Arc::new((AtomicU64::new(0), Notify::new()));
for n in 0..NTASK { for n in 0..N_TASKS {
let mut rcv = rcv.clone(); let mut rcv = snd.subscribe();
let wg = wg.clone(); let wg = wg.clone();
let mut rng = rand::rngs::StdRng::seed_from_u64(n); let mut rng = rand::rngs::StdRng::seed_from_u64(n as u64);
rt.spawn(async move { rt.spawn(async move {
while rcv.changed().await.is_ok() { while rcv.changed().await.is_ok() {
let _ = *rcv.borrow(); // contend on rwlock let _ = *rcv.borrow(); // contend on rwlock
@ -47,21 +47,39 @@ fn contention_resubscribe(c: &mut Criterion) {
}); });
} }
c.bench_function("contention_resubscribe", |b| { const N_ITERS: usize = 100;
g.bench_function(N_TASKS.to_string(), |b| {
b.iter(|| { b.iter(|| {
rt.block_on(async { rt.block_on({
for _ in 0..100 { let snd = snd.clone();
assert_eq!(wg.0.fetch_add(NTASK, Ordering::Relaxed), 0); let wg = wg.clone();
let _ = snd.send(black_box(42)); async move {
while wg.0.load(Ordering::Acquire) > 0 { tokio::spawn(async move {
wg.1.notified().await; for _ in 0..N_ITERS {
} assert_eq!(wg.0.fetch_add(N_TASKS as u64, Ordering::Relaxed), 0);
let _ = snd.send(black_box(42));
while wg.0.load(Ordering::Acquire) > 0 {
wg.1.notified().await;
}
}
})
.await
.unwrap();
} }
}); });
}) })
}); });
} }
criterion_group!(contention, contention_resubscribe); fn bench_contention_resubscribe(c: &mut Criterion) {
let mut group = c.benchmark_group("contention_resubscribe");
contention_resubscribe::<10>(&mut group);
contention_resubscribe::<100>(&mut group);
contention_resubscribe::<500>(&mut group);
contention_resubscribe::<1000>(&mut group);
group.finish();
}
criterion_group!(contention, bench_contention_resubscribe);
criterion_main!(contention); criterion_main!(contention);