mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
rt: fix panic in task abort when off rt (#3159)
A call to `JoinHandle::abort` releases a task. When called from outside of the runtime, this panics due to the current implementation checking for a thread-local worker context. This change makes accessing the thread-local context optional under release, by falling back to remotely marking a task remotely as dropped. Behaving the same as if the core was stolen by another worker. Fixes #3157
This commit is contained in:
parent
00500d1b35
commit
a125ebd745
@ -629,10 +629,21 @@ impl task::Schedule for Arc<Worker> {
|
|||||||
fn release(&self, task: &Task) -> Option<Task> {
|
fn release(&self, task: &Task) -> Option<Task> {
|
||||||
use std::ptr::NonNull;
|
use std::ptr::NonNull;
|
||||||
|
|
||||||
CURRENT.with(|maybe_cx| {
|
enum Immediate {
|
||||||
let cx = maybe_cx.expect("scheduler context missing");
|
Removed(Option<Task>),
|
||||||
|
Core(bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
let immediate = CURRENT.with(|maybe_cx| {
|
||||||
|
let cx = match maybe_cx {
|
||||||
|
Some(cx) => cx,
|
||||||
|
None => return Immediate::Core(false),
|
||||||
|
};
|
||||||
|
|
||||||
|
if !self.eq(&cx.worker) {
|
||||||
|
return Immediate::Core(cx.core.borrow().is_some());
|
||||||
|
}
|
||||||
|
|
||||||
if self.eq(&cx.worker) {
|
|
||||||
let mut maybe_core = cx.core.borrow_mut();
|
let mut maybe_core = cx.core.borrow_mut();
|
||||||
|
|
||||||
if let Some(core) = &mut *maybe_core {
|
if let Some(core) = &mut *maybe_core {
|
||||||
@ -641,11 +652,21 @@ impl task::Schedule for Arc<Worker> {
|
|||||||
// safety: the task is inserted in the list in `bind`.
|
// safety: the task is inserted in the list in `bind`.
|
||||||
unsafe {
|
unsafe {
|
||||||
let ptr = NonNull::from(task.header());
|
let ptr = NonNull::from(task.header());
|
||||||
return core.tasks.remove(ptr);
|
return Immediate::Removed(core.tasks.remove(ptr));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Immediate::Core(false)
|
||||||
|
});
|
||||||
|
|
||||||
|
// Checks if we were called from within a worker, allowing for immediate
|
||||||
|
// removal of a scheduled task. Else we have to go through the slower
|
||||||
|
// process below where we remotely mark a task as dropped.
|
||||||
|
let worker_has_core = match immediate {
|
||||||
|
Immediate::Removed(task) => return task,
|
||||||
|
Immediate::Core(worker_has_core) => worker_has_core,
|
||||||
|
};
|
||||||
|
|
||||||
// Track the task to be released by the worker that owns it
|
// Track the task to be released by the worker that owns it
|
||||||
//
|
//
|
||||||
// Safety: We get a new handle without incrementing the ref-count.
|
// Safety: We get a new handle without incrementing the ref-count.
|
||||||
@ -661,7 +682,7 @@ impl task::Schedule for Arc<Worker> {
|
|||||||
|
|
||||||
self.remote().pending_drop.push(task);
|
self.remote().pending_drop.push(task);
|
||||||
|
|
||||||
if cx.core.borrow().is_some() {
|
if worker_has_core {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -674,7 +695,6 @@ impl task::Schedule for Arc<Worker> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn schedule(&self, task: Notified) {
|
fn schedule(&self, task: Notified) {
|
||||||
|
26
tokio/tests/task_abort.rs
Normal file
26
tokio/tests/task_abort.rs
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
#![warn(rust_2018_idioms)]
|
||||||
|
#![cfg(feature = "full")]
|
||||||
|
|
||||||
|
/// Checks that a suspended task can be aborted without panicking as reported in
|
||||||
|
/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
|
||||||
|
#[test]
|
||||||
|
fn test_abort_without_panic_3157() {
|
||||||
|
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_time()
|
||||||
|
.worker_threads(1)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
rt.block_on(async move {
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
println!("task started");
|
||||||
|
tokio::time::sleep(std::time::Duration::new(100, 0)).await
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait for task to sleep.
|
||||||
|
tokio::time::sleep(std::time::Duration::new(1, 0)).await;
|
||||||
|
|
||||||
|
handle.abort();
|
||||||
|
let _ = handle.await;
|
||||||
|
});
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user