This commit is contained in:
Carl Lerche 2023-06-13 12:08:36 -07:00
parent e28b1e59ef
commit 0866ee376b
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
2 changed files with 128 additions and 3 deletions

View File

@ -13,6 +13,9 @@ pub(super) struct Idle {
/// Number of idle cores
num_idle: AtomicUsize,
/// Map of idle cores
idle_map: IdleMap,
/// Used to catch false-negatives when waking workers
needs_searching: AtomicBool,
@ -20,6 +23,14 @@ pub(super) struct Idle {
num_cores: usize,
}
pub(super) struct IdleMap {
chunks: Vec<AtomicUsize>,
}
pub(super) struct Snapshot {
chunks: Vec<usize>,
}
/// Data synchronized by the scheduler mutex
pub(super) struct Synced {
/// Worker IDs that are currently sleeping
@ -34,6 +45,7 @@ impl Idle {
let idle = Idle {
num_searching: AtomicUsize::new(0),
num_idle: AtomicUsize::new(cores.len()),
idle_map: IdleMap::new(&cores),
needs_searching: AtomicBool::new(false),
num_cores: cores.len(),
};
@ -51,15 +63,22 @@ impl Idle {
synced.available_cores.len()
}
pub(super) fn is_idle(&self, index: usize) -> bool {
self.idle_map.get(index)
}
/// Try to acquire an available core
pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
let ret = synced.available_cores.pop();
if ret.is_some() {
if let Some(core) = &ret {
// Decrement the number of idle cores
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
self.num_idle.store(num_idle, Release);
self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.available_cores));
}
ret
@ -110,6 +129,9 @@ impl Idle {
debug_assert!(!core.is_searching);
core.is_searching = is_searching;
self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);
@ -156,6 +178,8 @@ impl Idle {
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);
self.idle_map.unset(core.index);
synced.assigned_cores[worker] = Some(core);
workers.push(worker);
@ -170,6 +194,7 @@ impl Idle {
}
if !workers.is_empty() {
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
@ -195,6 +220,8 @@ impl Idle {
let worker = synced.idle.sleepers.pop().unwrap();
let core = synced.idle.available_cores.pop().unwrap();
self.idle_map.unset(core.index);
synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();
@ -204,6 +231,8 @@ impl Idle {
.store(synced.idle.available_cores.len(), Release);
}
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Wake up any other workers
while let Some(index) = synced.idle.sleepers.pop() {
shared.condvars[index].notify_one();
@ -224,9 +253,13 @@ impl Idle {
let num_idle = synced.idle.available_cores.len();
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
self.idle_map.set(core.index);
// Store the core in the list of available cores
synced.idle.available_cores.push(core);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
// Update `num_idle`
self.num_idle.store(num_idle + 1, Release);
}
@ -288,6 +321,84 @@ impl Idle {
}
}
const BITS: usize = usize::BITS as usize;
const BIT_MASK: usize = (usize::BITS - 1) as usize;
impl IdleMap {
fn new(cores: &[Box<Core>]) -> IdleMap {
let chunks = (0..num_chunks(cores.len()))
.map(|_| AtomicUsize::new(0))
.collect();
let ret = IdleMap { chunks };
for core in cores {
ret.set(core.index);
}
ret
}
fn get(&self, index: usize) -> bool {
let (chunk, mask) = index_to_mask(index);
self.chunks[chunk].load(Acquire) & mask == mask
}
fn set(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev | mask;
self.chunks[chunk].store(next, Release);
}
fn unset(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev & !mask;
self.chunks[chunk].store(next, Release);
}
fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
let expect = IdleMap::new(idle_cores);
for (i, chunk) in expect.chunks.iter().enumerate() {
if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
return false;
}
}
true
}
}
impl Snapshot {
pub(crate) fn new(idle: &Idle) -> Snapshot {
let chunks = vec![0; num_chunks(idle.idle_map.chunks.len())];
let mut ret = Snapshot { chunks };
ret.update(&idle.idle_map);
ret
}
fn update(&mut self, idle_map: &IdleMap) {
for i in 0..self.chunks.len() {
self.chunks[i] = idle_map.chunks[i].load(Acquire);
}
}
fn get(&self, index: usize) -> bool {
let (chunk, mask) = index_to_mask(index);
self.chunks[chunk] & mask == mask
}
}
fn num_chunks(max_cores: usize) -> usize {
(max_cores / BITS) + 1
}
fn index_to_mask(index: usize) -> (usize, usize) {
let mask = 1 << (index & BIT_MASK);
let chunk = index / BITS;
(chunk, mask)
}
fn num_active_workers(synced: &Synced) -> usize {
synced.available_cores.capacity() - synced.available_cores.len()
}

View File

@ -97,12 +97,15 @@ pub(super) struct Worker {
/// Used to collect a list of workers to notify
workers_to_notify: Vec<usize>,
/// Snapshot of idle core list. This helps speedup stealing
idle_snapshot: idle::Snapshot,
}
/// Core data
pub(super) struct Core {
/// Index holding this core's remote/shared state.
index: usize,
pub(super) index: usize,
/// Used to schedule bookkeeping tasks every so often.
tick: u32,
@ -340,6 +343,7 @@ pub(super) fn create(
handle: handle.clone(),
index,
workers_to_notify: Vec::with_capacity(num_workers - 1),
idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
};
handle
@ -823,8 +827,19 @@ impl Worker {
continue;
}
// If the core is currently idle, then there is nothing to steal.
if cx.shared().idle.is_idle(i) {
continue;
}
let target = &cx.shared().remotes[i];
if lifo {
if let Some(task) = target.lifo_slot.take_remote() {
return Some(task);
}
}
if let Some(task) = target
.steal
.steal_into(&mut core.run_queue, &mut core.stats)
@ -1061,7 +1076,6 @@ impl Worker {
} else {
Ok((None, core))
}
}
fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {