threadpool: introduce a global task queue (#798)

This commit is contained in:
Stjepan Glavina 2018-12-28 20:34:54 +01:00 committed by Toby Lawrence
parent 201b6ce53a
commit fdf4aba621
10 changed files with 90 additions and 288 deletions

View File

@ -20,6 +20,7 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam-channel = "0.3.3"
crossbeam-deque = "0.6.1"
crossbeam-utils = "0.6.0"
num_cpus = "1.2"

View File

@ -3,6 +3,7 @@ use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
use shutdown::ShutdownTrigger;
use pool::{Pool, MAX_BACKUP};
use task::Queue;
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};
@ -413,11 +414,13 @@ impl Builder {
workers.into()
};
let queue = Arc::new(Queue::new());
// Create a trigger that will clean up resources on shutdown.
//
// The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain
// strong references.
let trigger = Arc::new(ShutdownTrigger::new(workers.clone()));
let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone()));
// Create the pool
let pool = Arc::new(Pool::new(
@ -425,6 +428,7 @@ impl Builder {
Arc::downgrade(&trigger),
self.max_blocking,
self.config.clone(),
queue,
));
ThreadPool::new2(pool, trigger)

View File

@ -79,6 +79,7 @@
extern crate tokio_executor;
extern crate crossbeam_channel;
extern crate crossbeam_deque as deque;
extern crate crossbeam_utils;
#[macro_use]

View File

@ -15,7 +15,7 @@ use self::backup_stack::BackupStack;
use config::Config;
use shutdown::ShutdownTrigger;
use task::{Task, Blocking};
use task::{Blocking, Queue, Task};
use worker::{self, Worker, WorkerId};
use futures::Poll;
@ -59,6 +59,12 @@ pub(crate) struct Pool {
// The number of workers will *usually* be small.
pub workers: Arc<[worker::Entry]>,
// The global MPMC queue of tasks.
//
// Spawned tasks are pushed into this queue. Although worker threads have their own dedicated
// task queues, they periodically steal tasks from this global queue, too.
pub queue: Arc<Queue>,
// Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
//
// When spawning a new `Worker`, this weak reference is upgraded and handed out to the new
@ -90,6 +96,7 @@ impl Pool {
trigger: Weak<ShutdownTrigger>,
max_blocking: usize,
config: Config,
queue: Arc<Queue>,
) -> Pool {
let pool_size = workers.len();
let total_size = max_blocking + pool_size;
@ -117,6 +124,7 @@ impl Pool {
sleep_stack: CachePadded::new(worker::Stack::new()),
num_workers: AtomicUsize::new(0),
workers,
queue,
trigger,
backup,
backup_stack,
@ -264,50 +272,10 @@ impl Pool {
pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
debug_assert_eq!(*self, **pool);
use worker::Lifecycle::Notified;
trace!(" -> submit external");
// First try to get a handle to a sleeping worker. This ensures that
// sleeping tasks get woken up
if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Notified, false) {
trace!("submit to existing worker; idx={}; state={:?}", idx, worker_state);
self.submit_to_external(idx, task, worker_state, pool);
return;
}
// All workers are active, so pick a random worker and submit the
// task to it.
self.submit_to_random(task, pool);
}
/// Submit a task to a random worker
///
/// Called from outside of the scheduler, this function is how new tasks
/// enter the system.
pub fn submit_to_random(&self, task: Arc<Task>, pool: &Arc<Pool>) {
debug_assert_eq!(*self, **pool);
let len = self.workers.len();
let idx = self.rand_usize() % len;
trace!(" -> submitting to random; idx={}", idx);
let state = self.workers[idx].load_state();
self.submit_to_external(idx, task, state, pool);
}
fn submit_to_external(&self,
idx: usize,
task: Arc<Task>,
state: worker::State,
pool: &Arc<Pool>)
{
debug_assert_eq!(*self, **pool);
let entry = &self.workers[idx];
if !entry.submit_external(task, state) {
self.spawn_thread(WorkerId::new(idx), pool);
}
self.queue.push(task);
self.signal_work(pool);
}
pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> {
@ -438,43 +406,21 @@ impl Pool {
pub fn signal_work(&self, pool: &Arc<Pool>) {
debug_assert_eq!(*self, **pool);
use worker::Lifecycle::*;
use worker::Lifecycle::Signaled;
if let Some((idx, mut worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
let entry = &self.workers[idx];
debug_assert!(worker_state.lifecycle() != Signaled, "actual={:?}", worker_state.lifecycle());
debug_assert!(
worker_state.lifecycle() != Signaled,
"actual={:?}", worker_state.lifecycle(),
);
// Transition the worker state to signaled
loop {
let mut next = worker_state;
trace!("signal_work -- notify; idx={}", idx);
next.set_lifecycle(Signaled);
let actual = entry.state.compare_and_swap(
worker_state.into(), next.into(), AcqRel).into();
if actual == worker_state {
break;
}
worker_state = actual;
}
// The state has been transitioned to signal, now we need to wake up
// the worker if necessary.
match worker_state.lifecycle() {
Sleeping => {
trace!("signal_work -- wakeup; idx={}", idx);
self.workers[idx].wakeup();
}
Shutdown => {
trace!("signal_work -- spawn; idx={}", idx);
self.spawn_thread(WorkerId(idx), pool);
}
Running | Notified | Signaled => {
// The workers are already active. No need to wake them up.
}
if !entry.notify(worker_state) {
trace!("signal_work -- spawn; idx={}", idx);
self.spawn_thread(WorkerId(idx), pool);
}
}
}

View File

@ -161,7 +161,10 @@ impl<'a> tokio_executor::Executor for &'a Sender {
// Create a new task for the future
let task = Arc::new(Task::new(future));
self.pool.submit_to_random(task, &self.pool);
// Call `submit_external()` in order to place the task into the global
// queue. This way all workers have equal chance of running this task,
// which means IO handles will be assigned to reactors more evenly.
self.pool.submit_external(task, &self.pool);
Ok(())
}

View File

@ -1,3 +1,4 @@
use task::Queue;
use worker;
use futures::{Future, Poll, Async};
@ -61,25 +62,30 @@ impl Future for Shutdown {
pub(crate) struct ShutdownTrigger {
inner: Arc<Mutex<Inner>>,
workers: Arc<[worker::Entry]>,
queue: Arc<Queue>,
}
unsafe impl Send for ShutdownTrigger {}
unsafe impl Sync for ShutdownTrigger {}
impl ShutdownTrigger {
pub(crate) fn new(workers: Arc<[worker::Entry]>) -> ShutdownTrigger {
pub(crate) fn new(workers: Arc<[worker::Entry]>, queue: Arc<Queue>) -> ShutdownTrigger {
ShutdownTrigger {
inner: Arc::new(Mutex::new(Inner {
task: AtomicTask::new(),
completed: false,
})),
workers,
queue,
}
}
}
impl Drop for ShutdownTrigger {
fn drop(&mut self) {
// Drain the global task queue.
while self.queue.pop().is_some() {}
// Notify the task interested in shutdown.
let mut inner = self.inner.lock().unwrap();
inner.completed = true;

View File

@ -4,7 +4,7 @@ mod queue;
mod state;
pub(crate) use self::blocking::{Blocking, CanBlock};
pub(crate) use self::queue::{Queue, Poll};
pub(crate) use self::queue::Queue;
use self::blocking_state::BlockingState;
use self::state::State;
@ -31,9 +31,6 @@ pub(crate) struct Task {
/// Task blocking related state
blocking: AtomicUsize,
/// Next pointer in the queue that submits tasks to a worker.
next: AtomicPtr<Task>,
/// Next pointer in the queue of tasks pending blocking capacity.
next_blocking: AtomicPtr<Task>,
@ -63,7 +60,6 @@ impl Task {
Task {
state: AtomicUsize::new(State::new().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
next: AtomicPtr::new(ptr::null_mut()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
future: UnsafeCell::new(Some(task_fut)),
}
@ -78,7 +74,6 @@ impl Task {
Task {
state: AtomicUsize::new(State::stub().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
next: AtomicPtr::new(ptr::null_mut()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
future: UnsafeCell::new(Some(task_fut)),
}
@ -237,7 +232,6 @@ impl Task {
impl fmt::Debug for Task {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Task")
.field("next", &self.next)
.field("state", &self.state)
.field("future", &"Spawn<BoxFuture>")
.finish()

View File

@ -1,33 +1,13 @@
use task::Task;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::Arc;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{Acquire, Release, AcqRel, Relaxed};
use crossbeam_utils::CachePadded;
use crossbeam_channel::{unbounded, Receiver, Sender};
#[derive(Debug)]
pub(crate) struct Queue {
/// Queue head.
///
/// This is a strong reference to `Task` (i.e, `Arc<Task>`)
head: CachePadded<AtomicPtr<Task>>,
/// Tail pointer. This is `Arc<Task>` unless it points to `stub`.
tail: UnsafeCell<*mut Task>,
/// Stub pointer, used as part of the intrusive mpsc channel algorithm
/// described by 1024cores.
stub: Box<Task>,
}
#[derive(Debug)]
pub(crate) enum Poll {
Empty,
Inconsistent,
Data(Arc<Task>),
// TODO(stjepang): Use a custom, faster MPMC queue implementation that supports `steal_many()`.
chan: (Sender<Arc<Task>>, Receiver<Arc<Task>>),
}
// ===== impl Queue =====
@ -35,93 +15,20 @@ pub(crate) enum Poll {
impl Queue {
/// Create a new, empty, `Queue`.
pub fn new() -> Queue {
let stub = Box::new(Task::stub());
let ptr = &*stub as *const _ as *mut _;
Queue {
head: CachePadded::new(AtomicPtr::new(ptr)),
tail: UnsafeCell::new(ptr),
stub: stub,
chan: unbounded(),
}
}
/// Push a task onto the queue.
///
/// This function is `Sync`.
#[inline]
pub fn push(&self, task: Arc<Task>) {
unsafe {
self.push2(Arc::into_raw(task));
}
self.chan.0.send(task).unwrap();
}
unsafe fn push2(&self, task: *const Task) {
let task = task as *mut Task;
// Set the next pointer. This does not require an atomic operation as
// this node is not accessible. The write will be flushed with the next
// operation
(*task).next.store(ptr::null_mut(), Relaxed);
// Update the head to point to the new node. We need to see the previous
// node in order to update the next pointer as well as release `task`
// to any other threads calling `push`.
let prev = self.head.swap(task, AcqRel);
// Release `task` to the consume end.
(*prev).next.store(task, Release);
}
/// Poll a task from the queue.
///
/// This function is **not** `Sync` and requires coordination by the caller.
pub unsafe fn poll(&self) -> Poll {
let mut tail = *self.tail.get();
let mut next = (*tail).next.load(Acquire);
let stub = &*self.stub as *const _ as *mut _;
if tail == stub {
if next.is_null() {
return Poll::Empty;
}
*self.tail.get() = next;
tail = next;
next = (*next).next.load(Acquire);
}
if !next.is_null() {
*self.tail.get() = next;
// No ref_count inc is necessary here as this poll is paired
// with a `push` which "forgets" the handle.
return Poll::Data(Arc::from_raw(tail));
}
if self.head.load(Acquire) != tail {
return Poll::Inconsistent;
}
self.push2(stub);
next = (*tail).next.load(Acquire);
if !next.is_null() {
*self.tail.get() = next;
return Poll::Data(Arc::from_raw(tail));
}
Poll::Inconsistent
}
}
impl Drop for Queue {
fn drop(&mut self) {
loop {
if let Poll::Empty = unsafe { self.poll() } {
break
}
}
/// Pop a task from the queue.
#[inline]
pub fn pop(&self) -> Option<Arc<Task>> {
self.chan.1.try_recv().ok()
}
}

View File

@ -1,12 +1,12 @@
use park::{BoxPark, BoxUnpark};
use task::{Task, Queue};
use task::Task;
use worker::state::{State, PUSHED_MASK};
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed};
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use crossbeam_utils::CachePadded;
use deque;
@ -36,9 +36,6 @@ pub(crate) struct WorkerEntry {
// Thread unparker
pub unpark: BoxUnpark,
// MPSC queue of jobs submitted to the worker from an external source.
pub inbound: Queue,
}
impl WorkerEntry {
@ -50,21 +47,11 @@ impl WorkerEntry {
next_sleeper: UnsafeCell::new(0),
worker: w,
stealer: s,
inbound: Queue::new(),
park: UnsafeCell::new(park),
unpark,
}
}
/// Atomically load the worker's state
///
/// # Ordering
///
/// An `Acquire` ordering is established on the entry's state variable.
pub fn load_state(&self) -> State {
self.state.load(Acquire).into()
}
/// Atomically unset the pushed flag.
///
/// # Return
@ -85,20 +72,15 @@ impl WorkerEntry {
self.push_internal(task);
}
/// Submits a task to the worker. This assumes that the caller is external
/// to the worker. Internal submissions go through another path.
///
/// Returns `false` if the worker needs to be spawned.
/// Notifies the worker and returns `false` if it needs to be spawned.
///
/// # Ordering
///
/// The `state` must have been obtained with an `Acquire` ordering.
pub fn submit_external(&self, task: Arc<Task>, mut state: State) -> bool {
#[inline]
pub fn notify(&self, mut state: State) -> bool {
use worker::Lifecycle::*;
// Push the task onto the external queue
self.push_external(task);
loop {
let mut next = state;
next.notify();
@ -188,6 +170,7 @@ impl WorkerEntry {
///
/// This **must** only be called by the thread that owns the worker entry.
/// This function is not `Sync`.
#[inline]
pub fn pop_task(&self) -> deque::Pop<Arc<Task>> {
self.worker.pop()
}
@ -208,22 +191,17 @@ impl WorkerEntry {
///
/// This is called when the pool is shutting down.
pub fn drain_tasks(&self) {
use deque::Pop;
use deque::Pop::*;
loop {
match self.worker.pop() {
Pop::Data(_) => {}
Pop::Empty => break,
Pop::Retry => {}
Data(_) => {}
Empty => break,
Retry => {}
}
}
}
#[inline]
fn push_external(&self, task: Arc<Task>) {
self.inbound.push(task);
}
#[inline]
pub fn push_internal(&self, task: Arc<Task>) {
self.worker.push(task);
@ -254,7 +232,6 @@ impl fmt::Debug for WorkerEntry {
.field("stealer", &self.stealer)
.field("park", &"UnsafeCell<BoxPark>")
.field("unpark", &"BoxUnpark")
.field("inbound", &self.inbound)
.finish()
}
}

View File

@ -242,10 +242,6 @@ impl Worker {
while self.check_run_state(first) {
first = false;
// Poll inbound until empty, transferring all tasks to the internal
// queue.
let consistent = self.drain_inbound();
// Run the next available task
if self.try_run_task(&notify) {
if self.is_blocking.get() {
@ -253,6 +249,8 @@ impl Worker {
return;
}
// Poll the reactor and the global queue every now and then to
// ensure no task gets left behind.
if tick % LIGHT_SLEEP_INTERVAL == 0 {
self.sleep_light();
}
@ -264,11 +262,6 @@ impl Worker {
continue;
}
if !consistent {
spin_cnt = 0;
continue;
}
spin_cnt += 1;
// Yield the thread several times before it actually goes to sleep.
@ -423,7 +416,7 @@ impl Worker {
if idx < len {
match self.pool.workers[idx].steal_tasks(self.entry()) {
Steal::Data(task) => {
trace!("stole task");
trace!("stole task from another worker");
self.run_task(task, notify);
@ -562,48 +555,6 @@ impl Worker {
task.run(notify)
}
/// Drains all tasks on the extern queue and pushes them onto the internal
/// queue.
///
/// Returns `true` if the operation was able to complete in a consistent
/// state.
#[inline]
fn drain_inbound(&self) -> bool {
use task::Poll::*;
let mut found_work = false;
loop {
let task = unsafe { self.entry().inbound.poll() };
match task {
Empty => {
if found_work {
// TODO: Why is this called on every iteration? Would it
// not be better to only signal when work was found
// after waking up?
trace!("found work while draining; signal_work");
self.pool.signal_work(&self.pool);
}
return true;
}
Inconsistent => {
if found_work {
trace!("found work while draining; signal_work");
self.pool.signal_work(&self.pool);
}
return false;
}
Data(task) => {
found_work = true;
self.entry().push_internal(task);
}
}
}
}
/// Put the worker to sleep
///
/// Returns `true` if woken up due to new work arriving.
@ -679,18 +630,16 @@ impl Worker {
trace!(" -> starting to sleep; idx={}", self.id.0);
// Do a quick check to see if there are any notifications in the
// reactor or new tasks in the global queue. Since this call will
// clear the wakeup token, we need to check the state again and
// only after that go to sleep.
self.sleep_light();
// The state has been transitioned to sleeping, we can now wait by
// calling the parker. This is done in a loop as condvars can wakeup
// spuriously.
loop {
unsafe {
(*self.entry().park.get())
.park()
.unwrap();
}
trace!(" -> wakeup; idx={}", self.id.0);
// Reload the state
state = self.entry().state.load(Acquire).into();
@ -722,18 +671,38 @@ impl Worker {
unreachable!();
}
}
unsafe {
(*self.entry().park.get())
.park()
.unwrap();
}
trace!(" -> wakeup; idx={}", self.id.0);
}
}
/// This doesn't actually put the thread to sleep. It calls
/// `park.park_timeout` with a duration of 0. This allows the park
/// implementation to perform any work that might be done on an interval.
///
/// Returns `true` if this worker has tasks in its queue.
fn sleep_light(&self) {
const STEAL_COUNT: usize = 32;
unsafe {
(*self.entry().park.get())
.park_timeout(Duration::from_millis(0))
.unwrap();
}
for _ in 0..STEAL_COUNT {
if let Some(task) = self.pool.queue.pop() {
self.pool.submit(task, &self.pool);
} else {
break;
}
}
}
fn entry(&self) -> &Entry {
@ -747,14 +716,8 @@ impl Drop for Worker {
trace!("shutting down thread; idx={}", self.id.0);
if self.should_finalize.get() {
// Get all inbound work and push it onto the work queue. The work
// queue is drained in the next step.
self.drain_inbound();
// Drain the work queue
self.entry().drain_tasks();
// TODO: Drain the work queue...
}
}
}