Add some thread pool docs (#421)

This commit is contained in:
Carl Lerche 2018-06-15 15:20:25 -07:00 committed by GitHub
parent 71c8f561e3
commit 3fac7ce68c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 147 additions and 4 deletions

View File

@ -3,6 +3,116 @@
#![doc(html_root_url = "https://docs.rs/tokio-threadpool/0.1.4")]
#![deny(warnings, missing_docs, missing_debug_implementations)]
// The Tokio thread pool is designed to scheduled futures in Tokio based
// applications. The thread pool structure manages two sets of threads:
//
// * Worker threads.
// * Backup threads.
//
// Worker threads are used to schedule futures using a work-stealing strategy.
// Backup threads, on the other hand, are intended only to support the
// `blocking` API. Threads will transition between the two sets.
//
// The advantage of the work-stealing strategy is minimal cross-thread
// coordination. The thread pool attempts to make as much progress as possible
// without communicating across threads.
//
// # Crate layout
//
// The primary type, `Pool`, holds the majority of a thread pool's state,
// including the state for each worker. Each worker's state is maintained in an
// instance of `worker::Entry`.
//
// `Worker` contains the logic that runs on each worker thread. It holds an
// `Arc` to `Pool` and is able to access its state from `Pool`.
//
// `Task` is a harness around an individual future. It manages polling and
// scheduling that future.
//
// # Worker overview
//
// Each worker has two queues: a deque and a mpsc channel. The deque is the
// primary queue for tasks that are scheduled to run on the worker thread. Tasks
// can only be pushed onto the deque by the worker, but other workers may
// "steal" from that deque. The mpsc channel is used to submit futures while
// external to the pool.
//
// As long as the thread pool has not been shutdown, a worker will run in a
// loop. Each loop, it consumes all tasks on its mpsc channel and pushes it onto
// the deque. It then pops tasks off of the deque and executes them.
//
// If a worker has no work, i.e., both queues are empty. It attempts to steal.
// To do this, it randomly scans other workers' deques and tries to pop a task.
// If it finds no work to steal, the thread goes to sleep.
//
// When the worker detects that the pool has been shut down, it exits the loop,
// cleans up its state, and shuts the thread down.
//
// # Thread pool initialization
//
// By default, no threads are spawned on creation. Instead, when new futures are
// spawned, the pool first checks if there are enough active worker threads. If
// not, a new worker thread is spawned.
//
// # Spawning futures
//
// The spawning behavior depends on whether a future was spawned from within a
// worker or thread or if it was spawned from an external handle.
//
// When spawning a future while external to the thread pool, the current
// strategy is to randomly pick a worker to submit the task to. The task is then
// pushed onto that worker's mpsc channel.
//
// When spawning a future while on a worker thread, the task is pushed onto the
// back of the current worker's deque.
//
// # Sleeping workers
//
// Sleeping workers are tracked using a treiber stack [1]. This results in the
// thread that most recently went to sleep getting woken up first. When the pool
// is not under load, this helps threads shutdown faster.
//
// Sleeping is done by using `tokio_executor::Park` implementations. This allows
// the user of the thread pool to customize the work that is performed to sleep.
// This is how injecting timers and other functionality into the thread pool is
// done.
//
// [1]: https://en.wikipedia.org/wiki/Treiber_Stack
//
// # Notifying workers
//
// When there is work to be done, workers must be notified. However, notifying a
// worker requires cross thread coordination. Ideally, a worker would only be
// notified when it is sleeping, but there is no way to know if a worker is
// sleeping without cross thread communication.
//
// The two cases when a worker might need to be notified are:
//
// 1) A task is externally submitted to a worker via the mpsc channel.
// 2) A worker has a back log of work and needs other workers to steal from it.
//
// In the first case, the worker will always be notified. However, it could be
// possible to avoid the notification if the mpsc channel has two or greater
// number of tasks *after* the task is submitted. In this case, we are able to
// assume that the worker has previously been notified.
//
// The second case is trickier. Currently, whenever a worker spawns a new future
// (pushing it onto its deque) and when it pops a future from its mpsc, it tries
// to notify a sleeping worker to wake up and start stealing. This is a lot of
// notification and it **might** be possible to reduce it.
//
// Also, whenever a worker is woken up via a signal and it does find work, it,
// in turn, will try to wake up a new worker.
//
// # `blocking`
//
// The strategy for handling blocking closures is to hand off the worker to a
// new thread. This implies handing off the `deque` and `mpsc`. Once this is
// done, the new thread continues to process the work queue and the original
// thread is able to block. Once it finishes processing the blocking future, the
// thread has no additional work and is inserted into the backup pool. This
// makes it available to other workers that encounter a `blocking` call.
extern crate tokio_executor;
extern crate crossbeam_deque as deque;

View File

@ -29,10 +29,16 @@ use std::thread;
use rand::{Rng, SeedableRng, XorShiftRng};
// TODO: Rename this
#[derive(Debug)]
pub(crate) struct Pool {
// ThreadPool state
// Tracks the state of the thread pool (running, shutting down, ...).
//
// While workers check this field as a hint to detect shutdown, it is
// **not** used as a primary point of coordination for workers. The sleep
// stack is used as the primary point of coordination for workers.
//
// The value of this atomic is deserialized into a `pool::State` instance.
// See comments for that type.
pub state: AtomicUsize,
// Stack tracking sleeping workers.

View File

@ -16,6 +16,9 @@ use deque;
// operations are thread-safe vs. which ones require ownership of the worker.
pub(crate) struct WorkerEntry {
// Worker state. This is mutated when notifying the worker.
//
// The `usize` value is deserialized to a `worker::State` instance. See
// comments on that type.
pub state: AtomicUsize,
// Next entry in the parked Trieber stack

View File

@ -336,8 +336,29 @@ impl Worker {
state = actual;
}
// If this is the first iteration of the worker loop, then the state can
// be signaled.
// `first` is set to true the first time this function is called after
// the thread has started.
//
// This check is to handle the scenario where a worker gets signaled
// while it is already happily running. The `is_signaled` state is
// intended to wake up a worker that has been previously sleeping in
// effect increasing the number of active workers. If this is the first
// time `check_run_state` is called, then being in a signalled state is
// normal and the thread was started to handle it. However, if this is
// **not** the first time the fn was called, then the number of active
// workers has not been increased by the signal, so `signal_work` has to
// be called again to try to wake up another worker.
//
// For example, if the thread pool is configured to allow 4 workers.
// Worker 1 is processing tasks from its `deque`. Worker 2 receives its
// first task. Worker 2 will pick a random worker to signal. It does
// this by popping off the sleep stack, but there is no guarantee that
// workers on the sleep stack are actually sleeping. It is possible that
// Worker 1 gets signaled.
//
// Without this check, in the above case, no additional workers will get
// started, which results in the thread pool permanently being at 2
// workers even though it should reach 4.
if !first && state.is_signaled() {
trace!("Worker::check_run_state; delegate signal");
// This worker is not ready to be signaled, so delegate the signal
@ -537,6 +558,9 @@ impl Worker {
match task {
Empty => {
if found_work {
// TODO: Why is this called on every iteration? Would it
// not be better to only signal when work was found
// after waking up?
trace!("found work while draining; signal_work");
self.inner.signal_work(&self.inner);
}