diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index eca1554ad..966bfe3ea 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -67,7 +67,7 @@ process = [ "winapi/winerror", ] # Includes basic task execution capabilities -rt-core = [] +rt-core = ["slab"] rt-util = [] rt-threaded = [ "num_cpus", @@ -129,7 +129,7 @@ proptest = "0.9.4" tempfile = "3.1.0" [target.'cfg(loom)'.dev-dependencies] -loom = { version = "0.3.4", features = ["futures", "checkpoint"] } +loom = { version = "0.3.5", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index dbfb6e16e..d8d17f887 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -181,6 +181,8 @@ impl Park for Driver { self.turn(Some(duration))?; Ok(()) } + + fn shutdown(&mut self) {} } impl fmt::Debug for Driver { diff --git a/tokio/src/park/either.rs b/tokio/src/park/either.rs index 67f1e1727..c66d12131 100644 --- a/tokio/src/park/either.rs +++ b/tokio/src/park/either.rs @@ -36,6 +36,13 @@ where Either::B(b) => b.park_timeout(duration).map_err(Either::B), } } + + fn shutdown(&mut self) { + match self { + Either::A(a) => a.shutdown(), + Either::B(b) => b.shutdown(), + } + } } impl Unpark for Either diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs index 04d3051d8..2cfef8c2d 100644 --- a/tokio/src/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -88,6 +88,9 @@ pub(crate) trait Park { /// an implementation detail. Refer to the documentation for the specific /// `Park` implementation fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>; + + /// Release all resources holded by the parker for proper leak-free shutdown + fn shutdown(&mut self); } /// Unblock a thread blocked by the associated `Park` instance. diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 2e2397c72..44174d351 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -65,6 +65,10 @@ impl Park for ParkThread { self.inner.park_timeout(duration); Ok(()) } + + fn shutdown(&mut self) { + self.inner.shutdown(); + } } // ==== impl Inner ==== @@ -188,6 +192,10 @@ impl Inner { self.condvar.notify_one() } + + fn shutdown(&self) { + self.condvar.notify_all(); + } } impl Default for ParkThread { @@ -259,6 +267,10 @@ cfg_block_on! { self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; Ok(()) } + + fn shutdown(&mut self) { + let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); + } } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 40d417b19..c5d464c85 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,6 +8,8 @@ use crate::runtime::blocking::task::BlockingTask; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use slab::Slab; + use std::collections::VecDeque; use std::fmt; use std::time::Duration; @@ -41,6 +43,7 @@ struct Inner { /// Call before a thread stops before_stop: Option, + // Maximum number of threads thread_cap: usize, } @@ -51,6 +54,7 @@ struct Shared { num_notify: u32, shutdown: bool, shutdown_tx: Option, + worker_threads: Slab>, } type Task = task::Notified; @@ -96,6 +100,7 @@ impl BlockingPool { num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), + worker_threads: Slab::new(), }), condvar: Condvar::new(), thread_name: builder.thread_name.clone(), @@ -126,10 +131,15 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; self.spawner.inner.condvar.notify_all(); + let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new()); drop(shared); - self.shutdown_rx.wait(timeout); + if self.shutdown_rx.wait(timeout) { + for handle in workers.drain() { + let _ = handle.join(); + } + } } } @@ -187,13 +197,23 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - self.spawn_thread(shutdown_tx, rt); + let mut shared = self.inner.shared.lock().unwrap(); + let entry = shared.worker_threads.vacant_entry(); + + let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); + + entry.insert(handle); } Ok(()) } - fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { + fn spawn_thread( + &self, + shutdown_tx: shutdown::Sender, + rt: &Handle, + worker_id: usize, + ) -> thread::JoinHandle<()> { let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); if let Some(stack_size) = self.inner.stack_size { @@ -207,16 +227,16 @@ impl Spawner { // Only the reference should be moved into the closure let rt = &rt; rt.enter(move || { - rt.blocking_spawner.inner.run(); + rt.blocking_spawner.inner.run(worker_id); drop(shutdown_tx); }) }) - .unwrap(); + .unwrap() } } impl Inner { - fn run(&self) { + fn run(&self, worker_id: usize) { if let Some(f) = &self.after_start { f() } @@ -252,6 +272,8 @@ impl Inner { // Even if the condvar "timed out", if the pool is entering the // shutdown phase, we want to perform the cleanup logic. if !shared.shutdown && timeout_result.timed_out() { + shared.worker_threads.remove(worker_id); + break 'main; } diff --git a/tokio/src/runtime/blocking/shutdown.rs b/tokio/src/runtime/blocking/shutdown.rs index e76a70135..3b6cc5930 100644 --- a/tokio/src/runtime/blocking/shutdown.rs +++ b/tokio/src/runtime/blocking/shutdown.rs @@ -32,11 +32,13 @@ impl Receiver { /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout` /// duration. If `timeout` is `None`, then the thread is blocked until the /// shutdown signal is received. - pub(crate) fn wait(&mut self, timeout: Option) { + /// + /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`. + pub(crate) fn wait(&mut self, timeout: Option) -> bool { use crate::runtime::enter::try_enter; if timeout == Some(Duration::from_nanos(0)) { - return; + return true; } let mut e = match try_enter(false) { @@ -44,7 +46,7 @@ impl Receiver { _ => { if std::thread::panicking() { // Don't panic in a panic - return; + return false; } else { panic!( "Cannot drop a runtime in a context where blocking is not allowed. \ @@ -60,9 +62,10 @@ impl Receiver { // current thread (usually, shutting down a runtime stored in a // thread-local). if let Some(timeout) = timeout { - let _ = e.block_on_timeout(&mut self.rx, timeout); + e.block_on_timeout(&mut self.rx, timeout).is_ok() } else { let _ = e.block_on(&mut self.rx); + true } } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 300a14657..637f38cab 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -542,11 +542,10 @@ impl Runtime { /// runtime.shutdown_timeout(Duration::from_millis(100)); /// } /// ``` - pub fn shutdown_timeout(self, duration: Duration) { - let Runtime { - mut blocking_pool, .. - } = self; - blocking_pool.shutdown(Some(duration)); + pub fn shutdown_timeout(mut self, duration: Duration) { + // Wakeup and shutdown all the worker threads + self.handle.spawner.shutdown(); + self.blocking_pool.shutdown(Some(duration)); } /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs index ee437d1d9..1dcf65af9 100644 --- a/tokio/src/runtime/park.rs +++ b/tokio/src/runtime/park.rs @@ -104,6 +104,10 @@ impl Park for Parker { Ok(()) } } + + fn shutdown(&mut self) { + self.inner.shutdown(); + } } impl Unpark for Unparker { @@ -242,4 +246,12 @@ impl Inner { fn unpark_driver(&self) { self.shared.handle.unpark(); } + + fn shutdown(&self) { + if let Some(mut driver) = self.shared.driver.try_lock() { + driver.shutdown(); + } + + self.condvar.notify_all(); + } } diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index d136945cd..c5f2d17cd 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -18,6 +18,17 @@ pub(crate) enum Spawner { ThreadPool(thread_pool::Spawner), } +impl Spawner { + pub(crate) fn shutdown(&mut self) { + #[cfg(feature = "rt-threaded")] + { + if let Spawner::ThreadPool(spawner) = self { + spawner.shutdown(); + } + } + } +} + cfg_rt_core! { impl Spawner { pub(crate) fn spawn(&self, future: F) -> JoinHandle diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index ced9712d9..d30e8d456 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -91,7 +91,7 @@ impl fmt::Debug for ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { - self.spawner.shared.close(); + self.spawner.shutdown(); } } @@ -108,6 +108,10 @@ impl Spawner { self.shared.schedule(task, false); handle } + + pub(crate) fn shutdown(&mut self) { + self.shared.close(); + } } impl fmt::Debug for Spawner { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index c53c9384b..ac0528547 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -572,6 +572,8 @@ impl Core { // Drain the queue while self.next_local_task().is_some() {} + + park.shutdown(); } fn drain_pending_drop(&mut self, worker: &Worker) { diff --git a/tokio/src/time/driver/atomic_stack.rs b/tokio/src/time/driver/atomic_stack.rs index d27579f92..c1972a76c 100644 --- a/tokio/src/time/driver/atomic_stack.rs +++ b/tokio/src/time/driver/atomic_stack.rs @@ -95,7 +95,7 @@ impl Iterator for AtomicStackEntries { type Item = Arc; fn next(&mut self) -> Option { - if self.ptr.is_null() { + if self.ptr.is_null() || self.ptr == SHUTDOWN { return None; } diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 92a8474a7..bb6c28b34 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -82,7 +82,7 @@ use std::{cmp, fmt}; /// [timeout]: crate::time::Timeout /// [interval]: crate::time::Interval #[derive(Debug)] -pub(crate) struct Driver { +pub(crate) struct Driver { /// Shared state inner: Arc, @@ -94,6 +94,9 @@ pub(crate) struct Driver { /// Source of "now" instances clock: Clock, + + /// True if the driver is being shutdown + is_shutdown: bool, } /// Timer state shared between `Driver`, `Handle`, and `Registration`. @@ -135,6 +138,7 @@ where wheel: wheel::Wheel::new(), park, clock, + is_shutdown: false, } } @@ -303,10 +307,12 @@ where Ok(()) } -} -impl Drop for Driver { - fn drop(&mut self) { + fn shutdown(&mut self) { + if self.is_shutdown { + return; + } + use std::u64; // Shutdown the stack of entries to process, preventing any new entries @@ -319,6 +325,19 @@ impl Drop for Driver { while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { entry.error(Error::shutdown()); } + + self.park.shutdown(); + + self.is_shutdown = true; + } +} + +impl Drop for Driver +where + T: Park, +{ + fn drop(&mut self) { + self.shutdown(); } } diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs index b708f6fc0..b732e4584 100644 --- a/tokio/src/time/tests/test_delay.rs +++ b/tokio/src/time/tests/test_delay.rs @@ -351,6 +351,8 @@ fn unpark_is_delayed() { self.0.advance(ms(436)); Ok(()) } + + fn shutdown(&mut self) {} } impl Unpark for MockUnpark { @@ -434,6 +436,8 @@ impl Park for MockPark { self.0.advance(duration); Ok(()) } + + fn shutdown(&mut self) {} } impl Unpark for MockUnpark { diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 71101d46c..4211a6670 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -2,7 +2,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -// Tests to run on both current-thread & therad-pool runtime variants. +// Tests to run on both current-thread & thread-pool runtime variants. macro_rules! rt_test { ($($t:tt)*) => { @@ -869,6 +869,21 @@ rt_test! { } #[test] + fn shutdown_wakeup_time() { + let mut runtime = rt(); + + runtime.block_on(async move { + tokio::time::delay_for(std::time::Duration::from_millis(100)).await; + }); + + runtime.shutdown_timeout(Duration::from_secs(10_000)); + } + + // This test is currently ignored on Windows because of a + // rust-lang issue in thread local storage destructors. + // See https://github.com/rust-lang/rust/issues/74875 + #[test] + #[cfg(not(windows))] fn runtime_in_thread_local() { use std::cell::RefCell; use std::thread;