diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml index 72c090f68..975408f9a 100644 --- a/tokio-threadpool/Cargo.toml +++ b/tokio-threadpool/Cargo.toml @@ -15,7 +15,7 @@ categories = ["concurrency", "asynchronous"] [dependencies] tokio-executor = { version = "0.1", path = "../tokio-executor" } futures = "0.1" -coco = "0.3" +crossbeam-deque = "0.3" num_cpus = "1.2" rand = "0.3" log = "0.3" diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs index 6a6afe347..13dfb5d2c 100644 --- a/tokio-threadpool/src/lib.rs +++ b/tokio-threadpool/src/lib.rs @@ -4,7 +4,7 @@ extern crate tokio_executor; extern crate futures; -extern crate coco; +extern crate crossbeam_deque as deque; extern crate num_cpus; extern crate rand; @@ -15,7 +15,6 @@ mod task; use tokio_executor::{Enter, SpawnError}; -use coco::deque; use task::Task; use futures::{future, Future, Poll, Async}; @@ -246,7 +245,7 @@ struct WorkerEntry { next_sleeper: UnsafeCell, // Worker half of deque - deque: deque::Worker, + deque: deque::Deque, // Stealer half of deque steal: deque::Stealer, @@ -1435,16 +1434,16 @@ impl Worker { /// Returns `true` if work was found. #[inline] fn try_run_task(&self, notify: &Arc) -> bool { - use coco::deque::Steal::*; + use deque::Steal::*; // Poll the internal queue for a task to run - match self.entry().deque.steal_weak() { + match self.entry().deque.steal() { Data(task) => { self.run_task(task, notify); true } Empty => false, - Inconsistent => true, + Retry => true, } } @@ -1453,7 +1452,7 @@ impl Worker { /// Returns `true` if work was found #[inline] fn try_steal_task(&self, notify: &Arc) -> bool { - use coco::deque::Steal::*; + use deque::Steal::*; let len = self.inner.workers.len(); let mut idx = self.inner.rand_usize() % len; @@ -1462,7 +1461,7 @@ impl Worker { loop { if idx < len { - match self.inner.workers[idx].steal.steal_weak() { + match self.inner.workers[idx].steal.steal() { Data(task) => { trace!("stole task"); @@ -1477,7 +1476,7 @@ impl Worker { return true; } Empty => {} - Inconsistent => found_work = true, + Retry => found_work = true, } idx += 1; @@ -1923,7 +1922,8 @@ impl fmt::Debug for SleepStack { impl WorkerEntry { fn new() -> Self { - let (w, s) = deque::new(); + let w = deque::Deque::new(); + let s = w.stealer(); WorkerEntry { state: AtomicUsize::new(WorkerState::default().into()),