This commit is contained in:
Carl Lerche 2023-06-12 11:07:09 -07:00
parent 31839f6ed3
commit 392cd057ed
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
2 changed files with 124 additions and 112 deletions

View File

@ -1,3 +1,5 @@
use crate::loom::sync::{Mutex, MutexGuard};
/// A lock (mutex) yielding generic data.
pub(crate) trait Lock<T> {
type Handle: AsMut<T>;

View File

@ -231,6 +231,7 @@ pub(crate) struct Context {
/// running the task completes, it is returned. Otherwise, the worker will need
/// to stop processing.
type RunResult = Result<Box<Core>, ()>;
type NextTaskResult = Result<(Option<Notified>, Box<Core>), ()>;
/// A task handle
type Task = task::Task<Arc<Handle>>;
@ -482,7 +483,10 @@ fn run(mut worker: Worker) {
let cx = cx.expect_multi_thread();
// Run the worker
worker.run(&cx);
let res = worker.run(&cx);
// `err` here signifies the core was lost, this is an expected end
// state for a worker.
debug_assert!(res.is_err());
// Check if there are any deferred tasks to notify. This can happen when
// the worker core is lost due to `block_in_place()` being called from
@ -495,76 +499,49 @@ fn run(mut worker: Worker) {
});
}
macro_rules! n {
($e:expr) => {{
let (task, core) = $e?;
if task.is_some() {
return Ok((task, core));
}
core
}};
}
impl Worker {
fn run(&mut self, cx: &Context) {
let mut core = {
fn run(&mut self, cx: &Context) -> RunResult {
let (maybe_task, mut core) = {
let mut synced = cx.shared().synced.lock();
// First try to acquire an available core
if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
core
// Try to poll a task from the global queue
let maybe_task = self.next_remote_task_synced(cx, &mut synced);
(maybe_task, core)
} else {
// block the thread to wait for a core to be assinged to us
match self.wait_for_core(cx, synced) {
Ok(core) => core,
Err(_) => return,
}
self.wait_for_core(cx, synced)?
}
};
while !core.is_shutdown {
self.assert_lifo_enabled_is_correct(&core);
core.stats.start_processing_scheduled_tasks();
if core.is_traced {
core = self.handle.trace_core(core);
}
if let Some(task) = maybe_task {
core = self.run_task(cx, core, task)?;
}
// Increment the tick
core.tick();
loop {
let (maybe_task, c) = self.next_task(cx, core)?;
core = c;
// Run maintenance, if needed
core = match self.maybe_maintenance(cx, core) {
Ok(core) => core,
Err(_) => return,
};
// First, check work available to the current worker.
if let Some(task) = self.next_task(cx, &mut core) {
core = match self.run_task(cx, core, task) {
Ok(core) => core,
Err(_) => return,
};
continue;
}
// We consumed all work in the queues and will start searching for work.
core.stats.end_processing_scheduled_tasks();
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = self.steal_work(cx, &mut core) {
// Found work, switch back to processing
core.stats.start_processing_scheduled_tasks();
core = match self.run_task(cx, core, task) {
Ok(core) => core,
Err(_) => return,
};
if let Some(task) = maybe_task {
core = self.run_task(cx, core, task)?;
} else {
// Wait for work
core = if !cx.defer.borrow().is_empty() {
// Just run maintenance
match self.park_yield(cx, core) {
Ok(core) => core,
Err(_) => return,
}
} else {
match self.park(cx, core) {
Ok(core) => core,
Err(_) => return,
}
};
// The only reason to get `None` from `next_task` is we have
// entered the shutdown phase.
assert!(core.is_shutdown);
break;
}
}
@ -572,6 +549,8 @@ impl Worker {
// Signal shutdown
self.shutdown_core(cx, core);
Err(())
}
// Try to acquire an available core, but do not block the thread
@ -585,7 +564,7 @@ impl Worker {
}
// Block the current thread, waiting for an available core
fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> RunResult {
fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> NextTaskResult {
cx.shared()
.idle
.transition_worker_to_parked(&mut synced, self.index);
@ -608,29 +587,19 @@ impl Worker {
if core.is_shutdown {
// Currently shutting down, don't do any more work
return Ok(core);
return Ok((None, core));
}
// The core was notified to search for work, don't try to take tasks from the injection queue
if core.is_searching {
return Ok(core);
return Ok((None, core));
}
// TODO: don't hardcode 128
let n = core.run_queue.max_capacity() / 2;
let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, n);
drop(synced);
// Start as "processing" tasks as polling tasks from the local queue
// will be one of the first things we do.
core.stats.start_processing_scheduled_tasks();
if let Some(task) = maybe_task {
self.run_task(cx, core, task)
} else {
Ok(core)
}
Ok((maybe_task, core))
}
/// Ensure core's state is set correctly for the worker to start using.
@ -646,7 +615,49 @@ impl Worker {
self.update_global_flags(cx, synced, core);
}
fn next_task(&self, cx: &Context, core: &mut Core) -> Option<Notified> {
/// Finds the next task to run, this could be from a queue or stealing. If
/// none are available, the thread sleeps and tries again.
fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
while !core.is_shutdown {
self.assert_lifo_enabled_is_correct(&core);
if core.is_traced {
core = self.handle.trace_core(core);
}
// Increment the tick
core.tick();
// Runs maintenance every so often. When maintenance is run, the
// driver is checked, which may result in a task being found.
core = n!(self.maybe_maintenance(&cx, core));
// Check the LIFO slot, local run queue, and the injection queue for
// a notified task.
if let Some(task) = self.next_notified_task(cx, &mut core) {
return Ok((Some(task), core));
}
// We consumed all work in the queues and will start searching for work.
core.stats.end_processing_scheduled_tasks();
// Try to steal a task from other workers
if let Some(task) = self.steal_work(cx, &mut core) {
core.stats.start_processing_scheduled_tasks();
return Ok((Some(task), core));
}
if !cx.defer.borrow().is_empty() {
core = n!(self.park_yield(cx, core));
} else {
core = n!(self.park(cx, core));
}
}
Ok((None, core))
}
fn next_notified_task(&self, cx: &Context, core: &mut Core) -> Option<Notified> {
if core.tick % core.global_queue_interval == 0 {
// Update the global queue interval, if needed
self.tune_global_queue_interval(cx, core);
@ -851,42 +862,46 @@ impl Worker {
})
}
fn schedule_deferred_with_core(
fn schedule_deferred_with_core<'a>(
&mut self,
cx: &Context,
cx: &'a Context,
core: Box<Core>,
mut synced: MutexGuard<'_, Synced>,
) -> RunResult {
synced: impl FnOnce() -> MutexGuard<'a, Synced>,
) -> NextTaskResult {
let mut defer = cx.defer.borrow_mut();
// Grab a task to run next
let task = defer.pop();
// Number of tasks we want to try to spread across idle workers
let num_fanout = cmp::min(defer.len(), synced.available_cores.len());
if num_fanout > 0 {
cx.shared()
.push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
cx.shared()
.idle
.notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
if task.is_none() {
return Ok((None, core));
}
// Do not run the task while holding the lock...
drop(synced);
if !defer.is_empty() {
let mut synced = synced();
// Number of tasks we want to try to spread across idle workers
let num_fanout = cmp::min(defer.len(), synced.available_cores.len());
if num_fanout > 0 {
cx.shared()
.push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
cx.shared()
.idle
.notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
}
// Do not run the task while holding the lock...
drop(synced);
}
// Notify any workers
for worker in self.workers_to_notify.drain(..) {
cx.shared().condvars[worker].notify_one()
}
if let Some(task) = task {
self.run_task(cx, core, task)
} else {
Ok(core)
}
Ok((task, core))
}
fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) {
@ -909,19 +924,19 @@ impl Worker {
}
}
fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> RunResult {
fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
if core.tick % cx.shared().config.event_interval == 0 {
super::counters::inc_num_maintenance();
core.stats.end_processing_scheduled_tasks();
// Run regularly scheduled maintenance
core = self.park_yield(cx, core)?;
core = n!(self.park_yield(cx, core));
core.stats.start_processing_scheduled_tasks();
}
Ok(core)
Ok((None, core))
}
fn flush_metrics(&self, cx: &Context, core: &mut Core) {
@ -938,25 +953,22 @@ impl Worker {
}
}
fn park_yield(&mut self, cx: &Context, mut core: Box<Core>) -> RunResult {
fn park_yield(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
if let Some(mut driver) = cx.shared().driver.take() {
driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
// If there are more I/O events, schedule them.
if !cx.defer.borrow().is_empty() {
// TODO: don't lock if there is only one task
core = self.schedule_deferred_with_core(cx, core, cx.shared().synced.lock())?;
}
core = n!(self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock()));
}
self.flush_metrics(cx, &mut core);
self.update_global_flags(cx, &mut cx.shared().synced.lock(), &mut core);
Ok(core)
Ok((None, core))
}
fn park(&mut self, cx: &Context, mut core: Box<Core>) -> RunResult {
fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
if let Some(f) = &cx.shared().config.before_park {
f();
}
@ -965,17 +977,17 @@ impl Worker {
debug_assert!(!core.is_shutdown);
debug_assert!(!core.is_traced);
core = self.do_park(cx, core)?;
core = n!(self.do_park(cx, core));
}
if let Some(f) = &cx.shared().config.after_unpark {
f();
}
Ok(core)
Ok((None, core))
}
fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> RunResult {
fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
core.stats.about_to_park();
// Flush metrics to the runtime metrics aggregator
@ -989,7 +1001,7 @@ impl Worker {
if let Some(task) = self.steal_one_round(cx, &mut core, 0) {
cx.shared().notify_parked_local();
return self.run_task(cx, core, task);
return Ok((Some(task), core));
}
}
@ -999,9 +1011,7 @@ impl Worker {
// Try one last time to get tasks
let n = core.run_queue.max_capacity() / 2;
if let Some(task) = self.next_remote_task_batch(cx, &mut synced, &mut core, n) {
drop(synced);
return self.run_task(cx, core, task);
return Ok((Some(task), core));
}
if !was_searching {
@ -1011,7 +1021,7 @@ impl Worker {
.transition_worker_to_searching_if_needed(&mut synced.idle, &mut core)
{
// Skip parking, go back to searching
return Ok(core);
return Ok((None, core));
}
}
@ -1036,7 +1046,7 @@ impl Worker {
// Try to acquire an available core to schedule I/O events
if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
// This may result in a task being run
self.schedule_deferred_with_core(cx, core, synced)
self.schedule_deferred_with_core(cx, core, move || synced)
} else {
// Schedule any deferred tasks
self.schedule_deferred_without_core(cx, &mut synced);