more work

This commit is contained in:
Carl Lerche 2023-06-22 20:03:51 +00:00
parent 3339103ff5
commit 444615f245
No known key found for this signature in database
GPG Key ID: DF9E42ECB951A75D
2 changed files with 25 additions and 2 deletions

View File

@ -19,6 +19,8 @@ mod imp {
static NUM_GLOBAL_QUEUE_INTERVAL: AtomicUsize = AtomicUsize::new(0);
static NUM_NO_AVAIL_CORE: AtomicUsize = AtomicUsize::new(0);
static NUM_RELAY_SEARCH: AtomicUsize = AtomicUsize::new(0);
static NUM_SPIN_STALL: AtomicUsize = AtomicUsize::new(0);
static NUM_NO_LOCAL_WORK: AtomicUsize = AtomicUsize::new(0);
impl Drop for super::Counters {
fn drop(&mut self) {
@ -38,6 +40,8 @@ mod imp {
let num_global_queue_interval = NUM_GLOBAL_QUEUE_INTERVAL.load(Relaxed);
let num_no_avail_core = NUM_NO_AVAIL_CORE.load(Relaxed);
let num_relay_search = NUM_RELAY_SEARCH.load(Relaxed);
let num_spin_stall = NUM_SPIN_STALL.load(Relaxed);
let num_no_local_work = NUM_NO_LOCAL_WORK.load(Relaxed);
println!("---");
println!("notifies (remote): {}", notifies_remote);
@ -56,6 +60,8 @@ mod imp {
println!("remote task batch: {}", num_remote_batch);
println!("global Q interval: {}", num_global_queue_interval);
println!(" relay search: {}", num_relay_search);
println!(" spin stall: {}", num_spin_stall);
println!(" no local work: {}", num_no_local_work);
}
}
@ -122,6 +128,14 @@ mod imp {
pub(crate) fn inc_num_relay_search() {
NUM_RELAY_SEARCH.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_spin_stall() {
NUM_SPIN_STALL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_no_local_work() {
NUM_NO_LOCAL_WORK.fetch_add(1, Relaxed);
}
}
#[cfg(not(tokio_internal_mt_counters))]
@ -142,6 +156,8 @@ mod imp {
pub(crate) fn inc_global_queue_interval() {}
pub(crate) fn inc_notify_no_core() {}
pub(crate) fn inc_num_relay_search() {}
pub(crate) fn inc_num_spin_stall() {}
pub(crate) fn inc_num_no_local_work() {}
}
#[derive(Debug)]

View File

@ -700,6 +700,8 @@ impl Worker {
// a notified task.
core = try_task!(self.next_notified_task(cx, core));
super::counters::inc_num_no_local_work();
// We consumed all work in the queues and will start searching for work.
core.stats.end_processing_scheduled_tasks(&mut self.stats);
@ -837,7 +839,7 @@ impl Worker {
/// workers will be trying to steal at the same time.
fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
#[cfg(not(loom))]
const ROUNDS: usize = 4;
const ROUNDS: usize = 1;
#[cfg(loom)]
const ROUNDS: usize = 1;
@ -866,7 +868,10 @@ impl Worker {
return Ok((Some(task), core));
}
std::thread::sleep(std::time::Duration::from_micros(3));
if i > 0 {
super::counters::inc_num_spin_stall();
std::thread::sleep(std::time::Duration::from_micros(i as u64));
}
core = try_task_new_batch!(self, self.next_remote_task_batch(cx, core));
}
@ -1403,6 +1408,8 @@ impl Shared {
}
fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
core.stats.inc_local_schedule_count();
if cx.lifo_enabled.get() {
// Push to the LIFO slot
let prev = std::mem::replace(&mut core.lifo_slot, Some(task));