Each multi-threaded runtime worker prioritizes pulling tasks off of its
local queue. Every so often, it checks the injection (global) queue for
work submitted there. Previously, "every so often," was a constant
"number of tasks polled" value. Tokio sets a default of 61, but allows
users to configure this value.
If workers are under load with tasks that are slow to poll, the
injection queue can be starved. To prevent starvation in this case, this
commit implements some basic self-tuning. The multi-threaded scheduler
tracks the mean task poll time using an exponentially-weighted moving
average. It then uses this value to pick an interval at which to check
the injection queue.
This commit is a first pass at adding self-tuning to the scheduler.
There are other values in the scheduler that could benefit from
self-tuning (e.g. the maintenance interval). Additionally, the
current-thread scheduler could also benfit from self-tuning. However, we
have reached the point where we should start investigating ways to unify
logic in both schedulers. Adding self-tuning to the current-thread
scheduler will be punted until after this unification.
In the multi-threaded scheduler, when there are no tasks on the local
queue, a worker will attempt to pull tasks from the injection queue.
Previously, the worker would only attempt to poll one task from the
injection queue then continue trying to find work from other sources.
This can result in the injection queue backing up when there are many
tasks being scheduled from outside of the runtime.
This patch updates the worker to try to poll more than one task from the
injection queue when it has no more local work. Note that we also don't
want a single worker to poll **all** tasks on the injection queue as
that would result in work becoming unbalanced.
Previously, the current_thread scheduler used its own injection queue
instead of sharing the same one as the multi-threaded scheduler. This
patch updates the current_thread scheduler to use the same injection
queue as the multi-threaded one (`task::Inject`).
`task::Inject` includes an optimization where it does not need to
acquire the mutex if the queue is empty.
The test-util feature flag is only intended to be used with tests.
However, it is possible to enable it in release mode accidentally. This
patch reduces the overhead of `Instant::now()` when the `test-util`
feature flag is enabled but `time::pause()` is not called.
The optimization is implemented by adding a static atomic flag that
tracks if `time::pause()` has ever been called. In `Instant::now()`, the
atomic flag is first checked before the thread-local and mutex are
accessed.
This additional benchmark exercises a common request/reply pattern using an MPSC for requests along with a oneshot payload as a reply mechanism. When used in a current threaded scenario, the bench is 17 times faster on my machine than when using the multi-threaded runtime and one worker thread. Not only that, but if I increase the number of worker threads to 6, performance degrades further.
Does this suggest a scheduling problem with the multi-threaded runtime?
No matter what, hopefully the benchmarks are a useful addition.
This change removes all references to `Stream` from
within the `tokio` crate and moves them into a new
`tokio-stream` crate. Most types have had their
`impl Stream` removed as well in-favor of their
inherent methods.
Closes#2870
## Motivation
Many of Tokio's synchronization primitives (`RwLock`, `Mutex`,
`Semaphore`, and the bounded MPSC channel) are based on the internal
semaphore implementation, called `semaphore_ll`. This semaphore type
provides a lower-level internal API for the semaphore implementation
than the public `Semaphore` type, and supports "batch" operations, where
waiters may acquire more than one permit at a time, and batches of
permits may be released back to the semaphore.
Currently, `semaphore_ll` uses an atomic singly-linked list for the
waiter queue. The linked list implementation is specific to the
semaphore. This implementation therefore requires a heap allocation for
every waiter in the queue. These allocations are owned by the semaphore,
rather than by the task awaiting permits from the semaphore. Critically,
they are only _deallocated_ when permits are released back to the
semaphore, at which point it dequeues as many waiters from the front of
the queue as can be satisfied with the released permits. If a task
attempts to acquire permits from the semaphore and is cancelled (such as
by timing out), their waiter nodes remain in the list until they are
dequeued while releasing permits. In cases where large numbers of tasks
are cancelled while waiting for permits, this results in extremely high
memory use for the semaphore (see #2237).
## Solution
@Matthias247 has proposed that Tokio adopt the approach used in his
`futures-intrusive` crate: using an _intrusive_ linked list to store the
wakers of tasks waiting on a synchronization primitive. In an intrusive
list, each list node is stored as part of the entry that node
represents, rather than in a heap allocation that owns the entry.
Because futures must be pinned in order to be polled, the necessary
invariant of such a list --- that entries may not move while in the list
--- may be upheld by making the waiter node `!Unpin`. In this approach,
the waiter node can be stored inline in the future, rather than
requiring separate heap allocation, and cancelled futures may remove
their nodes from the list.
This branch adds a new semaphore implementation that uses the intrusive
list added to Tokio in #2210. The implementation is essentially a hybrid
of the old `semaphore_ll` and the semaphore used in `futures-intrusive`:
while a `Mutex` around the wait list is necessary, since the intrusive
list is not thread-safe, the permit state is stored outside of the mutex
and updated atomically.
The mutex is acquired only when accessing the wait list — if a task
can acquire sufficient permits without waiting, it does not need to
acquire the lock. When releasing permits, we iterate over the wait
list from the end of the queue until we run out of permits to release,
and split off all the nodes that received enough permits to wake up
into a separate list. Then, we can drain the new list and notify those
wakers *after* releasing the lock. Because the split operation only
modifies the pointers on the head node of the split-off list and the
new tail node of the old list, it is O(1) and does not require an
allocation to return a variable length number of waiters to notify.
Because of the intrusive list invariants, the API provided by the new
`batch_semaphore` is somewhat different than that of `semaphore_ll`. In
particular, the `Permit` type has been removed. This type was primarily
intended allow the reuse of a wait list node allocated on the heap.
Since the intrusive list means we can avoid heap-allocating waiters,
this is no longer necessary. Instead, acquiring permits is done by
polling an `Acquire` future returned by the `Semaphore` type. The use of
a future here ensures that the waiter node is always pinned while
waiting to acquire permits, and that a reference to the semaphore is
available to remove the waiter if the future is cancelled.
Unfortunately, the current implementation of the bounded MPSC requires a
`poll_acquire` operation, and has methods that call it while outside of
a pinned context. Therefore, I've left the old `semaphore_ll`
implementation in place to be used by the bounded MPSC, and updated the
`Mutex`, `RwLock`, and `Semaphore` APIs to use the new implementation.
Hopefully, a subsequent change can update the bounded MPSC to use the
new semaphore as well.
Fixes#2237
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
A refactor of the scheduler internals focusing on simplifying and
reducing unsafety. There are no fundamental logic changes.
* The state transitions of the core task component are refined and
reduced.
* `basic_scheduler` has most unsafety removed.
* `local_set` has most unsafety removed.
* `threaded_scheduler` limits most unsafety to its queue implementation.
Some of the benchhmarks were broken and/or using deprecated APIs. This
patch updates the benches and requires them all to compile without
warnings in order to pass CI.