rt: do not trace tasks while locking OwnedTasks (#6036)

This commit is contained in:
Jack Wrenn 2023-10-06 15:48:47 -04:00 committed by GitHub
parent f306bd02c3
commit 2bd43765d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 115 additions and 33 deletions

View File

@ -543,6 +543,14 @@ cfg_taskdump! {
scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
}
}
/// Produces `true` if the current task is being traced for a dump;
/// otherwise false. This function is only public for integration
/// testing purposes. Do not rely on it.
#[doc(hidden)]
pub fn is_tracing() -> bool {
super::task::trace::Context::is_tracing()
}
}
cfg_rt_multi_thread! {

View File

@ -361,6 +361,14 @@ impl<S: 'static> Task<S> {
fn header_ptr(&self) -> NonNull<Header> {
self.raw.header_ptr()
}
cfg_taskdump! {
pub(super) fn notify_for_tracing(&self) -> Notified<S> {
self.as_raw().state().transition_to_notified_for_tracing();
// SAFETY: `transition_to_notified_for_tracing` increments the refcount.
unsafe { Notified(Task::new(self.raw)) }
}
}
}
impl<S: 'static> Notified<S> {

View File

@ -18,7 +18,7 @@ mod tree;
use symbol::Symbol;
use tree::Tree;
use super::{Notified, OwnedTasks};
use super::{Notified, OwnedTasks, Schedule};
type Backtrace = Vec<BacktraceFrame>;
type SymbolTrace = Vec<Symbol>;
@ -100,6 +100,16 @@ impl Context {
Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
}
}
/// Produces `true` if the current task is being traced; otherwise false.
pub(crate) fn is_tracing() -> bool {
Self::with_current_collector(|maybe_collector| {
let collector = maybe_collector.take();
let result = collector.is_some();
maybe_collector.set(collector);
result
})
}
}
impl Trace {
@ -268,22 +278,8 @@ pub(in crate::runtime) fn trace_current_thread(
drop(task);
}
// notify each task
let mut tasks = vec![];
owned.for_each(|task| {
// set the notified bit
task.as_raw().state().transition_to_notified_for_tracing();
// store the raw tasks into a vec
tasks.push(task.as_raw());
});
tasks
.into_iter()
.map(|task| {
let ((), trace) = Trace::capture(|| task.poll());
trace
})
.collect()
// precondition: We have drained the tasks from the injection queue.
trace_owned(owned)
}
cfg_rt_multi_thread! {
@ -316,21 +312,34 @@ cfg_rt_multi_thread! {
drop(synced);
// notify each task
let mut traces = vec![];
owned.for_each(|task| {
// set the notified bit
task.as_raw().state().transition_to_notified_for_tracing();
// trace the task
let ((), trace) = Trace::capture(|| task.as_raw().poll());
traces.push(trace);
// reschedule the task
let _ = task.as_raw().state().transition_to_notified_by_ref();
task.as_raw().schedule();
});
traces
// precondition: we have drained the tasks from the local and injection
// queues.
trace_owned(owned)
}
}
/// Trace the `OwnedTasks`.
///
/// # Preconditions
///
/// This helper presumes exclusive access to each task. The tasks must not exist
/// in any other queue.
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>) -> Vec<Trace> {
// notify each task
let mut tasks = vec![];
owned.for_each(|task| {
// notify the task (and thus make it poll-able) and stash it
tasks.push(task.notify_for_tracing());
// we do not poll it here since we hold a lock on `owned` and the task
// may complete and need to remove itself from `owned`.
});
tasks
.into_iter()
.map(|task| {
let local_notified = owned.assert_owner(task);
let ((), trace) = Trace::capture(|| local_notified.run());
trace
})
.collect()
}

View File

@ -97,3 +97,60 @@ fn multi_thread() {
);
});
}
/// Regression tests for #6035.
///
/// These tests ensure that dumping will not deadlock if a future completes
/// during a trace.
mod future_completes_during_trace {
use super::*;
use core::future::{poll_fn, Future};
/// A future that completes only during a trace.
fn complete_during_trace() -> impl Future<Output = ()> + Send {
use std::task::Poll;
poll_fn(|cx| {
if Handle::is_tracing() {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
})
}
#[test]
fn current_thread() {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
async fn dump() {
let handle = Handle::current();
let _dump = handle.dump().await;
}
rt.block_on(async {
let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump());
});
}
#[test]
fn multi_thread() {
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
async fn dump() {
let handle = Handle::current();
let _dump = handle.dump().await;
}
rt.block_on(async {
let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump());
});
}
}