From e27b0a46ba916126d7c66dbb0d4b27f85aec84d2 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 3 Oct 2018 23:09:20 +0200 Subject: [PATCH] threadpool: spawn new tasks onto a random worker (#683) * threadpool: submit new tasks to a random worker * Revert unnecessary version bumps --- Cargo.toml | 17 ++++ tokio-reactor/Cargo.toml | 1 + tokio-reactor/benches/basic.rs | 156 +++++++++++++++++++++++-------- tokio-threadpool/src/pool/mod.rs | 10 ++ tokio-threadpool/src/sender.rs | 2 +- 5 files changed, 146 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dfc8ddb4f..252013e5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,3 +89,20 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" time = "0.1" + +[patch.crates-io] +tokio = { path = "." } +tokio-async-await = { path = "./tokio-async-await" } +tokio-codec = { path = "./tokio-codec" } +tokio-current-thread = { path = "./tokio-current-thread" } +tokio-executor = { path = "./tokio-executor" } +tokio-fs = { path = "./tokio-fs" } +tokio-io = { path = "./tokio-io" } +tokio-reactor = { path = "./tokio-reactor" } +tokio-signal = { path = "./tokio-signal" } +tokio-tcp = { path = "./tokio-tcp" } +tokio-threadpool = { path = "./tokio-threadpool" } +tokio-timer = { path = "./tokio-timer" } +tokio-tls = { path = "./tokio-tls" } +tokio-udp = { path = "./tokio-udp" } +tokio-uds = { path = "./tokio-uds" } diff --git a/tokio-reactor/Cargo.toml b/tokio-reactor/Cargo.toml index e07697de7..59725bc98 100644 --- a/tokio-reactor/Cargo.toml +++ b/tokio-reactor/Cargo.toml @@ -33,3 +33,4 @@ tokio-io = { version = "0.1.6", path = "../tokio-io" } [dev-dependencies] num_cpus = "1.8.0" tokio = { version = "0.1.7", path = ".." } +tokio-io-pool = "0.1.4" diff --git a/tokio-reactor/benches/basic.rs b/tokio-reactor/benches/basic.rs index 4dbf45446..c5150f20a 100644 --- a/tokio-reactor/benches/basic.rs +++ b/tokio-reactor/benches/basic.rs @@ -6,56 +6,134 @@ extern crate mio; extern crate num_cpus; extern crate test; extern crate tokio; +extern crate tokio_io_pool; extern crate tokio_reactor; -use std::sync::mpsc; +const NUM_YIELD: usize = 500; +const TASKS_PER_CPU: usize = 100; -use futures::{future, Async}; -use self::test::Bencher; -use tokio_reactor::Registration; +mod threadpool { + use super::*; + use std::sync::mpsc; -const NUM_YIELD: usize = 1_000; -const TASKS_PER_CPU: usize = 50; + use test::Bencher; + use futures::{future, Async}; + use tokio_reactor::Registration; + use tokio::runtime::Runtime; -#[bench] -fn notify_many(b: &mut Bencher) { - let mut rt = tokio::runtime::Runtime::new().unwrap(); + #[bench] + fn notify_many(b: &mut Bencher) { + let mut rt = Runtime::new().unwrap(); + let tasks = TASKS_PER_CPU * num_cpus::get(); - let tasks = TASKS_PER_CPU * num_cpus::get(); - let (tx, rx) = mpsc::channel(); + b.iter(|| { + let (tx, rx) = mpsc::channel(); - b.iter(|| { - for _ in 0..tasks { - let (r, s) = mio::Registration::new2(); - let registration = Registration::new(); - registration.register(&r).unwrap(); + rt.block_on::<_, (), ()>(future::lazy(move || { + for _ in 0..tasks { + let tx = tx.clone(); - let mut rem = NUM_YIELD; - let mut r = Some(r); - let tx = tx.clone(); + tokio::spawn(future::lazy(move || { + let (r, s) = mio::Registration::new2(); + let registration = Registration::new(); + registration.register(&r).unwrap(); - rt.spawn(future::poll_fn(move || { - loop { - let is_ready = registration.poll_read_ready().unwrap().is_ready(); + let mut rem = NUM_YIELD; + let mut r = Some(r); + let tx = tx.clone(); - if is_ready { - rem -= 1; + tokio::spawn(future::poll_fn(move || { + loop { + let is_ready = registration.poll_read_ready().unwrap().is_ready(); - if rem == 0 { - r.take().unwrap(); - tx.send(()).unwrap(); - return Ok(Async::Ready(())); - } - } else { - s.set_readiness(mio::Ready::readable()).unwrap(); - return Ok(Async::NotReady); - } + if is_ready { + rem -= 1; + + if rem == 0 { + r.take().unwrap(); + tx.send(()).unwrap(); + return Ok(Async::Ready(())); + } + } else { + s.set_readiness(mio::Ready::readable()).unwrap(); + return Ok(Async::NotReady); + } + } + })); + + Ok(()) + })); } - })); - } - for _ in 0..tasks { - rx.recv().unwrap(); - } - }); + Ok(()) + })).unwrap(); + + for _ in 0..tasks { + rx.recv().unwrap(); + } + }) + } +} + +mod io_pool { + use super::*; + use std::sync::mpsc; + + use futures::{future, Async}; + use test::Bencher; + use tokio_io_pool::Runtime; + use tokio_reactor::Registration; + + #[bench] + fn notify_many(b: &mut Bencher) { + let mut rt = Runtime::new(); + let tasks = TASKS_PER_CPU * num_cpus::get(); + + b.iter(|| { + let (tx, rx) = mpsc::channel(); + + rt.block_on::<_, (), ()>(future::lazy(move || { + for _ in 0..tasks { + let tx = tx.clone(); + + tokio::spawn(future::lazy(move || { + let (r, s) = mio::Registration::new2(); + let registration = Registration::new(); + registration.register(&r).unwrap(); + + let mut rem = NUM_YIELD; + let mut r = Some(r); + let tx = tx.clone(); + + tokio::spawn(future::poll_fn(move || { + loop { + let is_ready = registration.poll_read_ready().unwrap().is_ready(); + + if is_ready { + rem -= 1; + + if rem == 0 { + r.take().unwrap(); + tx.send(()).unwrap(); + return Ok(Async::Ready(())); + } + } else { + s.set_readiness(mio::Ready::readable()).unwrap(); + return Ok(Async::NotReady); + } + } + })); + + Ok(()) + })); + } + + Ok(()) + })).unwrap(); + + for _ in 0..tasks { + rx.recv().unwrap(); + } + }) + } } diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs index 7a029aaec..b4e8e388f 100644 --- a/tokio-threadpool/src/pool/mod.rs +++ b/tokio-threadpool/src/pool/mod.rs @@ -317,6 +317,16 @@ impl Pool { // All workers are active, so pick a random worker and submit the // task to it. + self.submit_to_random(task, inner); + } + + /// Submit a task to a random worker + /// + /// Called from outside of the scheduler, this function is how new tasks + /// enter the system. + pub fn submit_to_random(&self, task: Arc, inner: &Arc) { + debug_assert_eq!(*self, **inner); + let len = self.workers.len(); let idx = self.rand_usize() % len; diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs index e2e6584c0..211aca4dd 100644 --- a/tokio-threadpool/src/sender.rs +++ b/tokio-threadpool/src/sender.rs @@ -161,7 +161,7 @@ impl<'a> tokio_executor::Executor for &'a Sender { // Create a new task for the future let task = Arc::new(Task::new(future)); - self.inner.submit(task, &self.inner); + self.inner.submit_to_random(task, &self.inner); Ok(()) }