rt(unstable): remove alt multi-threaded runtime (#7275)

The alternative multi-threaded runtime started as an experiment. We have been
unable to find real-world benefit. Work has halted on this effort, so lets get
rid of it.
This commit is contained in:
Carl Lerche 2025-04-17 13:33:55 -07:00 committed by GitHub
parent ce87dcfbf0
commit 159a3b2c85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 0 additions and 5444 deletions

View File

@ -26,8 +26,6 @@ impl BlockingSchedule {
}
#[cfg(feature = "rt-multi-thread")]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
BlockingSchedule {
@ -51,8 +49,6 @@ impl task::Schedule for BlockingSchedule {
}
#[cfg(feature = "rt-multi-thread")]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
None

View File

@ -114,8 +114,6 @@ pub struct Builder {
/// How many ticks before yielding to the driver for timer and I/O events?
pub(super) event_interval: u32,
pub(super) local_queue_capacity: usize,
/// When true, the multi-threade scheduler LIFO slot should not be used.
///
/// This option should only be exposed as unstable.
@ -222,8 +220,6 @@ pub(crate) enum Kind {
CurrentThread,
#[cfg(feature = "rt-multi-thread")]
MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
MultiThreadAlt,
}
impl Builder {
@ -255,26 +251,6 @@ impl Builder {
Builder::new(Kind::MultiThread, 61)
}
cfg_unstable! {
/// Returns a new builder with the alternate multi thread scheduler
/// selected.
///
/// The alternate multi threaded scheduler is an in-progress
/// candidate to replace the existing multi threaded scheduler. It
/// currently does not scale as well to 16+ processors.
///
/// This runtime flavor is currently **not considered production
/// ready**.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread_alt() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThreadAlt, 61)
}
}
/// Returns a new runtime builder initialized with default configuration
/// values.
///
@ -326,12 +302,6 @@ impl Builder {
global_queue_interval: None,
event_interval,
#[cfg(not(loom))]
local_queue_capacity: 256,
#[cfg(loom)]
local_queue_capacity: 4,
seed_generator: RngSeedGenerator::new(RngSeed::new()),
#[cfg(tokio_unstable)]
@ -920,8 +890,6 @@ impl Builder {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(feature = "rt-multi-thread")]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
}
}
@ -962,8 +930,6 @@ impl Builder {
Kind::CurrentThread => true,
#[cfg(feature = "rt-multi-thread")]
Kind::MultiThread => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Kind::MultiThreadAlt => false,
},
enable_io: self.enable_io,
enable_time: self.enable_time,
@ -1451,14 +1417,6 @@ impl Builder {
}
}
cfg_loom! {
pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
assert!(value.is_power_of_two());
self.local_queue_capacity = value;
self
}
}
fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::runtime::Scheduler;
@ -1525,7 +1483,6 @@ impl Builder {
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
@ -1679,7 +1636,6 @@ cfg_rt_multi_thread! {
after_termination: self.after_termination.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
@ -1696,54 +1652,6 @@ cfg_rt_multi_thread! {
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}
cfg_unstable! {
fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::MultiThreadAlt;
let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?;
// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
let (scheduler, handle) = MultiThreadAlt::new(
worker_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
after_termination: self.after_termination.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);
Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
}
}
}
}

View File

@ -12,9 +12,6 @@ pub(crate) struct Config {
/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,
/// How big to make each worker's local queue
pub(crate) local_queue_capacity: usize,
/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,

View File

@ -63,10 +63,6 @@ impl Driver {
))
}
pub(crate) fn is_enabled(&self) -> bool {
self.inner.is_enabled()
}
pub(crate) fn park(&mut self, handle: &Handle) {
self.inner.park(handle);
}
@ -163,13 +159,6 @@ cfg_io_driver! {
}
impl IoStack {
pub(crate) fn is_enabled(&self) -> bool {
match self {
IoStack::Enabled(..) => true,
IoStack::Disabled(..) => false,
}
}
pub(crate) fn park(&mut self, handle: &Handle) {
match self {
IoStack::Enabled(v) => v.park(handle),
@ -320,13 +309,6 @@ cfg_time! {
}
impl TimeDriver {
pub(crate) fn is_enabled(&self) -> bool {
match self {
TimeDriver::Enabled { .. } => true,
TimeDriver::Disabled(inner) => inner.is_enabled(),
}
}
pub(crate) fn park(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, .. } => driver.park(handle),

View File

@ -399,8 +399,6 @@ impl Handle {
scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
#[cfg(feature = "rt-multi-thread")]
scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt,
}
}
@ -429,8 +427,6 @@ impl Handle {
scheduler::Handle::CurrentThread(handle) => handle.owned_id(),
#[cfg(feature = "rt-multi-thread")]
scheduler::Handle::MultiThread(handle) => handle.owned_id(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
scheduler::Handle::MultiThreadAlt(handle) => handle.owned_id(),
};
owned_id.into()
}
@ -582,8 +578,6 @@ cfg_taskdump! {
handle.dump().await
}).await
},
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
}
}

View File

@ -12,10 +12,6 @@ use std::time::Duration;
cfg_rt_multi_thread! {
use crate::runtime::Builder;
use crate::runtime::scheduler::MultiThread;
cfg_unstable! {
use crate::runtime::scheduler::MultiThreadAlt;
}
}
/// The Tokio runtime.
@ -117,10 +113,6 @@ pub enum RuntimeFlavor {
CurrentThread,
/// The flavor that executes tasks across multiple threads.
MultiThread,
/// The flavor that executes tasks across multiple threads.
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
MultiThreadAlt,
}
/// The runtime scheduler is either a multi-thread or a current-thread executor.
@ -132,10 +124,6 @@ pub(super) enum Scheduler {
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-multi-thread")]
MultiThread(MultiThread),
/// Execute tasks across multiple threads.
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
MultiThreadAlt(MultiThreadAlt),
}
impl Runtime {
@ -368,8 +356,6 @@ impl Runtime {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(feature = "rt-multi-thread")]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
}
}
@ -501,12 +487,6 @@ impl Drop for Runtime {
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Scheduler::MultiThreadAlt(multi_thread) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
}
}
}

View File

@ -5,17 +5,5 @@ pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
#[cfg(tokio_unstable)]
{
use crate::runtime::{Handle, RuntimeFlavor::MultiThreadAlt};
match Handle::try_current().map(|h| h.runtime_flavor()) {
Ok(MultiThreadAlt) => {
return scheduler::multi_thread_alt::block_in_place(f);
}
_ => {}
}
}
scheduler::multi_thread::block_in_place(f)
}

View File

@ -34,8 +34,4 @@ impl Synced {
// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
}
pub(crate) fn is_empty(&self) -> bool {
self.head.is_none()
}
}

View File

@ -22,11 +22,6 @@ cfg_rt_multi_thread! {
pub(crate) mod multi_thread;
pub(crate) use multi_thread::MultiThread;
cfg_unstable! {
pub(crate) mod multi_thread_alt;
pub(crate) use multi_thread_alt::MultiThread as MultiThreadAlt;
}
}
use crate::runtime::driver;
@ -39,9 +34,6 @@ pub(crate) enum Handle {
#[cfg(feature = "rt-multi-thread")]
MultiThread(Arc<multi_thread::Handle>),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
MultiThreadAlt(Arc<multi_thread_alt::Handle>),
// TODO: This is to avoid triggering "dead code" warnings many other places
// in the codebase. Remove this during a later cleanup
#[cfg(not(feature = "rt"))]
@ -55,9 +47,6 @@ pub(super) enum Context {
#[cfg(feature = "rt-multi-thread")]
MultiThread(multi_thread::Context),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
MultiThreadAlt(multi_thread_alt::Context),
}
impl Handle {
@ -70,9 +59,6 @@ impl Handle {
#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(ref h) => &h.driver,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(ref h) => &h.driver,
#[cfg(not(feature = "rt"))]
Handle::Disabled => unreachable!(),
}
@ -95,9 +81,6 @@ cfg_rt! {
#[cfg(feature = "rt-multi-thread")]
$ty::MultiThread($h) => $e,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
$ty::MultiThreadAlt($h) => $e,
}
}
}
@ -121,9 +104,6 @@ cfg_rt! {
#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(_) => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(_) => false,
}
}
@ -134,9 +114,6 @@ cfg_rt! {
#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(_) => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(_) => false,
}
}
@ -150,9 +127,6 @@ cfg_rt! {
#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id),
}
}
@ -180,9 +154,6 @@ cfg_rt! {
#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(ref h) => h.shutdown(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(ref h) => h.shutdown(),
}
}
@ -203,19 +174,6 @@ cfg_rt! {
Handle::CurrentThread(h) => &h.task_hooks,
#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(h) => &h.task_hooks,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(h) => &h.task_hooks,
}
}
cfg_rt_multi_thread! {
cfg_unstable! {
pub(crate) fn expect_multi_thread_alt(&self) -> &Arc<multi_thread_alt::Handle> {
match self {
Handle::MultiThreadAlt(handle) => handle,
_ => panic!("not a `MultiThreadAlt` handle"),
}
}
}
}
}
@ -226,8 +184,6 @@ cfg_rt! {
Handle::CurrentThread(_) => 1,
#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(handle) => handle.num_workers(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(handle) => handle.num_workers(),
}
}
@ -298,16 +254,6 @@ cfg_rt! {
_ => panic!("expected `MultiThread::Context`")
}
}
cfg_unstable! {
#[track_caller]
pub(crate) fn expect_multi_thread_alt(&self) -> &multi_thread_alt::Context {
match self {
Context::MultiThreadAlt(context) => context,
_ => panic!("expected `MultiThreadAlt::Context`")
}
}
}
}
}
}

View File

@ -1,166 +0,0 @@
#[cfg(tokio_internal_mt_counters)]
mod imp {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0);
static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_NOTIFY_REMOTE: AtomicUsize = AtomicUsize::new(0);
static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_UNPARKS_REMOTE: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0);
static NUM_STEALS: AtomicUsize = AtomicUsize::new(0);
static NUM_OVERFLOW: AtomicUsize = AtomicUsize::new(0);
static NUM_PARK: AtomicUsize = AtomicUsize::new(0);
static NUM_POLLS: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_POLLS: AtomicUsize = AtomicUsize::new(0);
static NUM_REMOTE_BATCH: AtomicUsize = AtomicUsize::new(0);
static NUM_GLOBAL_QUEUE_INTERVAL: AtomicUsize = AtomicUsize::new(0);
static NUM_NO_AVAIL_CORE: AtomicUsize = AtomicUsize::new(0);
static NUM_RELAY_SEARCH: AtomicUsize = AtomicUsize::new(0);
static NUM_SPIN_STALL: AtomicUsize = AtomicUsize::new(0);
static NUM_NO_LOCAL_WORK: AtomicUsize = AtomicUsize::new(0);
impl Drop for super::Counters {
fn drop(&mut self) {
let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed);
let notifies_remote = NUM_NOTIFY_REMOTE.load(Relaxed);
let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed);
let unparks_remote = NUM_UNPARKS_REMOTE.load(Relaxed);
let maintenance = NUM_MAINTENANCE.load(Relaxed);
let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed);
let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed);
let num_steals = NUM_STEALS.load(Relaxed);
let num_overflow = NUM_OVERFLOW.load(Relaxed);
let num_park = NUM_PARK.load(Relaxed);
let num_polls = NUM_POLLS.load(Relaxed);
let num_lifo_polls = NUM_LIFO_POLLS.load(Relaxed);
let num_remote_batch = NUM_REMOTE_BATCH.load(Relaxed);
let num_global_queue_interval = NUM_GLOBAL_QUEUE_INTERVAL.load(Relaxed);
let num_no_avail_core = NUM_NO_AVAIL_CORE.load(Relaxed);
let num_relay_search = NUM_RELAY_SEARCH.load(Relaxed);
let num_spin_stall = NUM_SPIN_STALL.load(Relaxed);
let num_no_local_work = NUM_NO_LOCAL_WORK.load(Relaxed);
println!("---");
println!("notifies (remote): {}", notifies_remote);
println!(" notifies (local): {}", notifies_local);
println!(" unparks (local): {}", unparks_local);
println!(" unparks (remote): {}", unparks_remote);
println!(" notify, no core: {}", num_no_avail_core);
println!(" maintenance: {}", maintenance);
println!(" LIFO schedules: {}", lifo_scheds);
println!(" LIFO capped: {}", lifo_capped);
println!(" steals: {}", num_steals);
println!(" queue overflows: {}", num_overflow);
println!(" parks: {}", num_park);
println!(" polls: {}", num_polls);
println!(" polls (LIFO): {}", num_lifo_polls);
println!("remote task batch: {}", num_remote_batch);
println!("global Q interval: {}", num_global_queue_interval);
println!(" relay search: {}", num_relay_search);
println!(" spin stall: {}", num_spin_stall);
println!(" no local work: {}", num_no_local_work);
}
}
pub(crate) fn inc_num_inc_notify_local() {
NUM_NOTIFY_LOCAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_notify_remote() {
NUM_NOTIFY_REMOTE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_unparks_local() {
NUM_UNPARKS_LOCAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_unparks_remote() {
NUM_UNPARKS_REMOTE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_maintenance() {
NUM_MAINTENANCE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_lifo_schedules() {
NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed);
}
pub(crate) fn inc_lifo_capped() {
NUM_LIFO_CAPPED.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_steals() {
NUM_STEALS.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_overflows() {
NUM_OVERFLOW.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_parks() {
NUM_PARK.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_polls() {
NUM_POLLS.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_lifo_polls() {
NUM_LIFO_POLLS.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_remote_batch() {
NUM_REMOTE_BATCH.fetch_add(1, Relaxed);
}
pub(crate) fn inc_global_queue_interval() {
NUM_GLOBAL_QUEUE_INTERVAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_notify_no_core() {
NUM_NO_AVAIL_CORE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_relay_search() {
NUM_RELAY_SEARCH.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_spin_stall() {
NUM_SPIN_STALL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_no_local_work() {
NUM_NO_LOCAL_WORK.fetch_add(1, Relaxed);
}
}
#[cfg(not(tokio_internal_mt_counters))]
mod imp {
pub(crate) fn inc_num_inc_notify_local() {}
pub(crate) fn inc_num_notify_remote() {}
pub(crate) fn inc_num_unparks_local() {}
pub(crate) fn inc_num_unparks_remote() {}
pub(crate) fn inc_num_maintenance() {}
pub(crate) fn inc_lifo_schedules() {}
pub(crate) fn inc_lifo_capped() {}
pub(crate) fn inc_num_steals() {}
pub(crate) fn inc_num_overflows() {}
pub(crate) fn inc_num_parks() {}
pub(crate) fn inc_num_polls() {}
pub(crate) fn inc_num_lifo_polls() {}
pub(crate) fn inc_num_remote_batch() {}
pub(crate) fn inc_global_queue_interval() {}
pub(crate) fn inc_notify_no_core() {}
pub(crate) fn inc_num_relay_search() {}
pub(crate) fn inc_num_spin_stall() {}
pub(crate) fn inc_num_no_local_work() {}
}
#[derive(Debug)]
pub(crate) struct Counters;
pub(super) use imp::*;

View File

@ -1,85 +0,0 @@
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread_alt::worker;
use crate::runtime::{
blocking, driver,
task::{self, JoinHandle},
TaskHooks, TaskMeta,
};
use crate::util::RngSeedGenerator;
use std::fmt;
cfg_unstable_metrics! {
mod metrics;
}
/// Handle to the multi thread scheduler
pub(crate) struct Handle {
/// Task spawner
pub(super) shared: worker::Shared,
/// Resource driver handles
pub(crate) driver: driver::Handle,
/// Blocking pool spawner
pub(crate) blocking_spawner: blocking::Spawner,
/// Current random number generator seed
pub(crate) seed_generator: RngSeedGenerator,
/// User-supplied hooks to invoke for things
pub(crate) task_hooks: TaskHooks,
}
impl Handle {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id)
}
pub(crate) fn shutdown(&self) {
self.shared.close(self);
self.driver.unpark();
}
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
me.task_hooks.spawn(&TaskMeta {
#[cfg(tokio_unstable)]
id,
_phantom: Default::default(),
});
if let Some(notified) = notified {
me.shared.schedule_task(notified, false);
}
handle
}
}
cfg_unstable! {
use std::num::NonZeroU64;
impl Handle {
pub(crate) fn owned_id(&self) -> NonZeroU64 {
self.shared.owned.id
}
}
}
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
}
}

View File

@ -1,50 +0,0 @@
use super::Handle;
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.shared.worker_metrics.len()
}
pub(crate) fn num_blocking_threads(&self) -> usize {
// workers are currently spawned using spawn_blocking
self.blocking_spawner
.num_threads()
.saturating_sub(self.num_workers())
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}
pub(crate) fn num_alive_tasks(&self) -> usize {
self.shared.owned.num_alive_tasks()
}
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}

View File

@ -1,26 +0,0 @@
use super::Handle;
use crate::runtime::Dump;
impl Handle {
pub(crate) async fn dump(&self) -> Dump {
let trace_status = &self.shared.trace_status;
// If a dump is in progress, block.
trace_status.start_trace_request(&self).await;
let result = loop {
if let Some(result) = trace_status.take_result() {
break result;
} else {
self.notify_all();
trace_status.result_ready.notified().await;
}
};
// Allow other queued dumps to proceed.
trace_status.end_trace_request(&self).await;
result
}
}

View File

@ -1,423 +0,0 @@
//! Coordinates idling workers
#![allow(dead_code)]
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::MutexGuard;
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
pub(super) struct Idle {
/// Number of searching cores
num_searching: AtomicUsize,
/// Number of idle cores
num_idle: AtomicUsize,
/// Map of idle cores
idle_map: IdleMap,
/// Used to catch false-negatives when waking workers
needs_searching: AtomicBool,
/// Total number of cores
num_cores: usize,
}
pub(super) struct IdleMap {
chunks: Vec<AtomicUsize>,
}
pub(super) struct Snapshot {
chunks: Vec<usize>,
}
/// Data synchronized by the scheduler mutex
pub(super) struct Synced {
/// Worker IDs that are currently sleeping
sleepers: Vec<usize>,
/// Cores available for workers
available_cores: Vec<Box<Core>>,
}
impl Idle {
pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
let idle = Idle {
num_searching: AtomicUsize::new(0),
num_idle: AtomicUsize::new(cores.len()),
idle_map: IdleMap::new(&cores),
needs_searching: AtomicBool::new(false),
num_cores: cores.len(),
};
let synced = Synced {
sleepers: Vec::with_capacity(num_workers),
available_cores: cores,
};
(idle, synced)
}
pub(super) fn needs_searching(&self) -> bool {
self.needs_searching.load(Acquire)
}
pub(super) fn num_idle(&self, synced: &Synced) -> usize {
#[cfg(not(loom))]
debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
synced.available_cores.len()
}
pub(super) fn num_searching(&self) -> usize {
self.num_searching.load(Acquire)
}
pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
snapshot.update(&self.idle_map)
}
/// Try to acquire an available core
pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
let ret = synced.available_cores.pop();
if let Some(core) = &ret {
// Decrement the number of idle cores
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
self.num_idle.store(num_idle, Release);
self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.available_cores));
}
ret
}
/// We need at least one searching worker
pub(super) fn notify_local(&self, shared: &Shared) {
if self.num_searching.load(Acquire) != 0 {
// There already is a searching worker. Note, that this could be a
// false positive. However, because this method is called **from** a
// worker, we know that there is at least one worker currently
// awake, so the scheduler won't deadlock.
return;
}
if self.num_idle.load(Acquire) == 0 {
self.needs_searching.store(true, Release);
return;
}
// There aren't any searching workers. Try to initialize one
if self
.num_searching
.compare_exchange(0, 1, AcqRel, Acquire)
.is_err()
{
// Failing the compare_exchange means another thread concurrently
// launched a searching worker.
return;
}
super::counters::inc_num_unparks_local();
// Acquire the lock
let synced = shared.synced.lock();
self.notify_synced(synced, shared);
}
/// Notifies a single worker
pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
if synced.idle.sleepers.is_empty() {
self.needs_searching.store(true, Release);
return;
}
// We need to establish a stronger barrier than with `notify_local`
self.num_searching.fetch_add(1, AcqRel);
self.notify_synced(synced, shared);
}
/// Notify a worker while synced
fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
// Find a sleeping worker
if let Some(worker) = synced.idle.sleepers.pop() {
// Find an available core
if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
debug_assert!(!core.is_searching);
core.is_searching = true;
// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);
// Drop the lock before notifying the condvar.
drop(synced);
super::counters::inc_num_unparks_remote();
// Notify the worker
shared.condvars[worker].notify_one();
return;
} else {
synced.idle.sleepers.push(worker);
}
}
super::counters::inc_notify_no_core();
// Set the `needs_searching` flag, this happens *while* the lock is held.
self.needs_searching.store(true, Release);
self.num_searching.fetch_sub(1, Release);
// Explicit mutex guard drop to show that holding the guard to this
// point is significant. `needs_searching` and `num_searching` must be
// updated in the critical section.
drop(synced);
}
pub(super) fn notify_mult(
&self,
synced: &mut worker::Synced,
workers: &mut Vec<usize>,
num: usize,
) {
debug_assert!(workers.is_empty());
for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() {
// TODO: can this be switched to use next_available_core?
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);
self.idle_map.unset(core.index);
synced.assigned_cores[worker] = Some(core);
workers.push(worker);
continue;
} else {
synced.idle.sleepers.push(worker);
}
}
break;
}
if !workers.is_empty() {
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
#[cfg(not(loom))]
debug_assert_eq!(
synced.idle.available_cores.len(),
self.num_idle.load(Acquire)
);
self.needs_searching.store(true, Release);
}
}
pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
// Wake every sleeping worker and assign a core to it. There may not be
// enough sleeping workers for all cores, but other workers will
// eventually find the cores and shut them down.
while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
let worker = synced.idle.sleepers.pop().unwrap();
let core = self.try_acquire_available_core(&mut synced.idle).unwrap();
synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();
}
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Wake up any other workers
while let Some(index) = synced.idle.sleepers.pop() {
shared.condvars[index].notify_one();
}
}
pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
// If there are any remaining cores, shut them down here.
//
// This code is a bit convoluted to avoid lock-reentry.
while let Some(core) = {
let mut synced = shared.synced.lock();
self.try_acquire_available_core(&mut synced.idle)
} {
shared.shutdown_core(handle, core);
}
}
/// The worker releases the given core, making it available to other workers
/// that are waiting.
pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
// The core should not be searching at this point
debug_assert!(!core.is_searching);
// Check that there are no pending tasks in the global queue
debug_assert!(synced.inject.is_empty());
let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
self.idle_map.set(core.index);
// Store the core in the list of available cores
synced.idle.available_cores.push(core);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Update `num_idle`
self.num_idle.store(num_idle + 1, Release);
}
pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) {
// Store the worker index in the list of sleepers
synced.idle.sleepers.push(index);
// The worker's assigned core slot should be empty
debug_assert!(synced.assigned_cores[index].is_none());
}
pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
debug_assert!(!core.is_searching);
let num_searching = self.num_searching.load(Acquire);
let num_idle = self.num_idle.load(Acquire);
if 2 * num_searching >= self.num_cores - num_idle {
return;
}
self.transition_worker_to_searching(core);
}
/// Needs to happen while synchronized in order to avoid races
pub(super) fn transition_worker_to_searching_if_needed(
&self,
_synced: &mut Synced,
core: &mut Core,
) -> bool {
if self.needs_searching.load(Acquire) {
// Needs to be called while holding the lock
self.transition_worker_to_searching(core);
true
} else {
false
}
}
pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
core.is_searching = true;
self.num_searching.fetch_add(1, AcqRel);
self.needs_searching.store(false, Release);
}
/// A lightweight transition from searching -> running.
///
/// Returns `true` if this is the final searching worker. The caller
/// **must** notify a new worker.
pub(super) fn transition_worker_from_searching(&self) -> bool {
let prev = self.num_searching.fetch_sub(1, AcqRel);
debug_assert!(prev > 0);
prev == 1
}
}
const BITS: usize = usize::BITS as usize;
const BIT_MASK: usize = (usize::BITS - 1) as usize;
impl IdleMap {
fn new(cores: &[Box<Core>]) -> IdleMap {
let ret = IdleMap::new_n(num_chunks(cores.len()));
ret.set_all(cores);
ret
}
fn new_n(n: usize) -> IdleMap {
let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect();
IdleMap { chunks }
}
fn set(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev | mask;
self.chunks[chunk].store(next, Release);
}
fn set_all(&self, cores: &[Box<Core>]) {
for core in cores {
self.set(core.index);
}
}
fn unset(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev & !mask;
self.chunks[chunk].store(next, Release);
}
fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
let expect = IdleMap::new_n(self.chunks.len());
expect.set_all(idle_cores);
for (i, chunk) in expect.chunks.iter().enumerate() {
if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
return false;
}
}
true
}
}
impl Snapshot {
pub(crate) fn new(idle: &Idle) -> Snapshot {
let chunks = vec![0; idle.idle_map.chunks.len()];
let mut ret = Snapshot { chunks };
ret.update(&idle.idle_map);
ret
}
fn update(&mut self, idle_map: &IdleMap) {
for i in 0..self.chunks.len() {
self.chunks[i] = idle_map.chunks[i].load(Acquire);
}
}
pub(super) fn is_idle(&self, index: usize) -> bool {
let (chunk, mask) = index_to_mask(index);
debug_assert!(
chunk < self.chunks.len(),
"index={}; chunks={}",
index,
self.chunks.len()
);
self.chunks[chunk] & mask == mask
}
}
fn num_chunks(max_cores: usize) -> usize {
(max_cores / BITS) + 1
}
fn index_to_mask(index: usize) -> (usize, usize) {
let mask = 1 << (index & BIT_MASK);
let chunk = index / BITS;
(chunk, mask)
}
fn num_active_workers(synced: &Synced) -> usize {
synced.available_cores.capacity() - synced.available_cores.len()
}

View File

@ -1,91 +0,0 @@
//! Multi-threaded runtime
mod counters;
use counters::Counters;
mod handle;
pub(crate) use handle::Handle;
mod overflow;
pub(crate) use overflow::Overflow;
mod idle;
use self::idle::Idle;
mod stats;
pub(crate) use stats::Stats;
pub(crate) mod queue;
mod worker;
use worker::Core;
pub(crate) use worker::{Context, Shared};
// TODO: implement task dump
mod trace_mock;
use trace_mock::TraceStatus;
pub(crate) use worker::block_in_place;
use crate::runtime::{
self, blocking,
driver::{self, Driver},
scheduler, Config,
};
use crate::util::RngSeedGenerator;
use std::fmt;
use std::future::Future;
/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread;
// ===== impl MultiThread =====
impl MultiThread {
pub(crate) fn new(
size: usize,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, runtime::Handle) {
let handle = worker::create(
size,
driver,
driver_handle,
blocking_spawner,
seed_generator,
config,
);
(MultiThread, handle)
}
/// Blocks the current thread waiting for the future to complete.
///
/// The future will execute on the current thread, but all spawned tasks
/// will be executed on the thread pool.
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
F: Future,
{
crate::runtime::context::enter_runtime(handle, true, |blocking| {
blocking.block_on(future).expect("failed to park thread")
})
}
pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
match handle {
scheduler::Handle::MultiThreadAlt(handle) => handle.shutdown(),
_ => panic!("expected MultiThread scheduler"),
}
}
}
impl fmt::Debug for MultiThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("MultiThread").finish()
}
}

View File

@ -1,26 +0,0 @@
use crate::runtime::task;
#[cfg(test)]
use std::cell::RefCell;
pub(crate) trait Overflow<T: 'static> {
fn push(&self, task: task::Notified<T>);
fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<T>>;
}
#[cfg(test)]
impl<T: 'static> Overflow<T> for RefCell<Vec<task::Notified<T>>> {
fn push(&self, task: task::Notified<T>) {
self.borrow_mut().push(task);
}
fn push_batch<I>(&self, iter: I)
where
I: Iterator<Item = task::Notified<T>>,
{
self.borrow_mut().extend(iter);
}
}

View File

@ -1,232 +0,0 @@
//! Parks the runtime.
//!
//! A combination of the various resource driver park handles.
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::runtime::driver::{self, Driver};
use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
pub(crate) struct Parker {
inner: Arc<Inner>,
}
pub(crate) struct Unparker {
inner: Arc<Inner>,
}
struct Inner {
/// Avoids entering the park if possible
state: AtomicUsize,
/// Used to coordinate access to the driver / condvar
mutex: Mutex<()>,
/// Condvar to block on if the driver is unavailable.
condvar: Condvar,
/// Resource (I/O, time, ...) driver
shared: Arc<Shared>,
}
const EMPTY: usize = 0;
const PARKED_CONDVAR: usize = 1;
const PARKED_DRIVER: usize = 2;
const NOTIFIED: usize = 3;
/// Shared across multiple Parker handles
struct Shared {
/// Shared driver. Only one thread at a time can use this
driver: TryLock<Driver>,
}
impl Parker {
pub(crate) fn new(driver: Driver) -> Parker {
Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: Arc::new(Shared {
driver: TryLock::new(driver),
}),
}),
}
}
pub(crate) fn unpark(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
}
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
// Only parking with zero is supported...
assert_eq!(duration, Duration::from_millis(0));
if let Some(mut driver) = self.inner.shared.driver.try_lock() {
driver.park_timeout(handle, duration)
}
}
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.inner.shutdown(handle);
}
}
impl Clone for Parker {
fn clone(&self) -> Parker {
Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: self.inner.shared.clone(),
}),
}
}
}
impl Unparker {
pub(crate) fn unpark(&self, driver: &driver::Handle) {
self.inner.unpark(driver);
}
}
impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self, handle: &driver::Handle) {
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver, handle);
} else {
self.park_condvar();
}
}
fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
match self
.state
.compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
}
// spurious wakeup, go back to sleep
}
}
fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
driver.park(handle);
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} // got a notification, hurray!
PARKED_DRIVER => {} // no notification, alas
n => panic!("inconsistent park_timeout state: {}", n),
}
}
fn unpark(&self, driver: &driver::Handle) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => {} // no one was waiting
NOTIFIED => {} // already unparked
PARKED_CONDVAR => self.unpark_condvar(),
PARKED_DRIVER => driver.unpark(),
actual => panic!("inconsistent state in unpark; actual = {}", actual),
}
}
fn unpark_condvar(&self) {
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());
self.condvar.notify_one()
}
fn shutdown(&self, handle: &driver::Handle) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown(handle);
}
self.condvar.notify_all();
}
}

View File

@ -1,595 +0,0 @@
//! Run-queue structures to support a work-stealing scheduler
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread_alt::{Overflow, Stats};
use crate::runtime::task;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
// Use wider integers when possible to increase ABA resilience.
//
// See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>.
cfg_has_atomic_u64! {
type UnsignedShort = u32;
type UnsignedLong = u64;
type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
}
cfg_not_has_atomic_u64! {
type UnsignedShort = u16;
type UnsignedLong = u32;
type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
}
/// Producer handle. May only be used from a single thread.
pub(crate) struct Local<T: 'static> {
inner: Arc<Inner<T>>,
}
/// Consumer handle. May be used from many threads.
pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
#[repr(align(128))]
pub(crate) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
/// Contains two `UnsignedShort` values. The `LSB` byte is the "real" head of
/// the queue. The `UnsignedShort` in the `MSB` is set by a stealer in process
/// of stealing values. It represents the first value being stolen in the
/// batch. The `UnsignedShort` indices are intentionally wider than strictly
/// required for buffer indexing in order to provide ABA mitigation and make
/// it possible to distinguish between full and empty buffers.
///
/// When both `UnsignedShort` values are the same, there is no active
/// stealer.
///
/// Tracking an in-progress stealer prevents a wrapping scenario.
head: AtomicUnsignedLong,
/// Only updated by producer thread but read by many threads.
tail: AtomicUnsignedShort,
/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
mask: usize,
}
unsafe impl<T> Send for Inner<T> {}
unsafe impl<T> Sync for Inner<T> {}
/// Create a new local run-queue
pub(crate) fn local<T: 'static>(capacity: usize) -> (Steal<T>, Local<T>) {
assert!(capacity <= 4096);
assert!(capacity >= 1);
let mut buffer = Vec::with_capacity(capacity);
for _ in 0..capacity {
buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let inner = Arc::new(Inner {
head: AtomicUnsignedLong::new(0),
tail: AtomicUnsignedShort::new(0),
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
});
let local = Local {
inner: inner.clone(),
};
let remote = Steal(inner);
(remote, local)
}
impl<T> Local<T> {
/// How many tasks can be pushed into the queue
pub(crate) fn remaining_slots(&self) -> usize {
self.inner.remaining_slots()
}
pub(crate) fn max_capacity(&self) -> usize {
self.inner.buffer.len()
}
/// Returns `true` if there are no entries in the queue
pub(crate) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub(crate) fn can_steal(&self) -> bool {
self.remaining_slots() >= self.max_capacity() - self.max_capacity() / 2
}
/// Pushes a batch of tasks to the back of the queue. All tasks must fit in
/// the local queue.
///
/// # Panics
///
/// The method panics if there is not enough capacity to fit in the queue.
pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
let len = tasks.len();
assert!(len <= self.inner.buffer.len());
if len == 0 {
// Nothing to do
return;
}
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let mut tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(steal) <= (self.inner.buffer.len() - len) as UnsignedShort {
// Yes, this if condition is structured a bit weird (first block
// does nothing, second returns an error). It is this way to match
// `push_back_or_overflow`.
} else {
panic!(
"not enough capacity; len={}; tail={}; steal={}; real={}",
len, tail, steal, real
);
}
for task in tasks {
let idx = tail as usize & self.inner.mask;
self.inner.buffer[idx].with_mut(|ptr| {
// Write the task to the slot
//
// Safety: There is only one producer and the above `if`
// condition ensures we don't touch a cell if there is a
// value, thus no consumer.
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
tail = tail.wrapping_add(1);
}
self.inner.tail.store(tail, Release);
}
/// Pushes a task to the back of the local queue, if there is not enough
/// capacity in the queue, this triggers the overflow operation.
///
/// When the queue overflows, half of the current contents of the queue is
/// moved to the given Injection queue. This frees up capacity for more
/// tasks to be pushed into the local queue.
pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
&mut self,
mut task: task::Notified<T>,
overflow: &O,
stats: &mut Stats,
) {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(steal) < self.inner.buffer.len() as UnsignedShort {
// There is capacity for the task
break tail;
} else if steal != real {
super::counters::inc_num_overflows();
// Concurrently stealing, this will free up capacity, so only
// push the task onto the inject queue
overflow.push(task);
return;
} else {
super::counters::inc_num_overflows();
// Push the current task and half of the queue into the
// inject queue.
match self.push_overflow(task, real, tail, overflow, stats) {
Ok(_) => return,
// Lost the race, try again
Err(v) => {
task = v;
}
}
}
};
self.push_back_finish(task, tail);
}
// Second half of `push_back`
fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
// Map the position to a slot index.
let idx = tail as usize & self.inner.mask;
self.inner.buffer[idx].with_mut(|ptr| {
// Write the task to the slot
//
// Safety: There is only one producer and the above `if`
// condition ensures we don't touch a cell if there is a
// value, thus no consumer.
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
// Make the task available. Synchronizes with a load in
// `steal_into2`.
self.inner.tail.store(tail.wrapping_add(1), Release);
}
/// Moves a batch of tasks into the inject queue.
///
/// This will temporarily make some of the tasks unavailable to stealers.
/// Once `push_overflow` is done, a notification is sent out, so if other
/// workers "missed" some of the tasks during a steal, they will get
/// another opportunity.
#[inline(never)]
fn push_overflow<O: Overflow<T>>(
&mut self,
task: task::Notified<T>,
head: UnsignedShort,
tail: UnsignedShort,
overflow: &O,
stats: &mut Stats,
) -> Result<(), task::Notified<T>> {
// How many elements are we taking from the local queue.
//
// This is one less than the number of tasks pushed to the inject
// queue as we are also inserting the `task` argument.
let num_tasks_taken: UnsignedShort = (self.inner.buffer.len() / 2) as UnsignedShort;
assert_eq!(
tail.wrapping_sub(head) as usize,
self.inner.buffer.len(),
"queue is not full; tail = {}; head = {}",
tail,
head
);
let prev = pack(head, head);
// Claim a bunch of tasks
//
// We are claiming the tasks **before** reading them out of the buffer.
// This is safe because only the **current** thread is able to push new
// tasks.
//
// There isn't really any need for memory ordering... Relaxed would
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
if self
.inner
.head
.compare_exchange(
prev,
pack(
head.wrapping_add(num_tasks_taken),
head.wrapping_add(num_tasks_taken),
),
Release,
Relaxed,
)
.is_err()
{
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
return Err(task);
}
/// An iterator that takes elements out of the run queue.
struct BatchTaskIter<'a, T: 'static> {
buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>],
mask: usize,
head: UnsignedLong,
i: UnsignedLong,
num: UnsignedShort,
}
impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
type Item = task::Notified<T>;
#[inline]
fn next(&mut self) -> Option<task::Notified<T>> {
if self.i == UnsignedLong::from(self.num) {
None
} else {
let i_idx = self.i.wrapping_add(self.head) as usize & self.mask;
let slot = &self.buffer[i_idx];
// safety: Our CAS from before has assumed exclusive ownership
// of the task pointers in this range.
let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
self.i += 1;
Some(task)
}
}
}
// safety: The CAS above ensures that no consumer will look at these
// values again, and we are the only producer.
let batch_iter = BatchTaskIter {
buffer: &self.inner.buffer,
mask: self.inner.mask,
head: head as UnsignedLong,
i: 0,
num: num_tasks_taken,
};
overflow.push_batch(batch_iter.chain(std::iter::once(task)));
// Add 1 to factor in the task currently being scheduled.
stats.incr_overflow_count();
Ok(())
}
/// Pops a task from the local queue.
pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
let mut head = self.inner.head.load(Acquire);
let idx = loop {
let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
if real == tail {
// queue is empty
return None;
}
let next_real = real.wrapping_add(1);
// If `steal == real` there are no concurrent stealers. Both `steal`
// and `real` are updated.
let next = if steal == real {
pack(next_real, next_real)
} else {
assert_ne!(steal, next_real);
pack(steal, next_real)
};
// Attempt to claim a task.
let res = self
.inner
.head
.compare_exchange(head, next, AcqRel, Acquire);
match res {
Ok(_) => break real as usize & self.inner.mask,
Err(actual) => head = actual,
}
};
Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
}
}
impl<T> Steal<T> {
pub(crate) fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// Steals half the tasks from self and place them into `dst`.
pub(crate) fn steal_into(
&self,
dst: &mut Local<T>,
dst_stats: &mut Stats,
) -> Option<task::Notified<T>> {
// Safety: the caller is the only thread that mutates `dst.tail` and
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
// To the caller, `dst` may **look** empty but still have values
// contained in the buffer. If another thread is concurrently stealing
// from `dst` there may not be enough capacity to steal.
let (steal, _) = unpack(dst.inner.head.load(Acquire));
if dst_tail.wrapping_sub(steal) > self.0.buffer.len() as UnsignedShort / 2 {
// we *could* try to steal less here, but for simplicity, we're just
// going to abort.
return None;
}
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
if n == 0 {
// No tasks were stolen
return None;
}
super::counters::inc_num_steals();
dst_stats.incr_steal_count(n as u16);
dst_stats.incr_steal_operations();
// We are returning a task here
n -= 1;
let ret_pos = dst_tail.wrapping_add(n);
let ret_idx = ret_pos as usize & dst.inner.mask;
// safety: the value was written as part of `steal_into2` and not
// exposed to stealers, so no other thread can access it.
let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
if n == 0 {
// The `dst` queue is empty, but a single task was stolen
return Some(ret);
}
// Make the stolen items available to consumers
dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
Some(ret)
}
// Steal tasks from `self`, placing them into `dst`. Returns the number of
// tasks that were stolen.
fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
let mut prev_packed = self.0.head.load(Acquire);
let mut next_packed;
let n = loop {
let (src_head_steal, src_head_real) = unpack(prev_packed);
let src_tail = self.0.tail.load(Acquire);
// If these two do not match, another thread is concurrently
// stealing from the queue.
if src_head_steal != src_head_real {
return 0;
}
// Number of available tasks to steal
let n = src_tail.wrapping_sub(src_head_real);
let n = n - n / 2;
if n == 0 {
// No tasks available to steal
return 0;
}
// Update the real head index to acquire the tasks.
let steal_to = src_head_real.wrapping_add(n);
assert_ne!(src_head_steal, steal_to);
next_packed = pack(src_head_steal, steal_to);
// Claim all those tasks. This is done by incrementing the "real"
// head but not the steal. By doing this, no other thread is able to
// steal from this queue until the current thread completes.
let res = self
.0
.head
.compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
match res {
Ok(_) => break n,
Err(actual) => prev_packed = actual,
}
};
debug_assert!(
n <= (self.0.buffer.len() - self.0.buffer.len() / 2) as UnsignedShort,
"actual = {}",
n
);
let (first, _) = unpack(next_packed);
// Take all the tasks
for i in 0..n {
// Compute the positions
let src_pos = first.wrapping_add(i);
let dst_pos = dst_tail.wrapping_add(i);
// Map to slots
let src_idx = src_pos as usize & self.0.mask;
let dst_idx = dst_pos as usize & self.0.mask;
// Read the task
//
// safety: We acquired the task with the atomic exchange above.
let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
// Write the task to the new slot
//
// safety: `dst` queue is empty and we are the only producer to
// this queue.
dst.inner.buffer[dst_idx]
.with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
}
let mut prev_packed = next_packed;
// Update `src_head_steal` to match `src_head_real` signalling that the
// stealing routine is complete.
loop {
let head = unpack(prev_packed).1;
next_packed = pack(head, head);
let res = self
.0
.head
.compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
match res {
Ok(_) => return n,
Err(actual) => {
let (actual_steal, actual_real) = unpack(actual);
assert_ne!(actual_steal, actual_real);
prev_packed = actual;
}
}
}
}
}
cfg_unstable_metrics! {
impl<T> Steal<T> {
pub(crate) fn len(&self) -> usize {
self.0.len() as _
}
}
}
impl<T> Clone for Steal<T> {
fn clone(&self) -> Steal<T> {
Steal(self.0.clone())
}
}
impl<T> Drop for Local<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
}
}
}
impl<T> Inner<T> {
fn remaining_slots(&self) -> usize {
let (steal, _) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
self.buffer.len() - (tail.wrapping_sub(steal) as usize)
}
fn len(&self) -> UnsignedShort {
let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
tail.wrapping_sub(head)
}
fn is_empty(&self) -> bool {
self.len() == 0
}
}
/// Split the head value into the real head and the index a stealer is working
/// on.
fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
let real = n & UnsignedShort::MAX as UnsignedLong;
let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
(steal as UnsignedShort, real as UnsignedShort)
}
/// Join the two head values
fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
(real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
}

View File

@ -1,175 +0,0 @@
use crate::runtime::{Config, MetricsBatch, WorkerMetrics};
use std::cmp;
use std::time::{Duration, Instant};
/// Per-worker statistics. This is used for both tuning the scheduler and
/// reporting runtime-level metrics/stats.
pub(crate) struct Stats {
/// The metrics batch used to report runtime-level metrics/stats to the
/// user.
batch: MetricsBatch,
/// Exponentially-weighted moving average of time spent polling scheduled a
/// task.
///
/// Tracked in nanoseconds, stored as a `f64` since that is what we use with
/// the EWMA calculations
task_poll_time_ewma: f64,
}
/// Transient state
pub(crate) struct Ephemeral {
/// Instant at which work last resumed (continued after park).
///
/// This duplicates the value stored in `MetricsBatch`. We will unify
/// `Stats` and `MetricsBatch` when we stabilize metrics.
processing_scheduled_tasks_started_at: Instant,
/// Number of tasks polled in the batch of scheduled tasks
tasks_polled_in_batch: usize,
/// Used to ensure calls to start / stop batch are paired
#[cfg(debug_assertions)]
batch_started: bool,
}
impl Ephemeral {
pub(crate) fn new() -> Ephemeral {
Ephemeral {
processing_scheduled_tasks_started_at: Instant::now(),
tasks_polled_in_batch: 0,
#[cfg(debug_assertions)]
batch_started: false,
}
}
}
/// How to weigh each individual poll time, value is plucked from thin air.
const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1;
/// Ideally, we wouldn't go above this, value is plucked from thin air.
const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64;
/// Max value for the global queue interval. This is 2x the previous default
const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127;
/// This is the previous default
const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61;
impl Stats {
pub(crate) const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 =
TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL;
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats {
// Seed the value with what we hope to see.
let task_poll_time_ewma =
TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64;
Stats {
batch: MetricsBatch::new(worker_metrics),
task_poll_time_ewma,
}
}
pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 {
// If an interval is explicitly set, don't tune.
if let Some(configured) = config.global_queue_interval {
return configured;
}
// As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here.
let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;
cmp::max(
// If we are using self-tuning, we don't want to return less than 2 as that would result in the
// global queue always getting checked first.
2,
cmp::min(
MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL,
tasks_per_interval,
),
)
}
pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to, self.task_poll_time_ewma as u64);
}
pub(crate) fn about_to_park(&mut self) {
self.batch.about_to_park();
}
pub(crate) fn unparked(&mut self) {
self.batch.unparked();
}
pub(crate) fn inc_local_schedule_count(&mut self) {
self.batch.inc_local_schedule_count();
}
pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
self.batch.start_processing_scheduled_tasks();
#[cfg(debug_assertions)]
{
debug_assert!(!ephemeral.batch_started);
ephemeral.batch_started = true;
}
ephemeral.processing_scheduled_tasks_started_at = Instant::now();
ephemeral.tasks_polled_in_batch = 0;
}
pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
self.batch.end_processing_scheduled_tasks();
#[cfg(debug_assertions)]
{
debug_assert!(ephemeral.batch_started);
ephemeral.batch_started = false;
}
// Update the EWMA task poll time
if ephemeral.tasks_polled_in_batch > 0 {
let now = Instant::now();
// If we "overflow" this conversion, we have bigger problems than
// slightly off stats.
let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64;
let num_polls = ephemeral.tasks_polled_in_batch as f64;
// Calculate the mean poll duration for a single task in the batch
let mean_poll_duration = elapsed / num_polls;
// Compute the alpha weighted by the number of tasks polled this batch.
let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls);
// Now compute the new weighted average task poll time.
self.task_poll_time_ewma = weighted_alpha * mean_poll_duration
+ (1.0 - weighted_alpha) * self.task_poll_time_ewma;
}
}
pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) {
self.batch.start_poll();
ephemeral.tasks_polled_in_batch += 1;
}
pub(crate) fn end_poll(&mut self) {
self.batch.end_poll();
}
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.batch.incr_steal_count(by);
}
pub(crate) fn incr_steal_operations(&mut self) {
self.batch.incr_steal_operations();
}
pub(crate) fn incr_overflow_count(&mut self) {
self.batch.incr_overflow_count();
}
}

View File

@ -1,61 +0,0 @@
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Barrier, Mutex};
use crate::runtime::dump::Dump;
use crate::runtime::scheduler::multi_thread_alt::Handle;
use crate::sync::notify::Notify;
/// Tracing status of the worker.
pub(super) struct TraceStatus {
pub(super) trace_requested: AtomicBool,
pub(super) trace_start: Barrier,
pub(super) trace_end: Barrier,
pub(super) result_ready: Notify,
pub(super) trace_result: Mutex<Option<Dump>>,
}
impl TraceStatus {
pub(super) fn new(remotes_len: usize) -> Self {
Self {
trace_requested: AtomicBool::new(false),
trace_start: Barrier::new(remotes_len),
trace_end: Barrier::new(remotes_len),
result_ready: Notify::new(),
trace_result: Mutex::new(None),
}
}
pub(super) fn trace_requested(&self) -> bool {
self.trace_requested.load(Ordering::Relaxed)
}
pub(super) async fn start_trace_request(&self, handle: &Handle) {
while self
.trace_requested
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
handle.notify_all();
crate::task::yield_now().await;
}
}
pub(super) fn stash_result(&self, dump: Dump) {
let _ = self.trace_result.lock().insert(dump);
self.result_ready.notify_one();
}
pub(super) fn take_result(&self) -> Option<Dump> {
self.trace_result.lock().take()
}
pub(super) async fn end_trace_request(&self, handle: &Handle) {
while self
.trace_requested
.compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
handle.notify_all();
crate::task::yield_now().await;
}
}
}

View File

@ -1,11 +0,0 @@
pub(super) struct TraceStatus {}
impl TraceStatus {
pub(super) fn new(_: usize) -> Self {
Self {}
}
pub(super) fn trace_requested(&self) -> bool {
false
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,11 +0,0 @@
use super::Shared;
impl Shared {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.inject.len()
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.remotes[worker].steal.len()
}
}

View File

@ -1,79 +0,0 @@
use super::{Core, Handle, Shared};
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread_alt::Stats;
use crate::runtime::task::trace::trace_multi_thread;
use crate::runtime::{dump, WorkerMetrics};
use std::time::Duration;
impl Handle {
pub(super) fn trace_core(&self, mut core: Box<Core>) -> Box<Core> {
core.is_traced = false;
if core.is_shutdown {
return core;
}
// wait for other workers, or timeout without tracing
let timeout = Duration::from_millis(250); // a _very_ generous timeout
let barrier =
if let Some(barrier) = self.shared.trace_status.trace_start.wait_timeout(timeout) {
barrier
} else {
// don't attempt to trace
return core;
};
if !barrier.is_leader() {
// wait for leader to finish tracing
self.shared.trace_status.trace_end.wait();
return core;
}
// trace
let owned = &self.shared.owned;
let mut local = self.shared.steal_all();
let synced = &self.shared.synced;
let injection = &self.shared.inject;
// safety: `trace_multi_thread` is invoked with the same `synced` that `injection`
// was created with.
let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) }
.into_iter()
.map(dump::Task::new)
.collect();
let result = dump::Dump::new(traces);
// stash the result
self.shared.trace_status.stash_result(result);
// allow other workers to proceed
self.shared.trace_status.trace_end.wait();
core
}
}
impl Shared {
/// Steal all tasks from remotes into a single local queue.
pub(super) fn steal_all(&self) -> super::queue::Local<Arc<Handle>> {
let (_steal, mut local) = super::queue::local();
let worker_metrics = WorkerMetrics::new();
let mut stats = Stats::new(&worker_metrics);
for remote in self.remotes.iter() {
let steal = &remote.steal;
while !steal.is_empty() {
if let Some(task) = steal.steal_into(&mut local, &mut stats) {
local.push_back([task].into_iter());
}
}
}
local
}
}

View File

@ -1,7 +0,0 @@
use super::{Core, Handle};
impl Handle {
pub(super) fn trace_core(&self, core: Box<Core>) -> Box<Core> {
core
}
}

View File

@ -202,8 +202,6 @@ pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
#[cfg(feature = "rt-multi-thread")]
scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
scheduler::Context::MultiThreadAlt(_) => unimplemented!(),
}
}
});

View File

@ -1,571 +0,0 @@
#![cfg(tokio_unstable)]
mod queue;
mod shutdown;
mod yield_now;
/// Full runtime loom tests. These are heavy tests and take significant time to
/// run on CI.
///
/// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
///
/// In order to speed up the C
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::{self, Runtime};
use crate::{spawn, task};
use tokio_test::assert_ok;
use loom::sync::atomic::{AtomicBool, AtomicUsize};
use loom::sync::Arc;
use pin_project_lite::pin_project;
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::task::{ready, Context, Poll};
mod atomic_take {
use loom::sync::atomic::AtomicBool;
use std::mem::MaybeUninit;
use std::sync::atomic::Ordering::SeqCst;
pub(super) struct AtomicTake<T> {
inner: MaybeUninit<T>,
taken: AtomicBool,
}
impl<T> AtomicTake<T> {
pub(super) fn new(value: T) -> Self {
Self {
inner: MaybeUninit::new(value),
taken: AtomicBool::new(false),
}
}
pub(super) fn take(&self) -> Option<T> {
// safety: Only one thread will see the boolean change from false
// to true, so that thread is able to take the value.
match self.taken.fetch_or(true, SeqCst) {
false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) },
true => None,
}
}
}
impl<T> Drop for AtomicTake<T> {
fn drop(&mut self) {
drop(self.take());
}
}
}
#[derive(Clone)]
struct AtomicOneshot<T> {
value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>,
}
impl<T> AtomicOneshot<T> {
fn new(sender: oneshot::Sender<T>) -> Self {
Self {
value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)),
}
}
fn assert_send(&self, value: T) {
self.value.take().unwrap().send(value);
}
}
/// Tests are divided into groups to make the runs faster on CI.
mod group_a {
use super::*;
#[test]
fn racy_shutdown() {
loom::model(|| {
let pool = mk_pool(1);
// here's the case we want to exercise:
//
// a worker that still has tasks in its local queue gets sent to the blocking pool (due to
// block_in_place). the blocking pool is shut down, so drops the worker. the worker's
// shutdown method never gets run.
//
// we do this by spawning two tasks on one worker, the first of which does block_in_place,
// and then immediately drop the pool.
pool.spawn(track(async {
crate::task::block_in_place(|| {});
}));
pool.spawn(track(async {}));
drop(pool);
});
}
#[test]
fn pool_multi_spawn() {
loom::model(|| {
let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
let (tx, rx) = oneshot::channel();
let tx1 = AtomicOneshot::new(tx);
// Spawn a task
let c2 = c1.clone();
let tx2 = tx1.clone();
pool.spawn(track(async move {
spawn(track(async move {
if 1 == c1.fetch_add(1, Relaxed) {
tx1.assert_send(());
}
}));
}));
// Spawn a second task
pool.spawn(track(async move {
spawn(track(async move {
if 1 == c2.fetch_add(1, Relaxed) {
tx2.assert_send(());
}
}));
}));
rx.recv();
});
}
fn only_blocking_inner(first_pending: bool) {
loom::model(move || {
let pool = mk_pool(1);
let (block_tx, block_rx) = oneshot::channel();
pool.spawn(track(async move {
crate::task::block_in_place(move || {
block_tx.send(());
});
if first_pending {
task::yield_now().await
}
}));
block_rx.recv();
drop(pool);
});
}
#[test]
fn only_blocking_without_pending() {
only_blocking_inner(false)
}
#[test]
fn only_blocking_with_pending() {
only_blocking_inner(true)
}
}
mod group_b {
use super::*;
fn blocking_and_regular_inner(first_pending: bool) {
const NUM: usize = 3;
loom::model(move || {
let pool = mk_pool(1);
let cnt = Arc::new(AtomicUsize::new(0));
let (block_tx, block_rx) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let done_tx = AtomicOneshot::new(done_tx);
pool.spawn(track(async move {
crate::task::block_in_place(move || {
block_tx.send(());
});
if first_pending {
task::yield_now().await
}
}));
for _ in 0..NUM {
let cnt = cnt.clone();
let done_tx = done_tx.clone();
pool.spawn(track(async move {
if NUM == cnt.fetch_add(1, Relaxed) + 1 {
done_tx.assert_send(());
}
}));
}
done_rx.recv();
block_rx.recv();
drop(pool);
});
}
#[test]
#[ignore] // TODO: uncomment
fn blocking_and_regular_without_pending() {
blocking_and_regular_inner(false);
}
#[test]
fn blocking_and_regular_with_pending() {
blocking_and_regular_inner(true);
}
#[test]
fn join_output() {
loom::model(|| {
let rt = mk_pool(1);
rt.block_on(async {
let t = crate::spawn(track(async { "hello" }));
let out = assert_ok!(t.await);
assert_eq!("hello", out.into_inner());
});
});
}
#[test]
fn poll_drop_handle_then_drop() {
loom::model(|| {
let rt = mk_pool(1);
rt.block_on(async move {
let mut t = crate::spawn(track(async { "hello" }));
poll_fn(|cx| {
let _ = Pin::new(&mut t).poll(cx);
Poll::Ready(())
})
.await;
});
})
}
#[test]
fn complete_block_on_under_load() {
loom::model(|| {
let pool = mk_pool(1);
pool.block_on(async {
// Trigger a re-schedule
crate::spawn(track(async {
for _ in 0..2 {
task::yield_now().await;
}
}));
gated2(true).await
});
});
}
#[test]
fn shutdown_with_notification() {
use crate::sync::oneshot;
loom::model(|| {
let rt = mk_pool(2);
let (done_tx, done_rx) = oneshot::channel::<()>();
rt.spawn(track(async move {
let (tx, rx) = oneshot::channel::<()>();
crate::spawn(async move {
crate::task::spawn_blocking(move || {
let _ = tx.send(());
});
let _ = done_rx.await;
});
let _ = rx.await;
let _ = done_tx.send(());
}));
});
}
}
mod group_c {
use super::*;
#[test]
fn pool_shutdown() {
loom::model(|| {
let pool = mk_pool(2);
pool.spawn(track(async move {
gated2(true).await;
}));
pool.spawn(track(async move {
gated2(false).await;
}));
drop(pool);
});
}
#[test]
fn fill_local_queue() {
const NUM_SPAWNS: usize = 3;
loom::model(|| {
// using std versions here as it is just to control shutdown.
let cnt = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let (tx, rx) = oneshot::channel();
let tx = AtomicOneshot::new(tx);
let pool = runtime::Builder::new_multi_thread_alt()
.worker_threads(2)
// Set the intervals to avoid tuning logic
.global_queue_interval(61)
.local_queue_capacity(1)
.build()
.unwrap();
for _ in 0..NUM_SPAWNS {
let cnt = cnt.clone();
let tx = tx.clone();
pool.spawn(track(async move {
if NUM_SPAWNS == 1 + cnt.fetch_add(1, Relaxed) {
tx.assert_send(());
}
}));
}
rx.recv();
});
}
// This tests a very specific case that happened when a worker has no more
// available work to process because a peer is in the process of stealing
// (but does not finish stealing), and the worker happens to find more work
// from the injection queue *right* before parking.
#[test]
fn pool_concurrent_park_with_steal_with_inject() {
const DEPTH: usize = 4;
let mut model = loom::model::Builder::new();
model.expect_explicit_explore = true;
model.preemption_bound = Some(3);
model.check(|| {
let pool = runtime::Builder::new_multi_thread_alt()
.worker_threads(2)
// Set the intervals to avoid tuning logic
.global_queue_interval(61)
.local_queue_capacity(DEPTH)
.build()
.unwrap();
// Use std types to avoid adding backtracking.
type Flag = std::sync::Arc<std::sync::atomic::AtomicIsize>;
let flag: Flag = Default::default();
let flag1 = flag.clone();
let (tx1, rx1) = oneshot::channel();
async fn task(expect: isize, flag: Flag) {
if expect == flag.load(Relaxed) {
flag.store(expect + 1, Relaxed);
} else {
flag.store(-1, Relaxed);
loom::skip_branch();
}
}
pool.spawn(track(async move {
let flag = flag1;
// First 2 spawned task should be stolen
crate::spawn(task(1, flag.clone()));
crate::spawn(task(2, flag.clone()));
crate::spawn(async move {
task(0, flag.clone()).await;
tx1.send(());
});
// One to fill the LIFO slot
crate::spawn(async move {});
loom::explore();
}));
rx1.recv();
if 1 == flag.load(Relaxed) {
loom::stop_exploring();
let (tx3, rx3) = oneshot::channel();
pool.spawn(async move {
loom::skip_branch();
tx3.send(());
});
pool.spawn(async {});
pool.spawn(async {});
loom::explore();
rx3.recv();
} else {
loom::skip_branch();
}
});
}
}
mod group_d {
use super::*;
#[test]
fn pool_multi_notify() {
loom::model(|| {
let pool = mk_pool(2);
let c1 = Arc::new(AtomicUsize::new(0));
let (done_tx, done_rx) = oneshot::channel();
let done_tx1 = AtomicOneshot::new(done_tx);
let done_tx2 = done_tx1.clone();
// Spawn a task
let c2 = c1.clone();
pool.spawn(track(async move {
multi_gated().await;
if 1 == c1.fetch_add(1, Relaxed) {
done_tx1.assert_send(());
}
}));
// Spawn a second task
pool.spawn(track(async move {
multi_gated().await;
if 1 == c2.fetch_add(1, Relaxed) {
done_tx2.assert_send(());
}
}));
done_rx.recv();
});
}
}
fn mk_pool(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread_alt()
.worker_threads(num_threads)
// Set the intervals to avoid tuning logic
.global_queue_interval(61)
.build()
.unwrap()
}
fn gated2(thread: bool) -> impl Future<Output = &'static str> {
use loom::thread;
use std::sync::Arc;
let gate = Arc::new(AtomicBool::new(false));
let mut fired = false;
poll_fn(move |cx| {
if !fired {
let gate = gate.clone();
let waker = cx.waker().clone();
if thread {
thread::spawn(move || {
gate.store(true, SeqCst);
waker.wake_by_ref();
});
} else {
spawn(track(async move {
gate.store(true, SeqCst);
waker.wake_by_ref();
}));
}
fired = true;
return Poll::Pending;
}
if gate.load(SeqCst) {
Poll::Ready("hello world")
} else {
Poll::Pending
}
})
}
async fn multi_gated() {
struct Gate {
waker: loom::future::AtomicWaker,
count: AtomicUsize,
}
let gate = Arc::new(Gate {
waker: loom::future::AtomicWaker::new(),
count: AtomicUsize::new(0),
});
{
let gate = gate.clone();
spawn(track(async move {
for i in 1..3 {
gate.count.store(i, SeqCst);
gate.waker.wake();
}
}));
}
poll_fn(move |cx| {
gate.waker.register_by_ref(cx.waker());
if gate.count.load(SeqCst) < 2 {
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
}
fn track<T: Future>(f: T) -> Track<T> {
Track {
inner: f,
arc: Arc::new(()),
}
}
pin_project! {
struct Track<T> {
#[pin]
inner: T,
// Arc is used to hook into loom's leak tracking.
arc: Arc<()>,
}
}
impl<T> Track<T> {
fn into_inner(self) -> T {
self.inner
}
}
impl<T: Future> Future for Track<T> {
type Output = Track<T::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
Poll::Ready(Track {
inner: ready!(me.inner.poll(cx)),
arc: me.arc.clone(),
})
}
}

View File

@ -1,205 +0,0 @@
use crate::runtime::scheduler::multi_thread::{queue, Stats};
use crate::runtime::tests::{unowned, NoopSchedule};
use loom::thread;
use std::cell::RefCell;
fn new_stats() -> Stats {
Stats::new(&crate::runtime::WorkerMetrics::new())
}
#[test]
fn basic() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = RefCell::new(vec![]);
let mut stats = new_stats();
let th = thread::spawn(move || {
let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..3 {
if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
while local.pop().is_some() {
n += 1;
}
}
n
});
let mut n = 0;
for _ in 0..2 {
for _ in 0..2 {
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
if local.pop().is_some() {
n += 1;
}
// Push another task
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
while local.pop().is_some() {
n += 1;
}
}
n += inject.borrow_mut().drain(..).count();
n += th.join().unwrap();
assert_eq!(6, n);
});
}
#[test]
fn steal_overflow() {
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = RefCell::new(vec![]);
let mut stats = new_stats();
let th = thread::spawn(move || {
let mut stats = new_stats();
let (_, mut local) = queue::local();
let mut n = 0;
if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
while local.pop().is_some() {
n += 1;
}
n
});
let mut n = 0;
// push a task, pop a task
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
if local.pop().is_some() {
n += 1;
}
for _ in 0..6 {
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
n += th.join().unwrap();
while local.pop().is_some() {
n += 1;
}
n += inject.borrow_mut().drain(..).count();
assert_eq!(7, n);
});
}
#[test]
fn multi_stealer() {
const NUM_TASKS: usize = 5;
fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize {
let mut stats = new_stats();
let (_, mut local) = queue::local();
if steal.steal_into(&mut local, &mut stats).is_none() {
return 0;
}
let mut n = 1;
while local.pop().is_some() {
n += 1;
}
n
}
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = RefCell::new(vec![]);
let mut stats = new_stats();
// Push work
for _ in 0..NUM_TASKS {
let (task, _) = unowned(async {});
local.push_back_or_overflow(task, &inject, &mut stats);
}
let th1 = {
let steal = steal.clone();
thread::spawn(move || steal_tasks(steal))
};
let th2 = thread::spawn(move || steal_tasks(steal));
let mut n = 0;
while local.pop().is_some() {
n += 1;
}
n += inject.borrow_mut().drain(..).count();
n += th1.join().unwrap();
n += th2.join().unwrap();
assert_eq!(n, NUM_TASKS);
});
}
#[test]
fn chained_steal() {
loom::model(|| {
let mut stats = new_stats();
let (s1, mut l1) = queue::local();
let (s2, mut l2) = queue::local();
let inject = RefCell::new(vec![]);
// Load up some tasks
for _ in 0..4 {
let (task, _) = unowned(async {});
l1.push_back_or_overflow(task, &inject, &mut stats);
let (task, _) = unowned(async {});
l2.push_back_or_overflow(task, &inject, &mut stats);
}
// Spawn a task to steal from **our** queue
let th = thread::spawn(move || {
let mut stats = new_stats();
let (_, mut local) = queue::local();
s1.steal_into(&mut local, &mut stats);
while local.pop().is_some() {}
});
// Drain our tasks, then attempt to steal
while l1.pop().is_some() {}
s2.steal_into(&mut l1, &mut stats);
th.join().unwrap();
while l1.pop().is_some() {}
while l2.pop().is_some() {}
});
}

View File

@ -1,28 +0,0 @@
use crate::runtime::{Builder, Handle};
#[test]
fn join_handle_cancel_on_shutdown() {
let mut builder = loom::model::Builder::new();
builder.preemption_bound = Some(2);
builder.check(|| {
use futures::future::FutureExt;
let rt = Builder::new_multi_thread()
.worker_threads(2)
.build()
.unwrap();
let handle = rt.block_on(async move { Handle::current() });
let jh1 = handle.spawn(futures::future::pending::<()>());
drop(rt);
let jh2 = handle.spawn(futures::future::pending::<()>());
let err1 = jh1.now_or_never().unwrap().unwrap_err();
let err2 = jh2.now_or_never().unwrap().unwrap_err();
assert!(err1.is_cancelled());
assert!(err2.is_cancelled());
});
}

View File

@ -1,38 +0,0 @@
use crate::runtime::park;
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::{self, Runtime};
#[test]
#[ignore]
fn yield_calls_park_before_scheduling_again() {
// Don't need to check all permutations
let mut loom = loom::model::Builder::default();
loom.max_permutations = Some(1);
loom.check(|| {
let rt = mk_runtime(2);
let (tx, rx) = oneshot::channel::<()>();
rt.spawn(async {
let tid = loom::thread::current().id();
let park_count = park::current_thread_park_count();
crate::task::yield_now().await;
if tid == loom::thread::current().id() {
let new_park_count = park::current_thread_park_count();
assert_eq!(park_count + 1, new_park_count);
}
tx.send(());
});
rx.recv();
});
}
fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.build()
.unwrap()
}

View File

@ -62,7 +62,6 @@ cfg_loom! {
mod loom_join_set;
mod loom_local;
mod loom_multi_thread;
mod loom_multi_thread_alt;
mod loom_oneshot;
// Make sure debug assertions are enabled

View File

@ -663,8 +663,6 @@ cfg_rt! {
Some(scheduler::Context::CurrentThread(_ctx)) => 0,
#[cfg(feature = "rt-multi-thread")]
Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32,
None => context::thread_rng_n(shard_size),
});
id % shard_size

View File

@ -1,744 +0,0 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))]
#![cfg(tokio_unstable)]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll, Waker};
macro_rules! cfg_metrics {
($($t:tt)*) => {
#[cfg(all(tokio_unstable, target_has_atomic = "64"))]
{
$( $t )*
}
}
}
#[test]
fn single_thread() {
// No panic when starting a runtime w/ a single thread
let _ = runtime::Builder::new_multi_thread_alt()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
}
#[test]
#[ignore] // https://github.com/tokio-rs/tokio/issues/5995
fn many_oneshot_futures() {
// used for notifying the main thread
const NUM: usize = 1_000;
for _ in 0..5 {
let (tx, rx) = mpsc::channel();
let rt = rt();
let cnt = Arc::new(AtomicUsize::new(0));
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
drop(rt);
}
}
#[test]
fn spawn_two() {
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tokio::spawn(async move {
tx.send("ZOMG").unwrap();
});
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
cfg_metrics! {
let metrics = rt.metrics();
drop(rt);
assert_eq!(1, metrics.remote_schedule_count());
let mut local = 0;
for i in 0..metrics.num_workers() {
local += metrics.worker_local_schedule_count(i);
}
assert_eq!(1, local);
}
}
#[test]
fn many_multishot_futures() {
const CHAIN: usize = 200;
const CYCLES: usize = 5;
const TRACKS: usize = 50;
for _ in 0..50 {
let rt = rt();
let mut start_txs = Vec::with_capacity(TRACKS);
let mut final_rxs = Vec::with_capacity(TRACKS);
for _ in 0..TRACKS {
let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10);
for _ in 0..CHAIN {
let (next_tx, next_rx) = tokio::sync::mpsc::channel(10);
// Forward all the messages
rt.spawn(async move {
while let Some(v) = chain_rx.recv().await {
next_tx.send(v).await.unwrap();
}
});
chain_rx = next_rx;
}
// This final task cycles if needed
let (final_tx, final_rx) = tokio::sync::mpsc::channel(10);
let cycle_tx = start_tx.clone();
let mut rem = CYCLES;
rt.spawn(async move {
for _ in 0..CYCLES {
let msg = chain_rx.recv().await.unwrap();
rem -= 1;
if rem == 0 {
final_tx.send(msg).await.unwrap();
} else {
cycle_tx.send(msg).await.unwrap();
}
}
});
start_txs.push(start_tx);
final_rxs.push(final_rx);
}
{
rt.block_on(async move {
for start_tx in start_txs {
start_tx.send("ping").await.unwrap();
}
for mut final_rx in final_rxs {
final_rx.recv().await.unwrap();
}
});
}
}
}
#[test]
fn lifo_slot_budget() {
async fn my_fn() {
spawn_another();
}
fn spawn_another() {
tokio::spawn(my_fn());
}
let rt = runtime::Builder::new_multi_thread_alt()
.enable_all()
.worker_threads(1)
.build()
.unwrap();
let (send, recv) = oneshot::channel();
rt.spawn(async move {
tokio::spawn(my_fn());
let _ = send.send(());
});
let _ = rt.block_on(recv);
}
#[test]
fn spawn_shutdown() {
let rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async {
tokio::spawn(client_server(tx.clone()));
});
// Use spawner
rt.spawn(client_server(tx));
assert_ok!(rx.recv());
assert_ok!(rx.recv());
drop(rt);
assert_err!(rx.try_recv());
}
async fn client_server(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
tokio::spawn(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
}
#[test]
fn drop_threadpool_drops_futures() {
for _ in 0..1_000 {
let num_inc = Arc::new(AtomicUsize::new(0));
let num_dec = Arc::new(AtomicUsize::new(0));
let num_drop = Arc::new(AtomicUsize::new(0));
struct Never(Arc<AtomicUsize>);
impl Future for Never {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}
impl Drop for Never {
fn drop(&mut self) {
self.0.fetch_add(1, Relaxed);
}
}
let a = num_inc.clone();
let b = num_dec.clone();
let rt = runtime::Builder::new_multi_thread_alt()
.enable_all()
.on_thread_start(move || {
a.fetch_add(1, Relaxed);
})
.on_thread_stop(move || {
b.fetch_add(1, Relaxed);
})
.build()
.unwrap();
rt.spawn(Never(num_drop.clone()));
// Wait for the pool to shutdown
drop(rt);
// Assert that only a single thread was spawned.
let a = num_inc.load(Relaxed);
assert!(a >= 1);
// Assert that all threads shutdown
let b = num_dec.load(Relaxed);
assert_eq!(a, b);
// Assert that the future was dropped
let c = num_drop.load(Relaxed);
assert_eq!(c, 1);
}
}
#[test]
fn start_stop_callbacks_called() {
use std::sync::atomic::{AtomicUsize, Ordering};
let after_start = Arc::new(AtomicUsize::new(0));
let before_stop = Arc::new(AtomicUsize::new(0));
let after_inner = after_start.clone();
let before_inner = before_stop.clone();
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.enable_all()
.on_thread_start(move || {
after_inner.clone().fetch_add(1, Ordering::Relaxed);
})
.on_thread_stop(move || {
before_inner.clone().fetch_add(1, Ordering::Relaxed);
})
.build()
.unwrap();
let (tx, rx) = oneshot::channel();
rt.spawn(async move {
assert_ok!(tx.send(()));
});
assert_ok!(rt.block_on(rx));
drop(rt);
assert!(after_start.load(Ordering::Relaxed) > 0);
assert!(before_stop.load(Ordering::Relaxed) > 0);
}
#[test]
fn blocking_task() {
// used for notifying the main thread
const NUM: usize = 1_000;
for _ in 0..10 {
let (tx, rx) = mpsc::channel();
let rt = rt();
let cnt = Arc::new(AtomicUsize::new(0));
// there are four workers in the pool
// so, if we run 4 blocking tasks, we know that handoff must have happened
let block = Arc::new(std::sync::Barrier::new(5));
for _ in 0..4 {
let block = block.clone();
rt.spawn(async move {
tokio::task::block_in_place(move || {
block.wait();
block.wait();
})
});
}
block.wait();
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
block.wait();
}
}
#[test]
fn multi_threadpool() {
use tokio::sync::oneshot;
let rt1 = rt();
let rt2 = rt();
let (tx, rx) = oneshot::channel();
let (done_tx, done_rx) = mpsc::channel();
rt2.spawn(async move {
rx.await.unwrap();
done_tx.send(()).unwrap();
});
rt1.spawn(async move {
tx.send(()).unwrap();
});
done_rx.recv().unwrap();
}
// When `block_in_place` returns, it attempts to reclaim the yielded runtime
// worker. In this case, the remainder of the task is on the runtime worker and
// must take part in the cooperative task budgeting system.
//
// The test ensures that, when this happens, attempting to consume from a
// channel yields occasionally even if there are values ready to receive.
#[test]
fn coop_and_block_in_place() {
let rt = tokio::runtime::Builder::new_multi_thread_alt()
// Setting max threads to 1 prevents another thread from claiming the
// runtime worker yielded as part of `block_in_place` and guarantees the
// same thread will reclaim the worker at the end of the
// `block_in_place` call.
.max_blocking_threads(1)
.build()
.unwrap();
rt.block_on(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
// Fill the channel
for _ in 0..1024 {
tx.send(()).await.unwrap();
}
drop(tx);
tokio::spawn(async move {
// Block in place without doing anything
tokio::task::block_in_place(|| {});
// Receive all the values, this should trigger a `Pending` as the
// coop limit will be reached.
poll_fn(|cx| {
while let Poll::Ready(v) = {
tokio::pin! {
let fut = rx.recv();
}
Pin::new(&mut fut).poll(cx)
} {
if v.is_none() {
panic!("did not yield");
}
}
Poll::Ready(())
})
.await
})
.await
.unwrap();
});
}
#[test]
fn yield_after_block_in_place() {
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.worker_threads(1)
.build()
.unwrap();
rt.block_on(async {
tokio::spawn(async move {
// Block in place then enter a new runtime
tokio::task::block_in_place(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async {});
});
// Yield, then complete
tokio::task::yield_now().await;
})
.await
.unwrap()
});
}
// Testing this does not panic
#[test]
fn max_blocking_threads() {
let _rt = tokio::runtime::Builder::new_multi_thread_alt()
.max_blocking_threads(1)
.build()
.unwrap();
}
#[test]
#[should_panic]
fn max_blocking_threads_set_to_zero() {
let _rt = tokio::runtime::Builder::new_multi_thread_alt()
.max_blocking_threads(0)
.build()
.unwrap();
}
/// Regression test for #6445.
///
/// After #6445, setting `global_queue_interval` to 1 is now technically valid.
/// This test confirms that there is no regression in `multi_thread_runtime`
/// when global_queue_interval is set to 1.
#[test]
fn global_queue_interval_set_to_one() {
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.global_queue_interval(1)
.build()
.unwrap();
// Perform a simple work.
let cnt = Arc::new(AtomicUsize::new(0));
rt.block_on(async {
let mut set = tokio::task::JoinSet::new();
for _ in 0..10 {
let cnt = cnt.clone();
set.spawn(async move { cnt.fetch_add(1, Relaxed) });
}
while let Some(res) = set.join_next().await {
res.unwrap();
}
});
assert_eq!(cnt.load(Relaxed), 10);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
tokio::spawn(async move {
tokio::task::block_in_place(|| sync_rx.recv().ok());
});
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
drop(sync_tx);
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
/// Demonstrates tokio-rs/tokio#3869
#[test]
fn wake_during_shutdown() {
struct Shared {
waker: Option<Waker>,
}
struct MyFuture {
shared: Arc<Mutex<Shared>>,
put_waker: bool,
}
impl MyFuture {
fn new() -> (Self, Self) {
let shared = Arc::new(Mutex::new(Shared { waker: None }));
let f1 = MyFuture {
shared: shared.clone(),
put_waker: true,
};
let f2 = MyFuture {
shared,
put_waker: false,
};
(f1, f2)
}
}
impl Future for MyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let me = Pin::into_inner(self);
let mut lock = me.shared.lock().unwrap();
if me.put_waker {
lock.waker = Some(cx.waker().clone());
}
Poll::Pending
}
}
impl Drop for MyFuture {
fn drop(&mut self) {
let mut lock = self.shared.lock().unwrap();
if !self.put_waker {
lock.waker.take().unwrap().wake();
}
drop(lock);
}
}
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let (f1, f2) = MyFuture::new();
rt.spawn(f1);
rt.spawn(f2);
rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
}
#[should_panic]
#[tokio::test]
async fn test_block_in_place1() {
tokio::task::block_in_place(|| {});
}
#[tokio::test(flavor = "multi_thread")]
async fn test_block_in_place2() {
tokio::task::block_in_place(|| {});
}
#[should_panic]
#[tokio::main(flavor = "current_thread")]
#[test]
async fn test_block_in_place3() {
tokio::task::block_in_place(|| {});
}
#[tokio::main]
#[test]
async fn test_block_in_place4() {
tokio::task::block_in_place(|| {});
}
// Testing the tuning logic is tricky as it is inherently timing based, and more
// of a heuristic than an exact behavior. This test checks that the interval
// changes over time based on load factors. There are no assertions, completion
// is sufficient. If there is a regression, this test will hang. In theory, we
// could add limits, but that would be likely to fail on CI.
#[test]
#[cfg(not(tokio_no_tuning_tests))]
fn test_tuning() {
use std::sync::atomic::AtomicBool;
use std::time::Duration;
let rt = runtime::Builder::new_multi_thread_alt()
.worker_threads(1)
.build()
.unwrap();
fn iter(flag: Arc<AtomicBool>, counter: Arc<AtomicUsize>, stall: bool) {
if flag.load(Relaxed) {
if stall {
std::thread::sleep(Duration::from_micros(5));
}
counter.fetch_add(1, Relaxed);
tokio::spawn(async move { iter(flag, counter, stall) });
}
}
let flag = Arc::new(AtomicBool::new(true));
let counter = Arc::new(AtomicUsize::new(61));
let interval = Arc::new(AtomicUsize::new(61));
{
let flag = flag.clone();
let counter = counter.clone();
rt.spawn(async move { iter(flag, counter, true) });
}
// Now, hammer the injection queue until the interval drops.
let mut n = 0;
loop {
let curr = interval.load(Relaxed);
if curr <= 8 {
n += 1;
} else {
n = 0;
}
// Make sure we get a few good rounds. Jitter in the tuning could result
// in one "good" value without being representative of reaching a good
// state.
if n == 3 {
break;
}
if Arc::strong_count(&interval) < 5_000 {
let counter = counter.clone();
let interval = interval.clone();
rt.spawn(async move {
let prev = counter.swap(0, Relaxed);
interval.store(prev, Relaxed);
});
std::thread::yield_now();
}
}
flag.store(false, Relaxed);
let w = Arc::downgrade(&interval);
drop(interval);
while w.strong_count() > 0 {
std::thread::sleep(Duration::from_micros(500));
}
// Now, run it again with a faster task
let flag = Arc::new(AtomicBool::new(true));
// Set it high, we know it shouldn't ever really be this high
let counter = Arc::new(AtomicUsize::new(10_000));
let interval = Arc::new(AtomicUsize::new(10_000));
{
let flag = flag.clone();
let counter = counter.clone();
rt.spawn(async move { iter(flag, counter, false) });
}
// Now, hammer the injection queue until the interval reaches the expected range.
let mut n = 0;
loop {
let curr = interval.load(Relaxed);
if curr <= 1_000 && curr > 32 {
n += 1;
} else {
n = 0;
}
if n == 3 {
break;
}
if Arc::strong_count(&interval) <= 5_000 {
let counter = counter.clone();
let interval = interval.clone();
rt.spawn(async move {
let prev = counter.swap(0, Relaxed);
interval.store(prev, Relaxed);
});
}
std::thread::yield_now();
}
flag.store(false, Relaxed);
}
fn rt() -> runtime::Runtime {
runtime::Builder::new_multi_thread_alt()
.enable_all()
.build()
.unwrap()
}