mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
metrics: add worker_park_unpark_count
(#6696)
This commit is contained in:
parent
6e845b794d
commit
b69f16aa21
@ -7,6 +7,9 @@ pub(crate) struct MetricsBatch {
|
||||
/// Number of times the worker parked.
|
||||
park_count: u64,
|
||||
|
||||
/// Number of times the worker parked and unparked.
|
||||
park_unpark_count: u64,
|
||||
|
||||
/// Number of times the worker woke w/o doing work.
|
||||
noop_count: u64,
|
||||
|
||||
@ -54,6 +57,7 @@ impl MetricsBatch {
|
||||
|
||||
MetricsBatch {
|
||||
park_count: 0,
|
||||
park_unpark_count: 0,
|
||||
noop_count: 0,
|
||||
steal_count: 0,
|
||||
steal_operations: 0,
|
||||
@ -76,6 +80,9 @@ impl MetricsBatch {
|
||||
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
|
||||
worker.mean_poll_time.store(mean_poll_time, Relaxed);
|
||||
worker.park_count.store(self.park_count, Relaxed);
|
||||
worker
|
||||
.park_unpark_count
|
||||
.store(self.park_unpark_count, Relaxed);
|
||||
worker.noop_count.store(self.noop_count, Relaxed);
|
||||
worker.steal_count.store(self.steal_count, Relaxed);
|
||||
worker
|
||||
@ -101,6 +108,7 @@ impl MetricsBatch {
|
||||
/// The worker is about to park.
|
||||
pub(crate) fn about_to_park(&mut self) {
|
||||
self.park_count += 1;
|
||||
self.park_unpark_count += 1;
|
||||
|
||||
if self.poll_count_on_last_park == self.poll_count {
|
||||
self.noop_count += 1;
|
||||
@ -109,6 +117,11 @@ impl MetricsBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// The worker was unparked.
|
||||
pub(crate) fn unparked(&mut self) {
|
||||
self.park_unpark_count += 1;
|
||||
}
|
||||
|
||||
/// Start processing a batch of tasks
|
||||
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
|
||||
self.processing_scheduled_tasks_started_at = Instant::now();
|
||||
|
@ -39,6 +39,7 @@ impl MetricsBatch {
|
||||
|
||||
pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
|
||||
pub(crate) fn about_to_park(&mut self) {}
|
||||
pub(crate) fn unparked(&mut self) {}
|
||||
pub(crate) fn inc_local_schedule_count(&mut self) {}
|
||||
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
|
||||
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
|
||||
|
@ -242,6 +242,61 @@ impl RuntimeMetrics {
|
||||
.load(Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the total number of times the given worker thread has parked
|
||||
/// and unparked.
|
||||
///
|
||||
/// The worker park/unpark count starts at zero when the runtime is created
|
||||
/// and increases by one each time the worker parks the thread waiting for
|
||||
/// new inbound events to process. This usually means the worker has processed
|
||||
/// all pending work and is currently idle. When new work becomes available,
|
||||
/// the worker is unparked and the park/unpark count is again increased by one.
|
||||
///
|
||||
/// An odd count means that the worker is currently parked.
|
||||
/// An even count means that the worker is currently active.
|
||||
///
|
||||
/// The counter is monotonically increasing. It is never decremented or
|
||||
/// reset to zero.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// `worker` is the index of the worker being queried. The given value must
|
||||
/// be between 0 and `num_workers()`. The index uniquely identifies a single
|
||||
/// worker and will continue to identify the worker throughout the lifetime
|
||||
/// of the runtime instance.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The method panics when `worker` represents an invalid worker, i.e. is
|
||||
/// greater than or equal to `num_workers()`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::runtime::Handle;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let metrics = Handle::current().metrics();
|
||||
/// let n = metrics.worker_park_unpark_count(0);
|
||||
///
|
||||
/// println!("worker 0 parked and unparked {} times", n);
|
||||
///
|
||||
/// if n % 2 == 0 {
|
||||
/// println!("worker 0 is active");
|
||||
/// } else {
|
||||
/// println!("worker 0 is parked");
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
|
||||
self.handle
|
||||
.inner
|
||||
.worker_metrics(worker)
|
||||
.park_unpark_count
|
||||
.load(Relaxed)
|
||||
}
|
||||
|
||||
|
||||
/// Returns the number of times the given worker thread unparked but
|
||||
/// performed no work before parking again.
|
||||
///
|
||||
|
@ -16,6 +16,9 @@ pub(crate) struct WorkerMetrics {
|
||||
/// Number of times the worker parked.
|
||||
pub(crate) park_count: MetricAtomicU64,
|
||||
|
||||
/// Number of times the worker parked and unparked.
|
||||
pub(crate) park_unpark_count: MetricAtomicU64,
|
||||
|
||||
/// Number of times the worker woke then parked again without doing work.
|
||||
pub(crate) noop_count: MetricAtomicU64,
|
||||
|
||||
|
@ -368,6 +368,9 @@ impl Context {
|
||||
});
|
||||
|
||||
core = c;
|
||||
|
||||
core.metrics.unparked();
|
||||
core.submit_metrics(handle);
|
||||
}
|
||||
|
||||
if let Some(f) = &handle.shared.config.after_unpark {
|
||||
|
@ -74,6 +74,10 @@ impl Stats {
|
||||
self.batch.about_to_park();
|
||||
}
|
||||
|
||||
pub(crate) fn unparked(&mut self) {
|
||||
self.batch.unparked();
|
||||
}
|
||||
|
||||
pub(crate) fn inc_local_schedule_count(&mut self) {
|
||||
self.batch.inc_local_schedule_count();
|
||||
}
|
||||
|
@ -699,8 +699,13 @@ impl Context {
|
||||
if core.transition_to_parked(&self.worker) {
|
||||
while !core.is_shutdown && !core.is_traced {
|
||||
core.stats.about_to_park();
|
||||
core.stats
|
||||
.submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
|
||||
|
||||
core = self.park_timeout(core, None);
|
||||
|
||||
core.stats.unparked();
|
||||
|
||||
// Run regularly scheduled maintenance
|
||||
core.maintenance(&self.worker);
|
||||
|
||||
|
@ -100,6 +100,10 @@ impl Stats {
|
||||
self.batch.about_to_park();
|
||||
}
|
||||
|
||||
pub(crate) fn unparked(&mut self) {
|
||||
self.batch.unparked();
|
||||
}
|
||||
|
||||
pub(crate) fn inc_local_schedule_count(&mut self) {
|
||||
self.batch.inc_local_schedule_count();
|
||||
}
|
||||
|
@ -658,6 +658,9 @@ impl Worker {
|
||||
let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
|
||||
let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
|
||||
|
||||
core.stats.unparked();
|
||||
self.flush_metrics(cx, &mut core);
|
||||
|
||||
Ok((maybe_task, core))
|
||||
}
|
||||
|
||||
|
@ -170,6 +170,44 @@ fn worker_park_count() {
|
||||
assert!(1 <= metrics.worker_park_count(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn worker_park_unpark_count() {
|
||||
let rt = current_thread();
|
||||
let metrics = rt.metrics();
|
||||
rt.block_on(rt.spawn(async {})).unwrap();
|
||||
drop(rt);
|
||||
assert!(2 <= metrics.worker_park_unpark_count(0));
|
||||
|
||||
let rt = threaded();
|
||||
let metrics = rt.metrics();
|
||||
|
||||
// Wait for workers to be parked after runtime startup.
|
||||
for _ in 0..100 {
|
||||
if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
|
||||
break;
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
assert_eq!(1, metrics.worker_park_unpark_count(0));
|
||||
assert_eq!(1, metrics.worker_park_unpark_count(1));
|
||||
|
||||
// Spawn a task to unpark and then park a worker.
|
||||
rt.block_on(rt.spawn(async {})).unwrap();
|
||||
for _ in 0..100 {
|
||||
if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
|
||||
break;
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));
|
||||
|
||||
// Both threads unpark for runtime shutdown.
|
||||
drop(rt);
|
||||
assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
|
||||
assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
|
||||
assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn worker_noop_count() {
|
||||
// There isn't really a great way to generate no-op parks as they happen as
|
||||
|
Loading…
x
Reference in New Issue
Block a user