This commit is contained in:
Carl Lerche 2023-06-12 11:53:22 -07:00
parent 392cd057ed
commit f71c369203
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
2 changed files with 55 additions and 35 deletions

View File

@ -24,24 +24,47 @@ pub(super) struct Idle {
pub(super) struct Synced {
/// Worker IDs that are currently sleeping
sleepers: Vec<usize>,
/// Cores available for workers
available_cores: Vec<Box<Core>>,
}
impl Idle {
pub(super) fn new(num_cores: usize, num_workers: usize) -> (Idle, Synced) {
pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
let idle = Idle {
num_searching: AtomicUsize::new(0),
num_idle: AtomicUsize::new(0),
num_idle: AtomicUsize::new(cores.len()),
needs_searching: AtomicBool::new(false),
num_cores,
num_cores: cores.len(),
};
let synced = Synced {
sleepers: Vec::with_capacity(num_workers),
available_cores: cores,
};
(idle, synced)
}
pub(super) fn num_idle(&self, synced: &Synced) -> usize {
debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
synced.available_cores.len()
}
/// Try to acquire an available core
pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
let ret = synced.available_cores.pop();
if ret.is_some() {
// Decrement the number of idle cores
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
self.num_idle.store(num_idle, Release);
}
ret
}
/// We need at least one searching worker
pub(super) fn notify_local(&self, shared: &Shared) {
if self.num_searching.load(Acquire) != 0 {
@ -83,15 +106,15 @@ impl Idle {
// Find a sleeping worker
if let Some(worker) = synced.idle.sleepers.pop() {
// Find an available core
if let Some(mut core) = synced.available_cores.pop() {
if let Some(mut core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);
core.is_searching = is_searching;
// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
let num_idle = synced.idle.available_cores.len();
debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1);
// Update the number of sleeping workers
self.num_idle.store(num_idle, Release);
@ -127,17 +150,15 @@ impl Idle {
workers: &mut Vec<usize>,
num: usize,
) {
let mut num_idle = self.num_idle.load(Acquire);
let mut did_notify = false;
for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() {
if let Some(core) = synced.available_cores.pop() {
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);
synced.assigned_cores[worker] = Some(core);
num_idle -= 1;
workers.push(worker);
did_notify = true;
@ -151,14 +172,21 @@ impl Idle {
}
if did_notify {
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
debug_assert_eq!(
synced.idle.available_cores.len(),
self.num_idle.load(Acquire)
);
self.needs_searching.store(true, Release);
}
}
pub(super) fn notify_shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
while let Some(core) = synced.available_cores.pop() {
pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
while let Some(mut core) = synced.idle.available_cores.pop() {
core.is_shutdown = true;
let worker = synced.idle.sleepers.pop().unwrap();
synced.assigned_cores[worker] = Some(core);
@ -176,13 +204,13 @@ impl Idle {
// Check that this isn't the final worker to go idle *and*
// `needs_searching` is set.
debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced) > 1);
debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced.idle) > 1);
let num_idle = synced.available_cores.len();
let num_idle = synced.idle.available_cores.len();
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
// Store the core in the list of available cores
synced.available_cores.push(core);
synced.idle.available_cores.push(core);
// Update `num_idle`
self.num_idle.store(num_idle + 1, Release);
@ -236,19 +264,15 @@ impl Idle {
/// **must** notify a new worker.
pub(super) fn transition_worker_from_searching(&self, core: &mut Core) -> bool {
debug_assert!(core.is_searching);
core.is_searching = false;
let prev = self.num_searching.fetch_sub(1, AcqRel);
debug_assert!(prev > 0);
if prev == 1 {
false
} else {
core.is_searching = false;
false
}
prev == 1
}
}
fn num_active_workers(synced: &worker::Synced) -> usize {
fn num_active_workers(synced: &Synced) -> usize {
synced.available_cores.capacity() - synced.available_cores.len()
}

View File

@ -92,7 +92,7 @@ pub(super) struct Worker {
/// Reference to scheduler's handle
handle: Arc<Handle>,
/// This worker's index in `available_cores` and `condvars`.
/// This worker's index in `assigned_cores` and `condvars`.
index: usize,
/// Used to collect a list of workers to notify
@ -126,7 +126,7 @@ pub(super) struct Core {
pub(super) is_searching: bool,
/// True if the scheduler is being shutdown
is_shutdown: bool,
pub(super) is_shutdown: bool,
/// True if the scheduler is being traced
is_traced: bool,
@ -188,9 +188,6 @@ pub(crate) struct Shared {
/// Data synchronized by the scheduler mutex
pub(crate) struct Synced {
/// Cores not currently assigned to workers
pub(super) available_cores: Vec<Box<Core>>,
/// When worker is notified, it is assigned a core. The core is placed here
/// until the worker wakes up to take it.
pub(super) assigned_cores: Vec<Option<Box<Core>>>,
@ -288,7 +285,7 @@ pub(super) fn create(
// Allocate num-cores + 1 workers, so one worker can handle the I/O driver,
// if needed.
let (idle, idle_synced) = Idle::new(num_cores, num_workers);
let (idle, idle_synced) = Idle::new(cores, num_workers);
let (inject, inject_synced) = inject::Shared::new();
let handle = Arc::new(Handle {
@ -298,7 +295,6 @@ pub(super) fn create(
idle,
owned: OwnedTasks::new(),
synced: Mutex::new(Synced {
available_cores: cores,
assigned_cores: Vec::with_capacity(num_cores),
shutdown_cores: Vec::with_capacity(num_cores),
idle: idle_synced,
@ -555,7 +551,11 @@ impl Worker {
// Try to acquire an available core, but do not block the thread
fn try_acquire_available_core(&self, cx: &Context, synced: &mut Synced) -> Option<Box<Core>> {
if let Some(mut core) = synced.available_cores.pop() {
if let Some(mut core) = cx
.shared()
.idle
.try_acquire_available_core(&mut synced.idle)
{
self.reset_acquired_core(cx, synced, &mut core);
Some(core)
} else {
@ -881,7 +881,7 @@ impl Worker {
let mut synced = synced();
// Number of tasks we want to try to spread across idle workers
let num_fanout = cmp::min(defer.len(), synced.available_cores.len());
let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle));
if num_fanout > 0 {
cx.shared()
@ -1224,11 +1224,7 @@ impl Shared {
if self.inject.close(&mut synced.inject) {
// Set the shutdown flag on all available cores
for core in &mut synced.available_cores {
core.is_shutdown = true;
}
self.idle.notify_shutdown(&mut synced, self);
self.idle.shutdown(&mut synced, self);
}
}