metrics: add a new metric for budget exhaustion yields (#5517)

This commit is contained in:
Noah Kennedy 2023-03-01 14:25:35 -06:00 committed by GitHub
parent ee1c940709
commit 52da177dea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 140 additions and 5 deletions

View File

@ -36,6 +36,11 @@ use crate::runtime::context;
#[derive(Debug, Copy, Clone)]
pub(crate) struct Budget(Option<u8>);
pub(crate) struct BudgetDecrement {
success: bool,
hit_zero: bool,
}
impl Budget {
/// Budget assigned to a task on each poll.
///
@ -172,9 +177,17 @@ cfg_coop! {
context::budget(|cell| {
let mut budget = cell.get();
if budget.decrement() {
let decrement = budget.decrement();
if decrement.success {
let restore = RestoreOnPending(Cell::new(cell.get()));
cell.set(budget);
// avoid double counting
if decrement.hit_zero {
inc_budget_forced_yield_count();
}
Poll::Ready(restore)
} else {
cx.waker().wake_by_ref();
@ -183,19 +196,43 @@ cfg_coop! {
}).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
}
cfg_rt! {
cfg_metrics! {
#[inline(always)]
fn inc_budget_forced_yield_count() {
if let Ok(handle) = context::try_current() {
handle.scheduler_metrics().inc_budget_forced_yield_count();
}
}
}
cfg_not_metrics! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}
}
}
cfg_not_rt! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}
}
impl Budget {
/// Decrements the budget. Returns `true` if successful. Decrementing fails
/// when there is not enough remaining budget.
fn decrement(&mut self) -> bool {
fn decrement(&mut self) -> BudgetDecrement {
if let Some(num) = &mut self.0 {
if *num > 0 {
*num -= 1;
true
let hit_zero = *num == 0;
BudgetDecrement { success: true, hit_zero }
} else {
false
BudgetDecrement { success: false, hit_zero: false }
}
} else {
true
BudgetDecrement { success: true, hit_zero: false }
}
}

View File

@ -124,6 +124,21 @@ impl RuntimeMetrics {
.load(Relaxed)
}
/// Returns the number of times that tasks have been forced to yield back to the scheduler
/// after exhausting their task budgets.
///
/// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
pub fn budget_forced_yield_count(&self) -> u64 {
self.handle
.inner
.scheduler_metrics()
.budget_forced_yield_count
.load(Relaxed)
}
/// Returns the total number of times the given worker thread has parked.
///
/// The worker park count starts at zero when the runtime is created and

View File

@ -11,12 +11,14 @@ use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
pub(crate) struct SchedulerMetrics {
/// Number of tasks that are scheduled from outside the runtime.
pub(super) remote_schedule_count: AtomicU64,
pub(super) budget_forced_yield_count: AtomicU64,
}
impl SchedulerMetrics {
pub(crate) fn new() -> SchedulerMetrics {
SchedulerMetrics {
remote_schedule_count: AtomicU64::new(0),
budget_forced_yield_count: AtomicU64::new(0),
}
}
@ -24,4 +26,9 @@ impl SchedulerMetrics {
pub(crate) fn inc_remote_schedule_count(&self) {
self.remote_schedule_count.fetch_add(1, Relaxed);
}
/// Increment the number of tasks forced to yield due to budget exhaustion
pub(crate) fn inc_budget_forced_yield_count(&self) {
self.budget_forced_yield_count.fetch_add(1, Relaxed);
}
}

View File

@ -1,9 +1,13 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))]
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use tokio::macros::support::poll_fn;
use tokio::runtime::Runtime;
use tokio::task::consume_budget;
use tokio::time::{self, Duration};
#[test]
@ -433,6 +437,78 @@ fn worker_local_queue_depth() {
});
}
#[test]
fn budget_exhaustion_yield() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.budget_forced_yield_count());
let mut did_yield = false;
// block on a task which consumes budget until it yields
rt.block_on(poll_fn(|cx| loop {
if did_yield {
return Poll::Ready(());
}
let fut = consume_budget();
tokio::pin!(fut);
if fut.poll(cx).is_pending() {
did_yield = true;
return Poll::Pending;
}
}));
assert_eq!(1, rt.metrics().budget_forced_yield_count());
}
#[test]
fn budget_exhaustion_yield_with_joins() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.budget_forced_yield_count());
let mut did_yield_1 = false;
let mut did_yield_2 = false;
// block on a task which consumes budget until it yields
rt.block_on(async {
tokio::join!(
poll_fn(|cx| loop {
if did_yield_1 {
return Poll::Ready(());
}
let fut = consume_budget();
tokio::pin!(fut);
if fut.poll(cx).is_pending() {
did_yield_1 = true;
return Poll::Pending;
}
}),
poll_fn(|cx| loop {
if did_yield_2 {
return Poll::Ready(());
}
let fut = consume_budget();
tokio::pin!(fut);
if fut.poll(cx).is_pending() {
did_yield_2 = true;
return Poll::Pending;
}
})
)
});
assert_eq!(1, rt.metrics().budget_forced_yield_count());
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {