This commit is contained in:
Carl Lerche 2023-06-22 17:13:45 +00:00
parent ec3570ecf5
commit 1a2d2df92d
No known key found for this signature in database
GPG Key ID: DF9E42ECB951A75D
3 changed files with 75 additions and 29 deletions

View File

@ -16,6 +16,9 @@ mod imp {
static NUM_POLLS: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_POLLS: AtomicUsize = AtomicUsize::new(0);
static NUM_REMOTE_BATCH: AtomicUsize = AtomicUsize::new(0);
static NUM_GLOBAL_QUEUE_INTERVAL: AtomicUsize = AtomicUsize::new(0);
static NUM_NO_AVAIL_CORE: AtomicUsize = AtomicUsize::new(0);
static NUM_RELAY_SEARCH: AtomicUsize = AtomicUsize::new(0);
impl Drop for super::Counters {
fn drop(&mut self) {
@ -32,12 +35,16 @@ mod imp {
let num_polls = NUM_POLLS.load(Relaxed);
let num_lifo_polls = NUM_LIFO_POLLS.load(Relaxed);
let num_remote_batch = NUM_REMOTE_BATCH.load(Relaxed);
let num_global_queue_interval = NUM_GLOBAL_QUEUE_INTERVAL.load(Relaxed);
let num_no_avail_core = NUM_NO_AVAIL_CORE.load(Relaxed);
let num_relay_search = NUM_RELAY_SEARCH.load(Relaxed);
println!("---");
println!("notifies (remote): {}", notifies_remote);
println!(" notifies (local): {}", notifies_local);
println!(" unparks (local): {}", unparks_local);
println!(" unparks (remote): {}", unparks_remote);
println!(" notify, no core: {}", num_no_avail_core);
println!(" maintenance: {}", maintenance);
println!(" LIFO schedules: {}", lifo_scheds);
println!(" LIFO capped: {}", lifo_capped);
@ -47,6 +54,8 @@ mod imp {
println!(" polls: {}", num_polls);
println!(" polls (LIFO): {}", num_lifo_polls);
println!("remote task batch: {}", num_remote_batch);
println!("global Q interval: {}", num_global_queue_interval);
println!(" relay search: {}", num_relay_search);
}
}
@ -101,6 +110,18 @@ mod imp {
pub(crate) fn inc_num_remote_batch() {
NUM_REMOTE_BATCH.fetch_add(1, Relaxed);
}
pub(crate) fn inc_global_queue_interval() {
NUM_GLOBAL_QUEUE_INTERVAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_notify_no_core() {
NUM_NO_AVAIL_CORE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_relay_search() {
NUM_RELAY_SEARCH.fetch_add(1, Relaxed);
}
}
#[cfg(not(tokio_internal_mt_counters))]
@ -118,6 +139,9 @@ mod imp {
pub(crate) fn inc_num_polls() {}
pub(crate) fn inc_num_lifo_polls() {}
pub(crate) fn inc_num_remote_batch() {}
pub(crate) fn inc_global_queue_interval() {}
pub(crate) fn inc_notify_no_core() {}
pub(crate) fn inc_num_relay_search() {}
}
#[derive(Debug)]

View File

@ -63,6 +63,10 @@ impl Idle {
synced.available_cores.len()
}
pub(super) fn num_searching(&self) -> usize {
self.num_searching.load(Acquire)
}
pub(super) fn is_idle(&self, index: usize) -> bool {
self.idle_map.get(index)
}
@ -174,6 +178,8 @@ impl Idle {
}
}
super::counters::inc_notify_no_core();
// Set the `needs_searching` flag, this happens *while* the lock is held.
self.needs_searching.store(true, Release);
self.num_searching.fetch_sub(1, Release);

View File

@ -659,7 +659,7 @@ impl Worker {
}
let n = core.run_queue.max_capacity() / 2;
let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, n);
let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
Ok((maybe_task, core))
}
@ -703,11 +703,11 @@ impl Worker {
// We consumed all work in the queues and will start searching for work.
core.stats.end_processing_scheduled_tasks(&mut self.stats);
core = try_task_new_batch!(self, self.poll_driver(cx, core));
while !self.is_shutdown {
// Try to steal a task from other workers
core = try_task_new_batch!(self, self.steal_work(cx, core));
// Search for more work, this involves trying to poll the resource
// driver, steal from other workers, and check the global queue
// again.
core = try_task_new_batch!(self, self.search_for_work(cx, core));
if !cx.defer.borrow().is_empty() {
core = try_task_new_batch!(self, self.park_yield(cx, core));
@ -726,6 +726,8 @@ impl Worker {
self.num_seq_local_queue_polls += 1;
if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
super::counters::inc_global_queue_interval();
self.num_seq_local_queue_polls = 0;
// Update the global queue interval, if needed
@ -740,22 +742,7 @@ impl Worker {
return Ok((Some(task), core));
}
if cx.shared().inject.is_empty() {
return Ok((None, core));
}
// Other threads can only **remove** tasks from the current worker's
// `run_queue`. So, we can be confident that by the time we call
// `run_queue.push_back` below, there will be *at least* `cap`
// available slots in the queue.
let cap = usize::min(
core.run_queue.remaining_slots(),
core.run_queue.max_capacity() / 2,
);
let mut synced = cx.shared().synced.lock();
let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, cap);
Ok((maybe_task, core))
self.next_remote_task_batch(cx, core)
}
fn next_remote_task(&self, cx: &Context) -> Option<Notified> {
@ -772,7 +759,26 @@ impl Worker {
unsafe { cx.shared().inject.pop(&mut synced.inject) }
}
fn next_remote_task_batch(
fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
if cx.shared().inject.is_empty() {
return Ok((None, core));
}
// Other threads can only **remove** tasks from the current worker's
// `run_queue`. So, we can be confident that by the time we call
// `run_queue.push_back` below, there will be *at least* `cap`
// available slots in the queue.
let cap = usize::min(
core.run_queue.remaining_slots(),
core.run_queue.max_capacity() / 2,
);
let mut synced = cx.shared().synced.lock();
let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap);
Ok((maybe_task, core))
}
fn next_remote_task_batch_synced(
&self,
cx: &Context,
synced: &mut Synced,
@ -784,10 +790,13 @@ impl Worker {
// The worker is currently idle, pull a batch of work from the
// injection queue. We don't want to pull *all* the work so other
// workers can also get some.
let n = usize::min(
cx.shared().inject.len() / cx.shared().remotes.len() + 1,
max,
);
let n = if core.is_searching {
cx.shared().inject.len() / cx.shared().idle.num_searching() + 1
} else {
cx.shared().inject.len() / cx.shared().remotes.len() + 1
};
let n = usize::min(n, max);
// safety: passing in the correct `inject::Synced`.
let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) };
@ -821,9 +830,9 @@ impl Worker {
/// Note: Only if less than half the workers are searching for tasks to steal
/// a new worker will actually try to steal. The idea is to make sure not all
/// workers will be trying to steal at the same time.
fn steal_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
#[cfg(not(loom))]
const ROUNDS: usize = 1;
const ROUNDS: usize = 4;
#[cfg(loom)]
const ROUNDS: usize = 1;
@ -835,6 +844,8 @@ impl Worker {
return Ok((None, core));
}
core = try_task_new_batch!(self, self.poll_driver(cx, core));
// Get a snapshot of which workers are idle
cx.shared().idle.snapshot(&mut self.idle_snapshot);
@ -849,6 +860,10 @@ impl Worker {
if let Some(task) = self.steal_one_round(cx, &mut core, start, last) {
return Ok((Some(task), core));
}
std::thread::sleep(std::time::Duration::from_micros(3));
core = try_task_new_batch!(self, self.next_remote_task_batch(cx, core));
}
Ok((None, core))
@ -903,6 +918,7 @@ impl Worker {
// Make sure the worker is not in the **searching** state. This enables
// another idle worker to try to steal work.
if self.transition_from_searching(cx, &mut core) {
super::counters::inc_num_relay_search();
cx.shared().notify_parked_local();
}
@ -1162,7 +1178,7 @@ impl Worker {
// Try one last time to get tasks
let n = core.run_queue.max_capacity() / 2;
if let Some(task) = self.next_remote_task_batch(cx, &mut synced, &mut core, n) {
if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) {
return Ok((Some(task), core));
}