rt: update MultiThread to use its own Handle (#5025)

This is the equivalent as tokio-rs/tokio#5023, but for the MultiThread
scheduler. This patch updates `MultiThread` to use `Arc<Handle>` for all
of its cross-thread needs instead of `Arc<Shared>`.

The effect of this change is that the multi-thread scheduler only has a
single `Arc` type, which includes the driver handles, keeping all
"shared state" together.
This commit is contained in:
Carl Lerche 2022-09-16 19:57:45 -07:00 committed by GitHub
parent 2effa7ff8a
commit cdd6eeaf70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 151 additions and 143 deletions

View File

@ -993,10 +993,9 @@ cfg_test_util! {
cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sync::Arc;
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, Scheduler};
use crate::runtime::scheduler::{self, multi_thread, MultiThread};
use crate::runtime::scheduler::{self, MultiThread};
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
@ -1007,9 +1006,16 @@ cfg_rt_multi_thread! {
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
let (scheduler, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
@ -1018,20 +1024,12 @@ cfg_rt_multi_thread! {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: self.seed_generator.next_generator(),
seed_generator: seed_generator_1,
},
);
let inner = Arc::new(multi_thread::Handle {
spawner: scheduler.spawner().clone(),
driver: driver_handle,
blocking_spawner,
seed_generator: self.seed_generator.next_generator(),
});
let inner = scheduler::Handle::MultiThread(inner);
// Create the runtime handle
let handle = Handle { inner };
let handle = scheduler::Handle::MultiThread(scheduler.handle().clone());
let handle = Handle { inner: handle };
// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());

View File

@ -83,7 +83,7 @@ cfg_rt! {
Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(h) => h.spawner.spawn(future, id),
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
}
}
@ -92,7 +92,7 @@ cfg_rt! {
Handle::CurrentThread(_) => {},
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(ref h) => h.spawner.shutdown(),
Handle::MultiThread(ref h) => h.shutdown(),
}
}

View File

@ -1,12 +1,18 @@
use crate::runtime::scheduler::multi_thread::Spawner;
use crate::runtime::{blocking, driver};
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread::worker;
use crate::runtime::{
blocking, driver,
task::{self, JoinHandle},
};
use crate::util::RngSeedGenerator;
use std::fmt;
/// Handle to the multi thread scheduler
#[derive(Debug)]
pub(crate) struct Handle {
/// Task spawner
pub(crate) spawner: Spawner,
pub(super) shared: worker::Shared,
/// Resource driver handles
pub(crate) driver: driver::Handle,
@ -18,28 +24,63 @@ pub(crate) struct Handle {
pub(crate) seed_generator: RngSeedGenerator,
}
impl Handle {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id)
}
pub(crate) fn shutdown(&self) {
self.shared.close();
}
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
if let Some(notified) = notified {
me.shared.schedule(notified, false);
}
handle
}
}
cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.spawner.shared.worker_metrics.len()
self.shared.worker_metrics.len()
}
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.spawner.shared.scheduler_metrics
&self.shared.scheduler_metrics
}
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.spawner.shared.worker_metrics[worker]
&self.shared.worker_metrics[worker]
}
pub(crate) fn injection_queue_depth(&self) -> usize {
self.spawner.shared.injection_queue_depth()
self.shared.injection_queue_depth()
}
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.spawner.shared.worker_local_queue_depth(worker)
self.shared.worker_local_queue_depth(worker)
}
}
}
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
}
}

View File

@ -17,42 +17,38 @@ pub(crate) use worker::Launch;
pub(crate) use worker::block_in_place;
use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Config, Driver};
use crate::runtime::{blocking, driver, Config, Driver};
use crate::util::RngSeedGenerator;
use std::fmt;
use std::future::Future;
/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread {
spawner: Spawner,
}
/// Submits futures to the associated thread pool for execution.
///
/// A `Spawner` instance is a handle to a single thread pool that allows the owner
/// of the handle to spawn futures onto the thread pool.
///
/// The `Spawner` handle is *only* used for spawning new futures. It does not
/// impact the lifecycle of the thread pool in any way. The thread pool may
/// shut down while there are outstanding `Spawner` instances.
///
/// `Spawner` instances are obtained by calling [`MultiThread::spawner`].
///
/// [`MultiThread::spawner`]: method@MultiThread::spawner
#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<worker::Shared>,
handle: Arc<Handle>,
}
// ===== impl MultiThread =====
impl MultiThread {
pub(crate) fn new(size: usize, driver: Driver, config: Config) -> (MultiThread, Launch) {
pub(crate) fn new(
size: usize,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(size, parker, config);
let spawner = Spawner { shared };
let multi_thread = MultiThread { spawner };
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
blocking_spawner,
seed_generator,
config,
);
let multi_thread = MultiThread { handle };
(multi_thread, launch)
}
@ -61,8 +57,8 @@ impl MultiThread {
///
/// The `Spawner` handle can be cloned and enables spawning tasks from other
/// threads.
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
pub(crate) fn handle(&self) -> &Arc<Handle> {
&self.handle
}
/// Blocks the current thread waiting for the future to complete.
@ -86,29 +82,6 @@ impl fmt::Debug for MultiThread {
impl Drop for MultiThread {
fn drop(&mut self) {
self.spawner.shutdown();
}
}
// ==== impl Spawner =====
impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
worker::Shared::bind_new_task(&self.shared, future, id)
}
pub(crate) fn shutdown(&self) {
self.shared.close();
}
}
impl fmt::Debug for Spawner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Spawner").finish()
self.handle.shutdown();
}
}

View File

@ -57,23 +57,24 @@
//! leak.
use crate::coop;
use crate::future::Future;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::scheduler::multi_thread::{queue, Idle, Parker, Unparker};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::{task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::runtime::scheduler::multi_thread::{queue, Handle, Idle, Parker, Unparker};
use crate::runtime::task::{Inject, OwnedTasks};
use crate::runtime::{
blocking, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;
use crate::util::{FastRand, RngSeedGenerator};
use std::cell::RefCell;
use std::time::Duration;
/// A scheduler worker
pub(super) struct Worker {
/// Reference to shared state
shared: Arc<Shared>,
/// Reference to scheduler's handle
handle: Arc<Handle>,
/// Index holding this worker's remote state
index: usize,
@ -95,7 +96,7 @@ struct Core {
lifo_slot: Option<Notified>,
/// The worker-local run queue.
run_queue: queue::Local<Arc<Shared>>,
run_queue: queue::Local<Arc<Handle>>,
/// True if the worker is currently searching for more work. Searching
/// involves attempting to steal from other workers.
@ -126,13 +127,13 @@ pub(super) struct Shared {
/// Global task queue used for:
/// 1. Submit work to the scheduler while **not** currently on a worker thread.
/// 2. Submit work to the scheduler when a worker run queue is saturated
inject: Inject<Arc<Shared>>,
inject: Inject<Arc<Handle>>,
/// Coordinates idle workers
idle: Idle,
/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Shared>>,
pub(super) owned: OwnedTasks<Arc<Handle>>,
/// Cores that have observed the shutdown signal
///
@ -153,7 +154,7 @@ pub(super) struct Shared {
/// Used to communicate with a worker from other threads.
struct Remote {
/// Steals tasks from this worker.
steal: queue::Steal<Arc<Shared>>,
steal: queue::Steal<Arc<Handle>>,
/// Unparks the associated worker thread
unpark: Unparker,
@ -177,15 +178,22 @@ pub(crate) struct Launch(Vec<Arc<Worker>>);
type RunResult = Result<Box<Core>, ()>;
/// A task handle
type Task = task::Task<Arc<Shared>>;
type Task = task::Task<Arc<Handle>>;
/// A notified task handle
type Notified = task::Notified<Arc<Shared>>;
type Notified = task::Notified<Arc<Handle>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);
pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc<Shared>, Launch) {
pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
@ -212,28 +220,33 @@ pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc<Shared>,
worker_metrics.push(WorkerMetrics::new());
}
let shared = Arc::new(Shared {
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
shared: shared.clone(),
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(shared, launch)
(handle, launch)
}
pub(crate) fn block_in_place<F, R>(f: F) -> R
@ -390,12 +403,12 @@ impl Context {
core.pre_shutdown(&self.worker);
// Signal shutdown
self.worker.shared.shutdown(core);
self.worker.handle.shared.shutdown(core);
Err(())
}
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
let task = self.worker.shared.owned.assert_owner(task);
let task = self.worker.handle.shared.owned.assert_owner(task);
// Make sure the worker is not in the **searching** state. This enables
// another idle worker to try to steal work.
@ -429,7 +442,7 @@ impl Context {
// Run the LIFO task, then loop
core.metrics.incr_poll_count();
*self.core.borrow_mut() = Some(core);
let task = self.worker.shared.owned.assert_owner(task);
let task = self.worker.handle.shared.owned.assert_owner(task);
task.run();
} else {
// Not enough budget left to run the LIFO task, push it to
@ -443,7 +456,7 @@ impl Context {
}
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % self.worker.shared.config.event_interval == 0 {
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
@ -467,7 +480,7 @@ impl Context {
/// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
/// after all the IOs get dispatched
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.shared.config.before_park {
if let Some(f) = &self.worker.handle.shared.config.before_park {
f();
}
@ -486,7 +499,7 @@ impl Context {
}
}
if let Some(f) = &self.worker.shared.config.after_unpark {
if let Some(f) = &self.worker.handle.shared.config.after_unpark {
f();
}
core
@ -515,7 +528,7 @@ impl Context {
// If there are tasks available to steal, but this worker is not
// looking for tasks to steal, notify another worker.
if !core.is_searching && core.run_queue.is_stealable() {
self.worker.shared.notify_parked();
self.worker.handle.shared.notify_parked();
}
core
@ -530,7 +543,7 @@ impl Core {
/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % worker.shared.config.global_queue_interval == 0 {
if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())
@ -551,7 +564,7 @@ impl Core {
return None;
}
let num = worker.shared.remotes.len();
let num = worker.handle.shared.remotes.len();
// Start from a random worker
let start = self.rand.fastrand_n(num as u32) as usize;
@ -563,7 +576,7 @@ impl Core {
continue;
}
let target = &worker.shared.remotes[i];
let target = &worker.handle.shared.remotes[i];
if let Some(task) = target
.steal
.steal_into(&mut self.run_queue, &mut self.metrics)
@ -573,12 +586,12 @@ impl Core {
}
// Fallback on checking the global queue
worker.shared.inject.pop()
worker.handle.shared.inject.pop()
}
fn transition_to_searching(&mut self, worker: &Worker) -> bool {
if !self.is_searching {
self.is_searching = worker.shared.idle.transition_worker_to_searching();
self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
}
self.is_searching
@ -590,7 +603,7 @@ impl Core {
}
self.is_searching = false;
worker.shared.transition_worker_from_searching();
worker.handle.shared.transition_worker_from_searching();
}
/// Prepares the worker state for parking.
@ -606,6 +619,7 @@ impl Core {
// must check all the queues one last time in case work materialized
// between the last work scan and transitioning out of searching.
let is_last_searcher = worker
.handle
.shared
.idle
.transition_worker_to_parked(worker.index, self.is_searching);
@ -615,7 +629,7 @@ impl Core {
self.is_searching = false;
if is_last_searcher {
worker.shared.notify_if_work_pending();
worker.handle.shared.notify_if_work_pending();
}
true
@ -630,11 +644,11 @@ impl Core {
// state when the wake originates from another worker *or* a new task
// is pushed. We do *not* want the worker to transition to "searching"
// when it wakes when the I/O driver receives new events.
self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = !worker.handle.shared.idle.unpark_worker_by_id(worker.index);
return true;
}
if worker.shared.idle.is_parked(worker.index) {
if worker.handle.shared.idle.is_parked(worker.index) {
return false;
}
@ -646,7 +660,7 @@ impl Core {
/// Runs maintenance work such as checking the pool's state.
fn maintenance(&mut self, worker: &Worker) {
self.metrics
.submit(&worker.shared.worker_metrics[worker.index]);
.submit(&worker.handle.shared.worker_metrics[worker.index]);
if !self.is_shutdown {
// Check if the scheduler has been shutdown
@ -658,10 +672,10 @@ impl Core {
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
worker.shared.owned.close_and_shutdown_all();
worker.handle.shared.owned.close_and_shutdown_all();
self.metrics
.submit(&worker.shared.worker_metrics[worker.index]);
.submit(&worker.handle.shared.worker_metrics[worker.index]);
}
/// Shuts down the core.
@ -678,49 +692,31 @@ impl Core {
impl Worker {
/// Returns a reference to the scheduler's injection queue.
fn inject(&self) -> &Inject<Arc<Shared>> {
&self.shared.inject
fn inject(&self) -> &Inject<Arc<Handle>> {
&self.handle.shared.inject
}
}
impl task::Schedule for Arc<Shared> {
impl task::Schedule for Arc<Handle> {
fn release(&self, task: &Task) -> Option<Task> {
self.owned.remove(task)
self.shared.owned.remove(task)
}
fn schedule(&self, task: Notified) {
(**self).schedule(task, false);
self.shared.schedule(task, false);
}
fn yield_now(&self, task: Notified) {
(**self).schedule(task, true);
self.shared.schedule(task, true);
}
}
impl Shared {
pub(super) fn bind_new_task<T>(
me: &Arc<Self>,
future: T,
id: crate::runtime::task::Id,
) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.owned.bind(future, me.clone(), id);
if let Some(notified) = notified {
me.schedule(notified, false);
}
handle
}
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
// Make sure the task is part of the **current** scheduler.
if self.ptr_eq(&cx.worker.shared) {
if self.ptr_eq(&cx.worker.handle.shared) {
// And the current thread still holds a core
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);