diff --git a/tokio/src/runtime/scheduler/multi_thread/idle.rs b/tokio/src/runtime/scheduler/multi_thread/idle.rs index c41d80e9b..53ee01106 100644 --- a/tokio/src/runtime/scheduler/multi_thread/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/idle.rs @@ -217,11 +217,6 @@ impl Idle { } pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) { - // First, set the shutdown flag on each core - for core in &mut synced.idle.available_cores { - core.is_shutdown = true; - } - // Wake every sleeping worker and assign a core to it. There may not be // enough sleeping workers for all cores, but other workers will // eventually find the cores and shut them down. diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index f01daaa1b..cc641df54 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -1,6 +1,7 @@ use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; use std::cmp; +use std::f32::consts::E; use std::time::{Duration, Instant}; /// Per-worker statistics. This is used for both tuning the scheduler and @@ -10,6 +11,16 @@ pub(crate) struct Stats { /// user. batch: MetricsBatch, + /// Exponentially-weighted moving average of time spent polling scheduled a + /// task. + /// + /// Tracked in nanoseconds, stored as a f64 since that is what we use with + /// the EWMA calculations + task_poll_time_ewma: f64, +} + +/// Transient state +pub(crate) struct Ephemeral { /// Instant at which work last resumed (continued after park). /// /// This duplicates the value stored in `MetricsBatch`. We will unify @@ -18,13 +29,15 @@ pub(crate) struct Stats { /// Number of tasks polled in the batch of scheduled tasks tasks_polled_in_batch: usize, +} - /// Exponentially-weighted moving average of time spent polling scheduled a - /// task. - /// - /// Tracked in nanoseconds, stored as a f64 since that is what we use with - /// the EWMA calculations - task_poll_time_ewma: f64, +impl Ephemeral { + pub(crate) fn new() -> Ephemeral { + Ephemeral { + processing_scheduled_tasks_started_at: Instant::now(), + tasks_polled_in_batch: 0, + } + } } /// How to weigh each individual poll time, value is plucked from thin air. @@ -47,8 +60,6 @@ impl Stats { Stats { batch: MetricsBatch::new(worker_metrics), - processing_scheduled_tasks_started_at: Instant::now(), - tasks_polled_in_batch: 0, task_poll_time_ewma, } } @@ -85,24 +96,24 @@ impl Stats { self.batch.inc_local_schedule_count(); } - pub(crate) fn start_processing_scheduled_tasks(&mut self) { + pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) { self.batch.start_processing_scheduled_tasks(); - self.processing_scheduled_tasks_started_at = Instant::now(); - self.tasks_polled_in_batch = 0; + ephemeral.processing_scheduled_tasks_started_at = Instant::now(); + ephemeral.tasks_polled_in_batch = 0; } - pub(crate) fn end_processing_scheduled_tasks(&mut self) { + pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) { self.batch.end_processing_scheduled_tasks(); // Update the EWMA task poll time - if self.tasks_polled_in_batch > 0 { + if ephemeral.tasks_polled_in_batch > 0 { let now = Instant::now(); // If we "overflow" this conversion, we have bigger problems than // slightly off stats. - let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64; - let num_polls = self.tasks_polled_in_batch as f64; + let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64; + let num_polls = ephemeral.tasks_polled_in_batch as f64; // Calculate the mean poll duration for a single task in the batch let mean_poll_duration = elapsed / num_polls; @@ -116,10 +127,10 @@ impl Stats { } } - pub(crate) fn start_poll(&mut self) { + pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) { self.batch.start_poll(); - self.tasks_polled_in_batch += 1; + ephemeral.tasks_polled_in_batch += 1; } pub(crate) fn end_poll(&mut self) { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index c9efa663b..1029c77d5 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -60,7 +60,7 @@ use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard}; use crate::runtime; use crate::runtime::context; use crate::runtime::scheduler::multi_thread::{ - idle, queue, Counters, Handle, Idle, Overflow, Stats, TraceStatus, + idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus, }; use crate::runtime::scheduler::{self, inject, Lock}; use crate::runtime::task::OwnedTasks; @@ -70,7 +70,7 @@ use crate::runtime::{ use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; -use std::cell::RefCell; +use std::cell::{Cell, RefCell}; use std::cmp; use std::task::Waker; use std::time::Duration; @@ -88,36 +88,41 @@ cfg_not_taskdump! { } /// A scheduler worker +/// +/// Data is stack-allocated and never migrates threads pub(super) struct Worker { - /* - /// Reference to scheduler's handle - handle: Arc, - - /// This worker's index in `assigned_cores` and `condvars`. - index: usize, - */ - /// Used to collect a list of workers to notify - workers_to_notify: Vec, - - /// Snapshot of idle core list. This helps speedup stealing - idle_snapshot: idle::Snapshot, -} - -/// Core data -pub(super) struct Core { - /// Index holding this core's remote/shared state. - pub(super) index: usize, - /// Used to schedule bookkeeping tasks every so often. tick: u32, + /// True if the scheduler is being shutdown + pub(super) is_shutdown: bool, + + /// True if the scheduler is being traced + is_traced: bool, + /// Counter used to track when to poll from the local queue vs. the /// injection queue num_seq_local_queue_polls: u32, - /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`, - /// they go to the back of the `run_queue`. - lifo_enabled: bool, + /// How often to check the global queue + global_queue_interval: u32, + + /// Used to collect a list of workers to notify + workers_to_notify: Vec, + + /// Snapshot of idle core list. This helps speedup stealing + idle_snapshot: idle::Snapshot, + + stats: stats::Ephemeral, +} + +/// Core data +/// +/// Data is heap-allocated and migrates threads. +#[repr(align(128))] +pub(super) struct Core { + /// Index holding this core's remote/shared state. + pub(super) index: usize, lifo_slot: Option, @@ -128,22 +133,19 @@ pub(super) struct Core { /// involves attempting to steal from other workers. pub(super) is_searching: bool, - /// True if the scheduler is being shutdown - pub(super) is_shutdown: bool, - - /// True if the scheduler is being traced - is_traced: bool, - /// Per-worker runtime stats stats: Stats, - /// How often to check the global queue - global_queue_interval: u32, - /// Fast random number generator. rand: FastRand, } +#[test] +fn test_size_of_core() { + let size_of = std::mem::size_of::(); + assert!(size_of <= 64, "actual={}", size_of); +} + /// State shared across all workers pub(crate) struct Shared { /// Per-core remote state. @@ -229,6 +231,9 @@ pub(crate) struct Context { /// Worker index index: usize, + /// True when the LIFO slot is enabled + lifo_enabled: Cell, + /// Core data core: RefCell>>, @@ -286,15 +291,9 @@ pub(super) fn create( cores.push(Box::new(Core { index: i, - tick: 0, - num_seq_local_queue_polls: 0, - lifo_enabled: !config.disable_lifo_slot, lifo_slot: None, run_queue, is_searching: false, - is_shutdown: false, - is_traced: false, - global_queue_interval: stats.tuned_global_queue_interval(&config), stats, rand: FastRand::from_seed(config.seed_generator.next_seed()), })); @@ -489,8 +488,14 @@ fn run( let num_workers = handle.shared.condvars.len(); let mut worker = Worker { + tick: 0, + num_seq_local_queue_polls: 0, + global_queue_interval: 0, + is_shutdown: false, + is_traced: false, workers_to_notify: Vec::with_capacity(num_workers - 1), idle_snapshot: idle::Snapshot::new(&handle.shared.idle), + stats: stats::Ephemeral::new(), }; let sched_handle = scheduler::Handle::MultiThread(handle.clone()); @@ -499,6 +504,7 @@ fn run( // Set the worker context. let cx = scheduler::Context::MultiThread(Context { index, + lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot), handle, core: RefCell::new(None), handoff_core, @@ -559,7 +565,7 @@ impl Worker { } }; - core.stats.start_processing_scheduled_tasks(); + core.stats.start_processing_scheduled_tasks(&mut self.stats); if let Some(task) = maybe_task { core = self.run_task(cx, core, task)?; @@ -574,7 +580,7 @@ impl Worker { } else { // The only reason to get `None` from `next_task` is we have // entered the shutdown phase. - assert!(core.is_shutdown); + assert!(self.is_shutdown); break; } } @@ -594,7 +600,11 @@ impl Worker { } // Try to acquire an available core, but do not block the thread - fn try_acquire_available_core(&self, cx: &Context, synced: &mut Synced) -> Option> { + fn try_acquire_available_core( + &mut self, + cx: &Context, + synced: &mut Synced, + ) -> Option> { if let Some(mut core) = cx .shared() .idle @@ -608,7 +618,11 @@ impl Worker { } // Block the current thread, waiting for an available core - fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> NextTaskResult { + fn wait_for_core( + &mut self, + cx: &Context, + mut synced: MutexGuard<'_, Synced>, + ) -> NextTaskResult { cx.shared() .idle .transition_worker_to_parked(&mut synced, cx.index); @@ -630,7 +644,7 @@ impl Worker { self.reset_acquired_core(cx, &mut synced, &mut core); - if core.is_shutdown { + if self.is_shutdown { // Currently shutting down, don't do any more work return Ok((None, core)); } @@ -647,7 +661,9 @@ impl Worker { } /// Ensure core's state is set correctly for the worker to start using. - fn reset_acquired_core(&self, cx: &Context, synced: &mut Synced, core: &mut Core) { + fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) { + self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config); + // Reset `lifo_enabled` here in case the core was previously stolen from // a task that had the LIFO slot disabled. self.reset_lifo_enabled(cx, core); @@ -662,15 +678,15 @@ impl Worker { /// Finds the next task to run, this could be from a queue or stealing. If /// none are available, the thread sleeps and tries again. fn next_task(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { - while !core.is_shutdown { + while !self.is_shutdown { self.assert_lifo_enabled_is_correct(cx, &core); - if core.is_traced { + if self.is_traced { core = cx.handle.trace_core(core); } // Increment the tick - core.tick(); + self.tick = self.tick.wrapping_add(1); // Runs maintenance every so often. When maintenance is run, the // driver is checked, which may result in a task being found. @@ -683,13 +699,13 @@ impl Worker { } // We consumed all work in the queues and will start searching for work. - core.stats.end_processing_scheduled_tasks(); + core.stats.end_processing_scheduled_tasks(&mut self.stats); core = n!(self.poll_driver(cx, core)); // Try to steal a task from other workers if let Some(task) = self.steal_work(cx, &mut core) { - core.stats.start_processing_scheduled_tasks(); + core.stats.start_processing_scheduled_tasks(&mut self.stats); return Ok((Some(task), core)); } @@ -707,11 +723,11 @@ impl Worker { Ok((None, core)) } - fn next_notified_task(&self, cx: &Context, core: &mut Core) -> Option { - core.num_seq_local_queue_polls += 1; + fn next_notified_task(&mut self, cx: &Context, core: &mut Core) -> Option { + self.num_seq_local_queue_polls += 1; - if core.num_seq_local_queue_polls % core.global_queue_interval == 0 { - core.num_seq_local_queue_polls = 0; + if self.num_seq_local_queue_polls % self.global_queue_interval == 0 { + self.num_seq_local_queue_polls = 0; // Update the global queue interval, if needed self.tune_global_queue_interval(cx, core); @@ -876,7 +892,7 @@ impl Worker { None } - fn run_task(&self, cx: &Context, mut core: Box, task: Notified) -> RunResult { + fn run_task(&mut self, cx: &Context, mut core: Box, task: Notified) -> RunResult { let task = cx.shared().owned.assert_owner(task); // Make sure the worker is not in the **searching** state. This enables @@ -891,7 +907,7 @@ impl Worker { // tasks under this measurement. In this case, the tasks came from the // LIFO slot and are considered part of the current task for scheduling // purposes. These tasks inherent the "parent"'s limits. - core.stats.start_poll(); + core.stats.start_poll(&mut self.stats); // Make the core available to the runtime context *cx.core.borrow_mut() = Some(core); @@ -936,7 +952,7 @@ impl Worker { .push_back_or_overflow(task, cx.shared(), &mut core.stats); // If we hit this point, the LIFO slot should be enabled. // There is no need to reset it. - debug_assert!(core.lifo_enabled); + debug_assert!(cx.lifo_enabled.get()); return Ok(core); } @@ -952,7 +968,7 @@ impl Worker { // repeatedly schedule the other. To mitigate this, we limit the // number of times the LIFO slot is prioritized. if lifo_polls >= MAX_LIFO_POLLS_PER_TICK { - core.lifo_enabled = false; + cx.lifo_enabled.set(false); super::counters::inc_lifo_capped(); } @@ -1041,15 +1057,15 @@ impl Worker { } fn maybe_maintenance(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { - if core.tick % cx.shared().config.event_interval == 0 { + if self.tick % cx.shared().config.event_interval == 0 { super::counters::inc_num_maintenance(); - core.stats.end_processing_scheduled_tasks(); + core.stats.end_processing_scheduled_tasks(&mut self.stats); // Run regularly scheduled maintenance core = n!(self.park_yield(cx, core)); - core.stats.start_processing_scheduled_tasks(); + core.stats.start_processing_scheduled_tasks(&mut self.stats); } Ok((None, core)) @@ -1059,13 +1075,13 @@ impl Worker { core.stats.submit(&cx.shared().worker_metrics[core.index]); } - fn update_global_flags(&self, cx: &Context, synced: &mut Synced, core: &mut Core) { - if !core.is_shutdown { - core.is_shutdown = cx.shared().inject.is_closed(&synced.inject); + fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) { + if !self.is_shutdown { + self.is_shutdown = cx.shared().inject.is_closed(&synced.inject); } - if !core.is_traced { - core.is_traced = cx.shared().trace_status.trace_requested(); + if !self.is_traced { + self.is_traced = cx.shared().trace_status.trace_requested(); } } @@ -1111,8 +1127,8 @@ impl Worker { } if self.can_transition_to_parked(cx, &mut core) { - debug_assert!(!core.is_shutdown); - debug_assert!(!core.is_traced); + debug_assert!(!self.is_shutdown); + debug_assert!(!self.is_traced); core = n!(self.do_park(cx, core)); } @@ -1166,7 +1182,7 @@ impl Worker { // If the runtime is shutdown, skip parking self.update_global_flags(cx, &mut synced, &mut core); - if core.is_shutdown { + if self.is_shutdown { return Ok((None, core)); } @@ -1232,8 +1248,8 @@ impl Worker { core.lifo_slot.is_none() // cx.shared().remotes[core.index].lifo_slot.is_none() && core.run_queue.is_empty() - && !core.is_shutdown - && !core.is_traced + && !self.is_shutdown + && !self.is_traced } /// Signals all tasks to shut down, and waits for them to complete. Must run @@ -1290,24 +1306,25 @@ impl Worker { } fn reset_lifo_enabled(&self, cx: &Context, core: &mut Core) { - core.lifo_enabled = !cx.handle.shared.config.disable_lifo_slot; + cx.lifo_enabled + .set(!cx.handle.shared.config.disable_lifo_slot); } fn assert_lifo_enabled_is_correct(&self, cx: &Context, core: &Core) { debug_assert_eq!( - core.lifo_enabled, + cx.lifo_enabled.get(), !cx.handle.shared.config.disable_lifo_slot ); } - fn tune_global_queue_interval(&self, cx: &Context, core: &mut Core) { + fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) { let next = core.stats.tuned_global_queue_interval(&cx.shared().config); debug_assert!(next > 1); // Smooth out jitter - if abs_diff(core.global_queue_interval, next) > 2 { - core.global_queue_interval = next; + if abs_diff(self.global_queue_interval, next) > 2 { + self.global_queue_interval = next; } } @@ -1362,7 +1379,7 @@ impl Shared { } fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) { - if core.lifo_enabled { + if cx.lifo_enabled.get() { // Push to the LIFO slot let prev = std::mem::replace(&mut core.lifo_slot, Some(task)); // let prev = cx.shared().remotes[core.index].lifo_slot.swap_local(task); @@ -1471,13 +1488,6 @@ impl task::Schedule for Arc { } } -impl Core { - /// Increment the tick - fn tick(&mut self) { - self.tick = self.tick.wrapping_add(1); - } -} - pub(crate) struct InjectGuard<'a> { lock: crate::loom::sync::MutexGuard<'a, Synced>, }