mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
Fix basic_scheduler deadlock when waking during drop (#2062)
This commit is contained in:
parent
798e86821f
commit
855d39f849
@ -261,19 +261,26 @@ where
|
|||||||
/// If the mutex around the remote queue is poisoned _and_ the current
|
/// If the mutex around the remote queue is poisoned _and_ the current
|
||||||
/// thread is not already panicking. This is safe to call in a `Drop` impl.
|
/// thread is not already panicking. This is safe to call in a `Drop` impl.
|
||||||
fn close_remote(&self) {
|
fn close_remote(&self) {
|
||||||
#[allow(clippy::match_wild_err_arm)]
|
loop {
|
||||||
let mut lock = match self.remote_queue.lock() {
|
#[allow(clippy::match_wild_err_arm)]
|
||||||
// If the lock is poisoned, but the thread is already panicking,
|
let mut lock = match self.remote_queue.lock() {
|
||||||
// avoid a double panic. This is necessary since this fn can be
|
// If the lock is poisoned, but the thread is already panicking,
|
||||||
// called in a drop impl.
|
// avoid a double panic. This is necessary since this fn can be
|
||||||
Err(_) if std::thread::panicking() => return,
|
// called in a drop impl.
|
||||||
Err(_) => panic!("mutex poisoned"),
|
Err(_) if std::thread::panicking() => return,
|
||||||
Ok(lock) => lock,
|
Err(_) => panic!("mutex poisoned"),
|
||||||
};
|
Ok(lock) => lock,
|
||||||
lock.open = false;
|
};
|
||||||
|
lock.open = false;
|
||||||
|
|
||||||
while let Some(task) = lock.queue.pop_front() {
|
if let Some(task) = lock.queue.pop_front() {
|
||||||
task.shutdown();
|
// Release lock before dropping task, in case
|
||||||
|
// task tries to re-schedule in its Drop.
|
||||||
|
drop(lock);
|
||||||
|
task.shutdown();
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,6 +63,65 @@ fn acquire_mutex_in_drop() {
|
|||||||
drop(rt);
|
drop(rt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn wake_while_rt_is_dropping() {
|
||||||
|
use tokio::task;
|
||||||
|
|
||||||
|
struct OnDrop<F: FnMut()>(F);
|
||||||
|
|
||||||
|
impl<F: FnMut()> Drop for OnDrop<F> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
(self.0)()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (tx1, rx1) = oneshot::channel();
|
||||||
|
let (tx2, rx2) = oneshot::channel();
|
||||||
|
let (tx3, rx3) = oneshot::channel();
|
||||||
|
|
||||||
|
let mut rt = rt();
|
||||||
|
|
||||||
|
let h1 = rt.handle().clone();
|
||||||
|
|
||||||
|
rt.handle().spawn(async move {
|
||||||
|
// Ensure a waker gets stored in oneshot 1.
|
||||||
|
let _ = rx1.await;
|
||||||
|
tx3.send(()).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
rt.handle().spawn(async move {
|
||||||
|
// When this task is dropped, we'll be "closing remotes".
|
||||||
|
// We spawn a new task that owns the `tx1`, to move its Drop
|
||||||
|
// out of here.
|
||||||
|
//
|
||||||
|
// Importantly, the oneshot 1 has a waker already stored, so
|
||||||
|
// the eventual drop here will try to re-schedule again.
|
||||||
|
let mut opt_tx1 = Some(tx1);
|
||||||
|
let _d = OnDrop(move || {
|
||||||
|
let tx1 = opt_tx1.take().unwrap();
|
||||||
|
h1.spawn(async move {
|
||||||
|
tx1.send(()).unwrap();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
let _ = rx2.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
rt.handle().spawn(async move {
|
||||||
|
let _ = rx3.await;
|
||||||
|
// We'll never get here, but once task 3 drops, this will
|
||||||
|
// force task 2 to re-schedule since it's waiting on oneshot 2.
|
||||||
|
tx2.send(()).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Tick the loop
|
||||||
|
rt.block_on(async {
|
||||||
|
task::yield_now().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Drop the rt
|
||||||
|
drop(rt);
|
||||||
|
}
|
||||||
|
|
||||||
fn rt() -> Runtime {
|
fn rt() -> Runtime {
|
||||||
tokio::runtime::Builder::new()
|
tokio::runtime::Builder::new()
|
||||||
.basic_scheduler()
|
.basic_scheduler()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user