mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
rt: avoid dropping a task in calls to wake() (#1972)
Calls to tasks should not be nested. Currently, while a task is being executed and the runtime is shutting down, a call to wake() can result in the wake target to be dropped. This, in turn, results in the drop handler being called. If the user holds a ref cell borrow, a mutex guard, or any such value, dropping the task inline can result in a deadlock. The fix is to permit tasks to be scheduled during the shutdown process and dropping the tasks once they are popped from the queue. Fixes #1929, #1886
This commit is contained in:
parent
8add90210b
commit
41d15ea212
@ -84,7 +84,7 @@ where
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
let (task, handle) = task::joinable(future);
|
||||
self.scheduler.schedule(task);
|
||||
self.scheduler.schedule(task, true);
|
||||
handle
|
||||
}
|
||||
|
||||
@ -161,7 +161,7 @@ impl Spawner {
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
let (task, handle) = task::joinable(future);
|
||||
self.scheduler.schedule(task);
|
||||
self.scheduler.schedule(task, true);
|
||||
handle
|
||||
}
|
||||
|
||||
@ -230,6 +230,27 @@ impl SchedulerPriv {
|
||||
self.queues.push_local(task);
|
||||
handle
|
||||
}
|
||||
|
||||
fn schedule(&self, task: Task<Self>, spawn: bool) {
|
||||
let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
|
||||
|
||||
if is_current {
|
||||
unsafe {
|
||||
// safety: this function is safe to call only from the
|
||||
// thread the basic scheduler is running on. If `is_current` is
|
||||
// then we are on that thread.
|
||||
self.queues.push_local(task)
|
||||
};
|
||||
} else {
|
||||
let mut lock = self.queues.remote();
|
||||
lock.schedule(task, spawn);
|
||||
|
||||
// while locked, call unpark
|
||||
self.unpark.unpark();
|
||||
|
||||
drop(lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Schedule for SchedulerPriv {
|
||||
@ -260,24 +281,7 @@ impl Schedule for SchedulerPriv {
|
||||
}
|
||||
|
||||
fn schedule(&self, task: Task<Self>) {
|
||||
let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
|
||||
|
||||
if is_current {
|
||||
unsafe {
|
||||
// safety: this function is safe to call only from the
|
||||
// thread the basic scheduler is running on. If `is_current` is
|
||||
// then we are on that thread.
|
||||
self.queues.push_local(task)
|
||||
};
|
||||
} else {
|
||||
let mut lock = self.queues.remote();
|
||||
lock.schedule(task);
|
||||
|
||||
// while locked, call unpark
|
||||
self.unpark.unpark();
|
||||
|
||||
drop(lock);
|
||||
}
|
||||
SchedulerPriv::schedule(self, task, false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -298,10 +302,16 @@ where
|
||||
}
|
||||
|
||||
// Wait until all tasks have been released.
|
||||
while unsafe { self.scheduler.queues.has_tasks_remaining() } {
|
||||
self.local.park.park().ok().expect("park failed");
|
||||
loop {
|
||||
unsafe {
|
||||
self.scheduler.queues.drain_pending_drop();
|
||||
self.scheduler.queues.drain_queues();
|
||||
|
||||
if !self.scheduler.queues.has_tasks_remaining() {
|
||||
break;
|
||||
}
|
||||
|
||||
self.local.park.park().ok().expect("park failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -345,7 +345,7 @@ impl Schedule for Scheduler {
|
||||
unsafe { self.queues.push_local(task) };
|
||||
} else {
|
||||
let mut lock = self.queues.remote();
|
||||
lock.schedule(task);
|
||||
lock.schedule(task, false);
|
||||
|
||||
self.waker.wake();
|
||||
|
||||
@ -437,8 +437,15 @@ impl Drop for Scheduler {
|
||||
// Wait until all tasks have been released.
|
||||
// XXX: this is a busy loop, but we don't really have any way to park
|
||||
// the thread here?
|
||||
while self.queues.has_tasks_remaining() {
|
||||
loop {
|
||||
self.queues.drain_pending_drop();
|
||||
self.queues.drain_queues();
|
||||
|
||||
if !self.queues.has_tasks_remaining() {
|
||||
break;
|
||||
}
|
||||
|
||||
std::thread::yield_now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -233,6 +233,16 @@ where
|
||||
self.drain_pending_drop();
|
||||
}
|
||||
|
||||
/// Drain both the local and remote run queues, shutting down any tasks.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This *must* be called only from the thread that owns the scheduler.
|
||||
pub(crate) unsafe fn drain_queues(&self) {
|
||||
self.close_local();
|
||||
self.close_remote();
|
||||
}
|
||||
|
||||
/// Shut down the scheduler's owned task list.
|
||||
///
|
||||
/// # Safety
|
||||
@ -300,8 +310,10 @@ where
|
||||
/// If the queue is open to accept new tasks, the task is pushed to the back
|
||||
/// of the queue. Otherwise, if the queue is closed (the scheduler is
|
||||
/// shutting down), the new task will be shut down immediately.
|
||||
pub(crate) fn schedule(&mut self, task: Task<S>) {
|
||||
if self.open {
|
||||
///
|
||||
/// `spawn` should be set if the caller is spawning a new task.
|
||||
pub(crate) fn schedule(&mut self, task: Task<S>, spawn: bool) {
|
||||
if !spawn || self.open {
|
||||
self.queue.push_back(task);
|
||||
} else {
|
||||
task.shutdown();
|
||||
|
@ -27,6 +27,42 @@ fn spawned_task_does_not_progress_without_block_on() {
|
||||
assert_eq!(out, "hello");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acquire_mutex_in_drop() {
|
||||
use futures::future::pending;
|
||||
use tokio::task;
|
||||
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
|
||||
let mut rt = rt();
|
||||
|
||||
rt.spawn(async move {
|
||||
let _ = rx2.await;
|
||||
unreachable!();
|
||||
});
|
||||
|
||||
rt.spawn(async move {
|
||||
let _ = rx1.await;
|
||||
let _ = tx2.send(()).unwrap();
|
||||
unreachable!();
|
||||
});
|
||||
|
||||
// Spawn a task that will never notify
|
||||
rt.spawn(async move {
|
||||
pending::<()>().await;
|
||||
tx1.send(()).unwrap();
|
||||
});
|
||||
|
||||
// Tick the loop
|
||||
rt.block_on(async {
|
||||
task::yield_now().await;
|
||||
});
|
||||
|
||||
// Drop the rt
|
||||
drop(rt);
|
||||
}
|
||||
|
||||
fn rt() -> Runtime {
|
||||
tokio::runtime::Builder::new()
|
||||
.basic_scheduler()
|
||||
|
50
tokio/tests/task_local_set.rs
Normal file
50
tokio/tests/task_local_set.rs
Normal file
@ -0,0 +1,50 @@
|
||||
#![warn(rust_2018_idioms)]
|
||||
#![cfg(feature = "full")]
|
||||
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::{self, LocalSet};
|
||||
|
||||
#[test]
|
||||
fn acquire_mutex_in_drop() {
|
||||
use futures::future::pending;
|
||||
|
||||
let (tx1, rx1) = oneshot::channel();
|
||||
let (tx2, rx2) = oneshot::channel();
|
||||
|
||||
let mut rt = rt();
|
||||
let local = LocalSet::new();
|
||||
|
||||
local.spawn_local(async move {
|
||||
let _ = rx2.await;
|
||||
unreachable!();
|
||||
});
|
||||
|
||||
local.spawn_local(async move {
|
||||
let _ = rx1.await;
|
||||
let _ = tx2.send(()).unwrap();
|
||||
unreachable!();
|
||||
});
|
||||
|
||||
// Spawn a task that will never notify
|
||||
local.spawn_local(async move {
|
||||
pending::<()>().await;
|
||||
tx1.send(()).unwrap();
|
||||
});
|
||||
|
||||
// Tick the loop
|
||||
local.block_on(&mut rt, async {
|
||||
task::yield_now().await;
|
||||
});
|
||||
|
||||
// Drop the LocalSet
|
||||
drop(local);
|
||||
}
|
||||
|
||||
fn rt() -> Runtime {
|
||||
tokio::runtime::Builder::new()
|
||||
.basic_scheduler()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user