rt: export metrics about the blocking thread pool (#5161)

Publish the blocking thread pool metrics as thread-safe values, written
under the blocking thread pool's lock and able to be read in a lock-free
fashion by any reader.

Fixes #5156
This commit is contained in:
Duarte Nunes 2022-11-04 15:05:36 -03:00 committed by GitHub
parent 5b46395a1e
commit 23fdd32b01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 260 additions and 17 deletions

View File

@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, Handle};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
pub(crate) struct BlockingPool {
@ -23,6 +24,53 @@ pub(crate) struct Spawner {
inner: Arc<Inner>,
}
#[derive(Default)]
pub(crate) struct SpawnerMetrics {
num_threads: AtomicUsize,
num_idle_threads: AtomicUsize,
queue_depth: AtomicUsize,
}
impl SpawnerMetrics {
fn num_threads(&self) -> usize {
self.num_threads.load(Ordering::Relaxed)
}
fn num_idle_threads(&self) -> usize {
self.num_idle_threads.load(Ordering::Relaxed)
}
cfg_metrics! {
fn queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
}
}
fn inc_num_threads(&self) {
self.num_threads.fetch_add(1, Ordering::Relaxed);
}
fn dec_num_threads(&self) {
self.num_threads.fetch_sub(1, Ordering::Relaxed);
}
fn inc_num_idle_threads(&self) {
self.num_idle_threads.fetch_add(1, Ordering::Relaxed);
}
fn dec_num_idle_threads(&self) -> usize {
self.num_idle_threads.fetch_sub(1, Ordering::Relaxed)
}
fn inc_queue_depth(&self) {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
}
fn dec_queue_depth(&self) {
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
}
}
struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
@ -47,12 +95,13 @@ struct Inner {
// Customizable wait timeout.
keep_alive: Duration,
// Metrics about the pool.
metrics: SpawnerMetrics,
}
struct Shared {
queue: VecDeque<Task>,
num_th: usize,
num_idle: u32,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
@ -165,8 +214,6 @@ impl BlockingPool {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_th: 0,
num_idle: 0,
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
@ -181,6 +228,7 @@ impl BlockingPool {
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
metrics: Default::default(),
}),
},
shutdown_rx,
@ -350,11 +398,12 @@ impl Spawner {
}
shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();
if shared.num_idle == 0 {
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.
if shared.num_th == self.inner.thread_cap {
if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
assert!(shared.shutdown_tx.is_some());
@ -365,11 +414,14 @@ impl Spawner {
match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
shared.num_th += 1;
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => {
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
@ -388,7 +440,7 @@ impl Spawner {
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
shared.num_idle -= 1;
self.inner.metrics.dec_num_idle_threads();
shared.num_notify += 1;
self.inner.condvar.notify_one();
}
@ -419,6 +471,22 @@ impl Spawner {
}
}
cfg_metrics! {
impl Spawner {
pub(crate) fn num_threads(&self) -> usize {
self.inner.metrics.num_threads()
}
pub(crate) fn num_idle_threads(&self) -> usize {
self.inner.metrics.num_idle_threads()
}
pub(crate) fn queue_depth(&self) -> usize {
self.inner.metrics.queue_depth()
}
}
}
// Tells whether the error when spawning a thread is temporary.
#[inline]
fn is_temporary_os_thread_error(error: &std::io::Error) -> bool {
@ -437,6 +505,7 @@ impl Inner {
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
@ -444,7 +513,7 @@ impl Inner {
}
// IDLE
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
@ -478,6 +547,7 @@ impl Inner {
if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.shutdown_or_run_if_mandatory();
@ -488,7 +558,7 @@ impl Inner {
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
shared.num_idle += 1;
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
@ -496,17 +566,17 @@ impl Inner {
}
// Thread exit
shared.num_th -= 1;
self.metrics.dec_num_threads();
// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
shared.num_idle = shared
.num_idle
.checked_sub(1)
.expect("num_idle underflowed on thread exit");
let prev_idle = self.metrics.dec_num_idle_threads();
if prev_idle < self.metrics.num_idle_threads() {
panic!("num_idle_threads underflowed on thread exit")
}
if shared.shutdown && shared.num_th == 0 {
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}

View File

@ -42,6 +42,56 @@ impl RuntimeMetrics {
self.handle.inner.num_workers()
}
/// Returns the number of additional threads spawned by the runtime.
///
/// The number of workers is set by configuring `max_blocking_threads` on
/// `runtime::Builder`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_blocking_threads();
/// println!("Runtime has created {} threads", n);
/// }
/// ```
pub fn num_blocking_threads(&self) -> usize {
self.handle.inner.num_blocking_threads()
}
/// Returns the number of idle threads, which hve spawned by the runtime
/// for `spawn_blocking` calls.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let _ = tokio::task::spawn_blocking(move || {
/// // Stand-in for compute-heavy work or using synchronous APIs
/// 1 + 1
/// }).await;
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.num_idle_blocking_threads();
/// println!("Runtime has {} idle blocking thread pool threads", n);
/// }
/// ```
pub fn num_idle_blocking_threads(&self) -> usize {
self.handle.inner.num_idle_blocking_threads()
}
/// Returns the number of tasks scheduled from **outside** of the runtime.
///
/// The remote schedule count starts at zero when the runtime is created and
@ -446,6 +496,30 @@ impl RuntimeMetrics {
pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.handle.inner.worker_local_queue_depth(worker)
}
/// Returns the number of tasks currently scheduled in the blocking
/// thread pool, spawned using `spawn_blocking`.
///
/// This metric returns the **current** number of tasks pending in
/// blocking thread pool. As such, the returned value may increase
/// or decrease as new tasks are scheduled and processed.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.blocking_queue_depth();
/// println!("{} tasks currently pending in the blocking thread pool", n);
/// }
/// ```
pub fn blocking_queue_depth(&self) -> usize {
self.handle.inner.blocking_queue_depth()
}
}
cfg_net! {

View File

@ -428,6 +428,18 @@ cfg_metrics! {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

View File

@ -111,6 +111,22 @@ cfg_rt! {
}
}
pub(crate) fn num_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_blocking_threads(),
}
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.num_idle_blocking_threads(),
}
}
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
@ -142,6 +158,14 @@ cfg_rt! {
Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker),
}
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.blocking_queue_depth(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.blocking_queue_depth(),
}
}
}
}
}

View File

@ -61,6 +61,14 @@ cfg_metrics! {
self.shared.worker_metrics.len()
}
pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}
pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
@ -76,6 +84,10 @@ cfg_metrics! {
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

View File

@ -1,6 +1,8 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))]
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::time::{self, Duration};
@ -13,6 +15,55 @@ fn num_workers() {
assert_eq!(2, rt.metrics().num_workers());
}
#[test]
fn num_blocking_threads() {
let rt = current_thread();
assert_eq!(0, rt.metrics().num_blocking_threads());
let _ = rt.block_on(rt.spawn_blocking(move || {}));
assert_eq!(1, rt.metrics().num_blocking_threads());
}
#[test]
fn num_idle_blocking_threads() {
let rt = current_thread();
assert_eq!(0, rt.metrics().num_idle_blocking_threads());
let _ = rt.block_on(rt.spawn_blocking(move || {}));
rt.block_on(async {
time::sleep(Duration::from_millis(5)).await;
});
assert_eq!(1, rt.metrics().num_idle_blocking_threads());
}
#[test]
fn blocking_queue_depth() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(1)
.build()
.unwrap();
assert_eq!(0, rt.metrics().blocking_queue_depth());
let ready = Arc::new(Mutex::new(()));
let guard = ready.lock().unwrap();
let ready_cloned = ready.clone();
let wait_until_ready = move || {
let _unused = ready_cloned.lock().unwrap();
};
let h1 = rt.spawn_blocking(wait_until_ready.clone());
let h2 = rt.spawn_blocking(wait_until_ready);
assert!(rt.metrics().blocking_queue_depth() > 0);
drop(guard);
let _ = rt.block_on(h1);
let _ = rt.block_on(h2);
assert_eq!(0, rt.metrics().blocking_queue_depth());
}
#[test]
fn remote_schedule_count() {
use std::thread;