mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
parent
3a4aef17b2
commit
d561b5850a
@ -364,10 +364,17 @@ impl<S: 'static> Task<S> {
|
||||
}
|
||||
|
||||
cfg_taskdump! {
|
||||
pub(super) fn notify_for_tracing(&self) -> Notified<S> {
|
||||
self.as_raw().state().transition_to_notified_for_tracing();
|
||||
// SAFETY: `transition_to_notified_for_tracing` increments the refcount.
|
||||
unsafe { Notified(Task::new(self.raw)) }
|
||||
/// Notify the task for task dumping.
|
||||
///
|
||||
/// Returns `None` if the task has already been notified.
|
||||
pub(super) fn notify_for_tracing(&self) -> Option<Notified<S>> {
|
||||
if self.as_raw().state().transition_to_notified_for_tracing() {
|
||||
// SAFETY: `transition_to_notified_for_tracing` increments the
|
||||
// refcount.
|
||||
Some(unsafe { Notified(Task::new(self.raw)) })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -270,7 +270,11 @@ impl State {
|
||||
})
|
||||
}
|
||||
|
||||
/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count.
|
||||
/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
|
||||
/// count.
|
||||
///
|
||||
/// Returns `true` if the notified bit was transitioned from `0` to `1`;
|
||||
/// otherwise `false.`
|
||||
#[cfg(all(
|
||||
tokio_unstable,
|
||||
tokio_taskdump,
|
||||
@ -278,12 +282,16 @@ impl State {
|
||||
target_os = "linux",
|
||||
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
|
||||
))]
|
||||
pub(super) fn transition_to_notified_for_tracing(&self) {
|
||||
pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
|
||||
self.fetch_update_action(|mut snapshot| {
|
||||
snapshot.set_notified();
|
||||
snapshot.ref_inc();
|
||||
((), Some(snapshot))
|
||||
});
|
||||
if snapshot.is_notified() {
|
||||
(false, None)
|
||||
} else {
|
||||
snapshot.set_notified();
|
||||
snapshot.ref_inc();
|
||||
(true, Some(snapshot))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
|
||||
|
@ -272,14 +272,19 @@ pub(in crate::runtime) fn trace_current_thread(
|
||||
injection: &Inject<Arc<current_thread::Handle>>,
|
||||
) -> Vec<Trace> {
|
||||
// clear the local and injection queues
|
||||
local.clear();
|
||||
|
||||
let mut dequeued = Vec::new();
|
||||
|
||||
while let Some(task) = local.pop_back() {
|
||||
dequeued.push(task);
|
||||
}
|
||||
|
||||
while let Some(task) = injection.pop() {
|
||||
drop(task);
|
||||
dequeued.push(task);
|
||||
}
|
||||
|
||||
// precondition: We have drained the tasks from the injection queue.
|
||||
trace_owned(owned)
|
||||
trace_owned(owned, dequeued)
|
||||
}
|
||||
|
||||
cfg_rt_multi_thread! {
|
||||
@ -299,22 +304,24 @@ cfg_rt_multi_thread! {
|
||||
synced: &Mutex<Synced>,
|
||||
injection: &Shared<Arc<multi_thread::Handle>>,
|
||||
) -> Vec<Trace> {
|
||||
let mut dequeued = Vec::new();
|
||||
|
||||
// clear the local queue
|
||||
while let Some(notified) = local.pop() {
|
||||
drop(notified);
|
||||
dequeued.push(notified);
|
||||
}
|
||||
|
||||
// clear the injection queue
|
||||
let mut synced = synced.lock();
|
||||
while let Some(notified) = injection.pop(&mut synced.inject) {
|
||||
drop(notified);
|
||||
dequeued.push(notified);
|
||||
}
|
||||
|
||||
drop(synced);
|
||||
|
||||
// precondition: we have drained the tasks from the local and injection
|
||||
// queues.
|
||||
trace_owned(owned)
|
||||
trace_owned(owned, dequeued)
|
||||
}
|
||||
}
|
||||
|
||||
@ -324,14 +331,20 @@ cfg_rt_multi_thread! {
|
||||
///
|
||||
/// This helper presumes exclusive access to each task. The tasks must not exist
|
||||
/// in any other queue.
|
||||
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>) -> Vec<Trace> {
|
||||
// notify each task
|
||||
let mut tasks = vec![];
|
||||
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<Trace> {
|
||||
let mut tasks = dequeued;
|
||||
// Notify and trace all un-notified tasks. The dequeued tasks are already
|
||||
// notified and so do not need to be re-notified.
|
||||
owned.for_each(|task| {
|
||||
// notify the task (and thus make it poll-able) and stash it
|
||||
tasks.push(task.notify_for_tracing());
|
||||
// we do not poll it here since we hold a lock on `owned` and the task
|
||||
// may complete and need to remove itself from `owned`.
|
||||
// Notify the task (and thus make it poll-able) and stash it. This fails
|
||||
// if the task is already notified. In these cases, we skip tracing the
|
||||
// task.
|
||||
if let Some(notified) = task.notify_for_tracing() {
|
||||
tasks.push(notified);
|
||||
}
|
||||
// We do not poll tasks here, since we hold a lock on `owned` and the
|
||||
// task may complete and need to remove itself from `owned`. Polling
|
||||
// such a task here would result in a deadlock.
|
||||
});
|
||||
|
||||
tasks
|
||||
|
@ -147,6 +147,7 @@ mod future_completes_during_trace {
|
||||
async fn dump() {
|
||||
let handle = Handle::current();
|
||||
let _dump = handle.dump().await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
rt.block_on(async {
|
||||
@ -154,3 +155,43 @@ mod future_completes_during_trace {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Regression test for #6051.
|
||||
///
|
||||
/// This test ensures that tasks notified outside of a worker will not be
|
||||
/// traced, since doing so will un-set their notified bit prior to them being
|
||||
/// run and panic.
|
||||
#[test]
|
||||
fn notified_during_tracing() {
|
||||
let rt = runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.worker_threads(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let timeout = async {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
};
|
||||
|
||||
let timer = rt.spawn(async {
|
||||
loop {
|
||||
tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await;
|
||||
}
|
||||
});
|
||||
|
||||
let dump = async {
|
||||
loop {
|
||||
let handle = Handle::current();
|
||||
let _dump = handle.dump().await;
|
||||
}
|
||||
};
|
||||
|
||||
rt.block_on(async {
|
||||
tokio::select!(
|
||||
biased;
|
||||
_ = timeout => {},
|
||||
_ = timer => {},
|
||||
_ = dump => {},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user