rt: unstable EWMA poll time metric (#5927)

Because the runtime uses this value as a tuning heuristic, it can be
useful to get its value. This patch exposes the value as an unstable
metric.
This commit is contained in:
Carl Lerche 2023-08-10 12:43:44 -07:00 committed by GitHub
parent dd23f08c3a
commit 6cb106c353
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 67 additions and 8 deletions

View File

@ -73,7 +73,8 @@ impl MetricsBatch {
}
}
pub(crate) fn submit(&mut self, worker: &WorkerMetrics) {
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
worker.mean_poll_time.store(mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);

View File

@ -37,7 +37,7 @@ impl MetricsBatch {
Self {}
}
pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {}
pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}

View File

@ -769,6 +769,47 @@ impl RuntimeMetrics {
.unwrap_or_default()
}
/// Returns the mean duration of task polls, in nanoseconds.
///
/// This is an exponentially weighted moving average. Currently, this metric
/// is only provided by the multi-threaded runtime.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_mean_poll_time(0);
/// println!("worker 0 has a mean poll time of {:?}", n);
/// }
/// ```
#[track_caller]
pub fn worker_mean_poll_time(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.mean_poll_time
.load(Relaxed);
Duration::from_nanos(nanos)
}
/// Returns the number of tasks currently scheduled in the blocking
/// thread pool, spawned using `spawn_blocking`.
///

View File

@ -28,6 +28,9 @@ pub(crate) struct WorkerMetrics {
/// Number of tasks the worker polled.
pub(crate) poll_count: AtomicU64,
/// EWMA task poll time, in nanoseconds.
pub(crate) mean_poll_time: AtomicU64,
/// Amount of time the worker spent doing work vs. parking.
pub(crate) busy_duration_total: AtomicU64,
@ -62,6 +65,7 @@ impl WorkerMetrics {
steal_count: AtomicU64::new(0),
steal_operations: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
mean_poll_time: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
local_schedule_count: AtomicU64::new(0),

View File

@ -321,7 +321,7 @@ impl Core {
}
fn submit_metrics(&mut self, handle: &Handle) {
self.metrics.submit(&handle.shared.worker_metrics);
self.metrics.submit(&handle.shared.worker_metrics, 0);
}
}

View File

@ -74,7 +74,7 @@ impl Stats {
}
pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to);
self.batch.submit(to, self.task_poll_time_ewma as u64);
}
pub(crate) fn about_to_park(&mut self) {

View File

@ -93,7 +93,7 @@ impl Stats {
}
pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to);
self.batch.submit(to, self.task_poll_time_ewma as u64);
}
pub(crate) fn about_to_park(&mut self) {

View File

@ -215,18 +215,25 @@ fn worker_steal_count() {
}
#[test]
fn worker_poll_count() {
fn worker_poll_count_and_time() {
const N: u64 = 5;
async fn task() {
// Sync sleep
std::thread::sleep(std::time::Duration::from_micros(10));
}
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
tokio::spawn(task()).await.unwrap();
}
});
drop(rt);
assert_eq!(N, metrics.worker_poll_count(0));
// Not currently supported for current-thread runtime
assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));
// Does not populate the histogram
assert!(!metrics.poll_count_histogram_enabled());
@ -238,7 +245,7 @@ fn worker_poll_count() {
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
tokio::spawn(task()).await.unwrap();
}
});
drop(rt);
@ -249,6 +256,12 @@ fn worker_poll_count() {
assert_eq!(N, n);
let n: Duration = (0..metrics.num_workers())
.map(|i| metrics.worker_mean_poll_time(i))
.sum();
assert!(n > Duration::default());
// Does not populate the histogram
assert!(!metrics.poll_count_histogram_enabled());
for n in 0..metrics.num_workers() {