diff --git a/tokio/src/runtime/scheduler/multi_thread/idle.rs b/tokio/src/runtime/scheduler/multi_thread/idle.rs index a57bf6a0b..834bc2b66 100644 --- a/tokio/src/runtime/scheduler/multi_thread/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/idle.rs @@ -1,7 +1,7 @@ //! Coordinates idling workers use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::Mutex; +use crate::runtime::scheduler::multi_thread::Shared; use std::fmt; use std::sync::atomic::Ordering::{self, SeqCst}; @@ -13,13 +13,16 @@ pub(super) struct Idle { /// Used as a fast-path to avoid acquiring the lock when needed. state: AtomicUsize, - /// Sleeping workers - sleepers: Mutex>, - /// Total number of workers. num_workers: usize, } +/// Data synchronized by the scheduler mutex +pub(super) struct Synced { + /// Sleeping workers + sleepers: Vec, +} + const UNPARK_SHIFT: usize = 16; const UNPARK_MASK: usize = !SEARCH_MASK; const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1; @@ -28,19 +31,24 @@ const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1; struct State(usize); impl Idle { - pub(super) fn new(num_workers: usize) -> Idle { + pub(super) fn new(num_workers: usize) -> (Idle, Synced) { let init = State::new(num_workers); - Idle { + let idle = Idle { state: AtomicUsize::new(init.into()), - sleepers: Mutex::new(Vec::with_capacity(num_workers)), num_workers, - } + }; + + let synced = Synced { + sleepers: Vec::with_capacity(num_workers), + }; + + (idle, synced) } /// If there are no workers actively searching, returns the index of a /// worker currently sleeping. - pub(super) fn worker_to_notify(&self) -> Option { + pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option { // If at least one worker is spinning, work being notified will // eventually be found. A searching thread will find **some** work and // notify another worker, eventually leading to our work being found. @@ -55,7 +63,7 @@ impl Idle { } // Acquire the lock - let mut sleepers = self.sleepers.lock(); + let mut lock = shared.synced.lock(); // Check again, now that the lock is acquired if !self.notify_should_wakeup() { @@ -67,7 +75,7 @@ impl Idle { State::unpark_one(&self.state, 1); // Get the worker to unpark - let ret = sleepers.pop(); + let ret = lock.idle.sleepers.pop(); debug_assert!(ret.is_some()); ret @@ -75,15 +83,20 @@ impl Idle { /// Returns `true` if the worker needs to do a final check for submitted /// work. - pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool { + pub(super) fn transition_worker_to_parked( + &self, + shared: &Shared, + worker: usize, + is_searching: bool, + ) -> bool { // Acquire the lock - let mut sleepers = self.sleepers.lock(); + let mut lock = shared.synced.lock(); // Decrement the number of unparked threads let ret = State::dec_num_unparked(&self.state, is_searching); // Track the sleeping worker - sleepers.push(worker); + lock.idle.sleepers.push(worker); ret } @@ -113,8 +126,9 @@ impl Idle { /// within the worker's park routine. /// /// Returns `true` if the worker was parked before calling the method. - pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool { - let mut sleepers = self.sleepers.lock(); + pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool { + let mut lock = shared.synced.lock(); + let sleepers = &mut lock.idle.sleepers; for index in 0..sleepers.len() { if sleepers[index] == worker_id { @@ -131,9 +145,9 @@ impl Idle { } /// Returns `true` if `worker_id` is contained in the sleep set. - pub(super) fn is_parked(&self, worker_id: usize) -> bool { - let sleepers = self.sleepers.lock(); - sleepers.contains(&worker_id) + pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool { + let lock = shared.synced.lock(); + lock.idle.sleepers.contains(&worker_id) } fn notify_should_wakeup(&self) -> bool { diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 67a7890a1..de39a93fd 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -18,6 +18,7 @@ pub(crate) use park::{Parker, Unparker}; pub(crate) mod queue; mod worker; +use worker::Shared; pub(crate) use worker::{Context, Launch}; pub(crate) use worker::block_in_place; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 02d7fce65..5cc5bd1dc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -60,7 +60,7 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::context; use crate::runtime::scheduler::multi_thread::{ - queue, Counters, Handle, Idle, Parker, Stats, Unparker, + idle, queue, Counters, Handle, Idle, Parker, Stats, Unparker, }; use crate::runtime::task::{Inject, OwnedTasks}; use crate::runtime::{ @@ -143,6 +143,9 @@ pub(super) struct Shared { /// Collection of all active tasks spawned onto this executor. pub(super) owned: OwnedTasks>, + /// Data synchronized by the scheduler mutex + pub(super) synced: Mutex, + /// Cores that have observed the shutdown signal /// /// The core is **not** placed back in the worker to avoid it from being @@ -165,6 +168,11 @@ pub(super) struct Shared { _counters: Counters, } +/// Data synchronized by the scheduler mutex +pub(super) struct Synced { + pub(super) idle: idle::Synced, +} + /// Used to communicate with a worker from other threads. struct Remote { /// Steals tasks from this worker. @@ -241,12 +249,15 @@ pub(super) fn create( worker_metrics.push(metrics); } + let (idle, idle_synced) = Idle::new(size); + let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), inject: Inject::new(), - idle: Idle::new(size), + idle, owned: OwnedTasks::new(), + synced: Mutex::new(Synced { idle: idle_synced }), shutdown_cores: Mutex::new(vec![]), config, scheduler_metrics: SchedulerMetrics::new(), @@ -793,11 +804,11 @@ impl Core { // When the final worker transitions **out** of searching to parked, it // must check all the queues one last time in case work materialized // between the last work scan and transitioning out of searching. - let is_last_searcher = worker - .handle - .shared - .idle - .transition_worker_to_parked(worker.index, self.is_searching); + let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked( + &worker.handle.shared, + worker.index, + self.is_searching, + ); // The worker is no longer searching. Setting this is the local cache // only. @@ -819,11 +830,20 @@ impl Core { // state when the wake originates from another worker *or* a new task // is pushed. We do *not* want the worker to transition to "searching" // when it wakes when the I/O driver receives new events. - self.is_searching = !worker.handle.shared.idle.unpark_worker_by_id(worker.index); + self.is_searching = !worker + .handle + .shared + .idle + .unpark_worker_by_id(&worker.handle.shared, worker.index); return true; } - if worker.handle.shared.idle.is_parked(worker.index) { + if worker + .handle + .shared + .idle + .is_parked(&worker.handle.shared, worker.index) + { return false; } @@ -964,14 +984,14 @@ impl Handle { fn notify_parked_local(&self) { super::counters::inc_num_inc_notify_local(); - if let Some(index) = self.shared.idle.worker_to_notify() { + if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { super::counters::inc_num_unparks_local(); self.shared.remotes[index].unpark.unpark(&self.driver); } } fn notify_parked_remote(&self) { - if let Some(index) = self.shared.idle.worker_to_notify() { + if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) { self.shared.remotes[index].unpark.unpark(&self.driver); } }