This commit is contained in:
Carl Lerche 2023-06-13 13:08:00 -07:00
parent 0866ee376b
commit 464e59caab
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
3 changed files with 81 additions and 86 deletions

View File

@ -14,12 +14,6 @@ pub(crate) struct Driver {
inner: TimeDriver,
}
impl Drop for Driver {
fn drop(&mut self) {
println!(" + DROP DRIVER");
}
}
#[derive(Debug)]
pub(crate) struct Handle {
/// IO driver handle

View File

@ -67,6 +67,10 @@ impl Idle {
self.idle_map.get(index)
}
pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
snapshot.update(&self.idle_map)
}
/// Try to acquire an available core
pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
let ret = synced.available_cores.pop();
@ -207,7 +211,6 @@ impl Idle {
}
pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
println!(" + start shutdown");
// First, set the shutdown flag on each core
for core in &mut synced.idle.available_cores {
core.is_shutdown = true;
@ -225,8 +228,6 @@ impl Idle {
synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();
println!(" + notify worker shutdown w/ core");
self.num_idle
.store(synced.idle.available_cores.len(), Release);
}
@ -236,7 +237,6 @@ impl Idle {
// Wake up any other workers
while let Some(index) = synced.idle.sleepers.pop() {
shared.condvars[index].notify_one();
println!(" + notify worker shutdown NO core");
}
}
@ -382,7 +382,7 @@ impl Snapshot {
}
}
fn get(&self, index: usize) -> bool {
pub(super) fn is_idle(&self, index: usize) -> bool {
let (chunk, mask) = index_to_mask(index);
self.chunks[chunk] & mask == mask
}

View File

@ -89,12 +89,13 @@ cfg_not_taskdump! {
/// A scheduler worker
pub(super) struct Worker {
/*
/// Reference to scheduler's handle
handle: Arc<Handle>,
/// This worker's index in `assigned_cores` and `condvars`.
index: usize,
*/
/// Used to collect a list of workers to notify
workers_to_notify: Vec<usize>,
@ -141,12 +142,6 @@ pub(super) struct Core {
rand: FastRand,
}
impl Drop for Core {
fn drop(&mut self) {
println!(" DROPPED CORE");
}
}
/// State shared across all workers
pub(crate) struct Shared {
/// Per-core remote state.
@ -229,9 +224,15 @@ pub(crate) struct Context {
// Current scheduler's handle
handle: Arc<Handle>,
/// Worker index
index: usize,
/// Core data
core: RefCell<Option<Box<Core>>>,
/// Used to pass cores to other threads when `block_in_place` is called
handoff_core: Arc<AtomicCell<Core>>,
/// Tasks to wake after resource drivers are polled. This is mostly to
/// handle yielded tasks.
pub(crate) defer: RefCell<Vec<Notified>>,
@ -339,27 +340,22 @@ pub(super) fn create(
// Eagerly start worker threads
for index in 0..num_workers {
let handle = rt_handle.inner.expect_multi_thread();
let worker = Worker {
handle: handle.clone(),
index,
workers_to_notify: Vec::with_capacity(num_workers - 1),
idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
};
let h2 = handle.clone();
let handoff_core = Arc::new(AtomicCell::new(None));
handle
.blocking_spawner
.spawn_blocking(&rt_handle, move || run(worker));
.spawn_blocking(&rt_handle, move || run(index, h2, handoff_core, false));
}
rt_handle
}
#[track_caller]
pub(crate) fn block_in_place<F, R>(_f: F) -> R
pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
/*
// Try to steal the worker core back
struct Reset(coop::Budget);
@ -367,7 +363,7 @@ where
fn drop(&mut self) {
with_current(|maybe_cx| {
if let Some(cx) = maybe_cx {
let core = cx.worker.core.take();
let core = cx.handoff_core.take();
let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
@ -432,22 +428,21 @@ where
None => return Ok(()),
};
// The parker should be set here
assert!(core.park.is_some());
// In order to block, the core must be sent to another thread for
// execution.
//
// First, move the core back into the worker's shared core slot.
cx.worker.core.set(core);
cx.handoff_core.set(core);
// Next, clone the worker handle and send it to a new thread for
// processing.
//
// Once the blocking task is done executing, we will attempt to
// steal the core back.
let worker = cx.worker.clone();
runtime::spawn_blocking(move || run(worker));
let index = cx.index;
let handle = cx.handle.clone();
let handoff_core = cx.handoff_core.clone();
runtime::spawn_blocking(move || run(index, handle, handoff_core, true));
Ok(())
});
@ -464,11 +459,14 @@ where
} else {
f()
}
*/
todo!()
}
fn run(mut worker: Worker) {
fn run(
index: usize,
handle: Arc<Handle>,
handoff_core: Arc<AtomicCell<Core>>,
blocking_in_place: bool,
) {
struct AbortOnPanic;
impl Drop for AbortOnPanic {
@ -485,13 +483,22 @@ fn run(mut worker: Worker) {
#[cfg(debug_assertions)]
let _abort_on_panic = AbortOnPanic;
let handle = scheduler::Handle::MultiThread(worker.handle.clone());
let num_workers = handle.shared.condvars.len();
crate::runtime::context::enter_runtime(&handle, true, |_| {
let mut worker = Worker {
workers_to_notify: Vec::with_capacity(num_workers - 1),
idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
};
let sched_handle = scheduler::Handle::MultiThread(handle.clone());
crate::runtime::context::enter_runtime(&sched_handle, true, |_| {
// Set the worker context.
let cx = scheduler::Context::MultiThread(Context {
handle: worker.handle.clone(),
index,
handle,
core: RefCell::new(None),
handoff_core,
defer: RefCell::new(Vec::with_capacity(64)),
});
@ -499,7 +506,7 @@ fn run(mut worker: Worker) {
let cx = cx.expect_multi_thread();
// Run the worker
let res = worker.run(&cx);
let res = worker.run(&cx, blocking_in_place);
// `err` here signifies the core was lost, this is an expected end
// state for a worker.
debug_assert!(res.is_err());
@ -526,18 +533,27 @@ macro_rules! n {
}
impl Worker {
fn run(&mut self, cx: &Context) -> RunResult {
fn run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult {
let (maybe_task, mut core) = {
let mut synced = cx.shared().synced.lock();
// First try to acquire an available core
if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
// Try to poll a task from the global queue
let maybe_task = self.next_remote_task_synced(cx, &mut synced);
(maybe_task, core)
if blocking_in_place {
if let Some(core) = cx.handoff_core.take() {
(None, core)
} else {
// Just shutdown
return Err(());
}
} else {
// block the thread to wait for a core to be assinged to us
self.wait_for_core(cx, synced)?
let mut synced = cx.shared().synced.lock();
// First try to acquire an available core
if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
// Try to poll a task from the global queue
let maybe_task = self.next_remote_task_synced(cx, &mut synced);
(maybe_task, core)
} else {
// block the thread to wait for a core to be assinged to us
self.wait_for_core(cx, synced)?
}
}
};
@ -582,7 +598,6 @@ impl Worker {
.idle
.try_acquire_available_core(&mut synced.idle)
{
println!(" + acquired_core; {}", self.index);
self.reset_acquired_core(cx, synced, &mut core);
Some(core)
} else {
@ -592,46 +607,40 @@ impl Worker {
// Block the current thread, waiting for an available core
fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> NextTaskResult {
println!(" + wait_for_core; {}", self.index);
cx.shared()
.idle
.transition_worker_to_parked(&mut synced, self.index);
.transition_worker_to_parked(&mut synced, cx.index);
// Wait until a core is available, then exit the loop.
let mut core = loop {
if let Some(core) = synced.assigned_cores[self.index].take() {
if let Some(core) = synced.assigned_cores[cx.index].take() {
break core;
}
// If shutting down, abort
if cx.shared().inject.is_closed(&synced.inject) {
self.shutdown_clear_defer(cx);
println!(" + wait_for_core; shutdown; {}", self.index);
return Err(());
}
synced = cx.shared().condvars[self.index].wait(synced).unwrap();
synced = cx.shared().condvars[cx.index].wait(synced).unwrap();
};
self.reset_acquired_core(cx, &mut synced, &mut core);
if core.is_shutdown {
println!(" + wait_for_core; CORE(shutdown) {}", self.index);
// Currently shutting down, don't do any more work
return Ok((None, core));
}
// The core was notified to search for work, don't try to take tasks from the injection queue
if core.is_searching {
println!(" + wait_for_core; SEARCHING");
return Ok((None, core));
}
let n = core.run_queue.max_capacity() / 2;
let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, n);
println!(" + wait_for_core; task={:?}", maybe_task.is_some());
Ok((maybe_task, core))
}
@ -639,7 +648,7 @@ impl Worker {
fn reset_acquired_core(&self, cx: &Context, synced: &mut Synced, core: &mut Core) {
// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
self.reset_lifo_enabled(core);
self.reset_lifo_enabled(cx, core);
// At this point, the local queue should be empty
debug_assert!(core.run_queue.is_empty());
@ -652,10 +661,10 @@ impl Worker {
/// none are available, the thread sleeps and tries again.
fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
while !core.is_shutdown {
self.assert_lifo_enabled_is_correct(&core);
self.assert_lifo_enabled_is_correct(cx, &core);
if core.is_traced {
core = self.handle.trace_core(core);
core = cx.handle.trace_core(core);
}
// Increment the tick
@ -783,7 +792,7 @@ impl Worker {
/// Note: Only if less than half the workers are searching for tasks to steal
/// a new worker will actually try to steal. The idea is to make sure not all
/// workers will be trying to steal at the same time.
fn steal_work(&self, cx: &Context, core: &mut Core) -> Option<Notified> {
fn steal_work(&mut self, cx: &Context, core: &mut Core) -> Option<Notified> {
#[cfg(not(loom))]
const ROUNDS: usize = 4;
@ -794,6 +803,9 @@ impl Worker {
return None;
}
// Get a snapshot of which workers are idle
cx.shared().idle.snapshot(&mut self.idle_snapshot);
let num = cx.shared().remotes.len();
for i in 0..ROUNDS {
@ -828,7 +840,7 @@ impl Worker {
}
// If the core is currently idle, then there is nothing to steal.
if cx.shared().idle.is_idle(i) {
if self.idle_snapshot.is_idle(i) {
continue;
}
@ -860,7 +872,7 @@ impl Worker {
cx.shared().notify_parked_local();
}
self.assert_lifo_enabled_is_correct(&core);
self.assert_lifo_enabled_is_correct(cx, &core);
// Measure the poll start time. Note that we may end up polling other
// tasks under this measurement. In this case, the tasks came from the
@ -895,7 +907,7 @@ impl Worker {
let task = match cx.shared().remotes[core.index].lifo_slot.take_local() {
Some(task) => task,
None => {
self.reset_lifo_enabled(&mut core);
self.reset_lifo_enabled(cx, &mut core);
core.stats.end_poll();
return Ok(core);
}
@ -1012,7 +1024,6 @@ impl Worker {
fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
if core.tick % cx.shared().config.event_interval == 0 {
println!(" + MAINT {}", core.tick);
super::counters::inc_num_maintenance();
core.stats.end_processing_scheduled_tasks();
@ -1044,7 +1055,6 @@ impl Worker {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
if let Some(mut driver) = cx.shared().driver.take() {
println!(" + DRIVER POLL");
driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
cx.shared().driver.set(driver);
@ -1066,7 +1076,6 @@ impl Worker {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
if let Some(mut driver) = cx.shared().driver.take() {
println!(" + DRIVER POLL");
driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
cx.shared().driver.set(driver);
@ -1107,6 +1116,7 @@ impl Worker {
// Before we park, if we are searching, we need to transition away from searching
if self.transition_from_searching(cx, &mut core) {
cx.shared().idle.snapshot(&mut self.idle_snapshot);
// We were the last searching worker, we need to do one last check
if let Some(task) = self.steal_one_round(cx, &mut core, 0, true) {
cx.shared().notify_parked_local();
@ -1152,11 +1162,8 @@ impl Worker {
// Drop the lock before parking on the driver
drop(synced);
println!(" + driver::park");
// Wait for driver events
driver.park(&self.handle.driver);
println!(" + driver::park / done");
driver.park(&cx.handle.driver);
synced = cx.shared().synced.lock();
@ -1230,22 +1237,16 @@ impl Worker {
}
fn shutdown_finalize(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) {
println!(" + shutdown core");
// Wait for all cores
if synced.shutdown_cores.len() != cx.shared().remotes.len() {
return;
}
println!(" + shutdown_finalize; all cores");
// Wait for driver
if cx.shared().driver.is_none() {
return;
}
println!(" + shutdown_finalize; have driver");
debug_assert!(cx.shared().owned.is_empty());
for mut core in synced.shutdown_cores.drain(..) {
@ -1255,7 +1256,7 @@ impl Worker {
// Shutdown the driver
let mut driver = cx.shared().driver.take().expect("driver missing");
driver.shutdown(&self.handle.driver);
driver.shutdown(&cx.handle.driver);
// Drain the injection queue
//
@ -1268,14 +1269,14 @@ impl Worker {
}
}
fn reset_lifo_enabled(&self, core: &mut Core) {
core.lifo_enabled = !self.handle.shared.config.disable_lifo_slot;
fn reset_lifo_enabled(&self, cx: &Context, core: &mut Core) {
core.lifo_enabled = !cx.handle.shared.config.disable_lifo_slot;
}
fn assert_lifo_enabled_is_correct(&self, core: &Core) {
fn assert_lifo_enabled_is_correct(&self, cx: &Context, core: &Core) {
debug_assert_eq!(
core.lifo_enabled,
!self.handle.shared.config.disable_lifo_slot
!cx.handle.shared.config.disable_lifo_slot
);
}