chore: fix thread_pool benchmarks (#1947)

Update the rotted thread_pool benchmarks. These benchmarks are not the
greatest, but as of now it is all we have for micro benchmarks.

Adds a little yielding in the parker as it helps a bit.
This commit is contained in:
Carl Lerche 2019-12-11 12:44:45 -08:00 committed by GitHub
parent 24cd6d67f7
commit c0953d41a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 27 deletions

View File

@ -2,7 +2,7 @@
extern crate test;
use tokio::executor::thread_pool::{Builder, Spawner};
use tokio::runtime::Builder;
use tokio::sync::oneshot;
use std::future::Future;
@ -28,13 +28,11 @@ impl Future for Backoff {
}
}
const NUM_THREADS: usize = 6;
#[bench]
fn spawn_many(b: &mut test::Bencher) {
const NUM_SPAWN: usize = 10_000;
let threadpool = Builder::new().num_threads(NUM_THREADS).build();
let rt = Builder::new().threaded_scheduler().build().unwrap();
let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
@ -46,7 +44,7 @@ fn spawn_many(b: &mut test::Bencher) {
let tx = tx.clone();
let rem = rem.clone();
threadpool.spawn(async move {
rt.spawn(async move {
if 1 == rem.fetch_sub(1, Relaxed) {
tx.send(()).unwrap();
}
@ -62,7 +60,7 @@ fn yield_many(b: &mut test::Bencher) {
const NUM_YIELD: usize = 1_000;
const TASKS_PER_CPU: usize = 50;
let threadpool = Builder::new().num_threads(NUM_THREADS).build();
let rt = Builder::new().threaded_scheduler().build().unwrap();
let tasks = TASKS_PER_CPU * num_cpus::get_physical();
let (tx, rx) = mpsc::sync_channel(tasks);
@ -71,7 +69,7 @@ fn yield_many(b: &mut test::Bencher) {
for _ in 0..tasks {
let tx = tx.clone();
threadpool.spawn(async move {
rt.spawn(async move {
let backoff = Backoff(NUM_YIELD);
backoff.await;
tx.send(()).unwrap();
@ -88,7 +86,7 @@ fn yield_many(b: &mut test::Bencher) {
fn ping_pong(b: &mut test::Bencher) {
const NUM_PINGS: usize = 1_000;
let threadpool = Builder::new().num_threads(NUM_THREADS).build();
let rt = Builder::new().threaded_scheduler().build().unwrap();
let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
@ -98,20 +96,16 @@ fn ping_pong(b: &mut test::Bencher) {
let rem = rem.clone();
rem.store(NUM_PINGS, Relaxed);
let spawner = threadpool.spawner().clone();
threadpool.spawn(async move {
rt.spawn(async move {
for _ in 0..NUM_PINGS {
let rem = rem.clone();
let done_tx = done_tx.clone();
let spawner2 = spawner.clone();
spawner.spawn(async move {
tokio::spawn(async move {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
spawner2.spawn(async move {
tokio::spawn(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
});
@ -134,15 +128,14 @@ fn ping_pong(b: &mut test::Bencher) {
fn chained_spawn(b: &mut test::Bencher) {
const ITER: usize = 1_000;
let threadpool = Builder::new().num_threads(NUM_THREADS).build();
let rt = Builder::new().threaded_scheduler().build().unwrap();
fn iter(spawner: Spawner, done_tx: mpsc::SyncSender<()>, n: usize) {
fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
done_tx.send(()).unwrap();
} else {
let s2 = spawner.clone();
spawner.spawn(async move {
iter(s2, done_tx, n - 1);
tokio::spawn(async move {
iter(done_tx, n - 1);
});
}
}
@ -151,9 +144,8 @@ fn chained_spawn(b: &mut test::Bencher) {
b.iter(move || {
let done_tx = done_tx.clone();
let spawner = threadpool.spawner().clone();
threadpool.spawn(async move {
iter(spawner, done_tx, ITER);
rt.spawn(async move {
iter(done_tx, ITER);
});
done_rx.recv().unwrap();

View File

@ -4,6 +4,7 @@
use crate::loom::sync::{Arc, Mutex, Condvar};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::thread;
use crate::park::{Park, Unpark};
use crate::runtime::time;
use crate::util::TryLock;
@ -113,10 +114,14 @@ impl Unpark for Unparker {
impl Inner {
/// Park the current thread for at most `dur`.
fn park(&self) {
// If we were previously notified then we consume this notification and
// return quickly.
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
return;
for _ in 0..3 {
// If we were previously notified then we consume this notification and
// return quickly.
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
return;
}
thread::yield_now();
}
if let Some(mut driver) = self.shared.driver.try_lock() {