rt: initial implementation of new threaded runtime (#5823)

This patch includes an initial implementation of a new multi-threaded
runtime. The new runtime aims to increase the scheduler throughput by
speeding up how it dispatches work to peer worker threads. This
implementation improves most benchmarks by about ~10% when the number of
threads is below 16. As threads increase, mutex contention deteriorates
performance.

Because the new scheduler is not yet ready to replace the old one, the
patch introduces it as an unstable runtime flavor with a warning that it
isn't production ready. Work to improve the scalability of the runtime
will most likely require more intrusive changes across Tokio, so I am
opting to merge with master to avoid larger conflicts.
This commit is contained in:
Carl Lerche 2023-07-21 11:56:34 -07:00 committed by GitHub
parent 63577cd8d3
commit 4165601b1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 5416 additions and 106 deletions

30
.github/labeler.yml vendored
View File

@ -1,8 +1,28 @@
R-loom:
R-loom-sync:
- tokio/src/sync/*
- tokio/src/sync/**/*
- tokio-util/src/sync/*
- tokio-util/src/sync/**/*
- tokio/src/runtime/*
- tokio/src/runtime/**/*
R-loom-time-driver:
- tokio/src/runtime/time/*
- tokio/src/runtime/time/**/*
R-loom-current-thread:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/current_thread/*
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**
R-loom-multi-thread:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/multi_thread/*
- tokio/src/runtime/scheduler/multi_thread/**
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**
R-loom-multi-thread-alt:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/multi_thread_alt/*
- tokio/src/runtime/scheduler/multi_thread_alt/**
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**

View File

@ -8,7 +8,9 @@ on:
name: Loom
env:
RUSTFLAGS: -Dwarnings
RUSTFLAGS: -Dwarnings --cfg loom --cfg tokio_unstable -C debug_assertions
LOOM_MAX_PREEMPTIONS: 2
LOOM_MAX_BRANCHES: 10000
RUST_BACKTRACE: 1
# Change to specific Rust release to pin
rust_stable: stable
@ -17,26 +19,66 @@ permissions:
contents: read
jobs:
loom:
name: loom
loom-sync:
name: loom tokio::sync
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom') || (github.base_ref == null))
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-sync') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture sync::tests
working-directory: tokio
loom-time-driver:
name: loom time driver
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-time-driver') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture runtime::time::tests
working-directory: tokio
loom-current-thread:
name: loom current-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-current-thread') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture loom_current_thread
working-directory: tokio
loom-multi-thread:
name: loom multi-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-multi-thread') || (github.base_ref == null))
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scope: --skip loom_pool
max_preemptions: 2
- scope: loom_pool::group_a
max_preemptions: 2
- scope: loom_pool::group_b
max_preemptions: 2
- scope: loom_pool::group_c
max_preemptions: 2
- scope: loom_pool::group_d
max_preemptions: 2
- scope: time::driver
max_preemptions: 2
- scope: loom_multi_thread::group_a
- scope: loom_multi_thread::group_b
- scope: loom_multi_thread::group_c
- scope: loom_multi_thread::group_d
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
@ -45,10 +87,34 @@ jobs:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: loom ${{ matrix.scope }}
run: cargo test --lib --release --features full -- --nocapture $SCOPE
run: cargo test --lib --release --features full -- $SCOPE
working-directory: tokio
env:
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings -C debug-assertions
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
LOOM_MAX_BRANCHES: 10000
SCOPE: ${{ matrix.scope }}
loom-multi-thread-alt:
name: loom ALT multi-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-multi-thread-alt') || (github.base_ref == null))
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scope: loom_multi_thread_alt::group_a
- scope: loom_multi_thread_alt::group_b
- scope: loom_multi_thread_alt::group_c
- scope: loom_multi_thread_alt::group_d
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: loom ${{ matrix.scope }}
run: cargo test --lib --release --features full -- $SCOPE
working-directory: tokio
env:
SCOPE: ${{ matrix.scope }}
# TODO: remove this before stabilizing
LOOM_MAX_PREEMPTIONS: 1

View File

@ -160,7 +160,7 @@ wasm-bindgen-test = "0.3.0"
mio-aio = { version = "0.7.0", features = ["tokio"] }
[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5.2", features = ["futures", "checkpoint"] }
loom = { version = "0.6", features = ["futures", "checkpoint"] }
[package.metadata.docs.rs]
all-features = true

View File

@ -6,10 +6,12 @@ impl<T> UnsafeCell<T> {
UnsafeCell(std::cell::UnsafeCell::new(data))
}
#[inline(always)]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}
#[inline(always)]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}

View File

@ -25,6 +25,8 @@ impl BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
BlockingSchedule {
@ -45,6 +47,8 @@ impl task::Schedule for BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
None

View File

@ -199,6 +199,8 @@ pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt,
}
impl Builder {
@ -230,6 +232,26 @@ impl Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61)
}
cfg_unstable! {
/// Returns a new builder with the alternate multi thread scheduler
/// selected.
///
/// The alternate multi threaded scheduler is an in-progress
/// candidate to replace the existing multi threaded scheduler. It
/// currently does not scale as well to 16+ processors.
///
/// This runtime flavor is currently **not considered production
/// ready**.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread_alt() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThreadAlt, 61)
}
}
}
/// Returns a new runtime builder initialized with default configuration
@ -656,6 +678,8 @@ impl Builder {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
}
}
@ -665,6 +689,8 @@ impl Builder {
Kind::CurrentThread => true,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => false,
},
enable_io: self.enable_io,
enable_time: self.enable_time,
@ -1214,6 +1240,48 @@ cfg_rt_multi_thread! {
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}
cfg_unstable! {
fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::MultiThreadAlt;
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
let (scheduler, handle) = MultiThreadAlt::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);
Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
}
}
}
}

View File

@ -357,6 +357,8 @@ impl Handle {
scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt,
}
}
@ -385,6 +387,8 @@ impl Handle {
scheduler::Handle::CurrentThread(handle) => handle.owned_id(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(handle) => handle.owned_id(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(handle) => handle.owned_id(),
};
owned_id.into()
}
@ -535,6 +539,8 @@ cfg_taskdump! {
handle.dump().await
}).await
},
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
}
}
}

View File

@ -9,6 +9,10 @@ use std::time::Duration;
cfg_rt_multi_thread! {
use crate::runtime::Builder;
use crate::runtime::scheduler::MultiThread;
cfg_unstable! {
use crate::runtime::scheduler::MultiThreadAlt;
}
}
/// The Tokio runtime.
@ -109,6 +113,9 @@ pub enum RuntimeFlavor {
CurrentThread,
/// The flavor that executes tasks across multiple threads.
MultiThread,
/// The flavor that executes tasks across multiple threads.
#[cfg(tokio_unstable)]
MultiThreadAlt,
}
/// The runtime scheduler is either a multi-thread or a current-thread executor.
@ -120,6 +127,10 @@ pub(super) enum Scheduler {
/// Execute tasks across multiple threads.
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread(MultiThread),
/// Execute tasks across multiple threads.
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt(MultiThreadAlt),
}
impl Runtime {
@ -336,6 +347,8 @@ impl Runtime {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
}
}
@ -456,6 +469,12 @@ impl Drop for Runtime {
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(multi_thread) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
}
}
}

View File

@ -0,0 +1,21 @@
use crate::runtime::scheduler;
#[track_caller]
pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
#[cfg(tokio_unstable)]
{
use crate::runtime::{Handle, RuntimeFlavor::MultiThreadAlt};
match Handle::try_current().map(|h| h.runtime_flavor()) {
Ok(MultiThreadAlt) => {
return scheduler::multi_thread_alt::block_in_place(f);
}
_ => {}
}
}
scheduler::multi_thread::block_in_place(f)
}

View File

@ -523,6 +523,10 @@ cfg_metrics! {
&self.shared.worker_metrics
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.worker_metrics(worker).queue_depth()
}
pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

View File

@ -75,6 +75,21 @@ impl<T: 'static> Shared<T> {
debug_assert!(unsafe { batch_tail.get_queue_next().is_none() });
let mut synced = shared.lock();
if synced.as_mut().is_closed {
drop(synced);
let mut curr = Some(batch_head);
while let Some(task) = curr {
curr = task.get_queue_next();
let _ = unsafe { task::Notified::<T>::from_raw(task) };
}
return;
}
let synced = synced.as_mut();
if let Some(tail) = synced.tail {

View File

@ -10,11 +10,19 @@ cfg_rt! {
}
cfg_rt_multi_thread! {
mod block_in_place;
pub(crate) use block_in_place::block_in_place;
mod lock;
use lock::Lock;
pub(crate) mod multi_thread;
pub(crate) use multi_thread::MultiThread;
cfg_unstable! {
pub(crate) mod multi_thread_alt;
pub(crate) use multi_thread_alt::MultiThread as MultiThreadAlt;
}
}
use crate::runtime::driver;
@ -27,6 +35,9 @@ pub(crate) enum Handle {
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread(Arc<multi_thread::Handle>),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt(Arc<multi_thread_alt::Handle>),
// TODO: This is to avoid triggering "dead code" warnings many other places
// in the codebase. Remove this during a later cleanup
#[cfg(not(feature = "rt"))]
@ -40,6 +51,9 @@ pub(super) enum Context {
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread(multi_thread::Context),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt(multi_thread_alt::Context),
}
impl Handle {
@ -52,6 +66,9 @@ impl Handle {
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(ref h) => &h.driver,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThreadAlt(ref h) => &h.driver,
#[cfg(not(feature = "rt"))]
Handle::Disabled => unreachable!(),
}
@ -67,6 +84,20 @@ cfg_rt! {
use crate::util::RngSeedGenerator;
use std::task::Waker;
macro_rules! match_flavor {
($self:expr, $ty:ident($h:ident) => $e:expr) => {
match $self {
$ty::CurrentThread($h) => $e,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
$ty::MultiThread($h) => $e,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
$ty::MultiThreadAlt($h) => $e,
}
}
}
impl Handle {
#[track_caller]
pub(crate) fn current() -> Handle {
@ -77,12 +108,7 @@ cfg_rt! {
}
pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
match self {
Handle::CurrentThread(h) => &h.blocking_spawner,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(h) => &h.blocking_spawner,
}
match_flavor!(self, Handle(h) => &h.blocking_spawner)
}
pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
@ -95,6 +121,9 @@ cfg_rt! {
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id),
}
}
@ -104,16 +133,14 @@ cfg_rt! {
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(ref h) => h.shutdown(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThreadAlt(ref h) => h.shutdown(),
}
}
pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
match self {
Handle::CurrentThread(h) => &h.seed_generator,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(h) => &h.seed_generator,
}
match_flavor!(self, Handle(h) => &h.seed_generator)
}
pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
@ -123,6 +150,17 @@ cfg_rt! {
_ => panic!("not a CurrentThread handle"),
}
}
cfg_rt_multi_thread! {
cfg_unstable! {
pub(crate) fn expect_multi_thread_alt(&self) -> &Arc<multi_thread_alt::Handle> {
match self {
Handle::MultiThreadAlt(handle) => handle,
_ => panic!("not a `MultiThreadAlt` handle"),
}
}
}
}
}
cfg_metrics! {
@ -134,71 +172,41 @@ cfg_rt! {
Handle::CurrentThread(_) => 1,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_workers(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThreadAlt(handle) => handle.num_workers(),
}
}
pub(crate) fn num_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_blocking_threads(),
}
match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_idle_blocking_threads(),
}
match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
}
pub(crate) fn active_tasks_count(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.active_tasks_count(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.active_tasks_count(),
}
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.scheduler_metrics(),
}
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
match self {
Handle::CurrentThread(handle) => handle.worker_metrics(worker),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.worker_metrics(worker),
}
match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
}
pub(crate) fn injection_queue_depth(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.injection_queue_depth(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.injection_queue_depth(),
}
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
match self {
Handle::CurrentThread(handle) => handle.worker_metrics(worker).queue_depth(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker),
}
match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.blocking_queue_depth(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.blocking_queue_depth(),
}
match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
}
}
}
@ -214,11 +222,7 @@ cfg_rt! {
}
pub(crate) fn defer(&self, waker: &Waker) {
match self {
Context::CurrentThread(context) => context.defer(waker),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Context::MultiThread(context) => context.defer(waker),
}
match_flavor!(self, Context(context) => context.defer(waker))
}
cfg_rt_multi_thread! {
@ -229,6 +233,16 @@ cfg_rt! {
_ => panic!("expected `MultiThread::Context`")
}
}
cfg_unstable! {
#[track_caller]
pub(crate) fn expect_multi_thread_alt(&self) -> &multi_thread_alt::Context {
match self {
Context::MultiThreadAlt(context) => context,
_ => panic!("expected `MultiThreadAlt::Context`")
}
}
}
}
}
}

View File

@ -0,0 +1,166 @@
#[cfg(tokio_internal_mt_counters)]
mod imp {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0);
static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_NOTIFY_REMOTE: AtomicUsize = AtomicUsize::new(0);
static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_UNPARKS_REMOTE: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0);
static NUM_STEALS: AtomicUsize = AtomicUsize::new(0);
static NUM_OVERFLOW: AtomicUsize = AtomicUsize::new(0);
static NUM_PARK: AtomicUsize = AtomicUsize::new(0);
static NUM_POLLS: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_POLLS: AtomicUsize = AtomicUsize::new(0);
static NUM_REMOTE_BATCH: AtomicUsize = AtomicUsize::new(0);
static NUM_GLOBAL_QUEUE_INTERVAL: AtomicUsize = AtomicUsize::new(0);
static NUM_NO_AVAIL_CORE: AtomicUsize = AtomicUsize::new(0);
static NUM_RELAY_SEARCH: AtomicUsize = AtomicUsize::new(0);
static NUM_SPIN_STALL: AtomicUsize = AtomicUsize::new(0);
static NUM_NO_LOCAL_WORK: AtomicUsize = AtomicUsize::new(0);
impl Drop for super::Counters {
fn drop(&mut self) {
let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed);
let notifies_remote = NUM_NOTIFY_REMOTE.load(Relaxed);
let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed);
let unparks_remote = NUM_UNPARKS_REMOTE.load(Relaxed);
let maintenance = NUM_MAINTENANCE.load(Relaxed);
let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed);
let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed);
let num_steals = NUM_STEALS.load(Relaxed);
let num_overflow = NUM_OVERFLOW.load(Relaxed);
let num_park = NUM_PARK.load(Relaxed);
let num_polls = NUM_POLLS.load(Relaxed);
let num_lifo_polls = NUM_LIFO_POLLS.load(Relaxed);
let num_remote_batch = NUM_REMOTE_BATCH.load(Relaxed);
let num_global_queue_interval = NUM_GLOBAL_QUEUE_INTERVAL.load(Relaxed);
let num_no_avail_core = NUM_NO_AVAIL_CORE.load(Relaxed);
let num_relay_search = NUM_RELAY_SEARCH.load(Relaxed);
let num_spin_stall = NUM_SPIN_STALL.load(Relaxed);
let num_no_local_work = NUM_NO_LOCAL_WORK.load(Relaxed);
println!("---");
println!("notifies (remote): {}", notifies_remote);
println!(" notifies (local): {}", notifies_local);
println!(" unparks (local): {}", unparks_local);
println!(" unparks (remote): {}", unparks_remote);
println!(" notify, no core: {}", num_no_avail_core);
println!(" maintenance: {}", maintenance);
println!(" LIFO schedules: {}", lifo_scheds);
println!(" LIFO capped: {}", lifo_capped);
println!(" steals: {}", num_steals);
println!(" queue overflows: {}", num_overflow);
println!(" parks: {}", num_park);
println!(" polls: {}", num_polls);
println!(" polls (LIFO): {}", num_lifo_polls);
println!("remote task batch: {}", num_remote_batch);
println!("global Q interval: {}", num_global_queue_interval);
println!(" relay search: {}", num_relay_search);
println!(" spin stall: {}", num_spin_stall);
println!(" no local work: {}", num_no_local_work);
}
}
pub(crate) fn inc_num_inc_notify_local() {
NUM_NOTIFY_LOCAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_notify_remote() {
NUM_NOTIFY_REMOTE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_unparks_local() {
NUM_UNPARKS_LOCAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_unparks_remote() {
NUM_UNPARKS_REMOTE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_maintenance() {
NUM_MAINTENANCE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_lifo_schedules() {
NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed);
}
pub(crate) fn inc_lifo_capped() {
NUM_LIFO_CAPPED.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_steals() {
NUM_STEALS.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_overflows() {
NUM_OVERFLOW.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_parks() {
NUM_PARK.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_polls() {
NUM_POLLS.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_lifo_polls() {
NUM_LIFO_POLLS.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_remote_batch() {
NUM_REMOTE_BATCH.fetch_add(1, Relaxed);
}
pub(crate) fn inc_global_queue_interval() {
NUM_GLOBAL_QUEUE_INTERVAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_notify_no_core() {
NUM_NO_AVAIL_CORE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_relay_search() {
NUM_RELAY_SEARCH.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_spin_stall() {
NUM_SPIN_STALL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_no_local_work() {
NUM_NO_LOCAL_WORK.fetch_add(1, Relaxed);
}
}
#[cfg(not(tokio_internal_mt_counters))]
mod imp {
pub(crate) fn inc_num_inc_notify_local() {}
pub(crate) fn inc_num_notify_remote() {}
pub(crate) fn inc_num_unparks_local() {}
pub(crate) fn inc_num_unparks_remote() {}
pub(crate) fn inc_num_maintenance() {}
pub(crate) fn inc_lifo_schedules() {}
pub(crate) fn inc_lifo_capped() {}
pub(crate) fn inc_num_steals() {}
pub(crate) fn inc_num_overflows() {}
pub(crate) fn inc_num_parks() {}
pub(crate) fn inc_num_polls() {}
pub(crate) fn inc_num_lifo_polls() {}
pub(crate) fn inc_num_remote_batch() {}
pub(crate) fn inc_global_queue_interval() {}
pub(crate) fn inc_notify_no_core() {}
pub(crate) fn inc_num_relay_search() {}
pub(crate) fn inc_num_spin_stall() {}
pub(crate) fn inc_num_no_local_work() {}
}
#[derive(Debug)]
pub(crate) struct Counters;
pub(super) use imp::*;

View File

@ -0,0 +1,75 @@
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread_alt::worker;
use crate::runtime::{
blocking, driver,
task::{self, JoinHandle},
};
use crate::util::RngSeedGenerator;
use std::fmt;
cfg_metrics! {
mod metrics;
}
/// Handle to the multi thread scheduler
pub(crate) struct Handle {
/// Task spawner
pub(super) shared: worker::Shared,
/// Resource driver handles
pub(crate) driver: driver::Handle,
/// Blocking pool spawner
pub(crate) blocking_spawner: blocking::Spawner,
/// Current random number generator seed
pub(crate) seed_generator: RngSeedGenerator,
}
impl Handle {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id)
}
pub(crate) fn shutdown(&self) {
self.shared.close();
self.driver.unpark();
}
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
if let Some(notified) = notified {
me.shared.schedule_task(notified, false);
}
handle
}
}
cfg_unstable! {
use std::num::NonZeroU64;
impl Handle {
pub(crate) fn owned_id(&self) -> NonZeroU64 {
self.shared.owned.id
}
}
}
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
}
}

View File

@ -0,0 +1,41 @@
use super::Handle;
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.shared.worker_metrics.len()
}
pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}
pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}

View File

@ -0,0 +1,26 @@
use super::Handle;
use crate::runtime::Dump;
impl Handle {
pub(crate) async fn dump(&self) -> Dump {
let trace_status = &self.shared.trace_status;
// If a dump is in progress, block.
trace_status.start_trace_request(&self).await;
let result = loop {
if let Some(result) = trace_status.take_result() {
break result;
} else {
self.notify_all();
trace_status.result_ready.notified().await;
}
};
// Allow other queued dumps to proceed.
trace_status.end_trace_request(&self).await;
result
}
}

View File

@ -0,0 +1,434 @@
//! Coordinates idling workers
#![allow(dead_code)]
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::MutexGuard;
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Shared};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
pub(super) struct Idle {
/// Number of searching cores
num_searching: AtomicUsize,
/// Number of idle cores
num_idle: AtomicUsize,
/// Map of idle cores
// idle_map: IdleMap,
/// Used to catch false-negatives when waking workers
needs_searching: AtomicBool,
/// Total number of cores
num_cores: usize,
}
pub(super) struct IdleMap {
chunks: Vec<AtomicUsize>,
}
pub(super) struct Snapshot {
// chunks: Vec<usize>,
}
/// Data synchronized by the scheduler mutex
pub(super) struct Synced {
/// Worker IDs that are currently sleeping
sleepers: Vec<usize>,
/// Cores available for workers
available_cores: Vec<Box<Core>>,
}
impl Idle {
pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
let idle = Idle {
num_searching: AtomicUsize::new(0),
num_idle: AtomicUsize::new(cores.len()),
// idle_map: IdleMap::new(&cores),
needs_searching: AtomicBool::new(false),
num_cores: cores.len(),
};
let synced = Synced {
sleepers: Vec::with_capacity(num_workers),
available_cores: cores,
};
(idle, synced)
}
pub(super) fn num_idle(&self, synced: &Synced) -> usize {
debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
synced.available_cores.len()
}
pub(super) fn num_searching(&self) -> usize {
self.num_searching.load(Acquire)
}
pub(super) fn snapshot(&self, _snapshot: &mut Snapshot) {
// snapshot.update(&self.idle_map)
}
/// Try to acquire an available core
pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
let ret = synced.available_cores.pop();
if let Some(_core) = &ret {
// Decrement the number of idle cores
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
self.num_idle.store(num_idle, Release);
// self.idle_map.unset(core.index);
// debug_assert!(self.idle_map.matches(&synced.available_cores));
}
ret
}
/// We need at least one searching worker
pub(super) fn notify_local(&self, shared: &Shared) {
if self.num_searching.load(Acquire) != 0 {
// There already is a searching worker. Note, that this could be a
// false positive. However, because this method is called **from** a
// worker, we know that there is at least one worker currently
// awake, so the scheduler won't deadlock.
return;
}
if self.num_idle.load(Acquire) == 0 {
self.needs_searching.store(true, Release);
return;
}
// There aren't any searching workers. Try to initialize one
if self
.num_searching
.compare_exchange(0, 1, AcqRel, Acquire)
.is_err()
{
// Failing the compare_exchange means another thread concurrently
// launched a searching worker.
return;
}
super::counters::inc_num_unparks_local();
// Acquire the lock
let synced = shared.synced.lock();
self.notify_synced(synced, shared);
}
/// Notifies a single worker
pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
if synced.idle.sleepers.is_empty() {
self.needs_searching.store(true, Release);
return;
}
// We need to establish a stronger barrier than with `notify_local`
if self
.num_searching
.compare_exchange(0, 1, AcqRel, Acquire)
.is_err()
{
return;
}
self.notify_synced(synced, shared);
}
/// Notify a worker while synced
fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
// Find a sleeping worker
if let Some(worker) = synced.idle.sleepers.pop() {
// Find an available core
if let Some(mut core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);
core.is_searching = true;
// self.idle_map.unset(core.index);
// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);
let num_idle = synced.idle.available_cores.len();
debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1);
// Update the number of sleeping workers
self.num_idle.store(num_idle, Release);
// Drop the lock before notifying the condvar.
drop(synced);
super::counters::inc_num_unparks_remote();
// Notify the worker
shared.condvars[worker].notify_one();
return;
} else {
synced.idle.sleepers.push(worker);
}
}
super::counters::inc_notify_no_core();
// Set the `needs_searching` flag, this happens *while* the lock is held.
self.needs_searching.store(true, Release);
self.num_searching.fetch_sub(1, Release);
// Explicit mutex guard drop to show that holding the guard to this
// point is significant. `needs_searching` and `num_searching` must be
// updated in the critical section.
drop(synced);
}
pub(super) fn notify_mult(
&self,
synced: &mut worker::Synced,
workers: &mut Vec<usize>,
num: usize,
) {
debug_assert!(workers.is_empty());
for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() {
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);
// self.idle_map.unset(core.index);
synced.assigned_cores[worker] = Some(core);
workers.push(worker);
continue;
} else {
synced.idle.sleepers.push(worker);
}
}
break;
}
if !workers.is_empty() {
// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
debug_assert_eq!(
synced.idle.available_cores.len(),
self.num_idle.load(Acquire)
);
self.needs_searching.store(true, Release);
}
}
pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
// Wake every sleeping worker and assign a core to it. There may not be
// enough sleeping workers for all cores, but other workers will
// eventually find the cores and shut them down.
while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
let worker = synced.idle.sleepers.pop().unwrap();
let core = synced.idle.available_cores.pop().unwrap();
// self.idle_map.unset(core.index);
synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();
self.num_idle
.store(synced.idle.available_cores.len(), Release);
}
// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Wake up any other workers
while let Some(index) = synced.idle.sleepers.pop() {
shared.condvars[index].notify_one();
}
}
/// The worker releases the given core, making it available to other workers
/// that are waiting.
pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
// The core should not be searching at this point
debug_assert!(!core.is_searching);
// Check that this isn't the final worker to go idle *and*
// `needs_searching` is set.
debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced.idle) > 1);
let num_idle = synced.idle.available_cores.len();
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
// self.idle_map.set(core.index);
// Store the core in the list of available cores
synced.idle.available_cores.push(core);
// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Update `num_idle`
self.num_idle.store(num_idle + 1, Release);
}
pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) {
// Store the worker index in the list of sleepers
synced.idle.sleepers.push(index);
// The worker's assigned core slot should be empty
debug_assert!(synced.assigned_cores[index].is_none());
}
pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
debug_assert!(!core.is_searching);
let num_searching = self.num_searching.load(Acquire);
let num_idle = self.num_idle.load(Acquire);
if 2 * num_searching >= self.num_cores - num_idle {
return;
}
self.transition_worker_to_searching(core);
}
/// Needs to happen while synchronized in order to avoid races
pub(super) fn transition_worker_to_searching_if_needed(
&self,
_synced: &mut Synced,
core: &mut Core,
) -> bool {
if self.needs_searching.load(Acquire) {
// Needs to be called while holding the lock
self.transition_worker_to_searching(core);
true
} else {
false
}
}
fn transition_worker_to_searching(&self, core: &mut Core) {
core.is_searching = true;
self.num_searching.fetch_add(1, AcqRel);
self.needs_searching.store(false, Release);
}
/// A lightweight transition from searching -> running.
///
/// Returns `true` if this is the final searching worker. The caller
/// **must** notify a new worker.
pub(super) fn transition_worker_from_searching(&self, core: &mut Core) -> bool {
debug_assert!(core.is_searching);
core.is_searching = false;
let prev = self.num_searching.fetch_sub(1, AcqRel);
debug_assert!(prev > 0);
prev == 1
}
}
const BITS: usize = usize::BITS as usize;
const BIT_MASK: usize = (usize::BITS - 1) as usize;
impl IdleMap {
fn new(cores: &[Box<Core>]) -> IdleMap {
let ret = IdleMap::new_n(num_chunks(cores.len()));
ret.set_all(cores);
ret
}
fn new_n(n: usize) -> IdleMap {
let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect();
IdleMap { chunks }
}
fn set(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev | mask;
self.chunks[chunk].store(next, Release);
}
fn set_all(&self, cores: &[Box<Core>]) {
for core in cores {
self.set(core.index);
}
}
fn unset(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev & !mask;
self.chunks[chunk].store(next, Release);
}
fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
let expect = IdleMap::new_n(self.chunks.len());
expect.set_all(idle_cores);
for (i, chunk) in expect.chunks.iter().enumerate() {
if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
return false;
}
}
true
}
}
impl Snapshot {
pub(crate) fn new(_idle: &Idle) -> Snapshot {
/*
let chunks = vec![0; idle.idle_map.chunks.len()];
let mut ret = Snapshot { chunks };
ret.update(&idle.idle_map);
ret
*/
Snapshot {}
}
fn update(&mut self, _idle_map: &IdleMap) {
/*
for i in 0..self.chunks.len() {
self.chunks[i] = idle_map.chunks[i].load(Acquire);
}
*/
}
/*
pub(super) fn is_idle(&self, index: usize) -> bool {
let (chunk, mask) = index_to_mask(index);
debug_assert!(
chunk < self.chunks.len(),
"index={}; chunks={}",
index,
self.chunks.len()
);
self.chunks[chunk] & mask == mask
}
*/
}
fn num_chunks(max_cores: usize) -> usize {
(max_cores / BITS) + 1
}
fn index_to_mask(index: usize) -> (usize, usize) {
let mask = 1 << (index & BIT_MASK);
let chunk = index / BITS;
(chunk, mask)
}
fn num_active_workers(synced: &Synced) -> usize {
synced.available_cores.capacity() - synced.available_cores.len()
}

View File

@ -0,0 +1,91 @@
//! Multi-threaded runtime
mod counters;
use counters::Counters;
mod handle;
pub(crate) use handle::Handle;
mod overflow;
pub(crate) use overflow::Overflow;
mod idle;
use self::idle::Idle;
mod stats;
pub(crate) use stats::Stats;
pub(crate) mod queue;
mod worker;
use worker::Core;
pub(crate) use worker::{Context, Shared};
// TODO: implement task dump
mod trace_mock;
use trace_mock::TraceStatus;
pub(crate) use worker::block_in_place;
use crate::runtime::{
self, blocking,
driver::{self, Driver},
scheduler, Config,
};
use crate::util::RngSeedGenerator;
use std::fmt;
use std::future::Future;
/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread;
// ===== impl MultiThread =====
impl MultiThread {
pub(crate) fn new(
size: usize,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, runtime::Handle) {
let handle = worker::create(
size,
driver,
driver_handle,
blocking_spawner,
seed_generator,
config,
);
(MultiThread, handle)
}
/// Blocks the current thread waiting for the future to complete.
///
/// The future will execute on the current thread, but all spawned tasks
/// will be executed on the thread pool.
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
F: Future,
{
crate::runtime::context::enter_runtime(handle, true, |blocking| {
blocking.block_on(future).expect("failed to park thread")
})
}
pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
match handle {
scheduler::Handle::MultiThreadAlt(handle) => handle.shutdown(),
_ => panic!("expected MultiThread scheduler"),
}
}
}
impl fmt::Debug for MultiThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("MultiThread").finish()
}
}

View File

@ -0,0 +1,26 @@
use crate::runtime::task;
#[cfg(test)]
use std::cell::RefCell;
pub(crate) trait Overflow<T: 'static> {
fn push(&self, task: task::Notified<T>);
fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<T>>;
}
#[cfg(test)]
impl<T: 'static> Overflow<T> for RefCell<Vec<task::Notified<T>>> {
fn push(&self, task: task::Notified<T>) {
self.borrow_mut().push(task);
}
fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<T>>,
{
self.borrow_mut().extend(iter);
}
}

View File

@ -0,0 +1,232 @@
//! Parks the runtime.
//!
//! A combination of the various resource driver park handles.
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::runtime::driver::{self, Driver};
use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
pub(crate) struct Parker {
inner: Arc<Inner>,
}
pub(crate) struct Unparker {
inner: Arc<Inner>,
}
struct Inner {
/// Avoids entering the park if possible
state: AtomicUsize,
/// Used to coordinate access to the driver / condvar
mutex: Mutex<()>,
/// Condvar to block on if the driver is unavailable.
condvar: Condvar,
/// Resource (I/O, time, ...) driver
shared: Arc<Shared>,
}
const EMPTY: usize = 0;
const PARKED_CONDVAR: usize = 1;
const PARKED_DRIVER: usize = 2;
const NOTIFIED: usize = 3;
/// Shared across multiple Parker handles
struct Shared {
/// Shared driver. Only one thread at a time can use this
driver: TryLock<Driver>,
}
impl Parker {
pub(crate) fn new(driver: Driver) -> Parker {
Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: Arc::new(Shared {
driver: TryLock::new(driver),
}),
}),
}
}
pub(crate) fn unpark(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
}
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
// Only parking with zero is supported...
assert_eq!(duration, Duration::from_millis(0));
if let Some(mut driver) = self.inner.shared.driver.try_lock() {
driver.park_timeout(handle, duration)
}
}
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.inner.shutdown(handle);
}
}
impl Clone for Parker {
fn clone(&self) -> Parker {
Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: self.inner.shared.clone(),
}),
}
}
}
impl Unparker {
pub(crate) fn unpark(&self, driver: &driver::Handle) {
self.inner.unpark(driver);
}
}
impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self, handle: &driver::Handle) {
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver, handle);
} else {
self.park_condvar();
}
}
fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
match self
.state
.compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
}
// spurious wakeup, go back to sleep
}
}
fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
driver.park(handle);
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} // got a notification, hurray!
PARKED_DRIVER => {} // no notification, alas
n => panic!("inconsistent park_timeout state: {}", n),
}
}
fn unpark(&self, driver: &driver::Handle) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => {} // no one was waiting
NOTIFIED => {} // already unparked
PARKED_CONDVAR => self.unpark_condvar(),
PARKED_DRIVER => driver.unpark(),
actual => panic!("inconsistent state in unpark; actual = {}", actual),
}
}
fn unpark_condvar(&self) {
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());
self.condvar.notify_one()
}
fn shutdown(&self, handle: &driver::Handle) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown(handle);
}
self.condvar.notify_all();
}
}

View File

@ -0,0 +1,601 @@
//! Run-queue structures to support a work-stealing scheduler
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread_alt::{Overflow, Stats};
use crate::runtime::task;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
// Use wider integers when possible to increase ABA resilience.
//
// See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>.
cfg_has_atomic_u64! {
type UnsignedShort = u32;
type UnsignedLong = u64;
type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
}
cfg_not_has_atomic_u64! {
type UnsignedShort = u16;
type UnsignedLong = u32;
type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
}
/// Producer handle. May only be used from a single thread.
pub(crate) struct Local<T: 'static> {
inner: Arc<Inner<T>>,
}
/// Consumer handle. May be used from many threads.
pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
#[repr(align(128))]
pub(crate) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
/// Contains two `UnsignedShort` values. The LSB byte is the "real" head of
/// the queue. The `UnsignedShort` in the MSB is set by a stealer in process
/// of stealing values. It represents the first value being stolen in the
/// batch. The `UnsignedShort` indices are intentionally wider than strictly
/// required for buffer indexing in order to provide ABA mitigation and make
/// it possible to distinguish between full and empty buffers.
///
/// When both `UnsignedShort` values are the same, there is no active
/// stealer.
///
/// Tracking an in-progress stealer prevents a wrapping scenario.
head: AtomicUnsignedLong,
/// Only updated by producer thread but read by many threads.
tail: AtomicUnsignedShort,
/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}
unsafe impl<T> Send for Inner<T> {}
unsafe impl<T> Sync for Inner<T> {}
#[cfg(not(loom))]
const LOCAL_QUEUE_CAPACITY: usize = 256;
// Shrink the size of the local queue when using loom. This shouldn't impact
// logic, but allows loom to test more edge cases in a reasonable a mount of
// time.
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 4;
const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
// Constructing the fixed size array directly is very awkward. The only way to
// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as
// the contents are not Copy. The trick with defining a const doesn't work for
// generic types.
fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> {
assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY);
// safety: We check that the length is correct.
unsafe { Box::from_raw(Box::into_raw(buffer).cast()) }
}
/// Create a new local run-queue
pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
for _ in 0..LOCAL_QUEUE_CAPACITY {
buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let inner = Arc::new(Inner {
head: AtomicUnsignedLong::new(0),
tail: AtomicUnsignedShort::new(0),
buffer: make_fixed_size(buffer.into_boxed_slice()),
});
let local = Local {
inner: inner.clone(),
};
let remote = Steal(inner);
(remote, local)
}
impl<T> Local<T> {
/// How many tasks can be pushed into the queue
pub(crate) fn remaining_slots(&self) -> usize {
self.inner.remaining_slots()
}
pub(crate) fn max_capacity(&self) -> usize {
LOCAL_QUEUE_CAPACITY
}
/// Returns `true` if there are no entries in the queue
pub(crate) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Pushes a batch of tasks to the back of the queue. All tasks must fit in
/// the local queue.
///
/// # Panics
///
/// The method panics if there is not enough capacity to fit in the queue.
pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
let len = tasks.len();
assert!(len <= LOCAL_QUEUE_CAPACITY);
if len == 0 {
// Nothing to do
return;
}
let head = self.inner.head.load(Acquire);
let (steal, _) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let mut tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort {
// Yes, this if condition is structured a bit weird (first block
// does nothing, second returns an error). It is this way to match
// `push_back_or_overflow`.
} else {
panic!()
}
for task in tasks {
let idx = tail as usize & MASK;
self.inner.buffer[idx].with_mut(|ptr| {
// Write the task to the slot
//
// Safety: There is only one producer and the above `if`
// condition ensures we don't touch a cell if there is a
// value, thus no consumer.
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
tail = tail.wrapping_add(1);
}
self.inner.tail.store(tail, Release);
}
/// Pushes a task to the back of the local queue, if there is not enough
/// capacity in the queue, this triggers the overflow operation.
///
/// When the queue overflows, half of the curent contents of the queue is
/// moved to the given Injection queue. This frees up capacity for more
/// tasks to be pushed into the local queue.
pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
&mut self,
mut task: task::Notified<T>,
overflow: &O,
stats: &mut Stats,
) {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort {
// There is capacity for the task
break tail;
} else if steal != real {
super::counters::inc_num_overflows();
// Concurrently stealing, this will free up capacity, so only
// push the task onto the inject queue
overflow.push(task);
return;
} else {
super::counters::inc_num_overflows();
// Push the current task and half of the queue into the
// inject queue.
match self.push_overflow(task, real, tail, overflow, stats) {
Ok(_) => return,
// Lost the race, try again
Err(v) => {
task = v;
}
}
}
};
self.push_back_finish(task, tail);
}
// Second half of `push_back`
fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
// Map the position to a slot index.
let idx = tail as usize & MASK;
self.inner.buffer[idx].with_mut(|ptr| {
// Write the task to the slot
//
// Safety: There is only one producer and the above `if`
// condition ensures we don't touch a cell if there is a
// value, thus no consumer.
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
// Make the task available. Synchronizes with a load in
// `steal_into2`.
self.inner.tail.store(tail.wrapping_add(1), Release);
}
/// Moves a batch of tasks into the inject queue.
///
/// This will temporarily make some of the tasks unavailable to stealers.
/// Once `push_overflow` is done, a notification is sent out, so if other
/// workers "missed" some of the tasks during a steal, they will get
/// another opportunity.
#[inline(never)]
fn push_overflow<O: Overflow<T>>(
&mut self,
task: task::Notified<T>,
head: UnsignedShort,
tail: UnsignedShort,
overflow: &O,
stats: &mut Stats,
) -> Result<(), task::Notified<T>> {
/// How many elements are we taking from the local queue.
///
/// This is one less than the number of tasks pushed to the inject
/// queue as we are also inserting the `task` argument.
const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort;
assert_eq!(
tail.wrapping_sub(head) as usize,
LOCAL_QUEUE_CAPACITY,
"queue is not full; tail = {}; head = {}",
tail,
head
);
let prev = pack(head, head);
// Claim a bunch of tasks
//
// We are claiming the tasks **before** reading them out of the buffer.
// This is safe because only the **current** thread is able to push new
// tasks.
//
// There isn't really any need for memory ordering... Relaxed would
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
if self
.inner
.head
.compare_exchange(
prev,
pack(
head.wrapping_add(NUM_TASKS_TAKEN),
head.wrapping_add(NUM_TASKS_TAKEN),
),
Release,
Relaxed,
)
.is_err()
{
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
return Err(task);
}
/// An iterator that takes elements out of the run queue.
struct BatchTaskIter<'a, T: 'static> {
buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY],
head: UnsignedLong,
i: UnsignedLong,
}
impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
type Item = task::Notified<T>;
#[inline]
fn next(&mut self) -> Option<task::Notified<T>> {
if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) {
None
} else {
let i_idx = self.i.wrapping_add(self.head) as usize & MASK;
let slot = &self.buffer[i_idx];
// safety: Our CAS from before has assumed exclusive ownership
// of the task pointers in this range.
let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
self.i += 1;
Some(task)
}
}
}
// safety: The CAS above ensures that no consumer will look at these
// values again, and we are the only producer.
let batch_iter = BatchTaskIter {
buffer: &self.inner.buffer,
head: head as UnsignedLong,
i: 0,
};
overflow.push_batch(batch_iter.chain(std::iter::once(task)));
// Add 1 to factor in the task currently being scheduled.
stats.incr_overflow_count();
Ok(())
}
/// Pops a task from the local queue.
pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
let mut head = self.inner.head.load(Acquire);
let idx = loop {
let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
if real == tail {
// queue is empty
return None;
}
let next_real = real.wrapping_add(1);
// If `steal == real` there are no concurrent stealers. Both `steal`
// and `real` are updated.
let next = if steal == real {
pack(next_real, next_real)
} else {
assert_ne!(steal, next_real);
pack(steal, next_real)
};
// Attempt to claim a task.
let res = self
.inner
.head
.compare_exchange(head, next, AcqRel, Acquire);
match res {
Ok(_) => break real as usize & MASK,
Err(actual) => head = actual,
}
};
Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
}
}
impl<T> Steal<T> {
/// Steals half the tasks from self and place them into `dst`.
pub(crate) fn steal_into(
&self,
dst: &mut Local<T>,
dst_stats: &mut Stats,
) -> Option<task::Notified<T>> {
// Safety: the caller is the only thread that mutates `dst.tail` and
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
// To the caller, `dst` may **look** empty but still have values
// contained in the buffer. If another thread is concurrently stealing
// from `dst` there may not be enough capacity to steal.
let (steal, _) = unpack(dst.inner.head.load(Acquire));
if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 {
// we *could* try to steal less here, but for simplicity, we're just
// going to abort.
return None;
}
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
if n == 0 {
// No tasks were stolen
return None;
}
super::counters::inc_num_steals();
dst_stats.incr_steal_count(n as u16);
dst_stats.incr_steal_operations();
// We are returning a task here
n -= 1;
let ret_pos = dst_tail.wrapping_add(n);
let ret_idx = ret_pos as usize & MASK;
// safety: the value was written as part of `steal_into2` and not
// exposed to stealers, so no other thread can access it.
let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
if n == 0 {
// The `dst` queue is empty, but a single task was stolen
return Some(ret);
}
// Make the stolen items available to consumers
dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
Some(ret)
}
// Steal tasks from `self`, placing them into `dst`. Returns the number of
// tasks that were stolen.
fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
let mut prev_packed = self.0.head.load(Acquire);
let mut next_packed;
let n = loop {
let (src_head_steal, src_head_real) = unpack(prev_packed);
let src_tail = self.0.tail.load(Acquire);
// If these two do not match, another thread is concurrently
// stealing from the queue.
if src_head_steal != src_head_real {
return 0;
}
// Number of available tasks to steal
let n = src_tail.wrapping_sub(src_head_real);
let n = n - n / 2;
if n == 0 {
// No tasks available to steal
return 0;
}
// Update the real head index to acquire the tasks.
let steal_to = src_head_real.wrapping_add(n);
assert_ne!(src_head_steal, steal_to);
next_packed = pack(src_head_steal, steal_to);
// Claim all those tasks. This is done by incrementing the "real"
// head but not the steal. By doing this, no other thread is able to
// steal from this queue until the current thread completes.
let res = self
.0
.head
.compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
match res {
Ok(_) => break n,
Err(actual) => prev_packed = actual,
}
};
assert!(
n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2,
"actual = {}",
n
);
let (first, _) = unpack(next_packed);
// Take all the tasks
for i in 0..n {
// Compute the positions
let src_pos = first.wrapping_add(i);
let dst_pos = dst_tail.wrapping_add(i);
// Map to slots
let src_idx = src_pos as usize & MASK;
let dst_idx = dst_pos as usize & MASK;
// Read the task
//
// safety: We acquired the task with the atomic exchange above.
let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
// Write the task to the new slot
//
// safety: `dst` queue is empty and we are the only producer to
// this queue.
dst.inner.buffer[dst_idx]
.with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
}
let mut prev_packed = next_packed;
// Update `src_head_steal` to match `src_head_real` signalling that the
// stealing routine is complete.
loop {
let head = unpack(prev_packed).1;
next_packed = pack(head, head);
let res = self
.0
.head
.compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
match res {
Ok(_) => return n,
Err(actual) => {
let (actual_steal, actual_real) = unpack(actual);
assert_ne!(actual_steal, actual_real);
prev_packed = actual;
}
}
}
}
}
cfg_metrics! {
impl<T> Steal<T> {
pub(crate) fn len(&self) -> usize {
self.0.len() as _
}
}
}
impl<T> Clone for Steal<T> {
fn clone(&self) -> Steal<T> {
Steal(self.0.clone())
}
}
impl<T> Drop for Local<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
}
}
}
impl<T> Inner<T> {
fn remaining_slots(&self) -> usize {
let (steal, _) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize)
}
fn len(&self) -> UnsignedShort {
let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
tail.wrapping_sub(head)
}
fn is_empty(&self) -> bool {
self.len() == 0
}
}
/// Split the head value into the real head and the index a stealer is working
/// on.
fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
let real = n & UnsignedShort::MAX as UnsignedLong;
let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
(steal as UnsignedShort, real as UnsignedShort)
}
/// Join the two head values
fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
(real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
}
#[test]
fn test_local_queue_capacity() {
assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize);
}

View File

@ -0,0 +1,171 @@
use crate::runtime::{Config, MetricsBatch, WorkerMetrics};
use std::cmp;
use std::time::{Duration, Instant};
/// Per-worker statistics. This is used for both tuning the scheduler and
/// reporting runtime-level metrics/stats.
pub(crate) struct Stats {
/// The metrics batch used to report runtime-level metrics/stats to the
/// user.
batch: MetricsBatch,
/// Exponentially-weighted moving average of time spent polling scheduled a
/// task.
///
/// Tracked in nanoseconds, stored as a f64 since that is what we use with
/// the EWMA calculations
task_poll_time_ewma: f64,
}
/// Transient state
pub(crate) struct Ephemeral {
/// Instant at which work last resumed (continued after park).
///
/// This duplicates the value stored in `MetricsBatch`. We will unify
/// `Stats` and `MetricsBatch` when we stabilize metrics.
processing_scheduled_tasks_started_at: Instant,
/// Number of tasks polled in the batch of scheduled tasks
tasks_polled_in_batch: usize,
/// Used to ensure calls to start / stop batch are paired
#[cfg(debug_assertions)]
batch_started: bool,
}
impl Ephemeral {
pub(crate) fn new() -> Ephemeral {
Ephemeral {
processing_scheduled_tasks_started_at: Instant::now(),
tasks_polled_in_batch: 0,
#[cfg(debug_assertions)]
batch_started: false,
}
}
}
/// How to weigh each individual poll time, value is plucked from thin air.
const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1;
/// Ideally, we wouldn't go above this, value is plucked from thin air.
const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64;
/// Max value for the global queue interval. This is 2x the previous default
const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127;
/// This is the previous default
const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61;
impl Stats {
pub(crate) const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 =
TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL;
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats {
// Seed the value with what we hope to see.
let task_poll_time_ewma =
TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64;
Stats {
batch: MetricsBatch::new(worker_metrics),
task_poll_time_ewma,
}
}
pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 {
// If an interval is explicitly set, don't tune.
if let Some(configured) = config.global_queue_interval {
return configured;
}
// As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here.
let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;
cmp::max(
// We don't want to return less than 2 as that would result in the
// global queue always getting checked first.
2,
cmp::min(
MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL,
tasks_per_interval,
),
)
}
pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to);
}
pub(crate) fn about_to_park(&mut self) {
self.batch.about_to_park();
}
pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
self.batch.start_processing_scheduled_tasks();
#[cfg(debug_assertions)]
{
debug_assert!(!ephemeral.batch_started);
ephemeral.batch_started = true;
}
ephemeral.processing_scheduled_tasks_started_at = Instant::now();
ephemeral.tasks_polled_in_batch = 0;
}
pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
self.batch.end_processing_scheduled_tasks();
#[cfg(debug_assertions)]
{
debug_assert!(ephemeral.batch_started);
ephemeral.batch_started = false;
}
// Update the EWMA task poll time
if ephemeral.tasks_polled_in_batch > 0 {
let now = Instant::now();
// If we "overflow" this conversion, we have bigger problems than
// slightly off stats.
let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64;
let num_polls = ephemeral.tasks_polled_in_batch as f64;
// Calculate the mean poll duration for a single task in the batch
let mean_poll_duration = elapsed / num_polls;
// Compute the alpha weighted by the number of tasks polled this batch.
let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls);
// Now compute the new weighted average task poll time.
self.task_poll_time_ewma = weighted_alpha * mean_poll_duration
+ (1.0 - weighted_alpha) * self.task_poll_time_ewma;
}
}
pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) {
self.batch.start_poll();
ephemeral.tasks_polled_in_batch += 1;
}
pub(crate) fn end_poll(&mut self) {
self.batch.end_poll();
}
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.batch.incr_steal_count(by);
}
pub(crate) fn incr_steal_operations(&mut self) {
self.batch.incr_steal_operations();
}
pub(crate) fn incr_overflow_count(&mut self) {
self.batch.incr_overflow_count();
}
}

View File

@ -0,0 +1,61 @@
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Barrier, Mutex};
use crate::runtime::dump::Dump;
use crate::runtime::scheduler::multi_thread_alt::Handle;
use crate::sync::notify::Notify;
/// Tracing status of the worker.
pub(super) struct TraceStatus {
pub(super) trace_requested: AtomicBool,
pub(super) trace_start: Barrier,
pub(super) trace_end: Barrier,
pub(super) result_ready: Notify,
pub(super) trace_result: Mutex<Option<Dump>>,
}
impl TraceStatus {
pub(super) fn new(remotes_len: usize) -> Self {
Self {
trace_requested: AtomicBool::new(false),
trace_start: Barrier::new(remotes_len),
trace_end: Barrier::new(remotes_len),
result_ready: Notify::new(),
trace_result: Mutex::new(None),
}
}
pub(super) fn trace_requested(&self) -> bool {
self.trace_requested.load(Ordering::Relaxed)
}
pub(super) async fn start_trace_request(&self, handle: &Handle) {
while self
.trace_requested
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
handle.notify_all();
crate::task::yield_now().await;
}
}
pub(super) fn stash_result(&self, dump: Dump) {
let _ = self.trace_result.lock().insert(dump);
self.result_ready.notify_one();
}
pub(super) fn take_result(&self) -> Option<Dump> {
self.trace_result.lock().take()
}
pub(super) async fn end_trace_request(&self, handle: &Handle) {
while self
.trace_requested
.compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
handle.notify_all();
crate::task::yield_now().await;
}
}
}

View File

@ -0,0 +1,11 @@
pub(super) struct TraceStatus {}
impl TraceStatus {
pub(super) fn new(_: usize) -> Self {
Self {}
}
pub(super) fn trace_requested(&self) -> bool {
false
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,11 @@
use super::Shared;
impl Shared {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.inject.len()
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.remotes[worker].steal.len()
}
}

View File

@ -0,0 +1,79 @@
use super::{Core, Handle, Shared};
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread_alt::Stats;
use crate::runtime::task::trace::trace_multi_thread;
use crate::runtime::{dump, WorkerMetrics};
use std::time::Duration;
impl Handle {
pub(super) fn trace_core(&self, mut core: Box<Core>) -> Box<Core> {
core.is_traced = false;
if core.is_shutdown {
return core;
}
// wait for other workers, or timeout without tracing
let timeout = Duration::from_millis(250); // a _very_ generous timeout
let barrier =
if let Some(barrier) = self.shared.trace_status.trace_start.wait_timeout(timeout) {
barrier
} else {
// don't attempt to trace
return core;
};
if !barrier.is_leader() {
// wait for leader to finish tracing
self.shared.trace_status.trace_end.wait();
return core;
}
// trace
let owned = &self.shared.owned;
let mut local = self.shared.steal_all();
let synced = &self.shared.synced;
let injection = &self.shared.inject;
// safety: `trace_multi_thread` is invoked with the same `synced` that `injection`
// was created with.
let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) }
.into_iter()
.map(dump::Task::new)
.collect();
let result = dump::Dump::new(traces);
// stash the result
self.shared.trace_status.stash_result(result);
// allow other workers to proceed
self.shared.trace_status.trace_end.wait();
core
}
}
impl Shared {
/// Steal all tasks from remotes into a single local queue.
pub(super) fn steal_all(&self) -> super::queue::Local<Arc<Handle>> {
let (_steal, mut local) = super::queue::local();
let worker_metrics = WorkerMetrics::new();
let mut stats = Stats::new(&worker_metrics);
for remote in self.remotes.iter() {
let steal = &remote.steal;
while !steal.is_empty() {
if let Some(task) = steal.steal_into(&mut local, &mut stats) {
local.push_back([task].into_iter());
}
}
}
local
}
}

View File

@ -0,0 +1,7 @@
use super::{Core, Handle};
impl Handle {
pub(super) fn trace_core(&self, core: Box<Core>) -> Box<Core> {
core
}
}

View File

@ -128,7 +128,7 @@ impl<S: 'static> OwnedTasks<S> {
/// a LocalNotified, giving the thread permission to poll this task.
#[inline]
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
assert_eq!(task.header().get_owner_id(), Some(self.id));
debug_assert_eq!(task.header().get_owner_id(), Some(self.id));
// safety: All tasks bound to this OwnedTasks are Send, so it is safe
// to poll it on this thread no matter what thread we are on.

View File

@ -186,6 +186,8 @@ pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Context::MultiThreadAlt(_) => unimplemented!(),
}
}
});

View File

@ -1,3 +1,5 @@
mod yield_now;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::loom::thread;

View File

@ -1,3 +1,7 @@
mod queue;
mod shutdown;
mod yield_now;
/// Full runtime loom tests. These are heavy tests and take significant time to
/// run on CI.
///
@ -412,8 +416,8 @@ async fn multi_gated() {
}
poll_fn(move |cx| {
gate.waker.register_by_ref(cx.waker());
if gate.count.load(SeqCst) < 2 {
gate.waker.register_by_ref(cx.waker());
Poll::Pending
} else {
Poll::Ready(())

View File

@ -1,5 +1,5 @@
use crate::runtime::scheduler::multi_thread::{queue, Stats};
use crate::runtime::tests::NoopSchedule;
use crate::runtime::tests::{unowned, NoopSchedule};
use loom::thread;
use std::cell::RefCell;
@ -37,7 +37,7 @@ fn basic() {
for _ in 0..2 {
for _ in 0..2 {
let (task, _) = super::unowned(async {});
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
@ -46,7 +46,7 @@ fn basic() {
}
// Push another task
let (task, _) = super::unowned(async {});
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
while local.pop().is_some() {
@ -88,7 +88,7 @@ fn steal_overflow() {
let mut n = 0;
// push a task, pop a task
let (task, _) = super::unowned(async {});
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
if local.pop().is_some() {
@ -96,7 +96,7 @@ fn steal_overflow() {
}
for _ in 0..6 {
let (task, _) = super::unowned(async {});
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
@ -140,7 +140,7 @@ fn multi_stealer() {
// Push work
for _ in 0..NUM_TASKS {
let (task, _) = super::unowned(async {});
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
@ -176,10 +176,10 @@ fn chained_steal() {
// Load up some tasks
for _ in 0..4 {
let (task, _) = super::unowned(async {});
let (task, _) = unowned(async {});
l1.push_back_or_overflow(task, &inject, &mut stats);
let (task, _) = super::unowned(async {});
let (task, _) = unowned(async {});
l2.push_back_or_overflow(task, &inject, &mut stats);
}

View File

@ -0,0 +1,37 @@
use crate::runtime::park;
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::{self, Runtime};
#[test]
fn yield_calls_park_before_scheduling_again() {
// Don't need to check all permutations
let mut loom = loom::model::Builder::default();
loom.max_permutations = Some(1);
loom.check(|| {
let rt = mk_runtime(2);
let (tx, rx) = oneshot::channel::<()>();
rt.spawn(async {
let tid = loom::thread::current().id();
let park_count = park::current_thread_park_count();
crate::task::yield_now().await;
if tid == loom::thread::current().id() {
let new_park_count = park::current_thread_park_count();
assert_eq!(park_count + 1, new_park_count);
}
tx.send(());
});
rx.recv();
});
}
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.build()
.unwrap()
}

View File

@ -0,0 +1,463 @@
mod queue;
mod shutdown;
mod yield_now;
/// Full runtime loom tests. These are heavy tests and take significant time to
/// run on CI.
///
/// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
///
/// In order to speed up the C
use crate::future::poll_fn;
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::{self, Runtime};
use crate::{spawn, task};
use tokio_test::assert_ok;
use loom::sync::atomic::{AtomicBool, AtomicUsize};
use loom::sync::Arc;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::task::{Context, Poll};
mod atomic_take {
use loom::sync::atomic::AtomicBool;
use std::mem::MaybeUninit;
use std::sync::atomic::Ordering::SeqCst;
pub(super) struct AtomicTake<T> {
inner: MaybeUninit<T>,
taken: AtomicBool,
}
impl<T> AtomicTake<T> {
pub(super) fn new(value: T) -> Self {
Self {
inner: MaybeUninit::new(value),
taken: AtomicBool::new(false),
}
}
pub(super) fn take(&self) -> Option<T> {
// safety: Only one thread will see the boolean change from false
// to true, so that thread is able to take the value.
match self.taken.fetch_or(true, SeqCst) {
false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) },
true => None,
}
}
}
impl<T> Drop for AtomicTake<T> {
fn drop(&mut self) {
drop(self.take());
}
}
}
#[derive(Clone)]
struct AtomicOneshot<T> {
value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>,
}
impl<T> AtomicOneshot<T> {
fn new(sender: oneshot::Sender<T>) -> Self {
Self {
value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)),
}
}
fn assert_send(&self, value: T) {
self.value.take().unwrap().send(value);
}
}
/// Tests are divided into groups to make the runs faster on CI.
mod group_a {
use super::*;
#[test]
fn racy_shutdown() {
loom::model(|| {
let pool = mk_pool(1);
// here's the case we want to exercise:
//
// a worker that still has tasks in its local queue gets sent to the blocking pool (due to
// block_in_place). the blocking pool is shut down, so drops the worker. the worker's
// shutdown method never gets run.
//
// we do this by spawning two tasks on one worker, the first of which does block_in_place,
// and then immediately drop the pool.
pool.spawn(track(async {
crate::task::block_in_place(|| {});
}));
pool.spawn(track(async {}));
drop(pool);
});
}
#[test]
fn pool_multi_spawn() {
loom::model(|| {
let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
let (tx, rx) = oneshot::channel();
let tx1 = AtomicOneshot::new(tx);
// Spawn a task
let c2 = c1.clone();
let tx2 = tx1.clone();
pool.spawn(track(async move {
spawn(track(async move {
if 1 == c1.fetch_add(1, Relaxed) {
tx1.assert_send(());
}
}));
}));
// Spawn a second task
pool.spawn(track(async move {
spawn(track(async move {
if 1 == c2.fetch_add(1, Relaxed) {
tx2.assert_send(());
}
}));
}));
rx.recv();
});
}
fn only_blocking_inner(first_pending: bool) {
loom::model(move || {
let pool = mk_pool(1);
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(track(async move {
crate::task::block_in_place(move || {
block_tx.send(());
});
if first_pending {
task::yield_now().await
}
}));
block_rx.recv();
drop(pool);
});
}
#[test]
fn only_blocking_without_pending() {
only_blocking_inner(false)
}
#[test]
fn only_blocking_with_pending() {
only_blocking_inner(true)
}
}
mod group_b {
use super::*;
fn blocking_and_regular_inner(first_pending: bool) {
const NUM: usize = 3;
loom::model(move || {
let pool = mk_pool(1);
let cnt = Arc::new(AtomicUsize::new(0));
let (block_tx, block_rx) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let done_tx = AtomicOneshot::new(done_tx);
pool.spawn(track(async move {
crate::task::block_in_place(move || {
block_tx.send(());
});
if first_pending {
task::yield_now().await
}
}));
for _ in 0..NUM {
let cnt = cnt.clone();
let done_tx = done_tx.clone();
pool.spawn(track(async move {
if NUM == cnt.fetch_add(1, Relaxed) + 1 {
done_tx.assert_send(());
}
}));
}
done_rx.recv();
block_rx.recv();
drop(pool);
});
}
#[test]
#[ignore] // TODO: uncomment
fn blocking_and_regular_without_pending() {
blocking_and_regular_inner(false);
}
#[test]
fn blocking_and_regular_with_pending() {
blocking_and_regular_inner(true);
}
#[test]
fn join_output() {
loom::model(|| {
let rt = mk_pool(1);
rt.block_on(async {
let t = crate::spawn(track(async { "hello" }));
let out = assert_ok!(t.await);
assert_eq!("hello", out.into_inner());
});
});
}
#[test]
fn poll_drop_handle_then_drop() {
loom::model(|| {
let rt = mk_pool(1);
rt.block_on(async move {
let mut t = crate::spawn(track(async { "hello" }));
poll_fn(|cx| {
let _ = Pin::new(&mut t).poll(cx);
Poll::Ready(())
})
.await;
});
})
}
#[test]
fn complete_block_on_under_load() {
loom::model(|| {
let pool = mk_pool(1);
pool.block_on(async {
// Trigger a re-schedule
crate::spawn(track(async {
for _ in 0..2 {
task::yield_now().await;
}
}));
gated2(true).await
});
});
}
#[test]
fn shutdown_with_notification() {
use crate::sync::oneshot;
loom::model(|| {
let rt = mk_pool(2);
let (done_tx, done_rx) = oneshot::channel::<()>();
rt.spawn(track(async move {
let (tx, rx) = oneshot::channel::<()>();
crate::spawn(async move {
crate::task::spawn_blocking(move || {
let _ = tx.send(());
});
let _ = done_rx.await;
});
let _ = rx.await;
let _ = done_tx.send(());
}));
});
}
}
mod group_c {
use super::*;
#[test]
fn pool_shutdown() {
loom::model(|| {
let pool = mk_pool(2);
pool.spawn(track(async move {
gated2(true).await;
}));
pool.spawn(track(async move {
gated2(false).await;
}));
drop(pool);
});
}
}
mod group_d {
use super::*;
#[test]
fn pool_multi_notify() {
loom::model(|| {
let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
let (done_tx, done_rx) = oneshot::channel();
let done_tx1 = AtomicOneshot::new(done_tx);
let done_tx2 = done_tx1.clone();
// Spawn a task
let c2 = c1.clone();
pool.spawn(track(async move {
multi_gated().await;
if 1 == c1.fetch_add(1, Relaxed) {
done_tx1.assert_send(());
}
}));
// Spawn a second task
pool.spawn(track(async move {
multi_gated().await;
if 1 == c2.fetch_add(1, Relaxed) {
done_tx2.assert_send(());
}
}));
done_rx.recv();
});
}
}
fn mk_pool(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread_alt()
.worker_threads(num_threads)
// Set the intervals to avoid tuning logic
.global_queue_interval(61)
.build()
.unwrap()
}
fn gated2(thread: bool) -> impl Future<Output = &'static str> {
use loom::thread;
use std::sync::Arc;
let gate = Arc::new(AtomicBool::new(false));
let mut fired = false;
poll_fn(move |cx| {
if !fired {
let gate = gate.clone();
let waker = cx.waker().clone();
if thread {
thread::spawn(move || {
gate.store(true, SeqCst);
waker.wake_by_ref();
});
} else {
spawn(track(async move {
gate.store(true, SeqCst);
waker.wake_by_ref();
}));
}
fired = true;
return Poll::Pending;
}
if gate.load(SeqCst) {
Poll::Ready("hello world")
} else {
Poll::Pending
}
})
}
async fn multi_gated() {
struct Gate {
waker: loom::future::AtomicWaker,
count: AtomicUsize,
}
let gate = Arc::new(Gate {
waker: loom::future::AtomicWaker::new(),
count: AtomicUsize::new(0),
});
{
let gate = gate.clone();
spawn(track(async move {
for i in 1..3 {
gate.count.store(i, SeqCst);
gate.waker.wake();
}
}));
}
poll_fn(move |cx| {
gate.waker.register_by_ref(cx.waker());
if gate.count.load(SeqCst) < 2 {
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
}
fn track<T: Future>(f: T) -> Track<T> {
Track {
inner: f,
arc: Arc::new(()),
}
}
pin_project! {
struct Track<T> {
#[pin]
inner: T,
// Arc is used to hook into loom's leak tracking.
arc: Arc<()>,
}
}
impl<T> Track<T> {
fn into_inner(self) -> T {
self.inner
}
}
impl<T: Future> Future for Track<T> {
type Output = Track<T::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
Poll::Ready(Track {
inner: ready!(me.inner.poll(cx)),
arc: me.arc.clone(),
})
}
}

View File

@ -0,0 +1,205 @@
use crate::runtime::scheduler::multi_thread::{queue, Stats};
use crate::runtime::tests::{unowned, NoopSchedule};
use loom::thread;
use std::cell::RefCell;
fn new_stats() -> Stats {
Stats::new(&crate::runtime::WorkerMetrics::new())
}
#[test]
fn basic() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = RefCell::new(vec![]);
let mut stats = new_stats();
let th = thread::spawn(move || {
let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..3 {
if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
while local.pop().is_some() {
n += 1;
}
}
n
});
let mut n = 0;
for _ in 0..2 {
for _ in 0..2 {
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
if local.pop().is_some() {
n += 1;
}
// Push another task
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
while local.pop().is_some() {
n += 1;
}
}
n += inject.borrow_mut().drain(..).count();
n += th.join().unwrap();
assert_eq!(6, n);
});
}
#[test]
fn steal_overflow() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = RefCell::new(vec![]);
let mut stats = new_stats();
let th = thread::spawn(move || {
let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
while local.pop().is_some() {
n += 1;
}
n
});
let mut n = 0;
// push a task, pop a task
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
if local.pop().is_some() {
n += 1;
}
for _ in 0..6 {
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
n += th.join().unwrap();
while local.pop().is_some() {
n += 1;
}
n += inject.borrow_mut().drain(..).count();
assert_eq!(7, n);
});
}
#[test]
fn multi_stealer() {
const NUM_TASKS: usize = 5;
fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize {
let mut stats = new_stats();
let (_, mut local) = queue::local();
if steal.steal_into(&mut local, &mut stats).is_none() {
return 0;
}
let mut n = 1;
while local.pop().is_some() {
n += 1;
}
n
}
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = RefCell::new(vec![]);
let mut stats = new_stats();
// Push work
for _ in 0..NUM_TASKS {
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
let th1 = {
let steal = steal.clone();
thread::spawn(move || steal_tasks(steal))
};
let th2 = thread::spawn(move || steal_tasks(steal));
let mut n = 0;
while local.pop().is_some() {
n += 1;
}
n += inject.borrow_mut().drain(..).count();
n += th1.join().unwrap();
n += th2.join().unwrap();
assert_eq!(n, NUM_TASKS);
});
}
#[test]
fn chained_steal() {
loom::model(|| {
let mut stats = new_stats();
let (s1, mut l1) = queue::local();
let (s2, mut l2) = queue::local();
let inject = RefCell::new(vec![]);
// Load up some tasks
for _ in 0..4 {
let (task, _) = unowned(async {});
l1.push_back_or_overflow(task, &inject, &mut stats);
let (task, _) = unowned(async {});
l2.push_back_or_overflow(task, &inject, &mut stats);
}
// Spawn a task to steal from **our** queue
let th = thread::spawn(move || {
let mut stats = new_stats();
let (_, mut local) = queue::local();
s1.steal_into(&mut local, &mut stats);
while local.pop().is_some() {}
});
// Drain our tasks, then attempt to steal
while l1.pop().is_some() {}
s2.steal_into(&mut l1, &mut stats);
th.join().unwrap();
while l1.pop().is_some() {}
while l2.pop().is_some() {}
});
}

View File

@ -0,0 +1,28 @@
use crate::runtime::{Builder, Handle};
#[test]
fn join_handle_cancel_on_shutdown() {
let mut builder = loom::model::Builder::new();
builder.preemption_bound = Some(2);
builder.check(|| {
use futures::future::FutureExt;
let rt = Builder::new_multi_thread()
.worker_threads(2)
.build()
.unwrap();
let handle = rt.block_on(async move { Handle::current() });
let jh1 = handle.spawn(futures::future::pending::<()>());
drop(rt);
let jh2 = handle.spawn(futures::future::pending::<()>());
let err1 = jh1.now_or_never().unwrap().unwrap_err();
let err2 = jh2.now_or_never().unwrap().unwrap_err();
assert!(err1.is_cancelled());
assert!(err2.is_cancelled());
});
}

View File

@ -0,0 +1,37 @@
use crate::runtime::park;
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::{self, Runtime};
#[test]
fn yield_calls_park_before_scheduling_again() {
// Don't need to check all permutations
let mut loom = loom::model::Builder::default();
loom.max_permutations = Some(1);
loom.check(|| {
let rt = mk_runtime(2);
let (tx, rx) = oneshot::channel::<()>();
rt.spawn(async {
let tid = loom::thread::current().id();
let park_count = park::current_thread_park_count();
crate::task::yield_now().await;
if tid == loom::thread::current().id() {
let new_park_count = park::current_thread_park_count();
assert_eq!(park_count + 1, new_park_count);
}
tx.send(());
});
rx.recv();
});
}
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.build()
.unwrap()
}

View File

@ -52,14 +52,12 @@ mod unowned_wrapper {
cfg_loom! {
mod loom_blocking;
mod loom_current_thread_scheduler;
mod loom_local;
mod loom_oneshot;
mod loom_pool;
mod loom_queue;
mod loom_shutdown_join;
mod loom_current_thread;
mod loom_join_set;
mod loom_yield;
mod loom_local;
mod loom_multi_thread;
mod loom_multi_thread_alt;
mod loom_oneshot;
// Make sure debug assertions are enabled
#[cfg(not(debug_assertions))]

View File

@ -1,11 +1,10 @@
use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;
use std::collections::VecDeque;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
struct AssertDropHandle {
is_dropped: Arc<AtomicBool>,
@ -243,7 +242,7 @@ fn with(f: impl FnOnce(Runtime)) {
let rt = Runtime(Arc::new(Inner {
owned: OwnedTasks::new(),
core: TryLock::new(Core {
core: Mutex::new(Core {
queue: VecDeque::new(),
}),
}));
@ -256,7 +255,7 @@ fn with(f: impl FnOnce(Runtime)) {
struct Runtime(Arc<Inner>);
struct Inner {
core: TryLock<Core>,
core: Mutex<Core>,
owned: OwnedTasks<Runtime>,
}
@ -264,7 +263,7 @@ struct Core {
queue: VecDeque<task::Notified<Runtime>>,
}
static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
static CURRENT: Mutex<Option<Runtime>> = Mutex::new(None);
impl Runtime {
fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>

View File

@ -75,7 +75,7 @@ cfg_rt_multi_thread! {
where
F: FnOnce() -> R,
{
crate::runtime::scheduler::multi_thread::block_in_place(f)
crate::runtime::scheduler::block_in_place(f)
}
}

View File

@ -52,6 +52,40 @@ macro_rules! rt_test {
.into()
}
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[cfg(tokio_unstable)]
mod alt_threaded_scheduler_4_threads {
$($t)*
const NUM_WORKERS: usize = 4;
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap()
.into()
}
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[cfg(tokio_unstable)]
mod alt_threaded_scheduler_1_thread {
$($t)*
const NUM_WORKERS: usize = 1;
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap()
.into()
}
}
}
}

View File

@ -0,0 +1,717 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(tokio_wasi)))]
#![cfg(tokio_unstable)]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll, Waker};
macro_rules! cfg_metrics {
($($t:tt)*) => {
#[cfg(tokio_unstable)]
{
$( $t )*
}
}
}
#[test]
fn single_thread() {
// No panic when starting a runtime w/ a single thread
let _ = runtime::Builder::new_multi_thread_alt()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
}
#[test]
fn many_oneshot_futures() {
// used for notifying the main thread
const NUM: usize = 1_000;
for _ in 0..5 {
let (tx, rx) = mpsc::channel();
let rt = rt();
let cnt = Arc::new(AtomicUsize::new(0));
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
drop(rt);
}
}
#[test]
fn spawn_two() {
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tokio::spawn(async move {
tx.send("ZOMG").unwrap();
});
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
cfg_metrics! {
let metrics = rt.metrics();
drop(rt);
assert_eq!(1, metrics.remote_schedule_count());
let mut local = 0;
for i in 0..metrics.num_workers() {
local += metrics.worker_local_schedule_count(i);
}
assert_eq!(1, local);
}
}
#[test]
fn many_multishot_futures() {
const CHAIN: usize = 200;
const CYCLES: usize = 5;
const TRACKS: usize = 50;
for _ in 0..50 {
let rt = rt();
let mut start_txs = Vec::with_capacity(TRACKS);
let mut final_rxs = Vec::with_capacity(TRACKS);
for _ in 0..TRACKS {
let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10);
for _ in 0..CHAIN {
let (next_tx, next_rx) = tokio::sync::mpsc::channel(10);
// Forward all the messages
rt.spawn(async move {
while let Some(v) = chain_rx.recv().await {
next_tx.send(v).await.unwrap();
}
});
chain_rx = next_rx;
}
// This final task cycles if needed
let (final_tx, final_rx) = tokio::sync::mpsc::channel(10);
let cycle_tx = start_tx.clone();
let mut rem = CYCLES;
rt.spawn(async move {
for _ in 0..CYCLES {
let msg = chain_rx.recv().await.unwrap();
rem -= 1;
if rem == 0 {
final_tx.send(msg).await.unwrap();
} else {
cycle_tx.send(msg).await.unwrap();
}
}
});
start_txs.push(start_tx);
final_rxs.push(final_rx);
}
{
rt.block_on(async move {
for start_tx in start_txs {
start_tx.send("ping").await.unwrap();
}
for mut final_rx in final_rxs {
final_rx.recv().await.unwrap();
}
});
}
}
}
#[test]
fn lifo_slot_budget() {
async fn my_fn() {
spawn_another();
}
fn spawn_another() {
tokio::spawn(my_fn());
}
let rt = runtime::Builder::new_multi_thread_alt()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let (send, recv) = oneshot::channel();
rt.spawn(async move {
tokio::spawn(my_fn());
let _ = send.send(());
});
let _ = rt.block_on(recv);
}
#[test]
fn spawn_shutdown() {
let rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async {
tokio::spawn(client_server(tx.clone()));
});
// Use spawner
rt.spawn(client_server(tx));
assert_ok!(rx.recv());
assert_ok!(rx.recv());
drop(rt);
assert_err!(rx.try_recv());
}
async fn client_server(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
tokio::spawn(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
}
#[test]
fn drop_threadpool_drops_futures() {
for _ in 0..1_000 {
let num_inc = Arc::new(AtomicUsize::new(0));
let num_dec = Arc::new(AtomicUsize::new(0));
let num_drop = Arc::new(AtomicUsize::new(0));
struct Never(Arc<AtomicUsize>);
impl Future for Never {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}
impl Drop for Never {
fn drop(&mut self) {
self.0.fetch_add(1, Relaxed);
}
}
let a = num_inc.clone();
let b = num_dec.clone();
let rt = runtime::Builder::new_multi_thread_alt()
.enable_all()
.on_thread_start(move || {
a.fetch_add(1, Relaxed);
})
.on_thread_stop(move || {
b.fetch_add(1, Relaxed);
})
.build()
.unwrap();
rt.spawn(Never(num_drop.clone()));
// Wait for the pool to shutdown
drop(rt);
// Assert that only a single thread was spawned.
let a = num_inc.load(Relaxed);
assert!(a >= 1);
// Assert that all threads shutdown
let b = num_dec.load(Relaxed);
assert_eq!(a, b);
// Assert that the future was dropped
let c = num_drop.load(Relaxed);
assert_eq!(c, 1);
}
}
#[test]
fn start_stop_callbacks_called() {
use std::sync::atomic::{AtomicUsize, Ordering};
let after_start = Arc::new(AtomicUsize::new(0));
let before_stop = Arc::new(AtomicUsize::new(0));
let after_inner = after_start.clone();
let before_inner = before_stop.clone();
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.enable_all()
.on_thread_start(move || {
after_inner.clone().fetch_add(1, Ordering::Relaxed);
})
.on_thread_stop(move || {
before_inner.clone().fetch_add(1, Ordering::Relaxed);
})
.build()
.unwrap();
let (tx, rx) = oneshot::channel();
rt.spawn(async move {
assert_ok!(tx.send(()));
});
assert_ok!(rt.block_on(rx));
drop(rt);
assert!(after_start.load(Ordering::Relaxed) > 0);
assert!(before_stop.load(Ordering::Relaxed) > 0);
}
#[test]
fn blocking() {
// used for notifying the main thread
const NUM: usize = 1_000;
for _ in 0..10 {
let (tx, rx) = mpsc::channel();
let rt = rt();
let cnt = Arc::new(AtomicUsize::new(0));
// there are four workers in the pool
// so, if we run 4 blocking tasks, we know that handoff must have happened
let block = Arc::new(std::sync::Barrier::new(5));
for _ in 0..4 {
let block = block.clone();
rt.spawn(async move {
tokio::task::block_in_place(move || {
block.wait();
block.wait();
})
});
}
block.wait();
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
block.wait();
}
}
#[test]
fn multi_threadpool() {
use tokio::sync::oneshot;
let rt1 = rt();
let rt2 = rt();
let (tx, rx) = oneshot::channel();
let (done_tx, done_rx) = mpsc::channel();
rt2.spawn(async move {
rx.await.unwrap();
done_tx.send(()).unwrap();
});
rt1.spawn(async move {
tx.send(()).unwrap();
});
done_rx.recv().unwrap();
}
// When `block_in_place` returns, it attempts to reclaim the yielded runtime
// worker. In this case, the remainder of the task is on the runtime worker and
// must take part in the cooperative task budgeting system.
//
// The test ensures that, when this happens, attempting to consume from a
// channel yields occasionally even if there are values ready to receive.
#[test]
fn coop_and_block_in_place() {
let rt = tokio::runtime::Builder::new_multi_thread_alt()
// Setting max threads to 1 prevents another thread from claiming the
// runtime worker yielded as part of `block_in_place` and guarantees the
// same thread will reclaim the worker at the end of the
// `block_in_place` call.
.max_blocking_threads(1)
.build()
.unwrap();
rt.block_on(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
// Fill the channel
for _ in 0..1024 {
tx.send(()).await.unwrap();
}
drop(tx);
tokio::spawn(async move {
// Block in place without doing anything
tokio::task::block_in_place(|| {});
// Receive all the values, this should trigger a `Pending` as the
// coop limit will be reached.
poll_fn(|cx| {
while let Poll::Ready(v) = {
tokio::pin! {
let fut = rx.recv();
}
Pin::new(&mut fut).poll(cx)
} {
if v.is_none() {
panic!("did not yield");
}
}
Poll::Ready(())
})
.await
})
.await
.unwrap();
});
}
#[test]
fn yield_after_block_in_place() {
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.worker_threads(1)
.build()
.unwrap();
rt.block_on(async {
tokio::spawn(async move {
// Block in place then enter a new runtime
tokio::task::block_in_place(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async {});
});
// Yield, then complete
tokio::task::yield_now().await;
})
.await
.unwrap()
});
}
// Testing this does not panic
#[test]
fn max_blocking_threads() {
let _rt = tokio::runtime::Builder::new_multi_thread_alt()
.max_blocking_threads(1)
.build()
.unwrap();
}
#[test]
#[should_panic]
fn max_blocking_threads_set_to_zero() {
let _rt = tokio::runtime::Builder::new_multi_thread_alt()
.max_blocking_threads(0)
.build()
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
tokio::spawn(async move {
tokio::task::block_in_place(|| sync_rx.recv().ok());
});
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
drop(sync_tx);
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
/// Demonstrates tokio-rs/tokio#3869
#[test]
fn wake_during_shutdown() {
struct Shared {
waker: Option<Waker>,
}
struct MyFuture {
shared: Arc<Mutex<Shared>>,
put_waker: bool,
}
impl MyFuture {
fn new() -> (Self, Self) {
let shared = Arc::new(Mutex::new(Shared { waker: None }));
let f1 = MyFuture {
shared: shared.clone(),
put_waker: true,
};
let f2 = MyFuture {
shared,
put_waker: false,
};
(f1, f2)
}
}
impl Future for MyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let me = Pin::into_inner(self);
let mut lock = me.shared.lock().unwrap();
if me.put_waker {
lock.waker = Some(cx.waker().clone());
}
Poll::Pending
}
}
impl Drop for MyFuture {
fn drop(&mut self) {
let mut lock = self.shared.lock().unwrap();
if !self.put_waker {
lock.waker.take().unwrap().wake();
}
drop(lock);
}
}
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let (f1, f2) = MyFuture::new();
rt.spawn(f1);
rt.spawn(f2);
rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
}
#[should_panic]
#[tokio::test]
async fn test_block_in_place1() {
tokio::task::block_in_place(|| {});
}
#[tokio::test(flavor = "multi_thread")]
async fn test_block_in_place2() {
tokio::task::block_in_place(|| {});
}
#[should_panic]
#[tokio::main(flavor = "current_thread")]
#[test]
async fn test_block_in_place3() {
tokio::task::block_in_place(|| {});
}
#[tokio::main]
#[test]
async fn test_block_in_place4() {
tokio::task::block_in_place(|| {});
}
// Testing the tuning logic is tricky as it is inherently timing based, and more
// of a heuristic than an exact behavior. This test checks that the interval
// changes over time based on load factors. There are no assertions, completion
// is sufficient. If there is a regression, this test will hang. In theory, we
// could add limits, but that would be likely to fail on CI.
#[test]
#[cfg(not(tokio_no_tuning_tests))]
fn test_tuning() {
use std::sync::atomic::AtomicBool;
use std::time::Duration;
let rt = runtime::Builder::new_multi_thread_alt()
.worker_threads(1)
.build()
.unwrap();
fn iter(flag: Arc<AtomicBool>, counter: Arc<AtomicUsize>, stall: bool) {
if flag.load(Relaxed) {
if stall {
std::thread::sleep(Duration::from_micros(5));
}
counter.fetch_add(1, Relaxed);
tokio::spawn(async move { iter(flag, counter, stall) });
}
}
let flag = Arc::new(AtomicBool::new(true));
let counter = Arc::new(AtomicUsize::new(61));
let interval = Arc::new(AtomicUsize::new(61));
{
let flag = flag.clone();
let counter = counter.clone();
rt.spawn(async move { iter(flag, counter, true) });
}
// Now, hammer the injection queue until the interval drops.
let mut n = 0;
loop {
let curr = interval.load(Relaxed);
if curr <= 8 {
n += 1;
} else {
n = 0;
}
// Make sure we get a few good rounds. Jitter in the tuning could result
// in one "good" value without being representative of reaching a good
// state.
if n == 3 {
break;
}
if Arc::strong_count(&interval) < 5_000 {
let counter = counter.clone();
let interval = interval.clone();
rt.spawn(async move {
let prev = counter.swap(0, Relaxed);
interval.store(prev, Relaxed);
});
std::thread::yield_now();
}
}
flag.store(false, Relaxed);
let w = Arc::downgrade(&interval);
drop(interval);
while w.strong_count() > 0 {
std::thread::sleep(Duration::from_micros(500));
}
// Now, run it again with a faster task
let flag = Arc::new(AtomicBool::new(true));
// Set it high, we know it shouldn't ever really be this high
let counter = Arc::new(AtomicUsize::new(10_000));
let interval = Arc::new(AtomicUsize::new(10_000));
{
let flag = flag.clone();
let counter = counter.clone();
rt.spawn(async move { iter(flag, counter, false) });
}
// Now, hammer the injection queue until the interval reaches the expected range.
let mut n = 0;
loop {
let curr = interval.load(Relaxed);
if curr <= 1_000 && curr > 32 {
n += 1;
} else {
n = 0;
}
if n == 3 {
break;
}
if Arc::strong_count(&interval) <= 5_000 {
let counter = counter.clone();
let interval = interval.clone();
rt.spawn(async move {
let prev = counter.swap(0, Relaxed);
interval.store(prev, Relaxed);
});
}
std::thread::yield_now();
}
flag.store(false, Relaxed);
}
fn rt() -> runtime::Runtime {
runtime::Builder::new_multi_thread_alt()
.enable_all()
.build()
.unwrap()
}