mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
rt: start work to unify MT scheduler mutexes (#5747)
In order to reduce the number of mutex operations in the multi-threaded scheduler hot path, we need to unify the various mutexes into a single one. To start this work, this commit splits up `Idle` into `Idle` and `Synced`. The `Synced` component is stored separately in the scheduler's `Shared` structure.
This commit is contained in:
parent
79a7e78c0d
commit
a96dab1089
@ -1,7 +1,7 @@
|
||||
//! Coordinates idling workers
|
||||
|
||||
use crate::loom::sync::atomic::AtomicUsize;
|
||||
use crate::loom::sync::Mutex;
|
||||
use crate::runtime::scheduler::multi_thread::Shared;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::atomic::Ordering::{self, SeqCst};
|
||||
@ -13,13 +13,16 @@ pub(super) struct Idle {
|
||||
/// Used as a fast-path to avoid acquiring the lock when needed.
|
||||
state: AtomicUsize,
|
||||
|
||||
/// Sleeping workers
|
||||
sleepers: Mutex<Vec<usize>>,
|
||||
|
||||
/// Total number of workers.
|
||||
num_workers: usize,
|
||||
}
|
||||
|
||||
/// Data synchronized by the scheduler mutex
|
||||
pub(super) struct Synced {
|
||||
/// Sleeping workers
|
||||
sleepers: Vec<usize>,
|
||||
}
|
||||
|
||||
const UNPARK_SHIFT: usize = 16;
|
||||
const UNPARK_MASK: usize = !SEARCH_MASK;
|
||||
const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
|
||||
@ -28,19 +31,24 @@ const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
|
||||
struct State(usize);
|
||||
|
||||
impl Idle {
|
||||
pub(super) fn new(num_workers: usize) -> Idle {
|
||||
pub(super) fn new(num_workers: usize) -> (Idle, Synced) {
|
||||
let init = State::new(num_workers);
|
||||
|
||||
Idle {
|
||||
let idle = Idle {
|
||||
state: AtomicUsize::new(init.into()),
|
||||
sleepers: Mutex::new(Vec::with_capacity(num_workers)),
|
||||
num_workers,
|
||||
}
|
||||
};
|
||||
|
||||
let synced = Synced {
|
||||
sleepers: Vec::with_capacity(num_workers),
|
||||
};
|
||||
|
||||
(idle, synced)
|
||||
}
|
||||
|
||||
/// If there are no workers actively searching, returns the index of a
|
||||
/// worker currently sleeping.
|
||||
pub(super) fn worker_to_notify(&self) -> Option<usize> {
|
||||
pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
|
||||
// If at least one worker is spinning, work being notified will
|
||||
// eventually be found. A searching thread will find **some** work and
|
||||
// notify another worker, eventually leading to our work being found.
|
||||
@ -55,7 +63,7 @@ impl Idle {
|
||||
}
|
||||
|
||||
// Acquire the lock
|
||||
let mut sleepers = self.sleepers.lock();
|
||||
let mut lock = shared.synced.lock();
|
||||
|
||||
// Check again, now that the lock is acquired
|
||||
if !self.notify_should_wakeup() {
|
||||
@ -67,7 +75,7 @@ impl Idle {
|
||||
State::unpark_one(&self.state, 1);
|
||||
|
||||
// Get the worker to unpark
|
||||
let ret = sleepers.pop();
|
||||
let ret = lock.idle.sleepers.pop();
|
||||
debug_assert!(ret.is_some());
|
||||
|
||||
ret
|
||||
@ -75,15 +83,20 @@ impl Idle {
|
||||
|
||||
/// Returns `true` if the worker needs to do a final check for submitted
|
||||
/// work.
|
||||
pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
|
||||
pub(super) fn transition_worker_to_parked(
|
||||
&self,
|
||||
shared: &Shared,
|
||||
worker: usize,
|
||||
is_searching: bool,
|
||||
) -> bool {
|
||||
// Acquire the lock
|
||||
let mut sleepers = self.sleepers.lock();
|
||||
let mut lock = shared.synced.lock();
|
||||
|
||||
// Decrement the number of unparked threads
|
||||
let ret = State::dec_num_unparked(&self.state, is_searching);
|
||||
|
||||
// Track the sleeping worker
|
||||
sleepers.push(worker);
|
||||
lock.idle.sleepers.push(worker);
|
||||
|
||||
ret
|
||||
}
|
||||
@ -113,8 +126,9 @@ impl Idle {
|
||||
/// within the worker's park routine.
|
||||
///
|
||||
/// Returns `true` if the worker was parked before calling the method.
|
||||
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool {
|
||||
let mut sleepers = self.sleepers.lock();
|
||||
pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
|
||||
let mut lock = shared.synced.lock();
|
||||
let sleepers = &mut lock.idle.sleepers;
|
||||
|
||||
for index in 0..sleepers.len() {
|
||||
if sleepers[index] == worker_id {
|
||||
@ -131,9 +145,9 @@ impl Idle {
|
||||
}
|
||||
|
||||
/// Returns `true` if `worker_id` is contained in the sleep set.
|
||||
pub(super) fn is_parked(&self, worker_id: usize) -> bool {
|
||||
let sleepers = self.sleepers.lock();
|
||||
sleepers.contains(&worker_id)
|
||||
pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool {
|
||||
let lock = shared.synced.lock();
|
||||
lock.idle.sleepers.contains(&worker_id)
|
||||
}
|
||||
|
||||
fn notify_should_wakeup(&self) -> bool {
|
||||
|
@ -18,6 +18,7 @@ pub(crate) use park::{Parker, Unparker};
|
||||
pub(crate) mod queue;
|
||||
|
||||
mod worker;
|
||||
use worker::Shared;
|
||||
pub(crate) use worker::{Context, Launch};
|
||||
|
||||
pub(crate) use worker::block_in_place;
|
||||
|
@ -60,7 +60,7 @@ use crate::loom::sync::{Arc, Mutex};
|
||||
use crate::runtime;
|
||||
use crate::runtime::context;
|
||||
use crate::runtime::scheduler::multi_thread::{
|
||||
queue, Counters, Handle, Idle, Parker, Stats, Unparker,
|
||||
idle, queue, Counters, Handle, Idle, Parker, Stats, Unparker,
|
||||
};
|
||||
use crate::runtime::task::{Inject, OwnedTasks};
|
||||
use crate::runtime::{
|
||||
@ -143,6 +143,9 @@ pub(super) struct Shared {
|
||||
/// Collection of all active tasks spawned onto this executor.
|
||||
pub(super) owned: OwnedTasks<Arc<Handle>>,
|
||||
|
||||
/// Data synchronized by the scheduler mutex
|
||||
pub(super) synced: Mutex<Synced>,
|
||||
|
||||
/// Cores that have observed the shutdown signal
|
||||
///
|
||||
/// The core is **not** placed back in the worker to avoid it from being
|
||||
@ -165,6 +168,11 @@ pub(super) struct Shared {
|
||||
_counters: Counters,
|
||||
}
|
||||
|
||||
/// Data synchronized by the scheduler mutex
|
||||
pub(super) struct Synced {
|
||||
pub(super) idle: idle::Synced,
|
||||
}
|
||||
|
||||
/// Used to communicate with a worker from other threads.
|
||||
struct Remote {
|
||||
/// Steals tasks from this worker.
|
||||
@ -241,12 +249,15 @@ pub(super) fn create(
|
||||
worker_metrics.push(metrics);
|
||||
}
|
||||
|
||||
let (idle, idle_synced) = Idle::new(size);
|
||||
|
||||
let handle = Arc::new(Handle {
|
||||
shared: Shared {
|
||||
remotes: remotes.into_boxed_slice(),
|
||||
inject: Inject::new(),
|
||||
idle: Idle::new(size),
|
||||
idle,
|
||||
owned: OwnedTasks::new(),
|
||||
synced: Mutex::new(Synced { idle: idle_synced }),
|
||||
shutdown_cores: Mutex::new(vec![]),
|
||||
config,
|
||||
scheduler_metrics: SchedulerMetrics::new(),
|
||||
@ -793,11 +804,11 @@ impl Core {
|
||||
// When the final worker transitions **out** of searching to parked, it
|
||||
// must check all the queues one last time in case work materialized
|
||||
// between the last work scan and transitioning out of searching.
|
||||
let is_last_searcher = worker
|
||||
.handle
|
||||
.shared
|
||||
.idle
|
||||
.transition_worker_to_parked(worker.index, self.is_searching);
|
||||
let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
|
||||
&worker.handle.shared,
|
||||
worker.index,
|
||||
self.is_searching,
|
||||
);
|
||||
|
||||
// The worker is no longer searching. Setting this is the local cache
|
||||
// only.
|
||||
@ -819,11 +830,20 @@ impl Core {
|
||||
// state when the wake originates from another worker *or* a new task
|
||||
// is pushed. We do *not* want the worker to transition to "searching"
|
||||
// when it wakes when the I/O driver receives new events.
|
||||
self.is_searching = !worker.handle.shared.idle.unpark_worker_by_id(worker.index);
|
||||
self.is_searching = !worker
|
||||
.handle
|
||||
.shared
|
||||
.idle
|
||||
.unpark_worker_by_id(&worker.handle.shared, worker.index);
|
||||
return true;
|
||||
}
|
||||
|
||||
if worker.handle.shared.idle.is_parked(worker.index) {
|
||||
if worker
|
||||
.handle
|
||||
.shared
|
||||
.idle
|
||||
.is_parked(&worker.handle.shared, worker.index)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -964,14 +984,14 @@ impl Handle {
|
||||
fn notify_parked_local(&self) {
|
||||
super::counters::inc_num_inc_notify_local();
|
||||
|
||||
if let Some(index) = self.shared.idle.worker_to_notify() {
|
||||
if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
|
||||
super::counters::inc_num_unparks_local();
|
||||
self.shared.remotes[index].unpark.unpark(&self.driver);
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_parked_remote(&self) {
|
||||
if let Some(index) = self.shared.idle.worker_to_notify() {
|
||||
if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
|
||||
self.shared.remotes[index].unpark.unpark(&self.driver);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user