This commit is contained in:
Carl Lerche 2023-05-18 12:57:58 -07:00
parent 4c524eaf56
commit 6abee1a896
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
3 changed files with 45 additions and 20 deletions

View File

@ -25,10 +25,14 @@ impl Defer {
self.deferred.is_empty()
}
pub(crate) fn wake(&mut self) {
pub(crate) fn wake(&mut self) -> usize {
let ret = self.deferred.len();
for waker in self.deferred.drain(..) {
waker.wake();
}
ret
}
#[cfg(tokio_taskdump)]

View File

@ -3,30 +3,30 @@ mod imp {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0);
static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_NEED_SEARCHERS: AtomicUsize = AtomicUsize::new(0);
static NUM_WAKE_DEFERS: AtomicUsize = AtomicUsize::new(0);
static NUM_WAKE_DEFERS_MULT: AtomicUsize = AtomicUsize::new(0);
impl Drop for super::Counters {
fn drop(&mut self) {
println!("---");
println!(
"NUM_NOTIFY: {:>10}",
NUM_NOTIFY_LOCAL.load(Relaxed)
);
println!(
"NUM_UNPARKS: {:>10}",
NUM_UNPARKS_LOCAL.load(Relaxed)
);
println!(
"NUM_NEED_SEARCHERS: {:>10}",
NUM_NEED_SEARCHERS.load(Relaxed)
);
}
}
let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed);
let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed);
let maintenance = NUM_MAINTENANCE.load(Relaxed);
let need_searchers = NUM_NEED_SEARCHERS.load(Relaxed);
let defers = NUM_WAKE_DEFERS.load(Relaxed);
let defers_mult = NUM_WAKE_DEFERS_MULT.load(Relaxed);
pub(crate) fn inc_num_need_searchers() {
NUM_NEED_SEARCHERS.fetch_add(1, Relaxed);
println!("---");
println!("notifies (local): {}", notifies_local);
println!(" unparks (local): {}", unparks_local);
println!(" maintenance: {}", maintenance);
println!(" need_searchers: {}", need_searchers);
println!(" waking defers: {}", defers);
println!(" (mult): {}", defers_mult);
}
}
pub(crate) fn inc_num_inc_notify_local() {
@ -36,13 +36,31 @@ mod imp {
pub(crate) fn inc_num_unparks_local() {
NUM_UNPARKS_LOCAL.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_maintenance() {
NUM_MAINTENANCE.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_need_searchers() {
NUM_NEED_SEARCHERS.fetch_add(1, Relaxed);
}
pub(crate) fn inc_num_defers(batch: usize) {
NUM_WAKE_DEFERS.fetch_add(1, Relaxed);
if batch > 1 {
NUM_WAKE_DEFERS_MULT.fetch_add(1, Relaxed);
}
}
}
#[cfg(not(tokio_internal_mt_counters))]
mod imp {
pub(crate) fn inc_num_need_searchers() {}
pub(crate) fn inc_num_inc_notify_local() {}
pub(crate) fn inc_num_unparks_local() {}
pub(crate) fn inc_num_maintenance() {}
pub(crate) fn inc_num_need_searchers() {}
pub(crate) fn inc_num_defers(_batch: usize) {}
}
#[derive(Debug)]

View File

@ -528,6 +528,7 @@ impl Context {
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
super::counters::inc_num_maintenance();
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
@ -986,8 +987,10 @@ fn did_defer_tasks() -> bool {
context::with_defer(|deferred| !deferred.is_empty()).unwrap()
}
/// Returns the number of deferred tasks that were woken
fn wake_deferred_tasks() {
context::with_defer(|deferred| deferred.wake());
let n = context::with_defer(|deferred| deferred.wake()).unwrap_or(0);
super::counters::inc_num_defers(n);
}
cfg_metrics! {