From f71c36920370685c600c9412a62ebe9ca5696485 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 12 Jun 2023 11:53:22 -0700 Subject: [PATCH] fixes --- .../runtime/scheduler/multi_thread/idle.rs | 66 +++++++++++++------ .../runtime/scheduler/multi_thread/worker.rs | 24 +++---- 2 files changed, 55 insertions(+), 35 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/idle.rs b/tokio/src/runtime/scheduler/multi_thread/idle.rs index b4e16000a..79e2e184d 100644 --- a/tokio/src/runtime/scheduler/multi_thread/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/idle.rs @@ -24,24 +24,47 @@ pub(super) struct Idle { pub(super) struct Synced { /// Worker IDs that are currently sleeping sleepers: Vec, + + /// Cores available for workers + available_cores: Vec>, } impl Idle { - pub(super) fn new(num_cores: usize, num_workers: usize) -> (Idle, Synced) { + pub(super) fn new(cores: Vec>, num_workers: usize) -> (Idle, Synced) { let idle = Idle { num_searching: AtomicUsize::new(0), - num_idle: AtomicUsize::new(0), + num_idle: AtomicUsize::new(cores.len()), needs_searching: AtomicBool::new(false), - num_cores, + num_cores: cores.len(), }; let synced = Synced { sleepers: Vec::with_capacity(num_workers), + available_cores: cores, }; (idle, synced) } + pub(super) fn num_idle(&self, synced: &Synced) -> usize { + debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire)); + synced.available_cores.len() + } + + /// Try to acquire an available core + pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option> { + let ret = synced.available_cores.pop(); + + if ret.is_some() { + // Decrement the number of idle cores + let num_idle = self.num_idle.load(Acquire) - 1; + debug_assert_eq!(num_idle, synced.available_cores.len()); + self.num_idle.store(num_idle, Release); + } + + ret + } + /// We need at least one searching worker pub(super) fn notify_local(&self, shared: &Shared) { if self.num_searching.load(Acquire) != 0 { @@ -83,15 +106,15 @@ impl Idle { // Find a sleeping worker if let Some(worker) = synced.idle.sleepers.pop() { // Find an available core - if let Some(mut core) = synced.available_cores.pop() { + if let Some(mut core) = synced.idle.available_cores.pop() { debug_assert!(!core.is_searching); core.is_searching = is_searching; // Assign the core to the worker synced.assigned_cores[worker] = Some(core); - let num_idle = self.num_idle.load(Acquire) - 1; - debug_assert_eq!(num_idle, synced.available_cores.len()); + let num_idle = synced.idle.available_cores.len(); + debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1); // Update the number of sleeping workers self.num_idle.store(num_idle, Release); @@ -127,17 +150,15 @@ impl Idle { workers: &mut Vec, num: usize, ) { - let mut num_idle = self.num_idle.load(Acquire); let mut did_notify = false; for _ in 0..num { if let Some(worker) = synced.idle.sleepers.pop() { - if let Some(core) = synced.available_cores.pop() { + if let Some(core) = synced.idle.available_cores.pop() { debug_assert!(!core.is_searching); synced.assigned_cores[worker] = Some(core); - num_idle -= 1; workers.push(worker); did_notify = true; @@ -151,14 +172,21 @@ impl Idle { } if did_notify { + let num_idle = synced.idle.available_cores.len(); self.num_idle.store(num_idle, Release); } else { + debug_assert_eq!( + synced.idle.available_cores.len(), + self.num_idle.load(Acquire) + ); self.needs_searching.store(true, Release); } } - pub(super) fn notify_shutdown(&self, synced: &mut worker::Synced, shared: &Shared) { - while let Some(core) = synced.available_cores.pop() { + pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) { + while let Some(mut core) = synced.idle.available_cores.pop() { + core.is_shutdown = true; + let worker = synced.idle.sleepers.pop().unwrap(); synced.assigned_cores[worker] = Some(core); @@ -176,13 +204,13 @@ impl Idle { // Check that this isn't the final worker to go idle *and* // `needs_searching` is set. - debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced) > 1); + debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced.idle) > 1); - let num_idle = synced.available_cores.len(); + let num_idle = synced.idle.available_cores.len(); debug_assert_eq!(num_idle, self.num_idle.load(Acquire)); // Store the core in the list of available cores - synced.available_cores.push(core); + synced.idle.available_cores.push(core); // Update `num_idle` self.num_idle.store(num_idle + 1, Release); @@ -236,19 +264,15 @@ impl Idle { /// **must** notify a new worker. pub(super) fn transition_worker_from_searching(&self, core: &mut Core) -> bool { debug_assert!(core.is_searching); + core.is_searching = false; let prev = self.num_searching.fetch_sub(1, AcqRel); debug_assert!(prev > 0); - if prev == 1 { - false - } else { - core.is_searching = false; - false - } + prev == 1 } } -fn num_active_workers(synced: &worker::Synced) -> usize { +fn num_active_workers(synced: &Synced) -> usize { synced.available_cores.capacity() - synced.available_cores.len() } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 4b6c0cb98..c89491286 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -92,7 +92,7 @@ pub(super) struct Worker { /// Reference to scheduler's handle handle: Arc, - /// This worker's index in `available_cores` and `condvars`. + /// This worker's index in `assigned_cores` and `condvars`. index: usize, /// Used to collect a list of workers to notify @@ -126,7 +126,7 @@ pub(super) struct Core { pub(super) is_searching: bool, /// True if the scheduler is being shutdown - is_shutdown: bool, + pub(super) is_shutdown: bool, /// True if the scheduler is being traced is_traced: bool, @@ -188,9 +188,6 @@ pub(crate) struct Shared { /// Data synchronized by the scheduler mutex pub(crate) struct Synced { - /// Cores not currently assigned to workers - pub(super) available_cores: Vec>, - /// When worker is notified, it is assigned a core. The core is placed here /// until the worker wakes up to take it. pub(super) assigned_cores: Vec>>, @@ -288,7 +285,7 @@ pub(super) fn create( // Allocate num-cores + 1 workers, so one worker can handle the I/O driver, // if needed. - let (idle, idle_synced) = Idle::new(num_cores, num_workers); + let (idle, idle_synced) = Idle::new(cores, num_workers); let (inject, inject_synced) = inject::Shared::new(); let handle = Arc::new(Handle { @@ -298,7 +295,6 @@ pub(super) fn create( idle, owned: OwnedTasks::new(), synced: Mutex::new(Synced { - available_cores: cores, assigned_cores: Vec::with_capacity(num_cores), shutdown_cores: Vec::with_capacity(num_cores), idle: idle_synced, @@ -555,7 +551,11 @@ impl Worker { // Try to acquire an available core, but do not block the thread fn try_acquire_available_core(&self, cx: &Context, synced: &mut Synced) -> Option> { - if let Some(mut core) = synced.available_cores.pop() { + if let Some(mut core) = cx + .shared() + .idle + .try_acquire_available_core(&mut synced.idle) + { self.reset_acquired_core(cx, synced, &mut core); Some(core) } else { @@ -881,7 +881,7 @@ impl Worker { let mut synced = synced(); // Number of tasks we want to try to spread across idle workers - let num_fanout = cmp::min(defer.len(), synced.available_cores.len()); + let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle)); if num_fanout > 0 { cx.shared() @@ -1224,11 +1224,7 @@ impl Shared { if self.inject.close(&mut synced.inject) { // Set the shutdown flag on all available cores - for core in &mut synced.available_cores { - core.is_shutdown = true; - } - - self.idle.notify_shutdown(&mut synced, self); + self.idle.shutdown(&mut synced, self); } }