move fields around

This commit is contained in:
Carl Lerche 2023-06-19 15:10:58 -07:00
parent ce19836b9a
commit bc5128f255
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
3 changed files with 122 additions and 106 deletions

View File

@ -217,11 +217,6 @@ impl Idle {
}
pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
// First, set the shutdown flag on each core
for core in &mut synced.idle.available_cores {
core.is_shutdown = true;
}
// Wake every sleeping worker and assign a core to it. There may not be
// enough sleeping workers for all cores, but other workers will
// eventually find the cores and shut them down.

View File

@ -1,6 +1,7 @@
use crate::runtime::{Config, MetricsBatch, WorkerMetrics};
use std::cmp;
use std::f32::consts::E;
use std::time::{Duration, Instant};
/// Per-worker statistics. This is used for both tuning the scheduler and
@ -10,6 +11,16 @@ pub(crate) struct Stats {
/// user.
batch: MetricsBatch,
/// Exponentially-weighted moving average of time spent polling scheduled a
/// task.
///
/// Tracked in nanoseconds, stored as a f64 since that is what we use with
/// the EWMA calculations
task_poll_time_ewma: f64,
}
/// Transient state
pub(crate) struct Ephemeral {
/// Instant at which work last resumed (continued after park).
///
/// This duplicates the value stored in `MetricsBatch`. We will unify
@ -18,13 +29,15 @@ pub(crate) struct Stats {
/// Number of tasks polled in the batch of scheduled tasks
tasks_polled_in_batch: usize,
}
/// Exponentially-weighted moving average of time spent polling scheduled a
/// task.
///
/// Tracked in nanoseconds, stored as a f64 since that is what we use with
/// the EWMA calculations
task_poll_time_ewma: f64,
impl Ephemeral {
pub(crate) fn new() -> Ephemeral {
Ephemeral {
processing_scheduled_tasks_started_at: Instant::now(),
tasks_polled_in_batch: 0,
}
}
}
/// How to weigh each individual poll time, value is plucked from thin air.
@ -47,8 +60,6 @@ impl Stats {
Stats {
batch: MetricsBatch::new(worker_metrics),
processing_scheduled_tasks_started_at: Instant::now(),
tasks_polled_in_batch: 0,
task_poll_time_ewma,
}
}
@ -85,24 +96,24 @@ impl Stats {
self.batch.inc_local_schedule_count();
}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
self.batch.start_processing_scheduled_tasks();
self.processing_scheduled_tasks_started_at = Instant::now();
self.tasks_polled_in_batch = 0;
ephemeral.processing_scheduled_tasks_started_at = Instant::now();
ephemeral.tasks_polled_in_batch = 0;
}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {
pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
self.batch.end_processing_scheduled_tasks();
// Update the EWMA task poll time
if self.tasks_polled_in_batch > 0 {
if ephemeral.tasks_polled_in_batch > 0 {
let now = Instant::now();
// If we "overflow" this conversion, we have bigger problems than
// slightly off stats.
let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64;
let num_polls = self.tasks_polled_in_batch as f64;
let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64;
let num_polls = ephemeral.tasks_polled_in_batch as f64;
// Calculate the mean poll duration for a single task in the batch
let mean_poll_duration = elapsed / num_polls;
@ -116,10 +127,10 @@ impl Stats {
}
}
pub(crate) fn start_poll(&mut self) {
pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) {
self.batch.start_poll();
self.tasks_polled_in_batch += 1;
ephemeral.tasks_polled_in_batch += 1;
}
pub(crate) fn end_poll(&mut self) {

View File

@ -60,7 +60,7 @@ use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
use crate::runtime;
use crate::runtime::context;
use crate::runtime::scheduler::multi_thread::{
idle, queue, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
};
use crate::runtime::scheduler::{self, inject, Lock};
use crate::runtime::task::OwnedTasks;
@ -70,7 +70,7 @@ use crate::runtime::{
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};
use std::cell::RefCell;
use std::cell::{Cell, RefCell};
use std::cmp;
use std::task::Waker;
use std::time::Duration;
@ -88,36 +88,41 @@ cfg_not_taskdump! {
}
/// A scheduler worker
///
/// Data is stack-allocated and never migrates threads
pub(super) struct Worker {
/*
/// Reference to scheduler's handle
handle: Arc<Handle>,
/// This worker's index in `assigned_cores` and `condvars`.
index: usize,
*/
/// Used to collect a list of workers to notify
workers_to_notify: Vec<usize>,
/// Snapshot of idle core list. This helps speedup stealing
idle_snapshot: idle::Snapshot,
}
/// Core data
pub(super) struct Core {
/// Index holding this core's remote/shared state.
pub(super) index: usize,
/// Used to schedule bookkeeping tasks every so often.
tick: u32,
/// True if the scheduler is being shutdown
pub(super) is_shutdown: bool,
/// True if the scheduler is being traced
is_traced: bool,
/// Counter used to track when to poll from the local queue vs. the
/// injection queue
num_seq_local_queue_polls: u32,
/// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
/// they go to the back of the `run_queue`.
lifo_enabled: bool,
/// How often to check the global queue
global_queue_interval: u32,
/// Used to collect a list of workers to notify
workers_to_notify: Vec<usize>,
/// Snapshot of idle core list. This helps speedup stealing
idle_snapshot: idle::Snapshot,
stats: stats::Ephemeral,
}
/// Core data
///
/// Data is heap-allocated and migrates threads.
#[repr(align(128))]
pub(super) struct Core {
/// Index holding this core's remote/shared state.
pub(super) index: usize,
lifo_slot: Option<Notified>,
@ -128,22 +133,19 @@ pub(super) struct Core {
/// involves attempting to steal from other workers.
pub(super) is_searching: bool,
/// True if the scheduler is being shutdown
pub(super) is_shutdown: bool,
/// True if the scheduler is being traced
is_traced: bool,
/// Per-worker runtime stats
stats: Stats,
/// How often to check the global queue
global_queue_interval: u32,
/// Fast random number generator.
rand: FastRand,
}
#[test]
fn test_size_of_core() {
let size_of = std::mem::size_of::<Core>();
assert!(size_of <= 64, "actual={}", size_of);
}
/// State shared across all workers
pub(crate) struct Shared {
/// Per-core remote state.
@ -229,6 +231,9 @@ pub(crate) struct Context {
/// Worker index
index: usize,
/// True when the LIFO slot is enabled
lifo_enabled: Cell<bool>,
/// Core data
core: RefCell<Option<Box<Core>>>,
@ -286,15 +291,9 @@ pub(super) fn create(
cores.push(Box::new(Core {
index: i,
tick: 0,
num_seq_local_queue_polls: 0,
lifo_enabled: !config.disable_lifo_slot,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
is_traced: false,
global_queue_interval: stats.tuned_global_queue_interval(&config),
stats,
rand: FastRand::from_seed(config.seed_generator.next_seed()),
}));
@ -489,8 +488,14 @@ fn run(
let num_workers = handle.shared.condvars.len();
let mut worker = Worker {
tick: 0,
num_seq_local_queue_polls: 0,
global_queue_interval: 0,
is_shutdown: false,
is_traced: false,
workers_to_notify: Vec::with_capacity(num_workers - 1),
idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
stats: stats::Ephemeral::new(),
};
let sched_handle = scheduler::Handle::MultiThread(handle.clone());
@ -499,6 +504,7 @@ fn run(
// Set the worker context.
let cx = scheduler::Context::MultiThread(Context {
index,
lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot),
handle,
core: RefCell::new(None),
handoff_core,
@ -559,7 +565,7 @@ impl Worker {
}
};
core.stats.start_processing_scheduled_tasks();
core.stats.start_processing_scheduled_tasks(&mut self.stats);
if let Some(task) = maybe_task {
core = self.run_task(cx, core, task)?;
@ -574,7 +580,7 @@ impl Worker {
} else {
// The only reason to get `None` from `next_task` is we have
// entered the shutdown phase.
assert!(core.is_shutdown);
assert!(self.is_shutdown);
break;
}
}
@ -594,7 +600,11 @@ impl Worker {
}
// Try to acquire an available core, but do not block the thread
fn try_acquire_available_core(&self, cx: &Context, synced: &mut Synced) -> Option<Box<Core>> {
fn try_acquire_available_core(
&mut self,
cx: &Context,
synced: &mut Synced,
) -> Option<Box<Core>> {
if let Some(mut core) = cx
.shared()
.idle
@ -608,7 +618,11 @@ impl Worker {
}
// Block the current thread, waiting for an available core
fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> NextTaskResult {
fn wait_for_core(
&mut self,
cx: &Context,
mut synced: MutexGuard<'_, Synced>,
) -> NextTaskResult {
cx.shared()
.idle
.transition_worker_to_parked(&mut synced, cx.index);
@ -630,7 +644,7 @@ impl Worker {
self.reset_acquired_core(cx, &mut synced, &mut core);
if core.is_shutdown {
if self.is_shutdown {
// Currently shutting down, don't do any more work
return Ok((None, core));
}
@ -647,7 +661,9 @@ impl Worker {
}
/// Ensure core's state is set correctly for the worker to start using.
fn reset_acquired_core(&self, cx: &Context, synced: &mut Synced, core: &mut Core) {
fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
self.reset_lifo_enabled(cx, core);
@ -662,15 +678,15 @@ impl Worker {
/// Finds the next task to run, this could be from a queue or stealing. If
/// none are available, the thread sleeps and tries again.
fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
while !core.is_shutdown {
while !self.is_shutdown {
self.assert_lifo_enabled_is_correct(cx, &core);
if core.is_traced {
if self.is_traced {
core = cx.handle.trace_core(core);
}
// Increment the tick
core.tick();
self.tick = self.tick.wrapping_add(1);
// Runs maintenance every so often. When maintenance is run, the
// driver is checked, which may result in a task being found.
@ -683,13 +699,13 @@ impl Worker {
}
// We consumed all work in the queues and will start searching for work.
core.stats.end_processing_scheduled_tasks();
core.stats.end_processing_scheduled_tasks(&mut self.stats);
core = n!(self.poll_driver(cx, core));
// Try to steal a task from other workers
if let Some(task) = self.steal_work(cx, &mut core) {
core.stats.start_processing_scheduled_tasks();
core.stats.start_processing_scheduled_tasks(&mut self.stats);
return Ok((Some(task), core));
}
@ -707,11 +723,11 @@ impl Worker {
Ok((None, core))
}
fn next_notified_task(&self, cx: &Context, core: &mut Core) -> Option<Notified> {
core.num_seq_local_queue_polls += 1;
fn next_notified_task(&mut self, cx: &Context, core: &mut Core) -> Option<Notified> {
self.num_seq_local_queue_polls += 1;
if core.num_seq_local_queue_polls % core.global_queue_interval == 0 {
core.num_seq_local_queue_polls = 0;
if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
self.num_seq_local_queue_polls = 0;
// Update the global queue interval, if needed
self.tune_global_queue_interval(cx, core);
@ -876,7 +892,7 @@ impl Worker {
None
}
fn run_task(&self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
fn run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
let task = cx.shared().owned.assert_owner(task);
// Make sure the worker is not in the **searching** state. This enables
@ -891,7 +907,7 @@ impl Worker {
// tasks under this measurement. In this case, the tasks came from the
// LIFO slot and are considered part of the current task for scheduling
// purposes. These tasks inherent the "parent"'s limits.
core.stats.start_poll();
core.stats.start_poll(&mut self.stats);
// Make the core available to the runtime context
*cx.core.borrow_mut() = Some(core);
@ -936,7 +952,7 @@ impl Worker {
.push_back_or_overflow(task, cx.shared(), &mut core.stats);
// If we hit this point, the LIFO slot should be enabled.
// There is no need to reset it.
debug_assert!(core.lifo_enabled);
debug_assert!(cx.lifo_enabled.get());
return Ok(core);
}
@ -952,7 +968,7 @@ impl Worker {
// repeatedly schedule the other. To mitigate this, we limit the
// number of times the LIFO slot is prioritized.
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
core.lifo_enabled = false;
cx.lifo_enabled.set(false);
super::counters::inc_lifo_capped();
}
@ -1041,15 +1057,15 @@ impl Worker {
}
fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
if core.tick % cx.shared().config.event_interval == 0 {
if self.tick % cx.shared().config.event_interval == 0 {
super::counters::inc_num_maintenance();
core.stats.end_processing_scheduled_tasks();
core.stats.end_processing_scheduled_tasks(&mut self.stats);
// Run regularly scheduled maintenance
core = n!(self.park_yield(cx, core));
core.stats.start_processing_scheduled_tasks();
core.stats.start_processing_scheduled_tasks(&mut self.stats);
}
Ok((None, core))
@ -1059,13 +1075,13 @@ impl Worker {
core.stats.submit(&cx.shared().worker_metrics[core.index]);
}
fn update_global_flags(&self, cx: &Context, synced: &mut Synced, core: &mut Core) {
if !core.is_shutdown {
core.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
if !self.is_shutdown {
self.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
}
if !core.is_traced {
core.is_traced = cx.shared().trace_status.trace_requested();
if !self.is_traced {
self.is_traced = cx.shared().trace_status.trace_requested();
}
}
@ -1111,8 +1127,8 @@ impl Worker {
}
if self.can_transition_to_parked(cx, &mut core) {
debug_assert!(!core.is_shutdown);
debug_assert!(!core.is_traced);
debug_assert!(!self.is_shutdown);
debug_assert!(!self.is_traced);
core = n!(self.do_park(cx, core));
}
@ -1166,7 +1182,7 @@ impl Worker {
// If the runtime is shutdown, skip parking
self.update_global_flags(cx, &mut synced, &mut core);
if core.is_shutdown {
if self.is_shutdown {
return Ok((None, core));
}
@ -1232,8 +1248,8 @@ impl Worker {
core.lifo_slot.is_none()
// cx.shared().remotes[core.index].lifo_slot.is_none()
&& core.run_queue.is_empty()
&& !core.is_shutdown
&& !core.is_traced
&& !self.is_shutdown
&& !self.is_traced
}
/// Signals all tasks to shut down, and waits for them to complete. Must run
@ -1290,24 +1306,25 @@ impl Worker {
}
fn reset_lifo_enabled(&self, cx: &Context, core: &mut Core) {
core.lifo_enabled = !cx.handle.shared.config.disable_lifo_slot;
cx.lifo_enabled
.set(!cx.handle.shared.config.disable_lifo_slot);
}
fn assert_lifo_enabled_is_correct(&self, cx: &Context, core: &Core) {
debug_assert_eq!(
core.lifo_enabled,
cx.lifo_enabled.get(),
!cx.handle.shared.config.disable_lifo_slot
);
}
fn tune_global_queue_interval(&self, cx: &Context, core: &mut Core) {
fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
let next = core.stats.tuned_global_queue_interval(&cx.shared().config);
debug_assert!(next > 1);
// Smooth out jitter
if abs_diff(core.global_queue_interval, next) > 2 {
core.global_queue_interval = next;
if abs_diff(self.global_queue_interval, next) > 2 {
self.global_queue_interval = next;
}
}
@ -1362,7 +1379,7 @@ impl Shared {
}
fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
if core.lifo_enabled {
if cx.lifo_enabled.get() {
// Push to the LIFO slot
let prev = std::mem::replace(&mut core.lifo_slot, Some(task));
// let prev = cx.shared().remotes[core.index].lifo_slot.swap_local(task);
@ -1471,13 +1488,6 @@ impl task::Schedule for Arc<Handle> {
}
}
impl Core {
/// Increment the tick
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
}
}
pub(crate) struct InjectGuard<'a> {
lock: crate::loom::sync::MutexGuard<'a, Synced>,
}