This commit is contained in:
Carl Lerche 2023-06-09 11:41:06 -07:00
parent 97123db204
commit 6cbdb587bb
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
2 changed files with 70 additions and 66 deletions

View File

@ -4,21 +4,20 @@ use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::MutexGuard;
use crate::runtime::scheduler::multi_thread::{worker, Core, Shared};
use std::fmt;
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
pub(super) struct Idle {
/// Number of searching workers
/// Number of searching cores
num_searching: AtomicUsize,
/// Number of sleeping workers
num_sleeping: AtomicUsize,
/// Number of idle cores
num_idle: AtomicUsize,
/// Used to catch false-negatives when waking workers
needs_searching: AtomicBool,
/// Total number of workers
num_workers: usize,
/// Total number of cores
num_cores: usize,
}
/// Data synchronized by the scheduler mutex
@ -28,12 +27,12 @@ pub(super) struct Synced {
}
impl Idle {
pub(super) fn new(num_workers: usize) -> (Idle, Synced) {
pub(super) fn new(num_cores: usize, num_workers: usize) -> (Idle, Synced) {
let idle = Idle {
num_searching: AtomicUsize::new(0),
num_sleeping: AtomicUsize::new(0),
num_idle: AtomicUsize::new(0),
needs_searching: AtomicBool::new(false),
num_workers,
num_cores,
};
let synced = Synced {
@ -91,11 +90,11 @@ impl Idle {
// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);
let num_sleeping = self.num_sleeping.load(Acquire) - 1;
debug_assert_eq!(num_sleeping, synced.idle.sleepers.len());
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
// Update the number of sleeping workers
self.num_sleeping.store(num_sleeping, Release);
self.num_idle.store(num_idle, Release);
// Drop the lock before notifying the condvar.
drop(synced);
@ -125,35 +124,40 @@ impl Idle {
&self,
synced: &mut worker::Synced,
core: Box<Core>,
index: usize,
index: Option<usize>,
) {
// The core should not be searching at this point
debug_assert!(!core.is_searching);
// Check that this isn't the final worker to go idle *and*
// `needs_searching` is set.
debug_assert!(!self.needs_searching.load(Acquire) || synced.idle.num_active_workers() > 1);
debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced) > 1);
let num_sleeping = synced.idle.sleepers.len();
debug_assert_eq!(num_sleeping, self.num_sleeping.load(Acquire));
// Store the worker index in the list of sleepers
synced.idle.sleepers.push(index);
let num_idle = synced.available_cores.len();
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
// Store the core in the list of available cores
synced.available_cores.push(core);
// The worker's assigned core slot should be empty
debug_assert!(synced.assigned_cores[index].is_none());
// Update `num_idle`
self.num_idle.store(num_idle + 1, Release);
if let Some(index) = index {
// Store the worker index in the list of sleepers
synced.idle.sleepers.push(index);
// The worker's assigned core slot should be empty
debug_assert!(synced.assigned_cores[index].is_none());
}
}
pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
debug_assert!(!core.is_searching);
let num_searching = self.num_searching.load(Acquire);
let num_sleeping = self.num_sleeping.load(Acquire);
let num_idle = self.num_idle.load(Acquire);
if 2 * num_searching >= self.num_workers - num_sleeping {
if 2 * num_searching >= self.num_cores - num_idle {
return;
}
@ -200,8 +204,6 @@ impl Idle {
}
}
impl Synced {
fn num_active_workers(&self) -> usize {
self.sleepers.capacity() - self.sleepers.len()
}
fn num_active_workers(synced: &worker::Synced) -> usize {
synced.available_cores.capacity() - synced.available_cores.len()
}

View File

@ -239,19 +239,23 @@ type Notified = task::Notified<Arc<Handle>>;
const MAX_LIFO_POLLS_PER_TICK: usize = 3;
pub(super) fn create(
size: usize,
num_cores: usize,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> runtime::Handle {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
// Allocate num_cores + 1 workers so that one worker can handle the I/O
// driver, if needed.
let num_workers = num_cores + 1;
let mut cores = Vec::with_capacity(num_cores);
let mut remotes = Vec::with_capacity(num_cores);
// Worker metrics are actually core based
let mut worker_metrics = Vec::with_capacity(num_cores);
// Create the local queues
for i in 0..size {
for i in 0..num_cores {
let (steal, run_queue) = queue::local();
let metrics = WorkerMetrics::from_config(&config);
@ -275,10 +279,11 @@ pub(super) fn create(
worker_metrics.push(metrics);
}
let (idle, idle_synced) = Idle::new(size);
// Allocate num-cores + 1 workers, so one worker can handle the I/O driver,
// if needed.
let (idle, idle_synced) = Idle::new(num_cores, num_workers);
let (inject, inject_synced) = inject::Shared::new();
let remotes_len = remotes.len();
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
@ -286,15 +291,15 @@ pub(super) fn create(
idle,
owned: OwnedTasks::new(),
synced: Mutex::new(Synced {
available_cores: Vec::with_capacity(size),
available_cores: Vec::with_capacity(num_cores),
assigned_cores: cores,
shutdown_cores: Vec::with_capacity(size),
shutdown_cores: Vec::with_capacity(num_cores),
idle: idle_synced,
inject: inject_synced,
driver: Some(Box::new(driver)),
}),
condvars: (0..size).map(|_| Condvar::new()).collect(),
trace_status: TraceStatus::new(remotes_len),
condvars: (0..num_workers).map(|_| Condvar::new()).collect(),
trace_status: TraceStatus::new(num_cores),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
@ -310,7 +315,7 @@ pub(super) fn create(
};
// Eagerly start worker threads
for index in 0..size {
for index in 0..num_workers {
let handle = rt_handle.inner.expect_multi_thread();
let worker = Worker {
handle: handle.clone(),
@ -456,15 +461,6 @@ fn run(mut worker: Worker) {
#[cfg(debug_assertions)]
let _abort_on_panic = AbortOnPanic;
/*
// Acquire a core. If this fails, then another thread is running this
// worker and there is nothing further to do.
let core = match worker.core.take() {
Some(core) => core,
None => return,
};
*/
let handle = scheduler::Handle::MultiThread(worker.handle.clone());
crate::runtime::context::enter_runtime(&handle, true, |_| {
@ -555,8 +551,6 @@ impl Worker {
// Signal shutdown
self.shutdown_core(core);
todo!()
}
fn acquire_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> RunResult {
@ -926,20 +920,32 @@ impl Worker {
// Core being returned must not be in the searching state
debug_assert!(!core.is_searching);
self.shared()
.idle
.transition_worker_to_parked(&mut synced, core, self.index);
if let Some(mut driver) = synced.driver.take() {
// Transition to parked without providing the worker's index.
// Because we are going to block on the driver, we don't want to be
// interrupted by scheduled tasks.
self.shared()
.idle
.transition_worker_to_parked(&mut synced, core, None);
// Drop the lock before parking on the driver
drop(synced);
// Wait for driver events
driver.park(&self.handle.driver);
// Acquire the lock again
synced = self.shared().synced.lock();
/*
if let Some(_driver) = synced.driver.take() {
todo!()
} else {
self.shared()
.idle
.transition_worker_to_parked(&mut synced, core, Some(self.index));
// Wait for a core to be assigned to us
self.acquire_core(cx, synced)
}
*/
// TODO: poll driver if needed
self.acquire_core(cx, synced)
}
fn transition_to_searching(&self, core: &mut Core) -> bool {
@ -963,10 +969,6 @@ impl Worker {
core.lifo_slot.is_none() && core.run_queue.is_empty() && !core.is_traced
}
fn transition_from_parked(&self, core: &mut Core) -> bool {
todo!()
}
/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&self, core: &mut Core) {
@ -982,15 +984,15 @@ impl Worker {
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown_core(&self, core: Box<Core>) {
let mut synced = self.shared().synced.lock();
synced.available_cores.push(core);
synced.shutdown_cores.push(core);
if synced.available_cores.len() != self.shared().remotes.len() {
if synced.shutdown_cores.len() != self.shared().remotes.len() {
return;
}
debug_assert!(self.shared().owned.is_empty());
for mut core in synced.available_cores.drain(..) {
for mut core in synced.shutdown_cores.drain(..) {
// Drain tasks from the local queue
while self.next_local_task(&mut core).is_some() {}
}