benches: use criterion instead of bencher (#5981)

This commit is contained in:
M.Amin Rayej 2023-09-10 18:12:53 +03:30 committed by GitHub
parent 737dff40cb
commit b046c0dcbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 779 additions and 646 deletions

View File

@ -9,7 +9,7 @@ test-util = ["tokio/test-util"]
[dependencies]
tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"
criterion = "0.5.1"
rand = "0.8"
rand_chacha = "0.3"

View File

@ -1,4 +1,4 @@
use bencher::{benchmark_group, benchmark_main, Bencher};
use criterion::{criterion_group, criterion_main, Criterion};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;
@ -174,65 +174,77 @@ fn rt() -> tokio::runtime::Runtime {
.unwrap()
}
fn copy_mem_to_mem(b: &mut Bencher) {
fn copy_mem_to_mem(c: &mut Criterion) {
let rt = rt();
b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};
c.bench_function("copy_mem_to_mem", |b| {
b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};
rt.block_on(task());
})
rt.block_on(task());
})
});
}
fn copy_mem_to_slow_hdd(b: &mut Bencher) {
fn copy_mem_to_slow_hdd(c: &mut Criterion) {
let rt = rt();
b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};
c.bench_function("copy_mem_to_slow_hdd", |b| {
b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};
rt.block_on(task());
})
rt.block_on(task());
})
});
}
fn copy_chunk_to_mem(b: &mut Bencher) {
fn copy_chunk_to_mem(c: &mut Criterion) {
let rt = rt();
b.iter(|| {
let task = || async {
let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};
rt.block_on(task());
})
c.bench_function("copy_chunk_to_mem", |b| {
b.iter(|| {
let task = || async {
let mut source =
ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};
rt.block_on(task());
})
});
}
fn copy_chunk_to_slow_hdd(b: &mut Bencher) {
fn copy_chunk_to_slow_hdd(c: &mut Criterion) {
let rt = rt();
b.iter(|| {
let task = || async {
let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};
rt.block_on(task());
})
c.bench_function("copy_chunk_to_slow_hdd", |b| {
b.iter(|| {
let task = || async {
let mut source =
ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};
rt.block_on(task());
})
});
}
benchmark_group!(
criterion_group!(
copy_bench,
copy_mem_to_mem,
copy_mem_to_slow_hdd,
copy_chunk_to_mem,
copy_chunk_to_slow_hdd,
);
benchmark_main!(copy_bench);
criterion_main!(copy_bench);

View File

@ -6,7 +6,7 @@ use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::codec::{BytesCodec, FramedRead /*FramedWrite*/};
use bencher::{benchmark_group, benchmark_main, Bencher};
use criterion::{criterion_group, criterion_main, Criterion};
use std::fs::File as StdFile;
use std::io::Read as StdRead;
@ -23,81 +23,90 @@ const BLOCK_COUNT: usize = 1_000;
const BUFFER_SIZE: usize = 4096;
const DEV_ZERO: &str = "/dev/zero";
fn async_read_codec(b: &mut Bencher) {
fn async_read_codec(c: &mut Criterion) {
let rt = rt();
b.iter(|| {
let task = || async {
let file = File::open(DEV_ZERO).await.unwrap();
let mut input_stream = FramedRead::with_capacity(file, BytesCodec::new(), BUFFER_SIZE);
c.bench_function("async_read_codec", |b| {
b.iter(|| {
let task = || async {
let file = File::open(DEV_ZERO).await.unwrap();
let mut input_stream =
FramedRead::with_capacity(file, BytesCodec::new(), BUFFER_SIZE);
for _i in 0..BLOCK_COUNT {
let _bytes = input_stream.next().await.unwrap();
}
};
rt.block_on(task());
});
}
fn async_read_buf(b: &mut Bencher) {
let rt = rt();
b.iter(|| {
let task = || async {
let mut file = File::open(DEV_ZERO).await.unwrap();
let mut buffer = [0u8; BUFFER_SIZE];
for _i in 0..BLOCK_COUNT {
let count = file.read(&mut buffer).await.unwrap();
if count == 0 {
break;
for _i in 0..BLOCK_COUNT {
let _bytes = input_stream.next().await.unwrap();
}
}
};
};
rt.block_on(task());
rt.block_on(task());
})
});
}
fn async_read_std_file(b: &mut Bencher) {
fn async_read_buf(c: &mut Criterion) {
let rt = rt();
let task = || async {
let mut file = tokio::task::block_in_place(|| Box::pin(StdFile::open(DEV_ZERO).unwrap()));
c.bench_function("async_read_buf", |b| {
b.iter(|| {
let task = || async {
let mut file = File::open(DEV_ZERO).await.unwrap();
let mut buffer = [0u8; BUFFER_SIZE];
for _i in 0..BLOCK_COUNT {
for _i in 0..BLOCK_COUNT {
let count = file.read(&mut buffer).await.unwrap();
if count == 0 {
break;
}
}
};
rt.block_on(task());
});
});
}
fn async_read_std_file(c: &mut Criterion) {
let rt = rt();
c.bench_function("async_read_std_file", |b| {
b.iter(|| {
let task = || async {
let mut file =
tokio::task::block_in_place(|| Box::pin(StdFile::open(DEV_ZERO).unwrap()));
for _i in 0..BLOCK_COUNT {
let mut buffer = [0u8; BUFFER_SIZE];
let mut file_ref = file.as_mut();
tokio::task::block_in_place(move || {
file_ref.read_exact(&mut buffer).unwrap();
});
}
};
rt.block_on(task());
});
});
}
fn sync_read(c: &mut Criterion) {
c.bench_function("sync_read", |b| {
b.iter(|| {
let mut file = StdFile::open(DEV_ZERO).unwrap();
let mut buffer = [0u8; BUFFER_SIZE];
let mut file_ref = file.as_mut();
tokio::task::block_in_place(move || {
file_ref.read_exact(&mut buffer).unwrap();
});
}
};
b.iter(|| {
rt.block_on(task());
for _i in 0..BLOCK_COUNT {
file.read_exact(&mut buffer).unwrap();
}
})
});
}
fn sync_read(b: &mut Bencher) {
b.iter(|| {
let mut file = StdFile::open(DEV_ZERO).unwrap();
let mut buffer = [0u8; BUFFER_SIZE];
for _i in 0..BLOCK_COUNT {
file.read_exact(&mut buffer).unwrap();
}
});
}
benchmark_group!(
criterion_group!(
file,
async_read_std_file,
async_read_buf,
async_read_codec,
sync_read
);
benchmark_main!(file);
criterion_main!(file);

View File

@ -4,46 +4,50 @@
use tokio::runtime::{self, Runtime};
use bencher::{benchmark_group, benchmark_main, Bencher};
use criterion::{criterion_group, criterion_main, Criterion};
const NUM_SPAWN: usize = 1_000;
fn spawn_many_local(b: &mut Bencher) {
fn spawn_many_local(c: &mut Criterion) {
let rt = rt();
let mut handles = Vec::with_capacity(NUM_SPAWN);
b.iter(|| {
rt.block_on(async {
for _ in 0..NUM_SPAWN {
handles.push(tokio::spawn(async move {}));
}
c.bench_function("spawn_many_local", |b| {
b.iter(|| {
rt.block_on(async {
for _ in 0..NUM_SPAWN {
handles.push(tokio::spawn(async move {}));
}
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
}
fn spawn_many_remote_idle(b: &mut Bencher) {
fn spawn_many_remote_idle(c: &mut Criterion) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
c.bench_function("spawn_many_remote_idle", |b| {
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
});
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
}
fn spawn_many_remote_busy(b: &mut Bencher) {
fn spawn_many_remote_busy(c: &mut Criterion) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);
@ -56,16 +60,18 @@ fn spawn_many_remote_busy(b: &mut Bencher) {
iter()
});
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
c.bench_function("spawn_many_remote_busy", |b| {
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
});
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
}
@ -73,11 +79,11 @@ fn rt() -> Runtime {
runtime::Builder::new_current_thread().build().unwrap()
}
benchmark_group!(
criterion_group!(
scheduler,
spawn_many_local,
spawn_many_remote_idle,
spawn_many_remote_busy
);
benchmark_main!(scheduler);
criterion_main!(scheduler);

View File

@ -5,63 +5,68 @@
use tokio::runtime::{self, Runtime};
use tokio::sync::oneshot;
use bencher::{benchmark_group, benchmark_main, Bencher};
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{mpsc, Arc};
use std::time::{Duration, Instant};
use criterion::{criterion_group, criterion_main, Criterion};
const NUM_WORKERS: usize = 4;
const NUM_SPAWN: usize = 10_000;
const STALL_DUR: Duration = Duration::from_micros(10);
fn spawn_many_local(b: &mut Bencher) {
fn spawn_many_local(c: &mut Criterion) {
let rt = rt();
let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
b.iter(|| {
rem.store(NUM_SPAWN, Relaxed);
c.bench_function("spawn_many_local", |b| {
b.iter(|| {
rem.store(NUM_SPAWN, Relaxed);
rt.block_on(async {
for _ in 0..NUM_SPAWN {
let tx = tx.clone();
let rem = rem.clone();
rt.block_on(async {
for _ in 0..NUM_SPAWN {
let tx = tx.clone();
let rem = rem.clone();
tokio::spawn(async move {
if 1 == rem.fetch_sub(1, Relaxed) {
tx.send(()).unwrap();
}
});
}
tokio::spawn(async move {
if 1 == rem.fetch_sub(1, Relaxed) {
tx.send(()).unwrap();
}
});
}
let _ = rx.recv().unwrap();
});
let _ = rx.recv().unwrap();
});
})
});
}
fn spawn_many_remote_idle(b: &mut Bencher) {
fn spawn_many_remote_idle(c: &mut Criterion) {
let rt = rt();
let mut handles = Vec::with_capacity(NUM_SPAWN);
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt.spawn(async {}));
}
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
c.bench_function("spawn_many_remote_idle", |b| {
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt.spawn(async {}));
}
});
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
}
// The runtime is busy with tasks that consume CPU time and yield. Yielding is a
// lower notification priority than spawning / regular notification.
fn spawn_many_remote_busy1(b: &mut Bencher) {
fn spawn_many_remote_busy1(c: &mut Criterion) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);
@ -78,16 +83,18 @@ fn spawn_many_remote_busy1(b: &mut Bencher) {
});
}
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
c.bench_function("spawn_many_remote_busy1", |b| {
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
});
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
flag.store(false, Relaxed);
@ -95,7 +102,7 @@ fn spawn_many_remote_busy1(b: &mut Bencher) {
// The runtime is busy with tasks that consume CPU time and spawn new high-CPU
// tasks. Spawning goes via a higher notification priority than yielding.
fn spawn_many_remote_busy2(b: &mut Bencher) {
fn spawn_many_remote_busy2(c: &mut Criterion) {
const NUM_SPAWN: usize = 1_000;
let rt = rt();
@ -119,49 +126,52 @@ fn spawn_many_remote_busy2(b: &mut Bencher) {
});
}
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
c.bench_function("spawn_many_remote_busy2", |b| {
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
});
rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
flag.store(false, Relaxed);
}
fn yield_many(b: &mut Bencher) {
fn yield_many(c: &mut Criterion) {
const NUM_YIELD: usize = 1_000;
const TASKS: usize = 200;
let rt = rt();
c.bench_function("yield_many", |b| {
let rt = rt();
let (tx, rx) = mpsc::sync_channel(TASKS);
let (tx, rx) = mpsc::sync_channel(TASKS);
b.iter(move || {
for _ in 0..TASKS {
let tx = tx.clone();
b.iter(move || {
for _ in 0..TASKS {
let tx = tx.clone();
rt.spawn(async move {
for _ in 0..NUM_YIELD {
tokio::task::yield_now().await;
}
rt.spawn(async move {
for _ in 0..NUM_YIELD {
tokio::task::yield_now().await;
}
tx.send(()).unwrap();
});
}
tx.send(()).unwrap();
});
}
for _ in 0..TASKS {
let _ = rx.recv().unwrap();
}
for _ in 0..TASKS {
let _ = rx.recv().unwrap();
}
})
});
}
fn ping_pong(b: &mut Bencher) {
fn ping_pong(c: &mut Criterion) {
const NUM_PINGS: usize = 1_000;
let rt = rt();
@ -169,46 +179,46 @@ fn ping_pong(b: &mut Bencher) {
let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
b.iter(|| {
let done_tx = done_tx.clone();
let rem = rem.clone();
rem.store(NUM_PINGS, Relaxed);
c.bench_function("ping_pong", |b| {
b.iter(|| {
let done_tx = done_tx.clone();
let rem = rem.clone();
rem.store(NUM_PINGS, Relaxed);
rt.block_on(async {
tokio::spawn(async move {
for _ in 0..NUM_PINGS {
let rem = rem.clone();
let done_tx = done_tx.clone();
tokio::spawn(async move {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
rt.block_on(async {
tokio::spawn(async move {
for _ in 0..NUM_PINGS {
let rem = rem.clone();
let done_tx = done_tx.clone();
tokio::spawn(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
});
tx1.send(()).unwrap();
rx2.await.unwrap();
if 1 == rem.fetch_sub(1, Relaxed) {
done_tx.send(()).unwrap();
}
});
}
});
tx1.send(()).unwrap();
rx2.await.unwrap();
if 1 == rem.fetch_sub(1, Relaxed) {
done_tx.send(()).unwrap();
}
});
}
done_rx.recv().unwrap();
});
done_rx.recv().unwrap();
});
})
});
}
fn chained_spawn(b: &mut Bencher) {
fn chained_spawn(c: &mut Criterion) {
const ITER: usize = 1_000;
let rt = rt();
fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
done_tx.send(()).unwrap();
@ -219,18 +229,21 @@ fn chained_spawn(b: &mut Bencher) {
}
}
let (done_tx, done_rx) = mpsc::sync_channel(1000);
c.bench_function("chained_spawn", |b| {
let rt = rt();
let (done_tx, done_rx) = mpsc::sync_channel(1000);
b.iter(move || {
let done_tx = done_tx.clone();
b.iter(move || {
let done_tx = done_tx.clone();
rt.block_on(async {
tokio::spawn(async move {
iter(done_tx, ITER);
rt.block_on(async {
tokio::spawn(async move {
iter(done_tx, ITER);
});
done_rx.recv().unwrap();
});
done_rx.recv().unwrap();
});
})
});
}
@ -249,7 +262,7 @@ fn stall() {
}
}
benchmark_group!(
criterion_group!(
scheduler,
spawn_many_local,
spawn_many_remote_idle,
@ -260,4 +273,4 @@ benchmark_group!(
chained_spawn,
);
benchmark_main!(scheduler);
criterion_main!(scheduler);

View File

@ -1,7 +1,7 @@
//! Benchmark the delay in propagating OS signals to any listeners.
#![cfg(unix)]
use bencher::{benchmark_group, benchmark_main, Bencher};
use criterion::{criterion_group, criterion_main, Criterion};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -41,7 +41,7 @@ pub fn send_signal(signal: libc::c_int) {
}
}
fn many_signals(bench: &mut Bencher) {
fn many_signals(c: &mut Criterion) {
let num_signals = 10;
let (tx, mut rx) = mpsc::channel(num_signals);
@ -75,21 +75,23 @@ fn many_signals(bench: &mut Bencher) {
// tasks have been polled at least once
rt.block_on(Spinner::new());
bench.iter(|| {
rt.block_on(async {
send_signal(libc::SIGCHLD);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
c.bench_function("many_signals", |b| {
b.iter(|| {
rt.block_on(async {
send_signal(libc::SIGCHLD);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
send_signal(libc::SIGIO);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
});
send_signal(libc::SIGIO);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
});
})
});
}
benchmark_group!(signal_group, many_signals,);
criterion_group!(signal_group, many_signals);
benchmark_main!(signal_group);
criterion_main!(signal_group);

View File

@ -2,10 +2,7 @@
//! This essentially measure the time to enqueue a task in the local and remote
//! case.
#[macro_use]
extern crate bencher;
use bencher::{black_box, Bencher};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
async fn work() -> usize {
let val = 1 + 1;
@ -13,67 +10,77 @@ async fn work() -> usize {
black_box(val)
}
fn basic_scheduler_spawn(bench: &mut Bencher) {
fn basic_scheduler_spawn(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
bench.iter(|| {
runtime.block_on(async {
let h = tokio::spawn(work());
assert_eq!(h.await.unwrap(), 2);
});
c.bench_function("basic_scheduler_spawn", |b| {
b.iter(|| {
runtime.block_on(async {
let h = tokio::spawn(work());
assert_eq!(h.await.unwrap(), 2);
});
})
});
}
fn basic_scheduler_spawn10(bench: &mut Bencher) {
fn basic_scheduler_spawn10(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
bench.iter(|| {
runtime.block_on(async {
let mut handles = Vec::with_capacity(10);
for _ in 0..10 {
handles.push(tokio::spawn(work()));
}
for handle in handles {
assert_eq!(handle.await.unwrap(), 2);
}
});
c.bench_function("basic_scheduler_spawn10", |b| {
b.iter(|| {
runtime.block_on(async {
let mut handles = Vec::with_capacity(10);
for _ in 0..10 {
handles.push(tokio::spawn(work()));
}
for handle in handles {
assert_eq!(handle.await.unwrap(), 2);
}
});
})
});
}
fn threaded_scheduler_spawn(bench: &mut Bencher) {
fn threaded_scheduler_spawn(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
bench.iter(|| {
runtime.block_on(async {
let h = tokio::spawn(work());
assert_eq!(h.await.unwrap(), 2);
});
c.bench_function("threaded_scheduler_spawn", |b| {
b.iter(|| {
runtime.block_on(async {
let h = tokio::spawn(work());
assert_eq!(h.await.unwrap(), 2);
});
})
});
}
fn threaded_scheduler_spawn10(bench: &mut Bencher) {
fn threaded_scheduler_spawn10(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
bench.iter(|| {
runtime.block_on(async {
let mut handles = Vec::with_capacity(10);
for _ in 0..10 {
handles.push(tokio::spawn(work()));
}
for handle in handles {
assert_eq!(handle.await.unwrap(), 2);
}
});
c.bench_function("threaded_scheduler_spawn10", |b| {
b.iter(|| {
runtime.block_on(async {
let mut handles = Vec::with_capacity(10);
for _ in 0..10 {
handles.push(tokio::spawn(work()));
}
for handle in handles {
assert_eq!(handle.await.unwrap(), 2);
}
});
})
});
}
bencher::benchmark_group!(
criterion_group!(
spawn,
basic_scheduler_spawn,
basic_scheduler_spawn10,
@ -81,4 +88,4 @@ bencher::benchmark_group!(
threaded_scheduler_spawn10,
);
bencher::benchmark_main!(spawn);
criterion_main!(spawn);

View File

@ -1,8 +1,23 @@
use bencher::{black_box, Bencher};
use tokio::sync::mpsc;
type Medium = [usize; 64];
type Large = [Medium; 64];
use criterion::measurement::WallTime;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
#[derive(Debug, Copy, Clone)]
struct Medium([usize; 64]);
impl Default for Medium {
fn default() -> Self {
Medium([0; 64])
}
}
#[derive(Debug, Copy, Clone)]
struct Large([Medium; 64]);
impl Default for Large {
fn default() -> Self {
Large([Medium::default(); 64])
}
}
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
@ -11,169 +26,176 @@ fn rt() -> tokio::runtime::Runtime {
.unwrap()
}
fn create_1_medium(b: &mut Bencher) {
b.iter(|| {
black_box(&mpsc::channel::<Medium>(1));
});
}
fn create_100_medium(b: &mut Bencher) {
b.iter(|| {
black_box(&mpsc::channel::<Medium>(100));
});
}
fn create_100_000_medium(b: &mut Bencher) {
b.iter(|| {
black_box(&mpsc::channel::<Medium>(100_000));
});
}
fn send_medium(b: &mut Bencher) {
let rt = rt();
b.iter(|| {
let (tx, mut rx) = mpsc::channel::<Medium>(1000);
let _ = rt.block_on(tx.send([0; 64]));
rt.block_on(rx.recv()).unwrap();
});
}
fn send_large(b: &mut Bencher) {
let rt = rt();
b.iter(|| {
let (tx, mut rx) = mpsc::channel::<Large>(1000);
let _ = rt.block_on(tx.send([[0; 64]; 64]));
rt.block_on(rx.recv()).unwrap();
});
}
fn contention_bounded(b: &mut Bencher) {
let rt = rt();
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}
for _ in 0..1_000 * 5 {
let _ = rx.recv().await;
}
fn create_medium<const SIZE: usize>(g: &mut BenchmarkGroup<WallTime>) {
g.bench_function(SIZE.to_string(), |b| {
b.iter(|| {
black_box(&mpsc::channel::<Medium>(SIZE));
})
});
}
fn contention_bounded_full(b: &mut Bencher) {
fn send_data<T: Default, const SIZE: usize>(g: &mut BenchmarkGroup<WallTime>, prefix: &str) {
let rt = rt();
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(100);
g.bench_function(format!("{}_{}", prefix, SIZE), |b| {
b.iter(|| {
let (tx, mut rx) = mpsc::channel::<T>(SIZE);
for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}
let _ = rt.block_on(tx.send(T::default()));
for _ in 0..1_000 * 5 {
let _ = rx.recv().await;
}
rt.block_on(rx.recv()).unwrap();
})
});
}
fn contention_unbounded(b: &mut Bencher) {
fn contention_bounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
g.bench_function("bounded", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).unwrap();
}
});
}
for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}
for _ in 0..1_000 * 5 {
let _ = rx.recv().await;
}
for _ in 0..1_000 * 5 {
let _ = rx.recv().await;
}
})
})
});
}
fn uncontented_bounded(b: &mut Bencher) {
fn contention_bounded_full(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
g.bench_function("bounded_full", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(100);
for i in 0..5000 {
tx.send(i).await.unwrap();
}
for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}
for _ in 0..5_000 {
let _ = rx.recv().await;
}
for _ in 0..1_000 * 5 {
let _ = rx.recv().await;
}
})
})
});
}
fn uncontented_unbounded(b: &mut Bencher) {
fn contention_unbounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
g.bench_function("unbounded", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
for i in 0..5000 {
tx.send(i).unwrap();
}
for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).unwrap();
}
});
}
for _ in 0..5_000 {
let _ = rx.recv().await;
}
for _ in 0..1_000 * 5 {
let _ = rx.recv().await;
}
})
})
});
}
bencher::benchmark_group!(
create,
create_1_medium,
create_100_medium,
create_100_000_medium
);
fn uncontented_bounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();
bencher::benchmark_group!(send, send_medium, send_large);
g.bench_function("bounded", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
bencher::benchmark_group!(
contention,
contention_bounded,
contention_bounded_full,
contention_unbounded,
uncontented_bounded,
uncontented_unbounded
);
for i in 0..5000 {
tx.send(i).await.unwrap();
}
bencher::benchmark_main!(create, send, contention);
for _ in 0..5_000 {
let _ = rx.recv().await;
}
})
})
});
}
fn uncontented_unbounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();
g.bench_function("unbounded", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
for i in 0..5000 {
tx.send(i).unwrap();
}
for _ in 0..5_000 {
let _ = rx.recv().await;
}
})
})
});
}
fn bench_create_medium(c: &mut Criterion) {
let mut group = c.benchmark_group("create_medium");
create_medium::<1>(&mut group);
create_medium::<100>(&mut group);
create_medium::<100_000>(&mut group);
group.finish();
}
fn bench_send(c: &mut Criterion) {
let mut group = c.benchmark_group("send");
send_data::<Medium, 1000>(&mut group, "medium");
send_data::<Large, 1000>(&mut group, "large");
group.finish();
}
fn bench_contention(c: &mut Criterion) {
let mut group = c.benchmark_group("contention");
contention_bounded(&mut group);
contention_bounded_full(&mut group);
contention_unbounded(&mut group);
group.finish();
}
fn bench_uncontented(c: &mut Criterion) {
let mut group = c.benchmark_group("uncontented");
uncontented_bounded(&mut group);
uncontented_unbounded(&mut group);
group.finish();
}
criterion_group!(create, bench_create_medium);
criterion_group!(send, bench_send);
criterion_group!(contention, bench_contention);
criterion_group!(uncontented, bench_uncontented);
criterion_main!(create, send, contention, uncontented);

View File

@ -1,27 +1,28 @@
use bencher::{benchmark_group, benchmark_main, Bencher};
use tokio::{
runtime::Runtime,
sync::{mpsc, oneshot},
};
fn request_reply_current_thread(b: &mut Bencher) {
use criterion::{criterion_group, criterion_main, Criterion};
fn request_reply_current_thread(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
request_reply(b, rt);
request_reply(c, rt);
}
fn request_reply_multi_threaded(b: &mut Bencher) {
fn request_reply_multi_threaded(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
request_reply(b, rt);
request_reply(c, rt);
}
fn request_reply(b: &mut Bencher, rt: Runtime) {
fn request_reply(b: &mut Criterion, rt: Runtime) {
let tx = rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<oneshot::Sender<()>>(10);
tokio::spawn(async move {
@ -32,22 +33,24 @@ fn request_reply(b: &mut Bencher, rt: Runtime) {
tx
});
b.iter(|| {
let task_tx = tx.clone();
rt.block_on(async move {
for _ in 0..1_000 {
let (o_tx, o_rx) = oneshot::channel();
task_tx.send(o_tx).await.unwrap();
let _ = o_rx.await;
}
b.bench_function("request_reply", |b| {
b.iter(|| {
let task_tx = tx.clone();
rt.block_on(async move {
for _ in 0..1_000 {
let (o_tx, o_rx) = oneshot::channel();
task_tx.send(o_tx).await.unwrap();
let _ = o_rx.await;
}
})
})
});
}
benchmark_group!(
criterion_group!(
sync_mpsc_oneshot_group,
request_reply_current_thread,
request_reply_multi_threaded,
);
benchmark_main!(sync_mpsc_oneshot_group);
criterion_main!(sync_mpsc_oneshot_group);

View File

@ -1,9 +1,11 @@
use bencher::Bencher;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
@ -11,7 +13,7 @@ fn rt() -> tokio::runtime::Runtime {
.unwrap()
}
fn notify_waiters<const N_WAITERS: usize>(b: &mut Bencher) {
fn notify_waiters<const N_WAITERS: usize>(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();
let notify = Arc::new(Notify::new());
let counter = Arc::new(AtomicUsize::new(0));
@ -29,18 +31,20 @@ fn notify_waiters<const N_WAITERS: usize>(b: &mut Bencher) {
}
const N_ITERS: usize = 500;
b.iter(|| {
counter.store(0, Ordering::Relaxed);
loop {
notify.notify_waiters();
if counter.load(Ordering::Relaxed) >= N_ITERS {
break;
g.bench_function(N_WAITERS.to_string(), |b| {
b.iter(|| {
counter.store(0, Ordering::Relaxed);
loop {
notify.notify_waiters();
if counter.load(Ordering::Relaxed) >= N_ITERS {
break;
}
}
}
})
});
}
fn notify_one<const N_WAITERS: usize>(b: &mut Bencher) {
fn notify_one<const N_WAITERS: usize>(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();
let notify = Arc::new(Notify::new());
let counter = Arc::new(AtomicUsize::new(0));
@ -58,33 +62,43 @@ fn notify_one<const N_WAITERS: usize>(b: &mut Bencher) {
}
const N_ITERS: usize = 500;
b.iter(|| {
counter.store(0, Ordering::Relaxed);
loop {
notify.notify_one();
if counter.load(Ordering::Relaxed) >= N_ITERS {
break;
g.bench_function(N_WAITERS.to_string(), |b| {
b.iter(|| {
counter.store(0, Ordering::Relaxed);
loop {
notify.notify_one();
if counter.load(Ordering::Relaxed) >= N_ITERS {
break;
}
}
}
})
});
}
bencher::benchmark_group!(
fn bench_notify_one(c: &mut Criterion) {
let mut group = c.benchmark_group("notify_one");
notify_one::<10>(&mut group);
notify_one::<50>(&mut group);
notify_one::<100>(&mut group);
notify_one::<200>(&mut group);
notify_one::<500>(&mut group);
group.finish();
}
fn bench_notify_waiters(c: &mut Criterion) {
let mut group = c.benchmark_group("notify_waiters");
notify_waiters::<10>(&mut group);
notify_waiters::<50>(&mut group);
notify_waiters::<100>(&mut group);
notify_waiters::<200>(&mut group);
notify_waiters::<500>(&mut group);
group.finish();
}
criterion_group!(
notify_waiters_simple,
notify_waiters::<10>,
notify_waiters::<50>,
notify_waiters::<100>,
notify_waiters::<200>,
notify_waiters::<500>
bench_notify_one,
bench_notify_waiters
);
bencher::benchmark_group!(
notify_one_simple,
notify_one::<10>,
notify_one::<50>,
notify_one::<100>,
notify_one::<200>,
notify_one::<500>
);
bencher::benchmark_main!(notify_waiters_simple, notify_one_simple);
criterion_main!(notify_waiters_simple);

View File

@ -1,26 +1,30 @@
use bencher::{black_box, Bencher};
use std::sync::Arc;
use tokio::{sync::RwLock, task};
fn read_uncontended(b: &mut Bencher) {
use criterion::measurement::WallTime;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
fn read_uncontended(g: &mut BenchmarkGroup<WallTime>) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
.build()
.unwrap();
let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
for _ in 0..6 {
let read = lock.read().await;
let _read = black_box(read);
}
g.bench_function("read", |b| {
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
for _ in 0..6 {
let read = lock.read().await;
let _read = black_box(read);
}
})
})
});
}
fn read_concurrent_uncontended_multi(b: &mut Bencher) {
fn read_concurrent_uncontended_multi(g: &mut BenchmarkGroup<WallTime>) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
.build()
@ -32,23 +36,25 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) {
}
let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone()))
};
j.unwrap();
g.bench_function("read_concurrent_multi", |b| {
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone()))
};
j.unwrap();
})
})
});
}
fn read_concurrent_uncontended(b: &mut Bencher) {
fn read_concurrent_uncontended(g: &mut BenchmarkGroup<WallTime>) {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
@ -59,22 +65,24 @@ fn read_concurrent_uncontended(b: &mut Bencher) {
}
let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
tokio::join! {
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone())
};
g.bench_function("read_concurrent", |b| {
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
tokio::join! {
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone())
};
})
})
});
}
fn read_concurrent_contended_multi(b: &mut Bencher) {
fn read_concurrent_contended_multi(g: &mut BenchmarkGroup<WallTime>) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
.build()
@ -86,24 +94,26 @@ fn read_concurrent_contended_multi(b: &mut Bencher) {
}
let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
let j = tokio::try_join! {
async move { drop(write); Ok(()) },
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
};
j.unwrap();
g.bench_function("read_concurrent_multi", |b| {
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
let j = tokio::try_join! {
async move { drop(write); Ok(()) },
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
task::spawn(task(lock.clone())),
};
j.unwrap();
})
})
});
}
fn read_concurrent_contended(b: &mut Bencher) {
fn read_concurrent_contended(g: &mut BenchmarkGroup<WallTime>) {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
@ -114,29 +124,40 @@ fn read_concurrent_contended(b: &mut Bencher) {
}
let lock = Arc::new(RwLock::new(()));
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
tokio::join! {
async move { drop(write) },
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
};
g.bench_function("read_concurrent", |b| {
b.iter(|| {
let lock = lock.clone();
rt.block_on(async move {
let write = lock.write().await;
tokio::join! {
async move { drop(write) },
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
task(lock.clone()),
};
})
})
});
}
bencher::benchmark_group!(
sync_rwlock,
read_uncontended,
read_concurrent_uncontended,
read_concurrent_uncontended_multi,
read_concurrent_contended,
read_concurrent_contended_multi
);
fn bench_contention(c: &mut Criterion) {
let mut group = c.benchmark_group("contention");
read_concurrent_contended(&mut group);
read_concurrent_contended_multi(&mut group);
group.finish();
}
bencher::benchmark_main!(sync_rwlock);
fn bench_uncontented(c: &mut Criterion) {
let mut group = c.benchmark_group("uncontented");
read_uncontended(&mut group);
read_concurrent_uncontended(&mut group);
read_concurrent_uncontended_multi(&mut group);
group.finish();
}
criterion_group!(contention, bench_contention);
criterion_group!(uncontented, bench_uncontented);
criterion_main!(contention, uncontented);

View File

@ -1,21 +1,36 @@
use bencher::Bencher;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::{sync::Semaphore, task};
fn uncontended(b: &mut Bencher) {
let rt = tokio::runtime::Builder::new_multi_thread()
use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion};
fn single_rt() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
}
fn multi_rt() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
.build()
.unwrap();
.unwrap()
}
fn uncontended(g: &mut BenchmarkGroup<WallTime>) {
let rt = multi_rt();
let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
for _ in 0..6 {
let permit = s.acquire().await;
drop(permit);
}
g.bench_function("multi", |b| {
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
for _ in 0..6 {
let permit = s.acquire().await;
drop(permit);
}
})
})
});
}
@ -25,101 +40,108 @@ async fn task(s: Arc<Semaphore>) {
drop(permit);
}
fn uncontended_concurrent_multi(b: &mut Bencher) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
.build()
.unwrap();
fn uncontended_concurrent_multi(g: &mut BenchmarkGroup<WallTime>) {
let rt = multi_rt();
let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
g.bench_function("concurrent_multi", |b| {
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
})
})
});
}
fn uncontended_concurrent_single(b: &mut Bencher) {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
fn uncontended_concurrent_single(g: &mut BenchmarkGroup<WallTime>) {
let rt = single_rt();
let s = Arc::new(Semaphore::new(10));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
g.bench_function("concurrent_single", |b| {
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
})
})
});
}
fn contended_concurrent_multi(b: &mut Bencher) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
.build()
.unwrap();
fn contended_concurrent_multi(g: &mut BenchmarkGroup<WallTime>) {
let rt = multi_rt();
let s = Arc::new(Semaphore::new(5));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
g.bench_function("concurrent_multi", |b| {
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
let j = tokio::try_join! {
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone())),
task::spawn(task(s.clone()))
};
j.unwrap();
})
})
});
}
fn contended_concurrent_single(b: &mut Bencher) {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
fn contended_concurrent_single(g: &mut BenchmarkGroup<WallTime>) {
let rt = single_rt();
let s = Arc::new(Semaphore::new(5));
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
g.bench_function("concurrent_single", |b| {
b.iter(|| {
let s = s.clone();
rt.block_on(async move {
tokio::join! {
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone()),
task(s.clone())
};
})
})
});
}
bencher::benchmark_group!(
sync_semaphore,
uncontended,
uncontended_concurrent_multi,
uncontended_concurrent_single,
contended_concurrent_multi,
contended_concurrent_single
);
fn bench_contention(c: &mut Criterion) {
let mut group = c.benchmark_group("contention");
contended_concurrent_multi(&mut group);
contended_concurrent_single(&mut group);
group.finish();
}
bencher::benchmark_main!(sync_semaphore);
fn bench_uncontented(c: &mut Criterion) {
let mut group = c.benchmark_group("uncontented");
uncontended(&mut group);
uncontended_concurrent_multi(&mut group);
uncontended_concurrent_single(&mut group);
group.finish();
}
criterion_group!(contention, bench_contention);
criterion_group!(uncontented, bench_uncontented);
criterion_main!(contention, uncontented);

View File

@ -1,9 +1,10 @@
use bencher::{black_box, Bencher};
use rand::prelude::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{watch, Notify};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
@ -24,7 +25,7 @@ fn do_work(rng: &mut impl RngCore) -> u32 {
.fold(0, u32::wrapping_add)
}
fn contention_resubscribe(b: &mut Bencher) {
fn contention_resubscribe(c: &mut Criterion) {
const NTASK: u64 = 1000;
let rt = rt();
@ -46,19 +47,21 @@ fn contention_resubscribe(b: &mut Bencher) {
});
}
b.iter(|| {
rt.block_on(async {
for _ in 0..100 {
assert_eq!(wg.0.fetch_add(NTASK, Ordering::Relaxed), 0);
let _ = snd.send(black_box(42));
while wg.0.load(Ordering::Acquire) > 0 {
wg.1.notified().await;
c.bench_function("contention_resubscribe", |b| {
b.iter(|| {
rt.block_on(async {
for _ in 0..100 {
assert_eq!(wg.0.fetch_add(NTASK, Ordering::Relaxed), 0);
let _ = snd.send(black_box(42));
while wg.0.load(Ordering::Acquire) > 0 {
wg.1.notified().await;
}
}
}
});
});
})
});
}
bencher::benchmark_group!(contention, contention_resubscribe);
criterion_group!(contention, contention_resubscribe);
bencher::benchmark_main!(contention);
criterion_main!(contention);

View File

@ -2,24 +2,23 @@
//! This essentially measure the time to enqueue a task in the local and remote
//! case.
#[macro_use]
extern crate bencher;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use bencher::{black_box, Bencher};
fn time_now_current_thread(bench: &mut Bencher) {
fn time_now_current_thread(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
bench.iter(|| {
rt.block_on(async {
black_box(tokio::time::Instant::now());
c.bench_function("time_now_current_thread", |b| {
b.iter(|| {
rt.block_on(async {
black_box(tokio::time::Instant::now());
})
})
})
});
}
bencher::benchmark_group!(time_now, time_now_current_thread,);
criterion_group!(time_now, time_now_current_thread);
bencher::benchmark_main!(time_now);
criterion_main!(time_now);