From 6cbdb587bb0f63a375ee5dcefe07cacab2c9f734 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 9 Jun 2023 11:41:06 -0700 Subject: [PATCH] wip --- .../runtime/scheduler/multi_thread/idle.rs | 58 +++++++------- .../runtime/scheduler/multi_thread/worker.rs | 78 ++++++++++--------- 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/idle.rs b/tokio/src/runtime/scheduler/multi_thread/idle.rs index 61bb0c338..405c811fe 100644 --- a/tokio/src/runtime/scheduler/multi_thread/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/idle.rs @@ -4,21 +4,20 @@ use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::MutexGuard; use crate::runtime::scheduler::multi_thread::{worker, Core, Shared}; -use std::fmt; -use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; pub(super) struct Idle { - /// Number of searching workers + /// Number of searching cores num_searching: AtomicUsize, - /// Number of sleeping workers - num_sleeping: AtomicUsize, + /// Number of idle cores + num_idle: AtomicUsize, /// Used to catch false-negatives when waking workers needs_searching: AtomicBool, - /// Total number of workers - num_workers: usize, + /// Total number of cores + num_cores: usize, } /// Data synchronized by the scheduler mutex @@ -28,12 +27,12 @@ pub(super) struct Synced { } impl Idle { - pub(super) fn new(num_workers: usize) -> (Idle, Synced) { + pub(super) fn new(num_cores: usize, num_workers: usize) -> (Idle, Synced) { let idle = Idle { num_searching: AtomicUsize::new(0), - num_sleeping: AtomicUsize::new(0), + num_idle: AtomicUsize::new(0), needs_searching: AtomicBool::new(false), - num_workers, + num_cores, }; let synced = Synced { @@ -91,11 +90,11 @@ impl Idle { // Assign the core to the worker synced.assigned_cores[worker] = Some(core); - let num_sleeping = self.num_sleeping.load(Acquire) - 1; - debug_assert_eq!(num_sleeping, synced.idle.sleepers.len()); + let num_idle = self.num_idle.load(Acquire) - 1; + debug_assert_eq!(num_idle, synced.available_cores.len()); // Update the number of sleeping workers - self.num_sleeping.store(num_sleeping, Release); + self.num_idle.store(num_idle, Release); // Drop the lock before notifying the condvar. drop(synced); @@ -125,35 +124,40 @@ impl Idle { &self, synced: &mut worker::Synced, core: Box, - index: usize, + index: Option, ) { // The core should not be searching at this point debug_assert!(!core.is_searching); // Check that this isn't the final worker to go idle *and* // `needs_searching` is set. - debug_assert!(!self.needs_searching.load(Acquire) || synced.idle.num_active_workers() > 1); + debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced) > 1); - let num_sleeping = synced.idle.sleepers.len(); - debug_assert_eq!(num_sleeping, self.num_sleeping.load(Acquire)); - - // Store the worker index in the list of sleepers - synced.idle.sleepers.push(index); + let num_idle = synced.available_cores.len(); + debug_assert_eq!(num_idle, self.num_idle.load(Acquire)); // Store the core in the list of available cores synced.available_cores.push(core); - // The worker's assigned core slot should be empty - debug_assert!(synced.assigned_cores[index].is_none()); + // Update `num_idle` + self.num_idle.store(num_idle + 1, Release); + + if let Some(index) = index { + // Store the worker index in the list of sleepers + synced.idle.sleepers.push(index); + + // The worker's assigned core slot should be empty + debug_assert!(synced.assigned_cores[index].is_none()); + } } pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) { debug_assert!(!core.is_searching); let num_searching = self.num_searching.load(Acquire); - let num_sleeping = self.num_sleeping.load(Acquire); + let num_idle = self.num_idle.load(Acquire); - if 2 * num_searching >= self.num_workers - num_sleeping { + if 2 * num_searching >= self.num_cores - num_idle { return; } @@ -200,8 +204,6 @@ impl Idle { } } -impl Synced { - fn num_active_workers(&self) -> usize { - self.sleepers.capacity() - self.sleepers.len() - } +fn num_active_workers(synced: &worker::Synced) -> usize { + synced.available_cores.capacity() - synced.available_cores.len() } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8a1a5a0b9..7b287ff28 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -239,19 +239,23 @@ type Notified = task::Notified>; const MAX_LIFO_POLLS_PER_TICK: usize = 3; pub(super) fn create( - size: usize, + num_cores: usize, driver: Driver, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> runtime::Handle { - let mut cores = Vec::with_capacity(size); - let mut remotes = Vec::with_capacity(size); - let mut worker_metrics = Vec::with_capacity(size); + // Allocate num_cores + 1 workers so that one worker can handle the I/O + // driver, if needed. + let num_workers = num_cores + 1; + let mut cores = Vec::with_capacity(num_cores); + let mut remotes = Vec::with_capacity(num_cores); + // Worker metrics are actually core based + let mut worker_metrics = Vec::with_capacity(num_cores); // Create the local queues - for i in 0..size { + for i in 0..num_cores { let (steal, run_queue) = queue::local(); let metrics = WorkerMetrics::from_config(&config); @@ -275,10 +279,11 @@ pub(super) fn create( worker_metrics.push(metrics); } - let (idle, idle_synced) = Idle::new(size); + // Allocate num-cores + 1 workers, so one worker can handle the I/O driver, + // if needed. + let (idle, idle_synced) = Idle::new(num_cores, num_workers); let (inject, inject_synced) = inject::Shared::new(); - let remotes_len = remotes.len(); let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), @@ -286,15 +291,15 @@ pub(super) fn create( idle, owned: OwnedTasks::new(), synced: Mutex::new(Synced { - available_cores: Vec::with_capacity(size), + available_cores: Vec::with_capacity(num_cores), assigned_cores: cores, - shutdown_cores: Vec::with_capacity(size), + shutdown_cores: Vec::with_capacity(num_cores), idle: idle_synced, inject: inject_synced, driver: Some(Box::new(driver)), }), - condvars: (0..size).map(|_| Condvar::new()).collect(), - trace_status: TraceStatus::new(remotes_len), + condvars: (0..num_workers).map(|_| Condvar::new()).collect(), + trace_status: TraceStatus::new(num_cores), config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), @@ -310,7 +315,7 @@ pub(super) fn create( }; // Eagerly start worker threads - for index in 0..size { + for index in 0..num_workers { let handle = rt_handle.inner.expect_multi_thread(); let worker = Worker { handle: handle.clone(), @@ -456,15 +461,6 @@ fn run(mut worker: Worker) { #[cfg(debug_assertions)] let _abort_on_panic = AbortOnPanic; - /* - // Acquire a core. If this fails, then another thread is running this - // worker and there is nothing further to do. - let core = match worker.core.take() { - Some(core) => core, - None => return, - }; - */ - let handle = scheduler::Handle::MultiThread(worker.handle.clone()); crate::runtime::context::enter_runtime(&handle, true, |_| { @@ -555,8 +551,6 @@ impl Worker { // Signal shutdown self.shutdown_core(core); - - todo!() } fn acquire_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> RunResult { @@ -926,20 +920,32 @@ impl Worker { // Core being returned must not be in the searching state debug_assert!(!core.is_searching); - self.shared() - .idle - .transition_worker_to_parked(&mut synced, core, self.index); + if let Some(mut driver) = synced.driver.take() { + // Transition to parked without providing the worker's index. + // Because we are going to block on the driver, we don't want to be + // interrupted by scheduled tasks. + self.shared() + .idle + .transition_worker_to_parked(&mut synced, core, None); + + // Drop the lock before parking on the driver + drop(synced); + + // Wait for driver events + driver.park(&self.handle.driver); + + // Acquire the lock again + synced = self.shared().synced.lock(); - /* - if let Some(_driver) = synced.driver.take() { todo!() } else { + self.shared() + .idle + .transition_worker_to_parked(&mut synced, core, Some(self.index)); + // Wait for a core to be assigned to us self.acquire_core(cx, synced) } - */ - // TODO: poll driver if needed - self.acquire_core(cx, synced) } fn transition_to_searching(&self, core: &mut Core) -> bool { @@ -963,10 +969,6 @@ impl Worker { core.lifo_slot.is_none() && core.run_queue.is_empty() && !core.is_traced } - fn transition_from_parked(&self, core: &mut Core) -> bool { - todo!() - } - /// Signals all tasks to shut down, and waits for them to complete. Must run /// before we enter the single-threaded phase of shutdown processing. fn pre_shutdown(&self, core: &mut Core) { @@ -982,15 +984,15 @@ impl Worker { /// If all workers have reached this point, the final cleanup is performed. fn shutdown_core(&self, core: Box) { let mut synced = self.shared().synced.lock(); - synced.available_cores.push(core); + synced.shutdown_cores.push(core); - if synced.available_cores.len() != self.shared().remotes.len() { + if synced.shutdown_cores.len() != self.shared().remotes.len() { return; } debug_assert!(self.shared().owned.is_empty()); - for mut core in synced.available_cores.drain(..) { + for mut core in synced.shutdown_cores.drain(..) { // Drain tasks from the local queue while self.next_local_task(&mut core).is_some() {} }