Steal multiple tasks from another worker at a time (#534)

* Steal multiple tasks from another worker at a time
* Better spinning and failing pop
* Update crossbeam-deque and simplify spinning
This commit is contained in:
Stjepan Glavina 2018-08-09 21:14:13 +02:00 committed by Carl Lerche
parent fd36054ae4
commit 96b556fbff
3 changed files with 44 additions and 23 deletions

View File

@ -19,7 +19,7 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam-deque = "0.5.0"
crossbeam-deque = "0.6.0"
crossbeam-utils = "0.5.0"
num_cpus = "1.2"
rand = "0.5"

View File

@ -188,23 +188,34 @@ impl WorkerEntry {
///
/// This **must** only be called by the thread that owns the worker entry.
/// This function is not `Sync`.
pub fn pop_task(&self) -> Option<Arc<Task>> {
pub fn pop_task(&self) -> deque::Pop<Arc<Task>> {
self.worker.pop()
}
/// Steal a task
/// Steal tasks
///
/// This is called by *other* workers to steal a task for processing. This
/// function is `Sync`.
pub fn steal_task(&self) -> Option<Arc<Task>> {
self.stealer.steal()
///
/// At the same time, this method steals some additional tasks and moves
/// them into `dest` in order to balance the work distribution among
/// workers.
pub fn steal_tasks(&self, dest: &Self) -> deque::Steal<Arc<Task>> {
self.stealer.steal_many(&dest.worker)
}
/// Drain (and drop) all tasks that are queued for work.
///
/// This is called when the pool is shutting down.
pub fn drain_tasks(&self) {
while let Some(_) = self.worker.pop() {
use deque::Pop;
loop {
match self.worker.pop() {
Pop::Data(_) => {}
Pop::Empty => break,
Pop::Retry => {}
}
}
}

View File

@ -217,7 +217,7 @@ impl Worker {
///
/// This function blocks until the worker is shutting down.
pub fn run(&self) {
const MAX_SPINS: usize = 60;
const MAX_SPINS: usize = 3;
const LIGHT_SLEEP_INTERVAL: usize = 32;
// Get the notifier.
@ -256,14 +256,14 @@ impl Worker {
}
if !consistent {
thread::yield_now();
spin_cnt = 0;
continue;
}
spin_cnt += 1;
if spin_cnt < MAX_SPINS {
// Yield the thread several times before it actually goes to sleep.
if spin_cnt <= MAX_SPINS {
thread::yield_now();
continue;
}
@ -384,13 +384,16 @@ impl Worker {
///
/// Returns `true` if work was found.
fn try_run_owned_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool {
use deque::Pop;
// Poll the internal queue for a task to run
match self.entry().pop_task() {
Some(task) => {
Pop::Data(task) => {
self.run_task(task, notify, sender);
true
}
None => false,
Pop::Empty => false,
Pop::Retry => true,
}
}
@ -398,29 +401,36 @@ impl Worker {
///
/// Returns `true` if work was found
fn try_steal_task(&self, notify: &Arc<Notifier>, sender: &mut Sender) -> bool {
use deque::Steal;
debug_assert!(!self.is_blocking.get());
let len = self.inner.workers.len();
let mut idx = self.inner.rand_usize() % len;
let mut found_work = false;
let start = idx;
loop {
if idx < len {
if let Some(task) = self.inner.workers[idx].steal_task() {
trace!("stole task");
match self.inner.workers[idx].steal_tasks(self.entry()) {
Steal::Data(task) => {
trace!("stole task");
self.run_task(task, notify, sender);
self.run_task(task, notify, sender);
trace!("try_steal_task -- signal_work; self={}; from={}",
self.id.0, idx);
trace!("try_steal_task -- signal_work; self={}; from={}",
self.id.0, idx);
// Signal other workers that work is available
//
// TODO: Should this be called here or before
// `run_task`?
self.inner.signal_work(&self.inner);
// Signal other workers that work is available
//
// TODO: Should this be called here or before
// `run_task`?
self.inner.signal_work(&self.inner);
return true;
return true;
}
Steal::Empty => {}
Steal::Retry => found_work = true,
}
idx += 1;
@ -433,7 +443,7 @@ impl Worker {
}
}
false
found_work
}
fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>, sender: &mut Sender) {