From 392cd057ed95358298b683f65139f2bb64cfe8d9 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 12 Jun 2023 11:07:09 -0700 Subject: [PATCH] wip --- tokio/src/runtime/scheduler/lock.rs | 2 + .../runtime/scheduler/multi_thread/worker.rs | 234 +++++++++--------- 2 files changed, 124 insertions(+), 112 deletions(-) diff --git a/tokio/src/runtime/scheduler/lock.rs b/tokio/src/runtime/scheduler/lock.rs index 0901c2b37..0111e301f 100644 --- a/tokio/src/runtime/scheduler/lock.rs +++ b/tokio/src/runtime/scheduler/lock.rs @@ -1,3 +1,5 @@ +use crate::loom::sync::{Mutex, MutexGuard}; + /// A lock (mutex) yielding generic data. pub(crate) trait Lock { type Handle: AsMut; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 21d68fa92..4b6c0cb98 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -231,6 +231,7 @@ pub(crate) struct Context { /// running the task completes, it is returned. Otherwise, the worker will need /// to stop processing. type RunResult = Result, ()>; +type NextTaskResult = Result<(Option, Box), ()>; /// A task handle type Task = task::Task>; @@ -482,7 +483,10 @@ fn run(mut worker: Worker) { let cx = cx.expect_multi_thread(); // Run the worker - worker.run(&cx); + let res = worker.run(&cx); + // `err` here signifies the core was lost, this is an expected end + // state for a worker. + debug_assert!(res.is_err()); // Check if there are any deferred tasks to notify. This can happen when // the worker core is lost due to `block_in_place()` being called from @@ -495,76 +499,49 @@ fn run(mut worker: Worker) { }); } +macro_rules! n { + ($e:expr) => {{ + let (task, core) = $e?; + if task.is_some() { + return Ok((task, core)); + } + core + }}; +} + impl Worker { - fn run(&mut self, cx: &Context) { - let mut core = { + fn run(&mut self, cx: &Context) -> RunResult { + let (maybe_task, mut core) = { let mut synced = cx.shared().synced.lock(); // First try to acquire an available core if let Some(core) = self.try_acquire_available_core(cx, &mut synced) { - core + // Try to poll a task from the global queue + let maybe_task = self.next_remote_task_synced(cx, &mut synced); + (maybe_task, core) } else { // block the thread to wait for a core to be assinged to us - match self.wait_for_core(cx, synced) { - Ok(core) => core, - Err(_) => return, - } + self.wait_for_core(cx, synced)? } }; - while !core.is_shutdown { - self.assert_lifo_enabled_is_correct(&core); + core.stats.start_processing_scheduled_tasks(); - if core.is_traced { - core = self.handle.trace_core(core); - } + if let Some(task) = maybe_task { + core = self.run_task(cx, core, task)?; + } - // Increment the tick - core.tick(); + loop { + let (maybe_task, c) = self.next_task(cx, core)?; + core = c; - // Run maintenance, if needed - core = match self.maybe_maintenance(cx, core) { - Ok(core) => core, - Err(_) => return, - }; - - // First, check work available to the current worker. - if let Some(task) = self.next_task(cx, &mut core) { - core = match self.run_task(cx, core, task) { - Ok(core) => core, - Err(_) => return, - }; - - continue; - } - - // We consumed all work in the queues and will start searching for work. - core.stats.end_processing_scheduled_tasks(); - - // There is no more **local** work to process, try to steal work - // from other workers. - if let Some(task) = self.steal_work(cx, &mut core) { - // Found work, switch back to processing - core.stats.start_processing_scheduled_tasks(); - - core = match self.run_task(cx, core, task) { - Ok(core) => core, - Err(_) => return, - }; + if let Some(task) = maybe_task { + core = self.run_task(cx, core, task)?; } else { - // Wait for work - core = if !cx.defer.borrow().is_empty() { - // Just run maintenance - match self.park_yield(cx, core) { - Ok(core) => core, - Err(_) => return, - } - } else { - match self.park(cx, core) { - Ok(core) => core, - Err(_) => return, - } - }; + // The only reason to get `None` from `next_task` is we have + // entered the shutdown phase. + assert!(core.is_shutdown); + break; } } @@ -572,6 +549,8 @@ impl Worker { // Signal shutdown self.shutdown_core(cx, core); + + Err(()) } // Try to acquire an available core, but do not block the thread @@ -585,7 +564,7 @@ impl Worker { } // Block the current thread, waiting for an available core - fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> RunResult { + fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> NextTaskResult { cx.shared() .idle .transition_worker_to_parked(&mut synced, self.index); @@ -608,29 +587,19 @@ impl Worker { if core.is_shutdown { // Currently shutting down, don't do any more work - return Ok(core); + return Ok((None, core)); } // The core was notified to search for work, don't try to take tasks from the injection queue if core.is_searching { - return Ok(core); + return Ok((None, core)); } // TODO: don't hardcode 128 let n = core.run_queue.max_capacity() / 2; let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, n); - drop(synced); - - // Start as "processing" tasks as polling tasks from the local queue - // will be one of the first things we do. - core.stats.start_processing_scheduled_tasks(); - - if let Some(task) = maybe_task { - self.run_task(cx, core, task) - } else { - Ok(core) - } + Ok((maybe_task, core)) } /// Ensure core's state is set correctly for the worker to start using. @@ -646,7 +615,49 @@ impl Worker { self.update_global_flags(cx, synced, core); } - fn next_task(&self, cx: &Context, core: &mut Core) -> Option { + /// Finds the next task to run, this could be from a queue or stealing. If + /// none are available, the thread sleeps and tries again. + fn next_task(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { + while !core.is_shutdown { + self.assert_lifo_enabled_is_correct(&core); + + if core.is_traced { + core = self.handle.trace_core(core); + } + + // Increment the tick + core.tick(); + + // Runs maintenance every so often. When maintenance is run, the + // driver is checked, which may result in a task being found. + core = n!(self.maybe_maintenance(&cx, core)); + + // Check the LIFO slot, local run queue, and the injection queue for + // a notified task. + if let Some(task) = self.next_notified_task(cx, &mut core) { + return Ok((Some(task), core)); + } + + // We consumed all work in the queues and will start searching for work. + core.stats.end_processing_scheduled_tasks(); + + // Try to steal a task from other workers + if let Some(task) = self.steal_work(cx, &mut core) { + core.stats.start_processing_scheduled_tasks(); + return Ok((Some(task), core)); + } + + if !cx.defer.borrow().is_empty() { + core = n!(self.park_yield(cx, core)); + } else { + core = n!(self.park(cx, core)); + } + } + + Ok((None, core)) + } + + fn next_notified_task(&self, cx: &Context, core: &mut Core) -> Option { if core.tick % core.global_queue_interval == 0 { // Update the global queue interval, if needed self.tune_global_queue_interval(cx, core); @@ -851,42 +862,46 @@ impl Worker { }) } - fn schedule_deferred_with_core( + fn schedule_deferred_with_core<'a>( &mut self, - cx: &Context, + cx: &'a Context, core: Box, - mut synced: MutexGuard<'_, Synced>, - ) -> RunResult { + synced: impl FnOnce() -> MutexGuard<'a, Synced>, + ) -> NextTaskResult { let mut defer = cx.defer.borrow_mut(); // Grab a task to run next let task = defer.pop(); - // Number of tasks we want to try to spread across idle workers - let num_fanout = cmp::min(defer.len(), synced.available_cores.len()); - - if num_fanout > 0 { - cx.shared() - .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout)); - - cx.shared() - .idle - .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout); + if task.is_none() { + return Ok((None, core)); } - // Do not run the task while holding the lock... - drop(synced); + if !defer.is_empty() { + let mut synced = synced(); + + // Number of tasks we want to try to spread across idle workers + let num_fanout = cmp::min(defer.len(), synced.available_cores.len()); + + if num_fanout > 0 { + cx.shared() + .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout)); + + cx.shared() + .idle + .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout); + } + + // Do not run the task while holding the lock... + drop(synced); + } // Notify any workers for worker in self.workers_to_notify.drain(..) { cx.shared().condvars[worker].notify_one() } - if let Some(task) = task { - self.run_task(cx, core, task) - } else { - Ok(core) - } + Ok((task, core)) } fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) { @@ -909,19 +924,19 @@ impl Worker { } } - fn maybe_maintenance(&mut self, cx: &Context, mut core: Box) -> RunResult { + fn maybe_maintenance(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { if core.tick % cx.shared().config.event_interval == 0 { super::counters::inc_num_maintenance(); core.stats.end_processing_scheduled_tasks(); // Run regularly scheduled maintenance - core = self.park_yield(cx, core)?; + core = n!(self.park_yield(cx, core)); core.stats.start_processing_scheduled_tasks(); } - Ok(core) + Ok((None, core)) } fn flush_metrics(&self, cx: &Context, core: &mut Core) { @@ -938,25 +953,22 @@ impl Worker { } } - fn park_yield(&mut self, cx: &Context, mut core: Box) -> RunResult { + fn park_yield(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. if let Some(mut driver) = cx.shared().driver.take() { driver.park_timeout(&cx.handle.driver, Duration::from_millis(0)); // If there are more I/O events, schedule them. - if !cx.defer.borrow().is_empty() { - // TODO: don't lock if there is only one task - core = self.schedule_deferred_with_core(cx, core, cx.shared().synced.lock())?; - } + core = n!(self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())); } self.flush_metrics(cx, &mut core); self.update_global_flags(cx, &mut cx.shared().synced.lock(), &mut core); - Ok(core) + Ok((None, core)) } - fn park(&mut self, cx: &Context, mut core: Box) -> RunResult { + fn park(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { if let Some(f) = &cx.shared().config.before_park { f(); } @@ -965,17 +977,17 @@ impl Worker { debug_assert!(!core.is_shutdown); debug_assert!(!core.is_traced); - core = self.do_park(cx, core)?; + core = n!(self.do_park(cx, core)); } if let Some(f) = &cx.shared().config.after_unpark { f(); } - Ok(core) + Ok((None, core)) } - fn do_park(&mut self, cx: &Context, mut core: Box) -> RunResult { + fn do_park(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { core.stats.about_to_park(); // Flush metrics to the runtime metrics aggregator @@ -989,7 +1001,7 @@ impl Worker { if let Some(task) = self.steal_one_round(cx, &mut core, 0) { cx.shared().notify_parked_local(); - return self.run_task(cx, core, task); + return Ok((Some(task), core)); } } @@ -999,9 +1011,7 @@ impl Worker { // Try one last time to get tasks let n = core.run_queue.max_capacity() / 2; if let Some(task) = self.next_remote_task_batch(cx, &mut synced, &mut core, n) { - drop(synced); - - return self.run_task(cx, core, task); + return Ok((Some(task), core)); } if !was_searching { @@ -1011,7 +1021,7 @@ impl Worker { .transition_worker_to_searching_if_needed(&mut synced.idle, &mut core) { // Skip parking, go back to searching - return Ok(core); + return Ok((None, core)); } } @@ -1036,7 +1046,7 @@ impl Worker { // Try to acquire an available core to schedule I/O events if let Some(core) = self.try_acquire_available_core(cx, &mut synced) { // This may result in a task being run - self.schedule_deferred_with_core(cx, core, synced) + self.schedule_deferred_with_core(cx, core, move || synced) } else { // Schedule any deferred tasks self.schedule_deferred_without_core(cx, &mut synced);