mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
taskdump: implement task dumps for current-thread runtime (#5608)
Task dumps are snapshots of runtime state. Taskdumps are collected by instrumenting Tokio's leaves to conditionally collect backtraces, which are then coalesced per-task into execution tree traces. This initial implementation only supports collecting taskdumps from within the context of a current-thread runtime, and only `yield_now()` is instrumented.
This commit is contained in:
parent
1d785fd66f
commit
660eac71f0
52
.github/workflows/ci.yml
vendored
52
.github/workflows/ci.yml
vendored
@ -186,10 +186,10 @@ jobs:
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
os:
|
||||
- windows-latest
|
||||
- ubuntu-latest
|
||||
- macos-latest
|
||||
include:
|
||||
- os: windows-latest
|
||||
- os: ubuntu-latest
|
||||
- os: macos-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Install Rust ${{ env.rust_stable }}
|
||||
@ -207,6 +207,31 @@ jobs:
|
||||
# the unstable cfg to RustDoc
|
||||
RUSTDOCFLAGS: --cfg tokio_unstable
|
||||
|
||||
test-unstable-taskdump:
|
||||
name: test tokio full --unstable --taskdump
|
||||
needs: basics
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- os: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Install Rust ${{ env.rust_stable }}
|
||||
uses: dtolnay/rust-toolchain@master
|
||||
with:
|
||||
toolchain: ${{ env.rust_stable }}
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
# Run `tokio` with "unstable" and "taskdump" cfg flags.
|
||||
- name: test tokio full --cfg unstable --cfg taskdump
|
||||
run: cargo test --all-features
|
||||
working-directory: tokio
|
||||
env:
|
||||
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings
|
||||
# in order to run doctests for unstable features, we must also pass
|
||||
# the unstable cfg to RustDoc
|
||||
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump
|
||||
|
||||
miri:
|
||||
name: miri
|
||||
needs: basics
|
||||
@ -293,9 +318,11 @@ jobs:
|
||||
matrix:
|
||||
include:
|
||||
- target: i686-unknown-linux-gnu
|
||||
rustflags: --cfg tokio_taskdump
|
||||
- target: arm-unknown-linux-gnueabihf
|
||||
- target: armv7-unknown-linux-gnueabihf
|
||||
- target: aarch64-unknown-linux-gnu
|
||||
rustflags: --cfg tokio_taskdump
|
||||
|
||||
# Run a platform without AtomicU64 and no const Mutex::new
|
||||
- target: arm-unknown-linux-gnueabihf
|
||||
@ -341,15 +368,15 @@ jobs:
|
||||
target: i686-unknown-linux-gnu
|
||||
- run: cargo test -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json -p tokio --all-features
|
||||
env:
|
||||
RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_atomic_u64
|
||||
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings --cfg tokio_no_atomic_u64
|
||||
# https://github.com/tokio-rs/tokio/pull/5356
|
||||
# https://github.com/tokio-rs/tokio/issues/5373
|
||||
- run: cargo hack build -p tokio --feature-powerset --depth 2 -Z avoid-dev-deps --keep-going
|
||||
env:
|
||||
RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_atomic_u64 --cfg tokio_no_const_mutex_new
|
||||
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings --cfg tokio_no_atomic_u64 --cfg tokio_no_const_mutex_new
|
||||
- run: cargo hack build -p tokio --feature-powerset --depth 2 -Z avoid-dev-deps --keep-going
|
||||
env:
|
||||
RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_atomic_u64
|
||||
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings --cfg tokio_no_atomic_u64
|
||||
|
||||
features:
|
||||
name: features
|
||||
@ -372,6 +399,11 @@ jobs:
|
||||
run: cargo hack check --all --feature-powerset --depth 2 -Z avoid-dev-deps --keep-going
|
||||
env:
|
||||
RUSTFLAGS: --cfg tokio_unstable -Dwarnings
|
||||
# Try with unstable and taskdump feature flags
|
||||
- name: check --feature-powerset --unstable --taskdump
|
||||
run: cargo hack check --all --feature-powerset --depth 2 -Z avoid-dev-deps --keep-going
|
||||
env:
|
||||
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings
|
||||
|
||||
minrust:
|
||||
name: minrust
|
||||
@ -424,7 +456,7 @@ jobs:
|
||||
cargo hack check --all-features --ignore-private
|
||||
- name: "check --all-features --unstable -Z minimal-versions"
|
||||
env:
|
||||
RUSTFLAGS: --cfg tokio_unstable -Dwarnings
|
||||
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings
|
||||
run: |
|
||||
# Remove dev-dependencies from Cargo.toml to prevent the next `cargo update`
|
||||
# from determining minimal versions based on dev-dependencies.
|
||||
@ -481,8 +513,8 @@ jobs:
|
||||
- name: "doc --lib --all-features"
|
||||
run: cargo doc --lib --no-deps --all-features --document-private-items
|
||||
env:
|
||||
RUSTFLAGS: --cfg docsrs --cfg tokio_unstable
|
||||
RUSTDOCFLAGS: --cfg docsrs --cfg tokio_unstable -Dwarnings
|
||||
RUSTFLAGS: --cfg docsrs --cfg tokio_unstable --cfg tokio_taskdump
|
||||
RUSTDOCFLAGS: --cfg docsrs --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings
|
||||
|
||||
loom-compile:
|
||||
name: build loom tests
|
||||
|
@ -90,3 +90,7 @@ path = "named-pipe-ready.rs"
|
||||
[[example]]
|
||||
name = "named-pipe-multi-client"
|
||||
path = "named-pipe-multi-client.rs"
|
||||
|
||||
[[example]]
|
||||
name = "dump"
|
||||
path = "dump.rs"
|
||||
|
50
examples/dump.rs
Normal file
50
examples/dump.rs
Normal file
@ -0,0 +1,50 @@
|
||||
//! This example demonstrates tokio's experimental taskdumping functionality.
|
||||
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() {
|
||||
use std::hint::black_box;
|
||||
|
||||
#[inline(never)]
|
||||
async fn a() {
|
||||
black_box(b()).await
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
async fn b() {
|
||||
black_box(c()).await
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
async fn c() {
|
||||
black_box(tokio::task::yield_now()).await
|
||||
}
|
||||
|
||||
tokio::spawn(a());
|
||||
tokio::spawn(b());
|
||||
tokio::spawn(c());
|
||||
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
let dump = handle.dump();
|
||||
|
||||
for (i, task) in dump.tasks().iter().enumerate() {
|
||||
let trace = task.trace();
|
||||
println!("task {i} trace:");
|
||||
println!("{trace}");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
)))]
|
||||
fn main() {
|
||||
println!("task dumps are not available")
|
||||
}
|
@ -115,6 +115,11 @@ socket2 = { version = "0.4.9", optional = true, features = [ "all" ] }
|
||||
[target.'cfg(tokio_unstable)'.dependencies]
|
||||
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full
|
||||
|
||||
# Currently unstable. The API exposed by these features may be broken at any time.
|
||||
# Requires `--cfg tokio_unstable` to enable.
|
||||
[target.'cfg(tokio_taskdump)'.dependencies]
|
||||
backtrace = { version = "0.3.58" }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc = { version = "0.2.42", optional = true }
|
||||
signal-hook-registry = { version = "1.1.1", optional = true }
|
||||
|
@ -487,6 +487,21 @@ compile_error!("Tokio's build script has incorrectly detected wasm.");
|
||||
))]
|
||||
compile_error!("Only features sync,macros,io-util,rt,time are supported on wasm.");
|
||||
|
||||
#[cfg(all(not(tokio_unstable), tokio_taskdump))]
|
||||
compile_error!("The `tokio_taskdump` feature requires `--cfg tokio_unstable`.");
|
||||
|
||||
#[cfg(all(
|
||||
tokio_taskdump,
|
||||
not(all(
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))
|
||||
))]
|
||||
compile_error!(
|
||||
"The `tokio_taskdump` feature is only currently supported on \
|
||||
linux, on `aarch64`, `x86` and `x86_64`."
|
||||
);
|
||||
|
||||
// Includes re-exports used by macros.
|
||||
//
|
||||
// This module is not intended to be part of the public API. In general, any
|
||||
|
@ -373,6 +373,36 @@ macro_rules! cfg_not_rt_multi_thread {
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! cfg_taskdump {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(
|
||||
target_arch = "aarch64",
|
||||
target_arch = "x86",
|
||||
target_arch = "x86_64"
|
||||
)
|
||||
))]
|
||||
#[cfg_attr(docsrs, doc(cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(
|
||||
target_arch = "aarch64",
|
||||
target_arch = "x86",
|
||||
target_arch = "x86_64"
|
||||
)
|
||||
))))]
|
||||
$item
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! cfg_test_util {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
|
@ -12,6 +12,10 @@ cfg_rt! {
|
||||
use std::cell::RefCell;
|
||||
use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
|
||||
cfg_taskdump! {
|
||||
use crate::runtime::task::trace;
|
||||
}
|
||||
}
|
||||
|
||||
struct Context {
|
||||
@ -45,6 +49,15 @@ struct Context {
|
||||
/// Tracks the amount of "work" a task may still do before yielding back to
|
||||
/// the sheduler
|
||||
budget: Cell<coop::Budget>,
|
||||
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
trace: trace::Context,
|
||||
}
|
||||
|
||||
tokio_thread_local! {
|
||||
@ -75,6 +88,19 @@ tokio_thread_local! {
|
||||
rng: FastRand::new(RngSeed::new()),
|
||||
|
||||
budget: Cell::new(coop::Budget::unconstrained()),
|
||||
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(
|
||||
target_arch = "aarch64",
|
||||
target_arch = "x86",
|
||||
target_arch = "x86_64"
|
||||
)
|
||||
))]
|
||||
trace: trace::Context::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -378,6 +404,14 @@ cfg_rt! {
|
||||
matches!(self, EnterRuntime::Entered { .. })
|
||||
}
|
||||
}
|
||||
|
||||
cfg_taskdump! {
|
||||
/// SAFETY: Callers of this function must ensure that trace frames always
|
||||
/// form a valid linked list.
|
||||
pub(crate) unsafe fn with_trace<R>(f: impl FnOnce(&trace::Context) -> R) -> R {
|
||||
CONTEXT.with(|c| f(&c.trace))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Forces the current "entered" state to be cleared while the closure
|
||||
|
66
tokio/src/runtime/dump.rs
Normal file
66
tokio/src/runtime/dump.rs
Normal file
@ -0,0 +1,66 @@
|
||||
//! Snapshots of runtime state.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
/// A snapshot of a runtime's state.
|
||||
#[derive(Debug)]
|
||||
pub struct Dump {
|
||||
tasks: Tasks,
|
||||
}
|
||||
|
||||
/// Snapshots of tasks.
|
||||
#[derive(Debug)]
|
||||
pub struct Tasks {
|
||||
tasks: Vec<Task>,
|
||||
}
|
||||
|
||||
/// A snapshot of a task.
|
||||
#[derive(Debug)]
|
||||
pub struct Task {
|
||||
trace: Trace,
|
||||
}
|
||||
|
||||
/// An execution trace of a task's last poll.
|
||||
#[derive(Debug)]
|
||||
pub struct Trace {
|
||||
inner: super::task::trace::Trace,
|
||||
}
|
||||
|
||||
impl Dump {
|
||||
pub(crate) fn new(tasks: Vec<Task>) -> Self {
|
||||
Self {
|
||||
tasks: Tasks { tasks },
|
||||
}
|
||||
}
|
||||
|
||||
/// Tasks in this snapshot.
|
||||
pub fn tasks(&self) -> &Tasks {
|
||||
&self.tasks
|
||||
}
|
||||
}
|
||||
|
||||
impl Tasks {
|
||||
/// Iterate over tasks.
|
||||
pub fn iter(&self) -> impl Iterator<Item = &Task> {
|
||||
self.tasks.iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl Task {
|
||||
pub(crate) fn new(trace: super::task::trace::Trace) -> Self {
|
||||
Self {
|
||||
trace: Trace { inner: trace },
|
||||
}
|
||||
}
|
||||
|
||||
/// A trace of this task's state.
|
||||
pub fn trace(&self) -> &Trace {
|
||||
&self.trace
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Trace {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.inner.fmt(f)
|
||||
}
|
||||
}
|
@ -252,6 +252,15 @@ impl Handle {
|
||||
/// [`tokio::time`]: crate::time
|
||||
#[track_caller]
|
||||
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
let future = super::task::trace::Trace::root(future);
|
||||
|
||||
#[cfg(all(tokio_unstable, feature = "tracing"))]
|
||||
let future =
|
||||
crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64());
|
||||
@ -274,6 +283,14 @@ impl Handle {
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
let id = crate::runtime::task::Id::next();
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
let future = super::task::trace::Trace::root(future);
|
||||
#[cfg(all(tokio_unstable, feature = "tracing"))]
|
||||
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
|
||||
self.inner.spawn(future, id)
|
||||
@ -321,6 +338,20 @@ cfg_metrics! {
|
||||
}
|
||||
}
|
||||
|
||||
cfg_taskdump! {
|
||||
impl Handle {
|
||||
/// Capture a snapshot of this runtime's state.
|
||||
pub fn dump(&self) -> crate::runtime::Dump {
|
||||
match &self.inner {
|
||||
scheduler::Handle::CurrentThread(handle) => handle.dump(),
|
||||
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
|
||||
scheduler::Handle::MultiThread(_) =>
|
||||
unimplemented!("taskdumps are unsupported on the multi-thread runtime"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Error returned by `try_current` when no Runtime has been started
|
||||
#[derive(Debug)]
|
||||
pub struct TryCurrentError {
|
||||
|
@ -233,6 +233,11 @@ cfg_rt! {
|
||||
mod defer;
|
||||
pub(crate) use defer::Defer;
|
||||
|
||||
cfg_taskdump! {
|
||||
pub mod dump;
|
||||
pub use dump::Dump;
|
||||
}
|
||||
|
||||
mod handle;
|
||||
pub use handle::{EnterGuard, Handle, TryCurrentError};
|
||||
|
||||
|
@ -288,6 +288,15 @@ impl Runtime {
|
||||
/// [handle]: fn@Handle::block_on
|
||||
#[track_caller]
|
||||
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
let future = super::task::trace::Trace::root(future);
|
||||
|
||||
#[cfg(all(tokio_unstable, feature = "tracing"))]
|
||||
let future = crate::util::trace::task(
|
||||
future,
|
||||
|
@ -377,6 +377,51 @@ impl Handle {
|
||||
handle
|
||||
}
|
||||
|
||||
/// Capture a snapshot of this runtime's state.
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
pub(crate) fn dump(&self) -> crate::runtime::Dump {
|
||||
use crate::runtime::dump;
|
||||
use task::trace::trace_current_thread;
|
||||
|
||||
let mut traces = vec![];
|
||||
|
||||
// todo: how to make this work outside of a runtime context?
|
||||
CURRENT.with(|maybe_context| {
|
||||
// drain the local queue
|
||||
let context = if let Some(context) = maybe_context {
|
||||
context
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
let mut maybe_core = context.core.borrow_mut();
|
||||
let core = if let Some(core) = maybe_core.as_mut() {
|
||||
core
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
let local = &mut core.tasks;
|
||||
|
||||
let mut injection = self.shared.queue.lock();
|
||||
let injection = if let Some(injection) = injection.as_mut() {
|
||||
injection
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
traces = trace_current_thread(&self.shared.owned, local, injection)
|
||||
.into_iter()
|
||||
.map(dump::Task::new)
|
||||
.collect();
|
||||
});
|
||||
|
||||
dump::Dump::new(traces)
|
||||
}
|
||||
|
||||
fn pop(&self) -> Option<task::Notified<Arc<Handle>>> {
|
||||
match self.shared.queue.lock().as_mut() {
|
||||
Some(queue) => queue.pop_front(),
|
||||
|
@ -180,6 +180,18 @@ impl<S: 'static> OwnedTasks<S> {
|
||||
}
|
||||
}
|
||||
|
||||
cfg_taskdump! {
|
||||
impl<S: 'static> OwnedTasks<S> {
|
||||
/// Locks the tasks, and calls `f` on an iterator over them.
|
||||
pub(crate) fn for_each<F>(&self, f: F)
|
||||
where
|
||||
F: FnMut(&Task<S>)
|
||||
{
|
||||
self.inner.lock().list.for_each(f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: 'static> LocalOwnedTasks<S> {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
|
@ -207,6 +207,10 @@ use self::state::State;
|
||||
|
||||
mod waker;
|
||||
|
||||
cfg_taskdump! {
|
||||
pub(crate) mod trace;
|
||||
}
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::util::linked_list;
|
||||
|
||||
@ -340,6 +344,17 @@ impl<S: 'static> Task<S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
pub(super) fn as_raw(&self) -> RawTask {
|
||||
self.raw
|
||||
}
|
||||
|
||||
fn header(&self) -> &Header {
|
||||
self.raw.header()
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ use std::ptr::NonNull;
|
||||
use std::task::{Poll, Waker};
|
||||
|
||||
/// Raw task handle
|
||||
pub(super) struct RawTask {
|
||||
pub(in crate::runtime) struct RawTask {
|
||||
ptr: NonNull<Header>,
|
||||
}
|
||||
|
||||
@ -195,7 +195,7 @@ impl RawTask {
|
||||
}
|
||||
|
||||
/// Safety: mutual exclusion is required to call this function.
|
||||
pub(super) fn poll(self) {
|
||||
pub(crate) fn poll(self) {
|
||||
let vtable = self.header().vtable;
|
||||
unsafe { (vtable.poll)(self.ptr) }
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ pub(super) enum TransitionToNotifiedByVal {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(super) enum TransitionToNotifiedByRef {
|
||||
pub(crate) enum TransitionToNotifiedByRef {
|
||||
DoNothing,
|
||||
Submit,
|
||||
}
|
||||
@ -270,6 +270,22 @@ impl State {
|
||||
})
|
||||
}
|
||||
|
||||
/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count.
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
pub(super) fn transition_to_notified_for_tracing(&self) {
|
||||
self.fetch_update_action(|mut snapshot| {
|
||||
snapshot.set_notified();
|
||||
snapshot.ref_inc();
|
||||
((), Some(snapshot))
|
||||
});
|
||||
}
|
||||
|
||||
/// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
|
||||
///
|
||||
/// Returns `true` if the task needs to be submitted to the pool for
|
||||
|
246
tokio/src/runtime/task/trace/mod.rs
Normal file
246
tokio/src/runtime/task/trace/mod.rs
Normal file
@ -0,0 +1,246 @@
|
||||
use crate::loom::sync::Arc;
|
||||
use crate::runtime::scheduler::current_thread;
|
||||
use backtrace::BacktraceFrame;
|
||||
use std::cell::Cell;
|
||||
use std::collections::VecDeque;
|
||||
use std::ffi::c_void;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::ptr::{self, NonNull};
|
||||
use std::task::{self, Poll};
|
||||
|
||||
mod symbol;
|
||||
mod tree;
|
||||
|
||||
use symbol::Symbol;
|
||||
use tree::Tree;
|
||||
|
||||
use super::{Notified, OwnedTasks};
|
||||
|
||||
type Backtrace = Vec<BacktraceFrame>;
|
||||
type SymbolTrace = Vec<Symbol>;
|
||||
|
||||
/// The ambiant backtracing context.
|
||||
pub(crate) struct Context {
|
||||
/// The address of [`Trace::root`] establishes an upper unwinding bound on
|
||||
/// the backtraces in `Trace`.
|
||||
active_frame: Cell<Option<NonNull<Frame>>>,
|
||||
/// The place to stash backtraces.
|
||||
collector: Cell<Option<Trace>>,
|
||||
}
|
||||
|
||||
/// A [`Frame`] in an intrusive, doubly-linked tree of [`Frame`]s.
|
||||
struct Frame {
|
||||
/// The location associated with this frame.
|
||||
inner_addr: *const c_void,
|
||||
|
||||
/// The parent frame, if any.
|
||||
parent: Option<NonNull<Frame>>,
|
||||
}
|
||||
|
||||
/// An tree execution trace.
|
||||
///
|
||||
/// Traces are captured with [`Trace::capture`], rooted with [`Trace::root`]
|
||||
/// and leaved with [`Trace::leaf`].
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct Trace {
|
||||
// The linear backtraces that comprise this trace. These linear traces can
|
||||
// be re-knitted into a tree.
|
||||
backtraces: Vec<Backtrace>,
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub(crate) struct Root<T> {
|
||||
#[pin]
|
||||
future: T,
|
||||
}
|
||||
}
|
||||
|
||||
impl Context {
|
||||
pub(crate) const fn new() -> Self {
|
||||
Context {
|
||||
active_frame: Cell::new(None),
|
||||
collector: Cell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// SAFETY: Callers of this function must ensure that trace frames always
|
||||
/// form a valid linked list.
|
||||
unsafe fn with_current<F, R>(f: F) -> R
|
||||
where
|
||||
F: FnOnce(&Self) -> R,
|
||||
{
|
||||
crate::runtime::context::with_trace(f)
|
||||
}
|
||||
|
||||
unsafe fn with_current_frame<F, R>(f: F) -> R
|
||||
where
|
||||
F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
|
||||
{
|
||||
Self::with_current(|context| f(&context.active_frame))
|
||||
}
|
||||
|
||||
fn with_current_collector<F, R>(f: F) -> R
|
||||
where
|
||||
F: FnOnce(&Cell<Option<Trace>>) -> R,
|
||||
{
|
||||
unsafe { Self::with_current(|context| f(&context.collector)) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Trace {
|
||||
/// Invokes `f`, returning both its result and the collection of backtraces
|
||||
/// captured at each sub-invocation of [`Trace::leaf`].
|
||||
#[inline(never)]
|
||||
pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
|
||||
where
|
||||
F: FnOnce() -> R,
|
||||
{
|
||||
let collector = Trace { backtraces: vec![] };
|
||||
|
||||
let previous = Context::with_current_collector(|current| current.replace(Some(collector)));
|
||||
|
||||
let result = f();
|
||||
|
||||
let collector =
|
||||
Context::with_current_collector(|current| current.replace(previous)).unwrap();
|
||||
|
||||
(result, collector)
|
||||
}
|
||||
|
||||
/// The root of a trace.
|
||||
#[inline(never)]
|
||||
pub(crate) fn root<F>(future: F) -> Root<F> {
|
||||
Root { future }
|
||||
}
|
||||
|
||||
/// If this is a sub-invocation of [`Trace::capture`], capture a backtrace.
|
||||
///
|
||||
/// The captured backtrace will be returned by [`Trace::capture`].
|
||||
///
|
||||
/// Invoking this function does nothing when it is not a sub-invocation
|
||||
/// [`Trace::capture`].
|
||||
// This function is marked `#[inline(never)]` to ensure that it gets a distinct `Frame` in the
|
||||
// backtrace, below which frames should not be included in the backtrace (since they reflect the
|
||||
// internal implementation details of this crate).
|
||||
#[inline(never)]
|
||||
pub(crate) fn leaf() {
|
||||
// Safety: We don't manipulate the current context's active frame.
|
||||
unsafe {
|
||||
Context::with_current(|context_cell| {
|
||||
if let Some(mut collector) = context_cell.collector.take() {
|
||||
let mut frames = vec![];
|
||||
let mut above_leaf = false;
|
||||
|
||||
if let Some(active_frame) = context_cell.active_frame.get() {
|
||||
let active_frame = active_frame.as_ref();
|
||||
|
||||
backtrace::trace(|frame| {
|
||||
let below_root =
|
||||
!ptr::eq(frame.symbol_address(), active_frame.inner_addr);
|
||||
|
||||
// only capture frames above `Trace::leaf` and below
|
||||
// `Trace::root`.
|
||||
if above_leaf && below_root {
|
||||
frames.push(frame.to_owned().into());
|
||||
}
|
||||
|
||||
if ptr::eq(frame.symbol_address(), Self::leaf as *const _) {
|
||||
above_leaf = true;
|
||||
}
|
||||
|
||||
// only continue unwinding if we're below `Trace::root`
|
||||
below_root
|
||||
});
|
||||
}
|
||||
collector.backtraces.push(frames);
|
||||
context_cell.collector.set(Some(collector));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Trace {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
Tree::from_trace(self.clone()).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
|
||||
use std::mem::ManuallyDrop;
|
||||
|
||||
struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
|
||||
|
||||
impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
|
||||
#[inline(always)]
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
ManuallyDrop::take(&mut self.0)();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Defer(ManuallyDrop::new(f))
|
||||
}
|
||||
|
||||
impl<T: Future> Future for Root<T> {
|
||||
type Output = T::Output;
|
||||
|
||||
#[inline(never)]
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
// SAFETY: The context's current frame is restored to its original state
|
||||
// before `frame` is dropped.
|
||||
unsafe {
|
||||
let mut frame = Frame {
|
||||
inner_addr: Self::poll as *const c_void,
|
||||
parent: None,
|
||||
};
|
||||
|
||||
Context::with_current_frame(|current| {
|
||||
frame.parent = current.take();
|
||||
current.set(Some(NonNull::from(&frame)));
|
||||
});
|
||||
|
||||
let _restore = defer(|| {
|
||||
Context::with_current_frame(|current| {
|
||||
current.set(frame.parent);
|
||||
});
|
||||
});
|
||||
|
||||
let this = self.project();
|
||||
this.future.poll(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trace and poll all tasks of the current_thread runtime.
|
||||
pub(in crate::runtime) fn trace_current_thread(
|
||||
owned: &OwnedTasks<Arc<current_thread::Handle>>,
|
||||
local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
|
||||
injection: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
|
||||
) -> Vec<Trace> {
|
||||
// clear the local and injection queues
|
||||
local.clear();
|
||||
injection.clear();
|
||||
|
||||
// notify each task
|
||||
let mut tasks = vec![];
|
||||
owned.for_each(|task| {
|
||||
// set the notified bit
|
||||
task.as_raw().state().transition_to_notified_for_tracing();
|
||||
// store the raw tasks into a vec
|
||||
tasks.push(task.as_raw());
|
||||
});
|
||||
|
||||
tasks
|
||||
.into_iter()
|
||||
.map(|task| {
|
||||
let ((), trace) = Trace::capture(|| task.poll());
|
||||
trace
|
||||
})
|
||||
.collect()
|
||||
}
|
92
tokio/src/runtime/task/trace/symbol.rs
Normal file
92
tokio/src/runtime/task/trace/symbol.rs
Normal file
@ -0,0 +1,92 @@
|
||||
use backtrace::BacktraceSymbol;
|
||||
use std::fmt;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::ptr;
|
||||
|
||||
/// A symbol in a backtrace.
|
||||
///
|
||||
/// This wrapper type serves two purposes. The first is that it provides a
|
||||
/// representation of a symbol that can be inserted into hashmaps and hashsets;
|
||||
/// the [`backtrace`] crate does not define [`Hash`], [`PartialEq`], or [`Eq`]
|
||||
/// on [`BacktraceSymbol`], and recommends that users define their own wrapper
|
||||
/// which implements these traits.
|
||||
///
|
||||
/// Second, this wrapper includes a `parent_hash` field that uniquely
|
||||
/// identifies this symbol's position in its trace. Otherwise, e.g., our code
|
||||
/// would not be able to distinguish between recursive calls of a function at
|
||||
/// different depths.
|
||||
#[derive(Clone)]
|
||||
pub(super) struct Symbol {
|
||||
pub(super) symbol: BacktraceSymbol,
|
||||
pub(super) parent_hash: u64,
|
||||
}
|
||||
|
||||
impl Hash for Symbol {
|
||||
fn hash<H>(&self, state: &mut H)
|
||||
where
|
||||
H: Hasher,
|
||||
{
|
||||
if let Some(name) = self.symbol.name() {
|
||||
name.as_bytes().hash(state);
|
||||
}
|
||||
|
||||
if let Some(addr) = self.symbol.addr() {
|
||||
ptr::hash(addr, state);
|
||||
}
|
||||
|
||||
self.symbol.filename().hash(state);
|
||||
self.symbol.lineno().hash(state);
|
||||
self.symbol.colno().hash(state);
|
||||
self.parent_hash.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Symbol {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
(self.parent_hash == other.parent_hash)
|
||||
&& match (self.symbol.name(), other.symbol.name()) {
|
||||
(None, None) => true,
|
||||
(Some(lhs_name), Some(rhs_name)) => lhs_name.as_bytes() == rhs_name.as_bytes(),
|
||||
_ => false,
|
||||
}
|
||||
&& match (self.symbol.addr(), other.symbol.addr()) {
|
||||
(None, None) => true,
|
||||
(Some(lhs_addr), Some(rhs_addr)) => ptr::eq(lhs_addr, rhs_addr),
|
||||
_ => false,
|
||||
}
|
||||
&& (self.symbol.filename() == other.symbol.filename())
|
||||
&& (self.symbol.lineno() == other.symbol.lineno())
|
||||
&& (self.symbol.colno() == other.symbol.colno())
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Symbol {}
|
||||
|
||||
impl fmt::Display for Symbol {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if let Some(name) = self.symbol.name() {
|
||||
let name = name.to_string();
|
||||
let name = if let Some((name, _)) = name.rsplit_once("::") {
|
||||
name
|
||||
} else {
|
||||
&name
|
||||
};
|
||||
fmt::Display::fmt(&name, f)?;
|
||||
}
|
||||
|
||||
if let Some(filename) = self.symbol.filename() {
|
||||
f.write_str(" at ")?;
|
||||
filename.to_string_lossy().fmt(f)?;
|
||||
if let Some(lineno) = self.symbol.lineno() {
|
||||
f.write_str(":")?;
|
||||
fmt::Display::fmt(&lineno, f)?;
|
||||
if let Some(colno) = self.symbol.colno() {
|
||||
f.write_str(":")?;
|
||||
fmt::Display::fmt(&colno, f)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
126
tokio/src/runtime/task/trace/tree.rs
Normal file
126
tokio/src/runtime/task/trace/tree.rs
Normal file
@ -0,0 +1,126 @@
|
||||
use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use super::{Backtrace, Symbol, SymbolTrace, Trace};
|
||||
|
||||
/// An adjacency list representation of an execution tree.
|
||||
///
|
||||
/// This tree provides a convenient intermediate representation for formatting
|
||||
/// [`Trace`] as a tree.
|
||||
pub(super) struct Tree {
|
||||
/// The roots of the trees.
|
||||
///
|
||||
/// There should only be one root, but the code is robust to multiple roots.
|
||||
roots: HashSet<Symbol>,
|
||||
|
||||
/// The adjacency list of symbols in the execution tree(s).
|
||||
edges: HashMap<Symbol, HashSet<Symbol>>,
|
||||
}
|
||||
|
||||
impl Tree {
|
||||
/// Constructs a [`Tree`] from [`Trace`]
|
||||
pub(super) fn from_trace(trace: Trace) -> Self {
|
||||
let mut roots: HashSet<Symbol> = HashSet::default();
|
||||
let mut edges: HashMap<Symbol, HashSet<Symbol>> = HashMap::default();
|
||||
|
||||
for trace in trace.backtraces {
|
||||
let trace = to_symboltrace(trace);
|
||||
|
||||
if let Some(first) = trace.first() {
|
||||
roots.insert(first.to_owned());
|
||||
}
|
||||
|
||||
let mut trace = trace.into_iter().peekable();
|
||||
while let Some(frame) = trace.next() {
|
||||
let subframes = edges.entry(frame).or_default();
|
||||
if let Some(subframe) = trace.peek() {
|
||||
subframes.insert(subframe.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Tree { roots, edges }
|
||||
}
|
||||
|
||||
/// Produces the sub-symbols of a given symbol.
|
||||
fn consequences(&self, frame: &Symbol) -> Option<impl ExactSizeIterator<Item = &Symbol>> {
|
||||
Some(self.edges.get(frame)?.iter())
|
||||
}
|
||||
|
||||
/// Format this [`Tree`] as a textual tree.
|
||||
fn display<W: fmt::Write>(
|
||||
&self,
|
||||
f: &mut W,
|
||||
root: &Symbol,
|
||||
is_last: bool,
|
||||
prefix: &str,
|
||||
) -> fmt::Result {
|
||||
let root_fmt = format!("{}", root);
|
||||
|
||||
let current;
|
||||
let next;
|
||||
|
||||
if is_last {
|
||||
current = format!("{prefix}└╼\u{a0}{root_fmt}");
|
||||
next = format!("{}\u{a0}\u{a0}\u{a0}", prefix);
|
||||
} else {
|
||||
current = format!("{prefix}├╼\u{a0}{root_fmt}");
|
||||
next = format!("{}│\u{a0}\u{a0}", prefix);
|
||||
}
|
||||
|
||||
write!(f, "{}", {
|
||||
let mut current = current.chars();
|
||||
current.next().unwrap();
|
||||
current.next().unwrap();
|
||||
¤t.as_str()
|
||||
})?;
|
||||
|
||||
if let Some(consequences) = self.consequences(root) {
|
||||
let len = consequences.len();
|
||||
for (i, consequence) in consequences.enumerate() {
|
||||
let is_last = i == len - 1;
|
||||
writeln!(f)?;
|
||||
self.display(f, consequence, is_last, &next)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Tree {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
for root in &self.roots {
|
||||
self.display(f, root, true, " ")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve a sequence of [`backtrace::BacktraceFrame`]s into a sequence of
|
||||
/// [`Symbol`]s.
|
||||
fn to_symboltrace(backtrace: Backtrace) -> SymbolTrace {
|
||||
// Resolve the backtrace frames to symbols.
|
||||
let backtrace: Backtrace = {
|
||||
let mut backtrace = backtrace::Backtrace::from(backtrace);
|
||||
backtrace.resolve();
|
||||
backtrace.into()
|
||||
};
|
||||
|
||||
// Accumulate the symbols in descending order into `symboltrace`.
|
||||
let mut symboltrace: SymbolTrace = vec![];
|
||||
let mut state = DefaultHasher::new();
|
||||
for frame in backtrace.into_iter().rev() {
|
||||
for symbol in frame.symbols().iter().rev() {
|
||||
let symbol = Symbol {
|
||||
symbol: symbol.clone(),
|
||||
parent_hash: state.finish(),
|
||||
};
|
||||
symbol.hash(&mut state);
|
||||
symboltrace.push(symbol);
|
||||
}
|
||||
}
|
||||
|
||||
symboltrace
|
||||
}
|
@ -179,6 +179,18 @@ cfg_rt! {
|
||||
T::Output: Send + 'static,
|
||||
{
|
||||
use crate::runtime::task;
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(
|
||||
target_arch = "aarch64",
|
||||
target_arch = "x86",
|
||||
target_arch = "x86_64"
|
||||
)
|
||||
))]
|
||||
let future = task::trace::Trace::root(future);
|
||||
let id = task::Id::next();
|
||||
let task = crate::util::trace::task(future, "task", name, id.as_u64());
|
||||
let handle = Handle::current();
|
||||
|
@ -46,6 +46,15 @@ pub async fn yield_now() {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
feature = "rt",
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
crate::runtime::task::trace::Trace::leaf();
|
||||
|
||||
if self.yielded {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
@ -341,6 +341,27 @@ cfg_io_readiness! {
|
||||
}
|
||||
}
|
||||
|
||||
cfg_taskdump! {
|
||||
impl<T: Link> LinkedList<T, T::Target> {
|
||||
pub(crate) fn for_each<F>(&mut self, mut f: F)
|
||||
where
|
||||
F: FnMut(&T::Handle),
|
||||
{
|
||||
use std::mem::ManuallyDrop;
|
||||
|
||||
let mut next = self.head;
|
||||
|
||||
while let Some(curr) = next {
|
||||
unsafe {
|
||||
let handle = ManuallyDrop::new(T::from_raw(curr));
|
||||
f(&handle);
|
||||
next = T::pointers(curr).as_ref().get_next();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl GuardedLinkedList =====
|
||||
|
||||
feature! {
|
||||
|
55
tokio/tests/dump_current_thread.rs
Normal file
55
tokio/tests/dump_current_thread.rs
Normal file
@ -0,0 +1,55 @@
|
||||
#![cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
|
||||
use std::hint::black_box;
|
||||
use tokio::runtime;
|
||||
|
||||
#[inline(never)]
|
||||
async fn a() {
|
||||
black_box(b()).await
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
async fn b() {
|
||||
black_box(c()).await
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
async fn c() {
|
||||
black_box(tokio::task::yield_now()).await
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test() {
|
||||
let rt = runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
rt.spawn(a());
|
||||
|
||||
let handle = rt.handle();
|
||||
|
||||
assert_eq!(handle.dump().tasks().iter().count(), 0);
|
||||
|
||||
let dump = rt.block_on(async {
|
||||
handle.spawn(a());
|
||||
handle.dump()
|
||||
});
|
||||
|
||||
let tasks: Vec<_> = dump.tasks().iter().collect();
|
||||
|
||||
assert_eq!(tasks.len(), 2);
|
||||
|
||||
for task in tasks {
|
||||
let trace = task.trace().to_string();
|
||||
assert!(trace.contains("dump_current_thread::a"));
|
||||
assert!(trace.contains("dump_current_thread::b"));
|
||||
assert!(trace.contains("dump_current_thread::c"));
|
||||
assert!(trace.contains("tokio::task::yield_now"));
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user