diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs index e412f06f0..c37d4179f 100644 --- a/tokio-threadpool/src/lib.rs +++ b/tokio-threadpool/src/lib.rs @@ -3,6 +3,116 @@ #![doc(html_root_url = "https://docs.rs/tokio-threadpool/0.1.4")] #![deny(warnings, missing_docs, missing_debug_implementations)] +// The Tokio thread pool is designed to scheduled futures in Tokio based +// applications. The thread pool structure manages two sets of threads: +// +// * Worker threads. +// * Backup threads. +// +// Worker threads are used to schedule futures using a work-stealing strategy. +// Backup threads, on the other hand, are intended only to support the +// `blocking` API. Threads will transition between the two sets. +// +// The advantage of the work-stealing strategy is minimal cross-thread +// coordination. The thread pool attempts to make as much progress as possible +// without communicating across threads. +// +// # Crate layout +// +// The primary type, `Pool`, holds the majority of a thread pool's state, +// including the state for each worker. Each worker's state is maintained in an +// instance of `worker::Entry`. +// +// `Worker` contains the logic that runs on each worker thread. It holds an +// `Arc` to `Pool` and is able to access its state from `Pool`. +// +// `Task` is a harness around an individual future. It manages polling and +// scheduling that future. +// +// # Worker overview +// +// Each worker has two queues: a deque and a mpsc channel. The deque is the +// primary queue for tasks that are scheduled to run on the worker thread. Tasks +// can only be pushed onto the deque by the worker, but other workers may +// "steal" from that deque. The mpsc channel is used to submit futures while +// external to the pool. +// +// As long as the thread pool has not been shutdown, a worker will run in a +// loop. Each loop, it consumes all tasks on its mpsc channel and pushes it onto +// the deque. It then pops tasks off of the deque and executes them. +// +// If a worker has no work, i.e., both queues are empty. It attempts to steal. +// To do this, it randomly scans other workers' deques and tries to pop a task. +// If it finds no work to steal, the thread goes to sleep. +// +// When the worker detects that the pool has been shut down, it exits the loop, +// cleans up its state, and shuts the thread down. +// +// # Thread pool initialization +// +// By default, no threads are spawned on creation. Instead, when new futures are +// spawned, the pool first checks if there are enough active worker threads. If +// not, a new worker thread is spawned. +// +// # Spawning futures +// +// The spawning behavior depends on whether a future was spawned from within a +// worker or thread or if it was spawned from an external handle. +// +// When spawning a future while external to the thread pool, the current +// strategy is to randomly pick a worker to submit the task to. The task is then +// pushed onto that worker's mpsc channel. +// +// When spawning a future while on a worker thread, the task is pushed onto the +// back of the current worker's deque. +// +// # Sleeping workers +// +// Sleeping workers are tracked using a treiber stack [1]. This results in the +// thread that most recently went to sleep getting woken up first. When the pool +// is not under load, this helps threads shutdown faster. +// +// Sleeping is done by using `tokio_executor::Park` implementations. This allows +// the user of the thread pool to customize the work that is performed to sleep. +// This is how injecting timers and other functionality into the thread pool is +// done. +// +// [1]: https://en.wikipedia.org/wiki/Treiber_Stack +// +// # Notifying workers +// +// When there is work to be done, workers must be notified. However, notifying a +// worker requires cross thread coordination. Ideally, a worker would only be +// notified when it is sleeping, but there is no way to know if a worker is +// sleeping without cross thread communication. +// +// The two cases when a worker might need to be notified are: +// +// 1) A task is externally submitted to a worker via the mpsc channel. +// 2) A worker has a back log of work and needs other workers to steal from it. +// +// In the first case, the worker will always be notified. However, it could be +// possible to avoid the notification if the mpsc channel has two or greater +// number of tasks *after* the task is submitted. In this case, we are able to +// assume that the worker has previously been notified. +// +// The second case is trickier. Currently, whenever a worker spawns a new future +// (pushing it onto its deque) and when it pops a future from its mpsc, it tries +// to notify a sleeping worker to wake up and start stealing. This is a lot of +// notification and it **might** be possible to reduce it. +// +// Also, whenever a worker is woken up via a signal and it does find work, it, +// in turn, will try to wake up a new worker. +// +// # `blocking` +// +// The strategy for handling blocking closures is to hand off the worker to a +// new thread. This implies handing off the `deque` and `mpsc`. Once this is +// done, the new thread continues to process the work queue and the original +// thread is able to block. Once it finishes processing the blocking future, the +// thread has no additional work and is inserted into the backup pool. This +// makes it available to other workers that encounter a `blocking` call. + extern crate tokio_executor; extern crate crossbeam_deque as deque; diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs index 94e1704fd..5a79b0dee 100644 --- a/tokio-threadpool/src/pool/mod.rs +++ b/tokio-threadpool/src/pool/mod.rs @@ -29,10 +29,16 @@ use std::thread; use rand::{Rng, SeedableRng, XorShiftRng}; -// TODO: Rename this #[derive(Debug)] pub(crate) struct Pool { - // ThreadPool state + // Tracks the state of the thread pool (running, shutting down, ...). + // + // While workers check this field as a hint to detect shutdown, it is + // **not** used as a primary point of coordination for workers. The sleep + // stack is used as the primary point of coordination for workers. + // + // The value of this atomic is deserialized into a `pool::State` instance. + // See comments for that type. pub state: AtomicUsize, // Stack tracking sleeping workers. diff --git a/tokio-threadpool/src/worker/entry.rs b/tokio-threadpool/src/worker/entry.rs index 99c28962f..752ad2955 100644 --- a/tokio-threadpool/src/worker/entry.rs +++ b/tokio-threadpool/src/worker/entry.rs @@ -16,6 +16,9 @@ use deque; // operations are thread-safe vs. which ones require ownership of the worker. pub(crate) struct WorkerEntry { // Worker state. This is mutated when notifying the worker. + // + // The `usize` value is deserialized to a `worker::State` instance. See + // comments on that type. pub state: AtomicUsize, // Next entry in the parked Trieber stack diff --git a/tokio-threadpool/src/worker/mod.rs b/tokio-threadpool/src/worker/mod.rs index 775dafcdf..66c9170cf 100644 --- a/tokio-threadpool/src/worker/mod.rs +++ b/tokio-threadpool/src/worker/mod.rs @@ -336,8 +336,29 @@ impl Worker { state = actual; } - // If this is the first iteration of the worker loop, then the state can - // be signaled. + // `first` is set to true the first time this function is called after + // the thread has started. + // + // This check is to handle the scenario where a worker gets signaled + // while it is already happily running. The `is_signaled` state is + // intended to wake up a worker that has been previously sleeping in + // effect increasing the number of active workers. If this is the first + // time `check_run_state` is called, then being in a signalled state is + // normal and the thread was started to handle it. However, if this is + // **not** the first time the fn was called, then the number of active + // workers has not been increased by the signal, so `signal_work` has to + // be called again to try to wake up another worker. + // + // For example, if the thread pool is configured to allow 4 workers. + // Worker 1 is processing tasks from its `deque`. Worker 2 receives its + // first task. Worker 2 will pick a random worker to signal. It does + // this by popping off the sleep stack, but there is no guarantee that + // workers on the sleep stack are actually sleeping. It is possible that + // Worker 1 gets signaled. + // + // Without this check, in the above case, no additional workers will get + // started, which results in the thread pool permanently being at 2 + // workers even though it should reach 4. if !first && state.is_signaled() { trace!("Worker::check_run_state; delegate signal"); // This worker is not ready to be signaled, so delegate the signal @@ -537,6 +558,9 @@ impl Worker { match task { Empty => { if found_work { + // TODO: Why is this called on every iteration? Would it + // not be better to only signal when work was found + // after waking up? trace!("found work while draining; signal_work"); self.inner.signal_work(&self.inner); }