metrics: add MetricAtomicU64 and use in metrics (#6574)

This commit is contained in:
Russell Cohen 2024-05-23 12:41:01 -04:00 committed by GitHub
parent 16ef7b1fd5
commit cba86cf1b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 722 additions and 633 deletions

View File

@ -1,4 +1,4 @@
284
285
&
+
<
@ -34,6 +34,7 @@ amongst
api
APIs
async
atomics
awaitable
backend
backpressure

View File

@ -218,19 +218,37 @@ macro_rules! cfg_macros {
macro_rules! cfg_metrics {
($($item:item)*) => {
$(
// For now, metrics is only disabled in loom tests.
// When stabilized, it might have a dedicated feature flag.
#[cfg(all(tokio_unstable, not(loom)))]
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
$item
)*
}
}
/// Some metrics require 64-bit atomics.
macro_rules! cfg_64bit_metrics {
($($item:item)*) => {
$(
#[cfg(target_has_atomic = "64")]
#[cfg_attr(docsrs, doc(cfg(target_has_atomic = "64")))]
$item
)*
}
}
macro_rules! cfg_no_64bit_metrics {
($($item:item)*) => {
$(
#[cfg(not(target_has_atomic = "64"))]
$item
)*
}
}
macro_rules! cfg_not_metrics {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, not(loom))))]
#[cfg(not(tokio_unstable))]
$item
)*
}
@ -238,7 +256,7 @@ macro_rules! cfg_not_metrics {
macro_rules! cfg_not_rt_and_metrics_and_net {
($($item:item)*) => {
$( #[cfg(not(all(feature = "net", feature = "rt", all(tokio_unstable, not(loom)))))]$item )*
$( #[cfg(not(all(feature = "net", feature = "rt", tokio_unstable)))]$item )*
}
}

View File

@ -1,4 +1,5 @@
use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::util::metric_atomics::MetricAtomicU64;
use std::cmp;
use std::ops::Range;
@ -6,7 +7,7 @@ use std::ops::Range;
#[derive(Debug)]
pub(crate) struct Histogram {
/// The histogram buckets
buckets: Box<[AtomicU64]>,
buckets: Box<[MetricAtomicU64]>,
/// Bucket scale, linear or log
scale: HistogramScale,
@ -53,8 +54,10 @@ impl Histogram {
self.buckets.len()
}
pub(crate) fn get(&self, bucket: usize) -> u64 {
self.buckets[bucket].load(Relaxed)
cfg_64bit_metrics! {
pub(crate) fn get(&self, bucket: usize) -> u64 {
self.buckets[bucket].load(Relaxed)
}
}
pub(crate) fn bucket_range(&self, bucket: usize) -> Range<u64> {
@ -150,7 +153,7 @@ impl HistogramBuilder {
Histogram {
buckets: (0..self.num_buckets)
.map(|_| AtomicU64::new(0))
.map(|_| MetricAtomicU64::new(0))
.collect::<Vec<_>>()
.into_boxed_slice(),
resolution,
@ -165,7 +168,7 @@ impl Default for HistogramBuilder {
}
}
#[cfg(test)]
#[cfg(all(test, target_has_atomic = "64"))]
mod test {
use super::*;

View File

@ -1,24 +1,24 @@
#![cfg_attr(not(feature = "net"), allow(dead_code))]
use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
use crate::{loom::sync::atomic::Ordering::Relaxed, util::metric_atomics::MetricAtomicU64};
#[derive(Default)]
pub(crate) struct IoDriverMetrics {
pub(super) fd_registered_count: AtomicU64,
pub(super) fd_deregistered_count: AtomicU64,
pub(super) ready_count: AtomicU64,
pub(super) fd_registered_count: MetricAtomicU64,
pub(super) fd_deregistered_count: MetricAtomicU64,
pub(super) ready_count: MetricAtomicU64,
}
impl IoDriverMetrics {
pub(crate) fn incr_fd_count(&self) {
self.fd_registered_count.fetch_add(1, Relaxed);
self.fd_registered_count.add(1, Relaxed);
}
pub(crate) fn dec_fd_count(&self) {
self.fd_deregistered_count.fetch_add(1, Relaxed);
self.fd_deregistered_count.add(1, Relaxed);
}
pub(crate) fn incr_ready_count_by(&self, amt: u64) {
self.ready_count.fetch_add(amt, Relaxed);
self.ready_count.add(amt, Relaxed);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,5 @@
use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::util::metric_atomics::MetricAtomicU64;
/// Retrieves metrics from the Tokio runtime.
///
@ -10,25 +11,25 @@ use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
#[derive(Debug)]
pub(crate) struct SchedulerMetrics {
/// Number of tasks that are scheduled from outside the runtime.
pub(super) remote_schedule_count: AtomicU64,
pub(super) budget_forced_yield_count: AtomicU64,
pub(super) remote_schedule_count: MetricAtomicU64,
pub(super) budget_forced_yield_count: MetricAtomicU64,
}
impl SchedulerMetrics {
pub(crate) fn new() -> SchedulerMetrics {
SchedulerMetrics {
remote_schedule_count: AtomicU64::new(0),
budget_forced_yield_count: AtomicU64::new(0),
remote_schedule_count: MetricAtomicU64::new(0),
budget_forced_yield_count: MetricAtomicU64::new(0),
}
}
/// Increment the number of tasks scheduled externally
pub(crate) fn inc_remote_schedule_count(&self) {
self.remote_schedule_count.fetch_add(1, Relaxed);
self.remote_schedule_count.add(1, Relaxed);
}
/// Increment the number of tasks forced to yield due to budget exhaustion
pub(crate) fn inc_budget_forced_yield_count(&self) {
self.budget_forced_yield_count.fetch_add(1, Relaxed);
self.budget_forced_yield_count.add(1, Relaxed);
}
}

View File

@ -1,7 +1,8 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
use crate::runtime::metrics::Histogram;
use crate::runtime::Config;
use crate::util::metric_atomics::MetricAtomicU64;
/// Retrieve runtime worker metrics.
///
@ -14,31 +15,31 @@ use crate::runtime::Config;
#[repr(align(128))]
pub(crate) struct WorkerMetrics {
/// Number of times the worker parked.
pub(crate) park_count: AtomicU64,
pub(crate) park_count: MetricAtomicU64,
/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: AtomicU64,
pub(crate) noop_count: MetricAtomicU64,
/// Number of tasks the worker stole.
pub(crate) steal_count: AtomicU64,
pub(crate) steal_count: MetricAtomicU64,
/// Number of times the worker stole
pub(crate) steal_operations: AtomicU64,
pub(crate) steal_operations: MetricAtomicU64,
/// Number of tasks the worker polled.
pub(crate) poll_count: AtomicU64,
pub(crate) poll_count: MetricAtomicU64,
/// EWMA task poll time, in nanoseconds.
pub(crate) mean_poll_time: AtomicU64,
pub(crate) mean_poll_time: MetricAtomicU64,
/// Amount of time the worker spent doing work vs. parking.
pub(crate) busy_duration_total: AtomicU64,
pub(crate) busy_duration_total: MetricAtomicU64,
/// Number of tasks scheduled for execution on the worker's local queue.
pub(crate) local_schedule_count: AtomicU64,
pub(crate) local_schedule_count: MetricAtomicU64,
/// Number of tasks moved from the local queue to the global queue to free space.
pub(crate) overflow_count: AtomicU64,
pub(crate) overflow_count: MetricAtomicU64,
/// Number of tasks currently in the local queue. Used only by the
/// current-thread scheduler.
@ -60,15 +61,15 @@ impl WorkerMetrics {
pub(crate) fn new() -> WorkerMetrics {
WorkerMetrics {
park_count: AtomicU64::new(0),
noop_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
steal_operations: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
mean_poll_time: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
local_schedule_count: AtomicU64::new(0),
park_count: MetricAtomicU64::new(0),
noop_count: MetricAtomicU64::new(0),
steal_count: MetricAtomicU64::new(0),
steal_operations: MetricAtomicU64::new(0),
poll_count: MetricAtomicU64::new(0),
mean_poll_time: MetricAtomicU64::new(0),
overflow_count: MetricAtomicU64::new(0),
busy_duration_total: MetricAtomicU64::new(0),
local_schedule_count: MetricAtomicU64::new(0),
queue_depth: AtomicUsize::new(0),
poll_count_histogram: None,
}

View File

@ -7,18 +7,21 @@ use std::time::Duration;
#[allow(unused)]
macro_rules! assert_metrics {
($stats:ident, $field:ident == $v:expr) => {{
use crate::runtime::WorkerMetrics;
use std::sync::atomic::Ordering::Relaxed;
($stats:ident, $field:ident == $v:expr) => {
#[cfg(target_has_atomic = "64")]
{
use crate::runtime::WorkerMetrics;
use std::sync::atomic::Ordering::Relaxed;
let worker = WorkerMetrics::new();
$stats.submit(&worker);
let worker = WorkerMetrics::new();
$stats.submit(&worker);
let expect = $v;
let actual = worker.$field.load(Relaxed);
let expect = $v;
let actual = worker.$field.load(Relaxed);
assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
}};
assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
}
};
}
fn new_stats() -> Stats {

View File

@ -0,0 +1,47 @@
use std::sync::atomic::Ordering;
cfg_64bit_metrics! {
use std::sync::atomic::AtomicU64;
}
/// `AtomicU64` that is is a no-op on platforms without 64-bit atomics
///
/// When used on platforms without 64-bit atomics, writes to this are no-ops.
/// The `load` method is only defined when 64-bit atomics are available.
#[derive(Debug, Default)]
pub(crate) struct MetricAtomicU64 {
#[cfg(target_has_atomic = "64")]
value: AtomicU64,
}
// some of these are currently only used behind cfg_unstable
#[allow(dead_code)]
impl MetricAtomicU64 {
// Load is only defined when supported
cfg_64bit_metrics! {
pub(crate) fn load(&self, ordering: Ordering) -> u64 {
self.value.load(ordering)
}
}
cfg_64bit_metrics! {
pub(crate) fn store(&self, val: u64, ordering: Ordering) {
self.value.store(val, ordering)
}
pub(crate) fn new(value: u64) -> Self {
Self { value: AtomicU64::new(value) }
}
pub(crate) fn add(&self, value: u64, ordering: Ordering) {
self.value.fetch_add(value, ordering);
}
}
cfg_no_64bit_metrics! {
pub(crate) fn store(&self, _val: u64, _ordering: Ordering) { }
// on platforms without 64-bit atomics, fetch-add returns unit
pub(crate) fn add(&self, _value: u64, _ordering: Ordering) { }
pub(crate) fn new(_value: u64) -> Self { Self { } }
}
}

View File

@ -5,6 +5,8 @@ cfg_io_driver! {
#[cfg(feature = "rt")]
pub(crate) mod atomic_cell;
pub(crate) mod metric_atomics;
#[cfg(any(feature = "rt", feature = "signal", feature = "process"))]
pub(crate) mod once_cell;

View File

@ -19,7 +19,7 @@ mod support {
macro_rules! cfg_metrics {
($($t:tt)*) => {
#[cfg(tokio_unstable)]
#[cfg(all(tokio_unstable, target_has_atomic = "64"))]
{
$( $t )*
}

View File

@ -1,6 +1,11 @@
#![allow(unknown_lints, unexpected_cfgs)]
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))]
#![cfg(all(
feature = "full",
tokio_unstable,
not(target_os = "wasi"),
target_has_atomic = "64"
))]
use std::future::Future;
use std::sync::{Arc, Barrier, Mutex};

View File

@ -18,7 +18,7 @@ use std::task::{Context, Poll, Waker};
macro_rules! cfg_metrics {
($($t:tt)*) => {
#[cfg(tokio_unstable)]
#[cfg(all(tokio_unstable, target_has_atomic = "64"))]
{
$( $t )*
}

View File

@ -19,7 +19,7 @@ use std::task::{Context, Poll, Waker};
macro_rules! cfg_metrics {
($($t:tt)*) => {
#[cfg(tokio_unstable)]
#[cfg(all(tokio_unstable, target_has_atomic = "64"))]
{
$( $t )*
}