mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
rt: remove last slab dependency (#2917)
This removes the last slab dependency by replacing the current slab-based JoinHandle tracking with one based on HashMap instead. Co-authored-by: Bryan Donlan <bdonlan@amazon.com>
This commit is contained in:
parent
0b3918bce9
commit
d7e3fcb9ee
@ -10,9 +10,7 @@ use crate::runtime::context;
|
||||
use crate::runtime::task::{self, JoinHandle};
|
||||
use crate::runtime::{Builder, Callback, Handle};
|
||||
|
||||
use slab::Slab;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -59,7 +57,18 @@ struct Shared {
|
||||
num_notify: u32,
|
||||
shutdown: bool,
|
||||
shutdown_tx: Option<shutdown::Sender>,
|
||||
worker_threads: Slab<thread::JoinHandle<()>>,
|
||||
/// Prior to shutdown, we clean up JoinHandles by having each timed-out
|
||||
/// thread join on the previous timed-out thread. This is not strictly
|
||||
/// necessary but helps avoid Valgrind false positives, see
|
||||
/// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666
|
||||
/// for more information.
|
||||
last_exiting_thread: Option<thread::JoinHandle<()>>,
|
||||
/// This holds the JoinHandles for all running threads; on shutdown, the thread
|
||||
/// calling shutdown handles joining on these.
|
||||
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
|
||||
/// This is a counter used to iterate worker_threads in a consistent order (for loom's
|
||||
/// benefit)
|
||||
worker_thread_index: usize,
|
||||
}
|
||||
|
||||
type Task = task::Notified<NoopSchedule>;
|
||||
@ -105,7 +114,9 @@ impl BlockingPool {
|
||||
num_notify: 0,
|
||||
shutdown: false,
|
||||
shutdown_tx: Some(shutdown_tx),
|
||||
worker_threads: Slab::new(),
|
||||
last_exiting_thread: None,
|
||||
worker_threads: HashMap::new(),
|
||||
worker_thread_index: 0,
|
||||
}),
|
||||
condvar: Condvar::new(),
|
||||
thread_name: builder.thread_name.clone(),
|
||||
@ -137,12 +148,21 @@ impl BlockingPool {
|
||||
shared.shutdown = true;
|
||||
shared.shutdown_tx = None;
|
||||
self.spawner.inner.condvar.notify_all();
|
||||
let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
|
||||
|
||||
let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
|
||||
let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new());
|
||||
|
||||
drop(shared);
|
||||
|
||||
if self.shutdown_rx.wait(timeout) {
|
||||
for handle in workers.drain() {
|
||||
let _ = last_exited_thread.map(|th| th.join());
|
||||
|
||||
// Loom requires that execution be deterministic, so sort by thread ID before joining.
|
||||
// (HashMaps use a randomly-seeded hash function, so the order is nondeterministic)
|
||||
let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect();
|
||||
workers.sort_by_key(|(id, _)| *id);
|
||||
|
||||
for (_id, handle) in workers.into_iter() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
}
|
||||
@ -204,11 +224,13 @@ impl Spawner {
|
||||
|
||||
if let Some(shutdown_tx) = shutdown_tx {
|
||||
let mut shared = self.inner.shared.lock();
|
||||
let entry = shared.worker_threads.vacant_entry();
|
||||
|
||||
let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
|
||||
let id = shared.worker_thread_index;
|
||||
shared.worker_thread_index += 1;
|
||||
|
||||
entry.insert(handle);
|
||||
let handle = self.spawn_thread(shutdown_tx, rt, id);
|
||||
|
||||
shared.worker_threads.insert(id, handle);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -218,7 +240,7 @@ impl Spawner {
|
||||
&self,
|
||||
shutdown_tx: shutdown::Sender,
|
||||
rt: &Handle,
|
||||
worker_id: usize,
|
||||
id: usize,
|
||||
) -> thread::JoinHandle<()> {
|
||||
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
|
||||
|
||||
@ -232,7 +254,7 @@ impl Spawner {
|
||||
.spawn(move || {
|
||||
// Only the reference should be moved into the closure
|
||||
let _enter = crate::runtime::context::enter(rt.clone());
|
||||
rt.blocking_spawner.inner.run(worker_id);
|
||||
rt.blocking_spawner.inner.run(id);
|
||||
drop(shutdown_tx);
|
||||
})
|
||||
.unwrap()
|
||||
@ -240,12 +262,13 @@ impl Spawner {
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn run(&self, worker_id: usize) {
|
||||
fn run(&self, worker_thread_id: usize) {
|
||||
if let Some(f) = &self.after_start {
|
||||
f()
|
||||
}
|
||||
|
||||
let mut shared = self.shared.lock();
|
||||
let mut join_on_thread = None;
|
||||
|
||||
'main: loop {
|
||||
// BUSY
|
||||
@ -276,7 +299,11 @@ impl Inner {
|
||||
// Even if the condvar "timed out", if the pool is entering the
|
||||
// shutdown phase, we want to perform the cleanup logic.
|
||||
if !shared.shutdown && timeout_result.timed_out() {
|
||||
shared.worker_threads.remove(worker_id);
|
||||
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
|
||||
// This isn't done when shutting down, because the thread calling shutdown will
|
||||
// handle joining everything.
|
||||
let my_handle = shared.worker_threads.remove(&worker_thread_id);
|
||||
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
|
||||
|
||||
break 'main;
|
||||
}
|
||||
@ -323,6 +350,10 @@ impl Inner {
|
||||
if let Some(f) = &self.before_stop {
|
||||
f()
|
||||
}
|
||||
|
||||
if let Some(handle) = join_on_thread {
|
||||
let _ = handle.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user