fix more bugz

This commit is contained in:
Carl Lerche 2023-06-12 16:14:33 -07:00
parent fab5adc17c
commit e6a1444fac
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
3 changed files with 69 additions and 11 deletions

View File

@ -14,6 +14,12 @@ pub(crate) struct Driver {
inner: TimeDriver, inner: TimeDriver,
} }
impl Drop for Driver {
fn drop(&mut self) {
println!(" + DROP DRIVER");
}
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Handle { pub(crate) struct Handle {
/// IO driver handle /// IO driver handle

View File

@ -126,8 +126,7 @@ impl Idle {
shared.condvars[worker].notify_one(); shared.condvars[worker].notify_one();
return; return;
} else { } else {
// synced.idle.sleepers.push(worker); synced.idle.sleepers.push(worker);
panic!("[tokio] unexpected condition");
} }
} }
@ -150,7 +149,7 @@ impl Idle {
workers: &mut Vec<usize>, workers: &mut Vec<usize>,
num: usize, num: usize,
) { ) {
let mut did_notify = false; debug_assert!(workers.is_empty());
for _ in 0..num { for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() { if let Some(worker) = synced.idle.sleepers.pop() {
@ -160,18 +159,17 @@ impl Idle {
synced.assigned_cores[worker] = Some(core); synced.assigned_cores[worker] = Some(core);
workers.push(worker); workers.push(worker);
did_notify = true;
continue; continue;
} else { } else {
panic!("[tokio] unexpected condition"); synced.idle.sleepers.push(worker);
} }
} }
break; break;
} }
if did_notify { if !workers.is_empty() {
let num_idle = synced.idle.available_cores.len(); let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release); self.num_idle.store(num_idle, Release);
} else { } else {
@ -184,6 +182,7 @@ impl Idle {
} }
pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) { pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
println!(" + start shutdown");
// First, set the shutdown flag on each core // First, set the shutdown flag on each core
for core in &mut synced.idle.available_cores { for core in &mut synced.idle.available_cores {
core.is_shutdown = true; core.is_shutdown = true;
@ -199,6 +198,8 @@ impl Idle {
synced.assigned_cores[worker] = Some(core); synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one(); shared.condvars[worker].notify_one();
println!(" + notify worker shutdown w/ core");
self.num_idle self.num_idle
.store(synced.idle.available_cores.len(), Release); .store(synced.idle.available_cores.len(), Release);
} }
@ -206,6 +207,7 @@ impl Idle {
// Wake up any other workers // Wake up any other workers
while let Some(index) = synced.idle.sleepers.pop() { while let Some(index) = synced.idle.sleepers.pop() {
shared.condvars[index].notify_one(); shared.condvars[index].notify_one();
println!(" + notify worker shutdown NO core");
} }
} }

View File

@ -107,6 +107,10 @@ pub(super) struct Core {
/// Used to schedule bookkeeping tasks every so often. /// Used to schedule bookkeeping tasks every so often.
tick: u32, tick: u32,
/// Counter used to track when to poll from the local queue vs. the
/// injection queue
num_seq_local_queue_polls: u32,
/// When a task is scheduled from a worker, it is stored in this slot. The /// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run /// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run /// queue. This effectively results in the **last** scheduled task to be run
@ -141,6 +145,12 @@ pub(super) struct Core {
rand: FastRand, rand: FastRand,
} }
impl Drop for Core {
fn drop(&mut self) {
println!(" DROPPED CORE");
}
}
/// State shared across all workers /// State shared across all workers
pub(crate) struct Shared { pub(crate) struct Shared {
/// Per-core remote state. /// Per-core remote state.
@ -268,6 +278,7 @@ pub(super) fn create(
cores.push(Box::new(Core { cores.push(Box::new(Core {
index: i, index: i,
tick: 0, tick: 0,
num_seq_local_queue_polls: 0,
lifo_slot: None, lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot, lifo_enabled: !config.disable_lifo_slot,
run_queue, run_queue,
@ -560,6 +571,7 @@ impl Worker {
.idle .idle
.try_acquire_available_core(&mut synced.idle) .try_acquire_available_core(&mut synced.idle)
{ {
println!(" + acquired_core; {}", self.index);
self.reset_acquired_core(cx, synced, &mut core); self.reset_acquired_core(cx, synced, &mut core);
Some(core) Some(core)
} else { } else {
@ -569,6 +581,7 @@ impl Worker {
// Block the current thread, waiting for an available core // Block the current thread, waiting for an available core
fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> NextTaskResult { fn wait_for_core(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) -> NextTaskResult {
println!(" + wait_for_core; {}", self.index);
cx.shared() cx.shared()
.idle .idle
.transition_worker_to_parked(&mut synced, self.index); .transition_worker_to_parked(&mut synced, self.index);
@ -582,6 +595,7 @@ impl Worker {
// If shutting down, abort // If shutting down, abort
if cx.shared().inject.is_closed(&synced.inject) { if cx.shared().inject.is_closed(&synced.inject) {
self.shutdown_clear_defer(cx); self.shutdown_clear_defer(cx);
println!(" + wait_for_core; shutdown; {}", self.index);
return Err(()); return Err(());
} }
@ -591,19 +605,22 @@ impl Worker {
self.reset_acquired_core(cx, &mut synced, &mut core); self.reset_acquired_core(cx, &mut synced, &mut core);
if core.is_shutdown { if core.is_shutdown {
println!(" + wait_for_core; CORE(shutdown) {}", self.index);
// Currently shutting down, don't do any more work // Currently shutting down, don't do any more work
return Ok((None, core)); return Ok((None, core));
} }
// The core was notified to search for work, don't try to take tasks from the injection queue // The core was notified to search for work, don't try to take tasks from the injection queue
if core.is_searching { if core.is_searching {
println!(" + wait_for_core; SEARCHING");
return Ok((None, core)); return Ok((None, core));
} }
// TODO: don't hardcode 128
let n = core.run_queue.max_capacity() / 2; let n = core.run_queue.max_capacity() / 2;
let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, n); let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, n);
println!(" + wait_for_core; task={:?}", maybe_task.is_some());
Ok((maybe_task, core)) Ok((maybe_task, core))
} }
@ -666,7 +683,12 @@ impl Worker {
} }
fn next_notified_task(&self, cx: &Context, core: &mut Core) -> Option<Notified> { fn next_notified_task(&self, cx: &Context, core: &mut Core) -> Option<Notified> {
if core.tick % core.global_queue_interval == 0 { core.num_seq_local_queue_polls += 1;
if core.num_seq_local_queue_polls % core.global_queue_interval == 0 {
core.num_seq_local_queue_polls = 0;
println!(" + next_notified_task; REMOTE FIRST");
// Update the global queue interval, if needed // Update the global queue interval, if needed
self.tune_global_queue_interval(cx, core); self.tune_global_queue_interval(cx, core);
@ -873,7 +895,7 @@ impl Worker {
fn schedule_deferred_with_core<'a>( fn schedule_deferred_with_core<'a>(
&mut self, &mut self,
cx: &'a Context, cx: &'a Context,
core: Box<Core>, mut core: Box<Core>,
synced: impl FnOnce() -> MutexGuard<'a, Synced>, synced: impl FnOnce() -> MutexGuard<'a, Synced>,
) -> NextTaskResult { ) -> NextTaskResult {
let mut defer = cx.defer.borrow_mut(); let mut defer = cx.defer.borrow_mut();
@ -909,6 +931,16 @@ impl Worker {
cx.shared().condvars[worker].notify_one() cx.shared().condvars[worker].notify_one()
} }
if !defer.is_empty() {
// Push the rest of the tasks on the local queue
for task in defer.drain(..) {
core.run_queue
.push_back_or_overflow(task, cx.shared(), &mut core.stats);
}
cx.shared().notify_parked_local();
}
Ok((task, core)) Ok((task, core))
} }
@ -962,18 +994,27 @@ impl Worker {
} }
fn park_yield(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult { fn park_yield(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
println!(" + park_yield; tick={}", core.tick);
let mut maybe_task = None;
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep. // to run without actually putting the thread to sleep.
if let Some(mut driver) = cx.shared().driver.take() { if let Some(mut driver) = cx.shared().driver.take() {
driver.park_timeout(&cx.handle.driver, Duration::from_millis(0)); driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
cx.shared().driver.set(driver);
// If there are more I/O events, schedule them. // If there are more I/O events, schedule them.
core = n!(self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())); let res = self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())?;
maybe_task = res.0;
core = res.1;
} }
self.flush_metrics(cx, &mut core); self.flush_metrics(cx, &mut core);
self.update_global_flags(cx, &mut cx.shared().synced.lock(), &mut core); self.update_global_flags(cx, &mut cx.shared().synced.lock(), &mut core);
Ok((None, core))
Ok((maybe_task, core))
} }
fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult { fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
@ -1050,8 +1091,11 @@ impl Worker {
// Drop the lock before parking on the driver // Drop the lock before parking on the driver
drop(synced); drop(synced);
println!(" + driver::park");
// Wait for driver events // Wait for driver events
driver.park(&self.handle.driver); driver.park(&self.handle.driver);
println!(" + driver::park / done");
synced = cx.shared().synced.lock(); synced = cx.shared().synced.lock();
@ -1123,16 +1167,22 @@ impl Worker {
} }
fn shutdown_finalize(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) { fn shutdown_finalize(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) {
println!(" + shutdown core");
// Wait for all cores // Wait for all cores
if synced.shutdown_cores.len() != cx.shared().remotes.len() { if synced.shutdown_cores.len() != cx.shared().remotes.len() {
return; return;
} }
println!(" + shutdown_finalize; all cores");
// Wait for driver // Wait for driver
if cx.shared().driver.is_none() { if cx.shared().driver.is_none() {
return; return;
} }
println!(" + shutdown_finalize; have driver");
debug_assert!(cx.shared().owned.is_empty()); debug_assert!(cx.shared().owned.is_empty());
for mut core in synced.shutdown_cores.drain(..) { for mut core in synced.shutdown_cores.drain(..) {