mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
rt: avoid early task shutdown (#3870)
Tokio 1.7.0 introduced a change intended to eagerly shutdown newly spawned tasks if the runtime is in the process of shutting down. However, it introduced a bug where already spawned tasks could be shutdown too early, resulting in the potential introduction of deadlocks if tasks acquired mutexes in drop handlers. Fixes #3869
This commit is contained in:
parent
34c6a26c01
commit
7601dc6d2a
@ -1,3 +1,11 @@
|
||||
# 1.7.1 (June 18, 2021)
|
||||
|
||||
### Fixed
|
||||
|
||||
- runtime: fix early task shutdown during runtime shutdown ([#3870])
|
||||
|
||||
[#3870]: https://github.com/tokio-rs/tokio/pull/3870
|
||||
|
||||
# 1.7.0 (June 15, 2021)
|
||||
|
||||
### Added
|
||||
|
@ -7,12 +7,12 @@ name = "tokio"
|
||||
# - README.md
|
||||
# - Update CHANGELOG.md.
|
||||
# - Create "v1.0.x" git tag.
|
||||
version = "1.7.0"
|
||||
version = "1.7.1"
|
||||
edition = "2018"
|
||||
authors = ["Tokio Contributors <team@tokio.rs>"]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
documentation = "https://docs.rs/tokio/1.7.0/tokio/"
|
||||
documentation = "https://docs.rs/tokio/1.7.1/tokio/"
|
||||
repository = "https://github.com/tokio-rs/tokio"
|
||||
homepage = "https://tokio.rs"
|
||||
description = """
|
||||
|
@ -9,6 +9,7 @@
|
||||
rust_2018_idioms,
|
||||
unreachable_pub
|
||||
)]
|
||||
#![deny(unused_must_use)]
|
||||
#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
|
||||
#![doc(test(
|
||||
no_crate_inject,
|
||||
|
@ -124,9 +124,14 @@ impl<T> Local<T> {
|
||||
// There is capacity for the task
|
||||
break tail;
|
||||
} else if steal != real {
|
||||
// Concurrently stealing, this will free up capacity, so
|
||||
// only push the new task onto the inject queue
|
||||
inject.push(task);
|
||||
// Concurrently stealing, this will free up capacity, so only
|
||||
// push the new task onto the inject queue
|
||||
//
|
||||
// If the task failes to be pushed on the injection queue, there
|
||||
// is nothing to be done at this point as the task cannot be a
|
||||
// newly spawned task. Shutting down this task is handled by the
|
||||
// worker shutdown process.
|
||||
let _ = inject.push(task);
|
||||
return;
|
||||
} else {
|
||||
// Push the current task and half of the queue into the
|
||||
@ -507,7 +512,11 @@ impl<T: 'static> Inject<T> {
|
||||
}
|
||||
|
||||
/// Pushes a value into the queue.
|
||||
pub(super) fn push(&self, task: task::Notified<T>)
|
||||
///
|
||||
/// Returns `Err(task)` if pushing fails due to the queue being shutdown.
|
||||
/// The caller is expected to call `shutdown()` on the task **if and only
|
||||
/// if** it is a newly spawned task.
|
||||
pub(super) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>>
|
||||
where
|
||||
T: crate::runtime::task::Schedule,
|
||||
{
|
||||
@ -515,11 +524,7 @@ impl<T: 'static> Inject<T> {
|
||||
let mut p = self.pointers.lock();
|
||||
|
||||
if p.is_closed {
|
||||
// Drop the mutex to avoid a potential deadlock when
|
||||
// re-entering.
|
||||
drop(p);
|
||||
task.shutdown();
|
||||
return;
|
||||
return Err(task);
|
||||
}
|
||||
|
||||
// safety: only mutated with the lock held
|
||||
@ -538,6 +543,7 @@ impl<T: 'static> Inject<T> {
|
||||
p.tail = Some(task);
|
||||
|
||||
self.len.store(len + 1, Release);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn push_batch(
|
||||
|
@ -94,7 +94,13 @@ impl Spawner {
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
let (task, handle) = task::joinable(future);
|
||||
self.shared.schedule(task, false);
|
||||
|
||||
if let Err(task) = self.shared.schedule(task, false) {
|
||||
// The newly spawned task could not be scheduled because the runtime
|
||||
// is shutting down. The task must be explicitly shutdown at this point.
|
||||
task.shutdown();
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
|
||||
|
@ -709,16 +709,22 @@ impl task::Schedule for Arc<Worker> {
|
||||
}
|
||||
|
||||
fn schedule(&self, task: Notified) {
|
||||
self.shared.schedule(task, false);
|
||||
// Because this is not a newly spawned task, if scheduling fails due to
|
||||
// the runtime shutting down, there is no special work that must happen
|
||||
// here.
|
||||
let _ = self.shared.schedule(task, false);
|
||||
}
|
||||
|
||||
fn yield_now(&self, task: Notified) {
|
||||
self.shared.schedule(task, true);
|
||||
// Because this is not a newly spawned task, if scheduling fails due to
|
||||
// the runtime shutting down, there is no special work that must happen
|
||||
// here.
|
||||
let _ = self.shared.schedule(task, true);
|
||||
}
|
||||
}
|
||||
|
||||
impl Shared {
|
||||
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
|
||||
pub(super) fn schedule(&self, task: Notified, is_yield: bool) -> Result<(), Notified> {
|
||||
CURRENT.with(|maybe_cx| {
|
||||
if let Some(cx) = maybe_cx {
|
||||
// Make sure the task is part of the **current** scheduler.
|
||||
@ -726,15 +732,16 @@ impl Shared {
|
||||
// And the current thread still holds a core
|
||||
if let Some(core) = cx.core.borrow_mut().as_mut() {
|
||||
self.schedule_local(core, task, is_yield);
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, use the inject queue
|
||||
self.inject.push(task);
|
||||
self.inject.push(task)?;
|
||||
self.notify_parked();
|
||||
});
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
|
||||
|
@ -12,8 +12,8 @@ use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
use std::sync::{mpsc, Arc};
|
||||
use std::task::{Context, Poll};
|
||||
use std::sync::{mpsc, Arc, Mutex};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
|
||||
#[test]
|
||||
fn single_thread() {
|
||||
@ -405,6 +405,74 @@ async fn hang_on_shutdown() {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
|
||||
/// Demonstrates tokio-rs/tokio#3869
|
||||
#[test]
|
||||
fn wake_during_shutdown() {
|
||||
struct Shared {
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
struct MyFuture {
|
||||
shared: Arc<Mutex<Shared>>,
|
||||
put_waker: bool,
|
||||
}
|
||||
|
||||
impl MyFuture {
|
||||
fn new() -> (Self, Self) {
|
||||
let shared = Arc::new(Mutex::new(Shared { waker: None }));
|
||||
let f1 = MyFuture {
|
||||
shared: shared.clone(),
|
||||
put_waker: true,
|
||||
};
|
||||
let f2 = MyFuture {
|
||||
shared,
|
||||
put_waker: false,
|
||||
};
|
||||
(f1, f2)
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for MyFuture {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
||||
let me = Pin::into_inner(self);
|
||||
let mut lock = me.shared.lock().unwrap();
|
||||
println!("poll {}", me.put_waker);
|
||||
if me.put_waker {
|
||||
println!("putting");
|
||||
lock.waker = Some(cx.waker().clone());
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MyFuture {
|
||||
fn drop(&mut self) {
|
||||
println!("drop {} start", self.put_waker);
|
||||
let mut lock = self.shared.lock().unwrap();
|
||||
if !self.put_waker {
|
||||
lock.waker.take().unwrap().wake();
|
||||
}
|
||||
drop(lock);
|
||||
println!("drop {} stop", self.put_waker);
|
||||
}
|
||||
}
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let (f1, f2) = MyFuture::new();
|
||||
|
||||
rt.spawn(f1);
|
||||
rt.spawn(f2);
|
||||
|
||||
rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
|
||||
}
|
||||
|
||||
fn rt() -> Runtime {
|
||||
Runtime::new().unwrap()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user