rt: fix potential leak during runtime shutdown (#2649)

JoinHandle of threads created by the pool are now tracked and properly joined at
shutdown. If the thread does not return within the timeout, then it's not joined and
left to the OS for cleanup.

Also, break a cycle between wakers held by the timer and the runtime.

Fixes #2641, #2535
This commit is contained in:
Émile Grégoire 2020-07-28 23:43:19 -04:00 committed by GitHub
parent 1562bb3144
commit 646fbae765
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 139 additions and 24 deletions

View File

@ -67,7 +67,7 @@ process = [
"winapi/winerror", "winapi/winerror",
] ]
# Includes basic task execution capabilities # Includes basic task execution capabilities
rt-core = [] rt-core = ["slab"]
rt-util = [] rt-util = []
rt-threaded = [ rt-threaded = [
"num_cpus", "num_cpus",
@ -129,7 +129,7 @@ proptest = "0.9.4"
tempfile = "3.1.0" tempfile = "3.1.0"
[target.'cfg(loom)'.dev-dependencies] [target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.3.4", features = ["futures", "checkpoint"] } loom = { version = "0.3.5", features = ["futures", "checkpoint"] }
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true

View File

@ -181,6 +181,8 @@ impl Park for Driver {
self.turn(Some(duration))?; self.turn(Some(duration))?;
Ok(()) Ok(())
} }
fn shutdown(&mut self) {}
} }
impl fmt::Debug for Driver { impl fmt::Debug for Driver {

View File

@ -36,6 +36,13 @@ where
Either::B(b) => b.park_timeout(duration).map_err(Either::B), Either::B(b) => b.park_timeout(duration).map_err(Either::B),
} }
} }
fn shutdown(&mut self) {
match self {
Either::A(a) => a.shutdown(),
Either::B(b) => b.shutdown(),
}
}
} }
impl<A, B> Unpark for Either<A, B> impl<A, B> Unpark for Either<A, B>

View File

@ -88,6 +88,9 @@ pub(crate) trait Park {
/// an implementation detail. Refer to the documentation for the specific /// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation /// `Park` implementation
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>; fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;
/// Release all resources holded by the parker for proper leak-free shutdown
fn shutdown(&mut self);
} }
/// Unblock a thread blocked by the associated `Park` instance. /// Unblock a thread blocked by the associated `Park` instance.

View File

@ -65,6 +65,10 @@ impl Park for ParkThread {
self.inner.park_timeout(duration); self.inner.park_timeout(duration);
Ok(()) Ok(())
} }
fn shutdown(&mut self) {
self.inner.shutdown();
}
} }
// ==== impl Inner ==== // ==== impl Inner ====
@ -188,6 +192,10 @@ impl Inner {
self.condvar.notify_one() self.condvar.notify_one()
} }
fn shutdown(&self) {
self.condvar.notify_all();
}
} }
impl Default for ParkThread { impl Default for ParkThread {
@ -259,6 +267,10 @@ cfg_block_on! {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(()) Ok(())
} }
fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
} }

View File

@ -8,6 +8,8 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle}; use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle}; use crate::runtime::{Builder, Callback, Handle};
use slab::Slab;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::Duration;
@ -41,6 +43,7 @@ struct Inner {
/// Call before a thread stops /// Call before a thread stops
before_stop: Option<Callback>, before_stop: Option<Callback>,
// Maximum number of threads
thread_cap: usize, thread_cap: usize,
} }
@ -51,6 +54,7 @@ struct Shared {
num_notify: u32, num_notify: u32,
shutdown: bool, shutdown: bool,
shutdown_tx: Option<shutdown::Sender>, shutdown_tx: Option<shutdown::Sender>,
worker_threads: Slab<thread::JoinHandle<()>>,
} }
type Task = task::Notified<NoopSchedule>; type Task = task::Notified<NoopSchedule>;
@ -96,6 +100,7 @@ impl BlockingPool {
num_notify: 0, num_notify: 0,
shutdown: false, shutdown: false,
shutdown_tx: Some(shutdown_tx), shutdown_tx: Some(shutdown_tx),
worker_threads: Slab::new(),
}), }),
condvar: Condvar::new(), condvar: Condvar::new(),
thread_name: builder.thread_name.clone(), thread_name: builder.thread_name.clone(),
@ -126,10 +131,15 @@ impl BlockingPool {
shared.shutdown = true; shared.shutdown = true;
shared.shutdown_tx = None; shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all(); self.spawner.inner.condvar.notify_all();
let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
drop(shared); drop(shared);
self.shutdown_rx.wait(timeout); if self.shutdown_rx.wait(timeout) {
for handle in workers.drain() {
let _ = handle.join();
}
}
} }
} }
@ -187,13 +197,23 @@ impl Spawner {
}; };
if let Some(shutdown_tx) = shutdown_tx { if let Some(shutdown_tx) = shutdown_tx {
self.spawn_thread(shutdown_tx, rt); let mut shared = self.inner.shared.lock().unwrap();
let entry = shared.worker_threads.vacant_entry();
let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
entry.insert(handle);
} }
Ok(()) Ok(())
} }
fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
worker_id: usize,
) -> thread::JoinHandle<()> {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
if let Some(stack_size) = self.inner.stack_size { if let Some(stack_size) = self.inner.stack_size {
@ -207,16 +227,16 @@ impl Spawner {
// Only the reference should be moved into the closure // Only the reference should be moved into the closure
let rt = &rt; let rt = &rt;
rt.enter(move || { rt.enter(move || {
rt.blocking_spawner.inner.run(); rt.blocking_spawner.inner.run(worker_id);
drop(shutdown_tx); drop(shutdown_tx);
}) })
}) })
.unwrap(); .unwrap()
} }
} }
impl Inner { impl Inner {
fn run(&self) { fn run(&self, worker_id: usize) {
if let Some(f) = &self.after_start { if let Some(f) = &self.after_start {
f() f()
} }
@ -252,6 +272,8 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the // Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic. // shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() { if !shared.shutdown && timeout_result.timed_out() {
shared.worker_threads.remove(worker_id);
break 'main; break 'main;
} }

View File

@ -32,11 +32,13 @@ impl Receiver {
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout` /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the /// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received. /// shutdown signal is received.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) { ///
/// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
use crate::runtime::enter::try_enter; use crate::runtime::enter::try_enter;
if timeout == Some(Duration::from_nanos(0)) { if timeout == Some(Duration::from_nanos(0)) {
return; return true;
} }
let mut e = match try_enter(false) { let mut e = match try_enter(false) {
@ -44,7 +46,7 @@ impl Receiver {
_ => { _ => {
if std::thread::panicking() { if std::thread::panicking() {
// Don't panic in a panic // Don't panic in a panic
return; return false;
} else { } else {
panic!( panic!(
"Cannot drop a runtime in a context where blocking is not allowed. \ "Cannot drop a runtime in a context where blocking is not allowed. \
@ -60,9 +62,10 @@ impl Receiver {
// current thread (usually, shutting down a runtime stored in a // current thread (usually, shutting down a runtime stored in a
// thread-local). // thread-local).
if let Some(timeout) = timeout { if let Some(timeout) = timeout {
let _ = e.block_on_timeout(&mut self.rx, timeout); e.block_on_timeout(&mut self.rx, timeout).is_ok()
} else { } else {
let _ = e.block_on(&mut self.rx); let _ = e.block_on(&mut self.rx);
true
} }
} }
} }

View File

@ -542,11 +542,10 @@ impl Runtime {
/// runtime.shutdown_timeout(Duration::from_millis(100)); /// runtime.shutdown_timeout(Duration::from_millis(100));
/// } /// }
/// ``` /// ```
pub fn shutdown_timeout(self, duration: Duration) { pub fn shutdown_timeout(mut self, duration: Duration) {
let Runtime { // Wakeup and shutdown all the worker threads
mut blocking_pool, .. self.handle.spawner.shutdown();
} = self; self.blocking_pool.shutdown(Some(duration));
blocking_pool.shutdown(Some(duration));
} }
/// Shutdown the runtime, without waiting for any spawned tasks to shutdown. /// Shutdown the runtime, without waiting for any spawned tasks to shutdown.

View File

@ -104,6 +104,10 @@ impl Park for Parker {
Ok(()) Ok(())
} }
} }
fn shutdown(&mut self) {
self.inner.shutdown();
}
} }
impl Unpark for Unparker { impl Unpark for Unparker {
@ -242,4 +246,12 @@ impl Inner {
fn unpark_driver(&self) { fn unpark_driver(&self) {
self.shared.handle.unpark(); self.shared.handle.unpark();
} }
fn shutdown(&self) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
}
self.condvar.notify_all();
}
} }

View File

@ -18,6 +18,17 @@ pub(crate) enum Spawner {
ThreadPool(thread_pool::Spawner), ThreadPool(thread_pool::Spawner),
} }
impl Spawner {
pub(crate) fn shutdown(&mut self) {
#[cfg(feature = "rt-threaded")]
{
if let Spawner::ThreadPool(spawner) = self {
spawner.shutdown();
}
}
}
}
cfg_rt_core! { cfg_rt_core! {
impl Spawner { impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>

View File

@ -91,7 +91,7 @@ impl fmt::Debug for ThreadPool {
impl Drop for ThreadPool { impl Drop for ThreadPool {
fn drop(&mut self) { fn drop(&mut self) {
self.spawner.shared.close(); self.spawner.shutdown();
} }
} }
@ -108,6 +108,10 @@ impl Spawner {
self.shared.schedule(task, false); self.shared.schedule(task, false);
handle handle
} }
pub(crate) fn shutdown(&mut self) {
self.shared.close();
}
} }
impl fmt::Debug for Spawner { impl fmt::Debug for Spawner {

View File

@ -572,6 +572,8 @@ impl Core {
// Drain the queue // Drain the queue
while self.next_local_task().is_some() {} while self.next_local_task().is_some() {}
park.shutdown();
} }
fn drain_pending_drop(&mut self, worker: &Worker) { fn drain_pending_drop(&mut self, worker: &Worker) {

View File

@ -95,7 +95,7 @@ impl Iterator for AtomicStackEntries {
type Item = Arc<Entry>; type Item = Arc<Entry>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.ptr.is_null() { if self.ptr.is_null() || self.ptr == SHUTDOWN {
return None; return None;
} }

View File

@ -82,7 +82,7 @@ use std::{cmp, fmt};
/// [timeout]: crate::time::Timeout /// [timeout]: crate::time::Timeout
/// [interval]: crate::time::Interval /// [interval]: crate::time::Interval
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Driver<T> { pub(crate) struct Driver<T: Park> {
/// Shared state /// Shared state
inner: Arc<Inner>, inner: Arc<Inner>,
@ -94,6 +94,9 @@ pub(crate) struct Driver<T> {
/// Source of "now" instances /// Source of "now" instances
clock: Clock, clock: Clock,
/// True if the driver is being shutdown
is_shutdown: bool,
} }
/// Timer state shared between `Driver`, `Handle`, and `Registration`. /// Timer state shared between `Driver`, `Handle`, and `Registration`.
@ -135,6 +138,7 @@ where
wheel: wheel::Wheel::new(), wheel: wheel::Wheel::new(),
park, park,
clock, clock,
is_shutdown: false,
} }
} }
@ -303,10 +307,12 @@ where
Ok(()) Ok(())
} }
}
impl<T> Drop for Driver<T> { fn shutdown(&mut self) {
fn drop(&mut self) { if self.is_shutdown {
return;
}
use std::u64; use std::u64;
// Shutdown the stack of entries to process, preventing any new entries // Shutdown the stack of entries to process, preventing any new entries
@ -319,6 +325,19 @@ impl<T> Drop for Driver<T> {
while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
entry.error(Error::shutdown()); entry.error(Error::shutdown());
} }
self.park.shutdown();
self.is_shutdown = true;
}
}
impl<T> Drop for Driver<T>
where
T: Park,
{
fn drop(&mut self) {
self.shutdown();
} }
} }

View File

@ -351,6 +351,8 @@ fn unpark_is_delayed() {
self.0.advance(ms(436)); self.0.advance(ms(436));
Ok(()) Ok(())
} }
fn shutdown(&mut self) {}
} }
impl Unpark for MockUnpark { impl Unpark for MockUnpark {
@ -434,6 +436,8 @@ impl Park for MockPark {
self.0.advance(duration); self.0.advance(duration);
Ok(()) Ok(())
} }
fn shutdown(&mut self) {}
} }
impl Unpark for MockUnpark { impl Unpark for MockUnpark {

View File

@ -2,7 +2,7 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
#![cfg(feature = "full")] #![cfg(feature = "full")]
// Tests to run on both current-thread & therad-pool runtime variants. // Tests to run on both current-thread & thread-pool runtime variants.
macro_rules! rt_test { macro_rules! rt_test {
($($t:tt)*) => { ($($t:tt)*) => {
@ -869,6 +869,21 @@ rt_test! {
} }
#[test] #[test]
fn shutdown_wakeup_time() {
let mut runtime = rt();
runtime.block_on(async move {
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
});
runtime.shutdown_timeout(Duration::from_secs(10_000));
}
// This test is currently ignored on Windows because of a
// rust-lang issue in thread local storage destructors.
// See https://github.com/rust-lang/rust/issues/74875
#[test]
#[cfg(not(windows))]
fn runtime_in_thread_local() { fn runtime_in_thread_local() {
use std::cell::RefCell; use std::cell::RefCell;
use std::thread; use std::thread;