threadpool: spawn new tasks onto a random worker (#683)

* threadpool: submit new tasks to a random worker

* Revert unnecessary version bumps
This commit is contained in:
Stjepan Glavina 2018-10-03 23:09:20 +02:00 committed by GitHub
parent d35d0518f5
commit e27b0a46ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 146 additions and 40 deletions

View File

@ -89,3 +89,20 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
time = "0.1"
[patch.crates-io]
tokio = { path = "." }
tokio-async-await = { path = "./tokio-async-await" }
tokio-codec = { path = "./tokio-codec" }
tokio-current-thread = { path = "./tokio-current-thread" }
tokio-executor = { path = "./tokio-executor" }
tokio-fs = { path = "./tokio-fs" }
tokio-io = { path = "./tokio-io" }
tokio-reactor = { path = "./tokio-reactor" }
tokio-signal = { path = "./tokio-signal" }
tokio-tcp = { path = "./tokio-tcp" }
tokio-threadpool = { path = "./tokio-threadpool" }
tokio-timer = { path = "./tokio-timer" }
tokio-tls = { path = "./tokio-tls" }
tokio-udp = { path = "./tokio-udp" }
tokio-uds = { path = "./tokio-uds" }

View File

@ -33,3 +33,4 @@ tokio-io = { version = "0.1.6", path = "../tokio-io" }
[dev-dependencies]
num_cpus = "1.8.0"
tokio = { version = "0.1.7", path = ".." }
tokio-io-pool = "0.1.4"

View File

@ -6,56 +6,134 @@ extern crate mio;
extern crate num_cpus;
extern crate test;
extern crate tokio;
extern crate tokio_io_pool;
extern crate tokio_reactor;
use std::sync::mpsc;
const NUM_YIELD: usize = 500;
const TASKS_PER_CPU: usize = 100;
use futures::{future, Async};
use self::test::Bencher;
use tokio_reactor::Registration;
mod threadpool {
use super::*;
use std::sync::mpsc;
const NUM_YIELD: usize = 1_000;
const TASKS_PER_CPU: usize = 50;
use test::Bencher;
use futures::{future, Async};
use tokio_reactor::Registration;
use tokio::runtime::Runtime;
#[bench]
fn notify_many(b: &mut Bencher) {
let mut rt = tokio::runtime::Runtime::new().unwrap();
#[bench]
fn notify_many(b: &mut Bencher) {
let mut rt = Runtime::new().unwrap();
let tasks = TASKS_PER_CPU * num_cpus::get();
let tasks = TASKS_PER_CPU * num_cpus::get();
let (tx, rx) = mpsc::channel();
b.iter(|| {
let (tx, rx) = mpsc::channel();
b.iter(|| {
for _ in 0..tasks {
let (r, s) = mio::Registration::new2();
let registration = Registration::new();
registration.register(&r).unwrap();
rt.block_on::<_, (), ()>(future::lazy(move || {
for _ in 0..tasks {
let tx = tx.clone();
let mut rem = NUM_YIELD;
let mut r = Some(r);
let tx = tx.clone();
tokio::spawn(future::lazy(move || {
let (r, s) = mio::Registration::new2();
let registration = Registration::new();
registration.register(&r).unwrap();
rt.spawn(future::poll_fn(move || {
loop {
let is_ready = registration.poll_read_ready().unwrap().is_ready();
let mut rem = NUM_YIELD;
let mut r = Some(r);
let tx = tx.clone();
if is_ready {
rem -= 1;
tokio::spawn(future::poll_fn(move || {
loop {
let is_ready = registration.poll_read_ready().unwrap().is_ready();
if rem == 0 {
r.take().unwrap();
tx.send(()).unwrap();
return Ok(Async::Ready(()));
}
} else {
s.set_readiness(mio::Ready::readable()).unwrap();
return Ok(Async::NotReady);
}
if is_ready {
rem -= 1;
if rem == 0 {
r.take().unwrap();
tx.send(()).unwrap();
return Ok(Async::Ready(()));
}
} else {
s.set_readiness(mio::Ready::readable()).unwrap();
return Ok(Async::NotReady);
}
}
}));
Ok(())
}));
}
}));
}
for _ in 0..tasks {
rx.recv().unwrap();
}
});
Ok(())
})).unwrap();
for _ in 0..tasks {
rx.recv().unwrap();
}
})
}
}
mod io_pool {
use super::*;
use std::sync::mpsc;
use futures::{future, Async};
use test::Bencher;
use tokio_io_pool::Runtime;
use tokio_reactor::Registration;
#[bench]
fn notify_many(b: &mut Bencher) {
let mut rt = Runtime::new();
let tasks = TASKS_PER_CPU * num_cpus::get();
b.iter(|| {
let (tx, rx) = mpsc::channel();
rt.block_on::<_, (), ()>(future::lazy(move || {
for _ in 0..tasks {
let tx = tx.clone();
tokio::spawn(future::lazy(move || {
let (r, s) = mio::Registration::new2();
let registration = Registration::new();
registration.register(&r).unwrap();
let mut rem = NUM_YIELD;
let mut r = Some(r);
let tx = tx.clone();
tokio::spawn(future::poll_fn(move || {
loop {
let is_ready = registration.poll_read_ready().unwrap().is_ready();
if is_ready {
rem -= 1;
if rem == 0 {
r.take().unwrap();
tx.send(()).unwrap();
return Ok(Async::Ready(()));
}
} else {
s.set_readiness(mio::Ready::readable()).unwrap();
return Ok(Async::NotReady);
}
}
}));
Ok(())
}));
}
Ok(())
})).unwrap();
for _ in 0..tasks {
rx.recv().unwrap();
}
})
}
}

View File

@ -317,6 +317,16 @@ impl Pool {
// All workers are active, so pick a random worker and submit the
// task to it.
self.submit_to_random(task, inner);
}
/// Submit a task to a random worker
///
/// Called from outside of the scheduler, this function is how new tasks
/// enter the system.
pub fn submit_to_random(&self, task: Arc<Task>, inner: &Arc<Pool>) {
debug_assert_eq!(*self, **inner);
let len = self.workers.len();
let idx = self.rand_usize() % len;

View File

@ -161,7 +161,7 @@ impl<'a> tokio_executor::Executor for &'a Sender {
// Create a new task for the future
let task = Arc::new(Task::new(future));
self.inner.submit(task, &self.inner);
self.inner.submit_to_random(task, &self.inner);
Ok(())
}