metrics: stabilize injection_queue_depth metric (#6854)

This commit is contained in:
Owen Leung 2024-09-23 00:38:37 +08:00 committed by GitHub
parent a302367b8f
commit 542197cdb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 93 additions and 92 deletions

View File

@ -70,6 +70,32 @@ impl RuntimeMetrics {
self.handle.inner.num_alive_tasks()
}
/// Returns the number of tasks currently scheduled in the runtime's
/// injection queue.
///
/// Tasks that are spawned or notified from a non-runtime thread are
/// scheduled using the runtime's injection queue. This metric returns the
/// **current** number of tasks pending in the injection queue. As such, the
/// returned value may increase or decrease as new tasks are scheduled and
/// processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.injection_queue_depth();
/// println!("{} tasks currently pending in the runtime's injection queue", n);
/// }
/// ```
pub fn injection_queue_depth(&self) -> usize {
self.handle.inner.injection_queue_depth()
}
cfg_unstable_metrics! {
/// Returns the number of additional threads spawned by the runtime.
@ -655,32 +681,6 @@ impl RuntimeMetrics {
}
}
/// Returns the number of tasks currently scheduled in the runtime's
/// injection queue.
///
/// Tasks that are spawned or notified from a non-runtime thread are
/// scheduled using the runtime's injection queue. This metric returns the
/// **current** number of tasks pending in the injection queue. As such, the
/// returned value may increase or decrease as new tasks are scheduled and
/// processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.injection_queue_depth();
/// println!("{} tasks currently pending in the runtime's injection queue", n);
/// }
/// ```
pub fn injection_queue_depth(&self) -> usize {
self.handle.inner.injection_queue_depth()
}
/// Returns the number of tasks currently scheduled in the given worker's
/// local queue.
///

View File

@ -528,6 +528,10 @@ impl Handle {
pub(crate) fn num_alive_tasks(&self) -> usize {
self.shared.owned.num_alive_tasks()
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}
}
cfg_unstable_metrics! {
@ -536,10 +540,6 @@ cfg_unstable_metrics! {
&self.shared.scheduler_metrics
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics

View File

@ -16,9 +16,7 @@ cfg_rt_multi_thread! {
mod rt_multi_thread;
}
cfg_unstable_metrics! {
mod metrics;
}
mod metrics;
/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
/// overflow queue when the local, fixed-size, array queue overflows.

View File

@ -189,6 +189,10 @@ cfg_rt! {
pub(crate) fn num_alive_tasks(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
}
pub(crate) fn injection_queue_depth(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
}
}
cfg_unstable_metrics! {
@ -217,10 +221,6 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
}
pub(crate) fn injection_queue_depth(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
}

View File

@ -13,6 +13,10 @@ impl Handle {
self.shared.owned.num_alive_tasks()
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}
cfg_unstable_metrics! {
cfg_64bit_metrics! {
pub(crate) fn spawned_tasks_count(&self) -> u64 {
@ -39,10 +43,6 @@ impl Handle {
&self.shared.worker_metrics[worker]
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}

View File

@ -75,9 +75,7 @@ use std::task::Waker;
use std::thread;
use std::time::Duration;
cfg_unstable_metrics! {
mod metrics;
}
mod metrics;
cfg_taskdump! {
mod taskdump;

View File

@ -4,8 +4,12 @@ impl Shared {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.inject.len()
}
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.remotes[worker].steal.len()
cfg_unstable_metrics! {
impl Shared {
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.remotes[worker].steal.len()
}
}
}

View File

@ -2,6 +2,7 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))]
use std::sync::{Arc, Barrier};
use tokio::runtime::Runtime;
#[test]
@ -45,6 +46,51 @@ fn num_alive_tasks() {
assert_eq!(0, rt.metrics().num_alive_tasks());
}
#[test]
fn injection_queue_depth_current_thread() {
use std::thread;
let rt = current_thread();
let handle = rt.handle().clone();
let metrics = rt.metrics();
thread::spawn(move || {
handle.spawn(async {});
})
.join()
.unwrap();
assert_eq!(1, metrics.injection_queue_depth());
}
#[test]
fn injection_queue_depth_multi_thread() {
let rt = threaded();
let metrics = rt.metrics();
let barrier1 = Arc::new(Barrier::new(3));
let barrier2 = Arc::new(Barrier::new(3));
// Spawn a task per runtime worker to block it.
for _ in 0..2 {
let barrier1 = barrier1.clone();
let barrier2 = barrier2.clone();
rt.spawn(async move {
barrier1.wait();
barrier2.wait();
});
}
barrier1.wait();
for i in 0..10 {
assert_eq!(i, metrics.injection_queue_depth());
rt.spawn(async {});
}
barrier2.wait();
}
fn current_thread() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()

View File

@ -8,7 +8,7 @@
))]
use std::future::Future;
use std::sync::{Arc, Barrier, Mutex};
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::thread;
use tokio::macros::support::poll_fn;
@ -622,51 +622,6 @@ fn worker_overflow_count() {
assert_eq!(1, n);
}
#[test]
fn injection_queue_depth_current_thread() {
use std::thread;
let rt = current_thread();
let handle = rt.handle().clone();
let metrics = rt.metrics();
thread::spawn(move || {
handle.spawn(async {});
})
.join()
.unwrap();
assert_eq!(1, metrics.injection_queue_depth());
}
#[test]
fn injection_queue_depth_multi_thread() {
let rt = threaded();
let metrics = rt.metrics();
let barrier1 = Arc::new(Barrier::new(3));
let barrier2 = Arc::new(Barrier::new(3));
// Spawn a task per runtime worker to block it.
for _ in 0..2 {
let barrier1 = barrier1.clone();
let barrier2 = barrier2.clone();
rt.spawn(async move {
barrier1.wait();
barrier2.wait();
});
}
barrier1.wait();
for i in 0..10 {
assert_eq!(i, metrics.injection_queue_depth());
rt.spawn(async {});
}
barrier2.wait();
}
#[test]
fn worker_local_queue_depth() {
const N: usize = 100;