mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
executor: switch from log
to tracing
(#1454)
## Motivation The `tracing` crate implements scoped, structured, context-aware diagnostics, which can add significant debugging value over unstructured log messages. `tracing` is part of the Tokio project. As part of the `tokio` 0.2 changes, I thought it would be good to move over from `log` to `tracing` in the tokio runtime. Updating the executor crate is an obvious starting point. ## Solution This branch replaces the use of `log` in `tokio-executor` with `tracing`. I've tried to leave all the instrumentation points more or less the same, but modified to use structured fields instead of string interpolation. I've also added a few `tracing` spans, primarily in places where a variable is added to all the log messages in a scope. ## Notes For users who are using the legacy `log` output, there is a feature flag to enable `log` support in `tracing`. I thought about making this on by default, but that would also enable the `tracing` dependency by default, and it is only pulled in when the `threadpool` feature flag is enabled. The `tokio` crate could enable the log feature in its default features instead, since the threadpool feature is on by default in `tokio`. If this isn't the right approach, I can change how `log` back-compatibility is enabled. We might want to consider adding more `tracing` spans in the threadpool later. This could be useful for profiling, and for helping users debug the way their applications interact with the executor. This branch is just intended as a starting point so that we can begin emitting `tracing` data from the executor; we should revisit what instrumentation should be exposed, as well. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
2d56312b89
commit
7e7a5147a3
@ -30,7 +30,6 @@ threadpool = [
|
||||
"futures-core-preview",
|
||||
"futures-util-preview",
|
||||
"num_cpus",
|
||||
"log",
|
||||
"lazy_static",
|
||||
"slab",
|
||||
]
|
||||
@ -38,6 +37,8 @@ threadpool = [
|
||||
[dependencies]
|
||||
tokio-sync = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-sync" }
|
||||
|
||||
tracing = { version = "0.1.5", optional = true }
|
||||
|
||||
# current-thread dependencies
|
||||
crossbeam-channel = { version = "0.3.8", optional = true }
|
||||
|
||||
@ -48,7 +49,6 @@ crossbeam-utils = { version = "0.6.4", optional = true }
|
||||
futures-core-preview = { version = "=0.3.0-alpha.18", optional = true }
|
||||
futures-util-preview = { version = "=0.3.0-alpha.18", optional = true }
|
||||
num_cpus = { version = "1.2", optional = true }
|
||||
log = { version = "0.4", optional = true }
|
||||
lazy_static = { version = "1", optional = true }
|
||||
slab = { version = "0.4.1", optional = true }
|
||||
|
||||
|
@ -56,6 +56,9 @@
|
||||
//! [`DefaultExecutor`]: struct.DefaultExecutor.html
|
||||
//! [`Park`]: park/index.html
|
||||
//! [`Future::poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
|
||||
#[cfg(any(feature = "current-thread", feature = "threadpool"))]
|
||||
#[macro_use]
|
||||
mod tracing;
|
||||
|
||||
mod enter;
|
||||
mod error;
|
||||
|
@ -8,7 +8,6 @@ use super::worker::{self, Worker, WorkerId};
|
||||
use crate::park::Park;
|
||||
|
||||
use crossbeam_deque::Injector;
|
||||
use log::trace;
|
||||
use num_cpus;
|
||||
use std::any::Any;
|
||||
use std::cmp::max;
|
||||
@ -374,7 +373,7 @@ impl Builder {
|
||||
/// .build();
|
||||
/// ```
|
||||
pub fn build(&self) -> ThreadPool {
|
||||
trace!("build; num-workers={}", self.pool_size);
|
||||
trace!(message = "build;", num_workers = self.pool_size);
|
||||
|
||||
// Create the worker entry list
|
||||
let workers: Arc<[worker::Entry]> = {
|
||||
|
@ -1,6 +1,5 @@
|
||||
use crate::park::{Park, Unpark};
|
||||
|
||||
use log::warn;
|
||||
use std::error::Error;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -27,19 +26,20 @@ where
|
||||
}
|
||||
|
||||
fn park(&mut self) -> Result<(), Self::Error> {
|
||||
self.0.park().map_err(|e| {
|
||||
self.0.park().map_err(|_e| {
|
||||
// if tracing is disabled, the compiler will flag this as unused.
|
||||
warn!(
|
||||
"calling `park` on worker thread errored -- shutting down thread: {}",
|
||||
e
|
||||
message = "calling `park` on worker thread errored -- shutting down thread",
|
||||
error = %_e
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
|
||||
self.0.park_timeout(duration).map_err(|e| {
|
||||
self.0.park_timeout(duration).map_err(|_e| {
|
||||
warn!(
|
||||
"calling `park` on worker thread errored -- shutting down thread: {}",
|
||||
e
|
||||
message = "calling `park` on worker thread errored -- shutting down thread",
|
||||
error = %_e,
|
||||
);
|
||||
})
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ use super::BlockingError;
|
||||
use crossbeam_deque::Injector;
|
||||
use crossbeam_utils::CachePadded;
|
||||
use lazy_static::lazy_static;
|
||||
use log::{debug, error, trace};
|
||||
use std::cell::Cell;
|
||||
use std::collections::hash_map::RandomState;
|
||||
use std::hash::{BuildHasher, Hash, Hasher};
|
||||
@ -133,10 +132,10 @@ impl Pool {
|
||||
|
||||
/// Start shutting down the pool. This means that no new futures will be
|
||||
/// accepted.
|
||||
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace"))]
|
||||
pub(crate) fn shutdown(&self, now: bool, purge_queue: bool) {
|
||||
let mut state: State = self.state.load(Acquire).into();
|
||||
|
||||
trace!("shutdown; state={:?}", state);
|
||||
trace!(?state);
|
||||
|
||||
// For now, this must be true
|
||||
debug_assert!(!purge_queue || now);
|
||||
@ -184,7 +183,7 @@ impl Pool {
|
||||
state = actual;
|
||||
}
|
||||
|
||||
trace!(" -> transitioned to shutdown");
|
||||
trace!("transitioned to shutdown");
|
||||
|
||||
// Only transition to terminate if there are no futures currently on the
|
||||
// pool
|
||||
@ -205,7 +204,7 @@ impl Pool {
|
||||
pub(crate) fn terminate_sleeping_workers(&self) {
|
||||
use super::worker::Lifecycle::Signaled;
|
||||
|
||||
trace!(" -> shutting down workers");
|
||||
trace!("shutting down workers");
|
||||
// Wakeup all sleeping workers. They will wake up, see the state
|
||||
// transition, and terminate.
|
||||
while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) {
|
||||
@ -249,7 +248,7 @@ impl Pool {
|
||||
if !worker.is_blocking() && *self == *worker.pool {
|
||||
let idx = worker.id.0;
|
||||
|
||||
trace!(" -> submit internal; idx={}", idx);
|
||||
trace!(message = "submit internal;", idx);
|
||||
|
||||
worker.pool.workers[idx].submit_internal(task);
|
||||
worker.pool.signal_work(pool);
|
||||
@ -268,7 +267,7 @@ impl Pool {
|
||||
pub(crate) fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
|
||||
debug_assert_eq!(*self, **pool);
|
||||
|
||||
trace!(" -> submit external");
|
||||
trace!("submit external");
|
||||
|
||||
self.queue.push(task);
|
||||
self.signal_work(pool);
|
||||
@ -388,9 +387,9 @@ impl Pool {
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("failed to spawn worker thread; err={:?}", e);
|
||||
panic!("failed to spawn worker thread: {:?}", e);
|
||||
if let Err(err) = res {
|
||||
error!(message = "failed to spawn worker thread;", ?err);
|
||||
panic!("failed to spawn worker thread: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
@ -402,6 +401,9 @@ impl Pool {
|
||||
use super::worker::Lifecycle::Signaled;
|
||||
|
||||
if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
|
||||
let span = trace_span!("signal_work", idx);
|
||||
let _enter = span.enter();
|
||||
|
||||
let entry = &self.workers[idx];
|
||||
|
||||
debug_assert!(
|
||||
@ -410,10 +412,10 @@ impl Pool {
|
||||
worker_state.lifecycle(),
|
||||
);
|
||||
|
||||
trace!("signal_work -- notify; idx={}", idx);
|
||||
trace!("notify");
|
||||
|
||||
if !entry.notify(worker_state) {
|
||||
trace!("signal_work -- spawn; idx={}", idx);
|
||||
trace!("spawn;");
|
||||
self.spawn_thread(WorkerId(idx), pool);
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ use super::task::Task;
|
||||
|
||||
use crate::{Executor, SpawnError, TypedExecutor};
|
||||
|
||||
use log::trace;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::Ordering::{AcqRel, Acquire};
|
||||
@ -110,7 +109,7 @@ impl Sender {
|
||||
.into();
|
||||
|
||||
if actual == state {
|
||||
trace!("execute; count={:?}", next.num_futures());
|
||||
trace!(message = "execute;", count = next.num_futures());
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -9,11 +9,10 @@ use super::pool::Pool;
|
||||
use super::waker::Waker;
|
||||
|
||||
use futures_util::task;
|
||||
use log::trace;
|
||||
use std::cell::{Cell, UnsafeCell};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
|
||||
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
|
||||
use std::sync::atomic::{AtomicPtr, AtomicUsize};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
@ -95,8 +94,13 @@ impl Task {
|
||||
|
||||
/// Execute the task returning `Run::Schedule` if the task needs to be
|
||||
/// scheduled again.
|
||||
///
|
||||
// tracing macro expansion adds enough branches to make clippy angry here.
|
||||
#[allow(clippy::cognitive_complexity)]
|
||||
pub(crate) fn run(me: &Arc<Task>, pool: &Arc<Pool>) -> Run {
|
||||
use self::State::*;
|
||||
#[cfg(feature = "tracing")]
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
|
||||
// Transition task to running state. At this point, the task must be
|
||||
// scheduled.
|
||||
@ -109,8 +113,10 @@ impl Task {
|
||||
Scheduled => {}
|
||||
_ => panic!("unexpected task state; {:?}", actual),
|
||||
}
|
||||
let span = trace_span!("Task::run");
|
||||
let _enter = span.enter();
|
||||
|
||||
trace!("Task::run; state={:?}", State::from(me.state.load(Relaxed)));
|
||||
trace!(state = ?State::from(me.state.load(Relaxed)));
|
||||
|
||||
// The transition to `Running` done above ensures that a lock on the
|
||||
// future has been obtained.
|
||||
@ -151,7 +157,7 @@ impl Task {
|
||||
|
||||
match res {
|
||||
Ok(Poll::Ready(_)) | Err(_) => {
|
||||
trace!(" -> task complete");
|
||||
trace!("task complete");
|
||||
|
||||
// The future has completed. Drop it immediately to free
|
||||
// resources and run drop handlers.
|
||||
@ -172,7 +178,7 @@ impl Task {
|
||||
Run::Complete
|
||||
}
|
||||
Ok(Poll::Pending) => {
|
||||
trace!(" -> not ready");
|
||||
trace!("not ready");
|
||||
|
||||
// Attempt to transition from Running -> Idle, if successful,
|
||||
// then the task does not need to be scheduled again. If the CAS
|
||||
|
@ -12,7 +12,6 @@ use super::shutdown::ShutdownTrigger;
|
||||
use super::task::{self, CanBlock, Task};
|
||||
use super::BlockingError;
|
||||
|
||||
use log::trace;
|
||||
use std::cell::Cell;
|
||||
use std::marker::PhantomData;
|
||||
use std::ptr;
|
||||
@ -410,9 +409,9 @@ impl Worker {
|
||||
self.run_task(task, pool);
|
||||
|
||||
trace!(
|
||||
"try_steal_task -- signal_work; self={}; from={}",
|
||||
self.id.0,
|
||||
idx
|
||||
message = "try_steal_task -- signal_work;",
|
||||
self = self.id.0,
|
||||
from = idx,
|
||||
);
|
||||
|
||||
// Signal other workers that work is available
|
||||
@ -487,7 +486,7 @@ impl Worker {
|
||||
.into();
|
||||
|
||||
if actual == state {
|
||||
trace!("task complete; state={:?}", next);
|
||||
trace!(message = "task complete;", state = ?next);
|
||||
|
||||
if state.num_futures() == 1 {
|
||||
// If the thread pool has been flagged as shutdown,
|
||||
@ -564,14 +563,16 @@ impl Worker {
|
||||
/// Put the worker to sleep
|
||||
///
|
||||
/// Returns `true` if woken up due to new work arriving.
|
||||
// tracing macro expansion adds enough branches to make clippy angry here.
|
||||
#[cfg_attr(feature = "tracing", allow(clippy::cognitive_complexity))]
|
||||
fn sleep(&self) -> bool {
|
||||
use self::Lifecycle::*;
|
||||
|
||||
// Putting a worker to sleep is a multipart operation. This is, in part,
|
||||
// due to the fact that a worker can be notified without it being popped
|
||||
// from the sleep stack. Extra care is needed to deal with this.
|
||||
|
||||
trace!("Worker::sleep; worker={:?}", self.id);
|
||||
let span = trace_span!("Worker::sleep", idx = self.id.0, id = ?self.id);
|
||||
let _e = span.enter();
|
||||
|
||||
let mut state: State = self.entry().state.load(Acquire).into();
|
||||
|
||||
@ -617,12 +618,12 @@ impl Worker {
|
||||
if !state.is_pushed() {
|
||||
debug_assert!(next.is_pushed());
|
||||
|
||||
trace!(" sleeping -- push to stack; idx={}", self.id.0);
|
||||
trace!("push to stack");
|
||||
|
||||
// We obtained permission to push the worker into the
|
||||
// sleeper queue.
|
||||
if self.pool.push_sleeper(self.id.0).is_err() {
|
||||
trace!(" sleeping -- push to stack failed; idx={}", self.id.0);
|
||||
trace!("push to stack failed");
|
||||
// The push failed due to the pool being terminated.
|
||||
//
|
||||
// This is true because the "work" being woken up for is
|
||||
@ -637,7 +638,7 @@ impl Worker {
|
||||
state = actual;
|
||||
}
|
||||
|
||||
trace!(" -> starting to sleep; idx={}", self.id.0);
|
||||
trace!("starting to sleep");
|
||||
|
||||
// Do a quick check to see if there are any notifications in the
|
||||
// reactor or new tasks in the global queue. Since this call will
|
||||
@ -686,7 +687,7 @@ impl Worker {
|
||||
|
||||
self.entry().park();
|
||||
|
||||
trace!(" -> wakeup; idx={}", self.id.0);
|
||||
trace!("wakeup");
|
||||
}
|
||||
}
|
||||
|
||||
@ -719,7 +720,7 @@ impl Worker {
|
||||
|
||||
impl Drop for Worker {
|
||||
fn drop(&mut self) {
|
||||
trace!("shutting down thread; idx={}", self.id.0);
|
||||
trace!(message = "shutting down thread", idx = self.id.0);
|
||||
|
||||
if self.should_finalize.get() {
|
||||
// Drain the work queue
|
||||
|
82
tokio-executor/src/tracing.rs
Normal file
82
tokio-executor/src/tracing.rs
Normal file
@ -0,0 +1,82 @@
|
||||
//! This module provides a small facade that wraps the `tracing` APIs we use, so
|
||||
//! that when the `tracing` dependency is disabled, `tracing`'s macros expand to
|
||||
//! no-ops.
|
||||
//!
|
||||
//! This means we don't have to put a `#[cfg(feature = "tracing")]` on every
|
||||
//! individual use of a `tracing` macro.
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct Span {}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
macro_rules! trace {
|
||||
($($arg:tt)+) => {
|
||||
tracing::trace!($($arg)+)
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
macro_rules! trace {
|
||||
($($arg:tt)+) => {};
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
macro_rules! debug {
|
||||
($($arg:tt)+) => {
|
||||
tracing::debug!($($arg)+)
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
macro_rules! debug {
|
||||
($($arg:tt)+) => {};
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
macro_rules! warn {
|
||||
($($arg:tt)+) => {
|
||||
tracing::warn!($($arg)+)
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
macro_rules! warn {
|
||||
($($arg:tt)+) => {};
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
macro_rules! error {
|
||||
($($arg:tt)+) => {
|
||||
tracing::error!($($arg)+)
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
macro_rules! error {
|
||||
($($arg:tt)+) => {};
|
||||
}
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
macro_rules! trace_span {
|
||||
($($arg:tt)+) => {
|
||||
tracing::trace_span!($($arg)+)
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
macro_rules! trace_span {
|
||||
($($arg:tt)+) => {
|
||||
crate::tracing::Span::new()
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tracing"))]
|
||||
impl Span {
|
||||
pub(crate) fn new() -> Self {
|
||||
Span {}
|
||||
}
|
||||
|
||||
pub(crate) fn enter(&self) -> Span {
|
||||
Span {}
|
||||
}
|
||||
}
|
@ -51,6 +51,7 @@ rt-full = [
|
||||
sync = ["tokio-sync"]
|
||||
tcp = ["io", "tokio-net/tcp"]
|
||||
timer = ["tokio-timer"]
|
||||
tracing = ["tracing-core"]
|
||||
udp = ["io", "tokio-net/udp"]
|
||||
uds = ["io", "tokio-net/uds"]
|
||||
|
||||
@ -72,6 +73,9 @@ tokio-sync = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-syn
|
||||
tokio-timer = { version = "=0.3.0-alpha.2", optional = true, path = "../tokio-timer", features = ["async-traits"] }
|
||||
tracing-core = { version = "0.1", optional = true }
|
||||
|
||||
[target.'cfg(feature = "tracing")'.dependencies]
|
||||
tokio-executor = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-executor", features = ["tracing"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = { version = "0.2.0-alpha.1", path = "../tokio-test" }
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user