diff --git a/tokio/src/runtime/scheduler/multi_thread/counters.rs b/tokio/src/runtime/scheduler/multi_thread/counters.rs index 8a74882ef..276894b44 100644 --- a/tokio/src/runtime/scheduler/multi_thread/counters.rs +++ b/tokio/src/runtime/scheduler/multi_thread/counters.rs @@ -16,6 +16,9 @@ mod imp { static NUM_POLLS: AtomicUsize = AtomicUsize::new(0); static NUM_LIFO_POLLS: AtomicUsize = AtomicUsize::new(0); static NUM_REMOTE_BATCH: AtomicUsize = AtomicUsize::new(0); + static NUM_GLOBAL_QUEUE_INTERVAL: AtomicUsize = AtomicUsize::new(0); + static NUM_NO_AVAIL_CORE: AtomicUsize = AtomicUsize::new(0); + static NUM_RELAY_SEARCH: AtomicUsize = AtomicUsize::new(0); impl Drop for super::Counters { fn drop(&mut self) { @@ -32,12 +35,16 @@ mod imp { let num_polls = NUM_POLLS.load(Relaxed); let num_lifo_polls = NUM_LIFO_POLLS.load(Relaxed); let num_remote_batch = NUM_REMOTE_BATCH.load(Relaxed); + let num_global_queue_interval = NUM_GLOBAL_QUEUE_INTERVAL.load(Relaxed); + let num_no_avail_core = NUM_NO_AVAIL_CORE.load(Relaxed); + let num_relay_search = NUM_RELAY_SEARCH.load(Relaxed); println!("---"); println!("notifies (remote): {}", notifies_remote); println!(" notifies (local): {}", notifies_local); println!(" unparks (local): {}", unparks_local); println!(" unparks (remote): {}", unparks_remote); + println!(" notify, no core: {}", num_no_avail_core); println!(" maintenance: {}", maintenance); println!(" LIFO schedules: {}", lifo_scheds); println!(" LIFO capped: {}", lifo_capped); @@ -47,6 +54,8 @@ mod imp { println!(" polls: {}", num_polls); println!(" polls (LIFO): {}", num_lifo_polls); println!("remote task batch: {}", num_remote_batch); + println!("global Q interval: {}", num_global_queue_interval); + println!(" relay search: {}", num_relay_search); } } @@ -101,6 +110,18 @@ mod imp { pub(crate) fn inc_num_remote_batch() { NUM_REMOTE_BATCH.fetch_add(1, Relaxed); } + + pub(crate) fn inc_global_queue_interval() { + NUM_GLOBAL_QUEUE_INTERVAL.fetch_add(1, Relaxed); + } + + pub(crate) fn inc_notify_no_core() { + NUM_NO_AVAIL_CORE.fetch_add(1, Relaxed); + } + + pub(crate) fn inc_num_relay_search() { + NUM_RELAY_SEARCH.fetch_add(1, Relaxed); + } } #[cfg(not(tokio_internal_mt_counters))] @@ -118,6 +139,9 @@ mod imp { pub(crate) fn inc_num_polls() {} pub(crate) fn inc_num_lifo_polls() {} pub(crate) fn inc_num_remote_batch() {} + pub(crate) fn inc_global_queue_interval() {} + pub(crate) fn inc_notify_no_core() {} + pub(crate) fn inc_num_relay_search() {} } #[derive(Debug)] diff --git a/tokio/src/runtime/scheduler/multi_thread/idle.rs b/tokio/src/runtime/scheduler/multi_thread/idle.rs index 664878998..aa3bc76ae 100644 --- a/tokio/src/runtime/scheduler/multi_thread/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/idle.rs @@ -63,6 +63,10 @@ impl Idle { synced.available_cores.len() } + pub(super) fn num_searching(&self) -> usize { + self.num_searching.load(Acquire) + } + pub(super) fn is_idle(&self, index: usize) -> bool { self.idle_map.get(index) } @@ -174,6 +178,8 @@ impl Idle { } } + super::counters::inc_notify_no_core(); + // Set the `needs_searching` flag, this happens *while* the lock is held. self.needs_searching.store(true, Release); self.num_searching.fetch_sub(1, Release); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8a0b206da..dc0bd98b1 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -659,7 +659,7 @@ impl Worker { } let n = core.run_queue.max_capacity() / 2; - let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, n); + let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n); Ok((maybe_task, core)) } @@ -703,11 +703,11 @@ impl Worker { // We consumed all work in the queues and will start searching for work. core.stats.end_processing_scheduled_tasks(&mut self.stats); - core = try_task_new_batch!(self, self.poll_driver(cx, core)); - while !self.is_shutdown { - // Try to steal a task from other workers - core = try_task_new_batch!(self, self.steal_work(cx, core)); + // Search for more work, this involves trying to poll the resource + // driver, steal from other workers, and check the global queue + // again. + core = try_task_new_batch!(self, self.search_for_work(cx, core)); if !cx.defer.borrow().is_empty() { core = try_task_new_batch!(self, self.park_yield(cx, core)); @@ -726,6 +726,8 @@ impl Worker { self.num_seq_local_queue_polls += 1; if self.num_seq_local_queue_polls % self.global_queue_interval == 0 { + super::counters::inc_global_queue_interval(); + self.num_seq_local_queue_polls = 0; // Update the global queue interval, if needed @@ -740,22 +742,7 @@ impl Worker { return Ok((Some(task), core)); } - if cx.shared().inject.is_empty() { - return Ok((None, core)); - } - - // Other threads can only **remove** tasks from the current worker's - // `run_queue`. So, we can be confident that by the time we call - // `run_queue.push_back` below, there will be *at least* `cap` - // available slots in the queue. - let cap = usize::min( - core.run_queue.remaining_slots(), - core.run_queue.max_capacity() / 2, - ); - - let mut synced = cx.shared().synced.lock(); - let maybe_task = self.next_remote_task_batch(cx, &mut synced, &mut core, cap); - Ok((maybe_task, core)) + self.next_remote_task_batch(cx, core) } fn next_remote_task(&self, cx: &Context) -> Option { @@ -772,7 +759,26 @@ impl Worker { unsafe { cx.shared().inject.pop(&mut synced.inject) } } - fn next_remote_task_batch( + fn next_remote_task_batch(&self, cx: &Context, mut core: Box) -> NextTaskResult { + if cx.shared().inject.is_empty() { + return Ok((None, core)); + } + + // Other threads can only **remove** tasks from the current worker's + // `run_queue`. So, we can be confident that by the time we call + // `run_queue.push_back` below, there will be *at least* `cap` + // available slots in the queue. + let cap = usize::min( + core.run_queue.remaining_slots(), + core.run_queue.max_capacity() / 2, + ); + + let mut synced = cx.shared().synced.lock(); + let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap); + Ok((maybe_task, core)) + } + + fn next_remote_task_batch_synced( &self, cx: &Context, synced: &mut Synced, @@ -784,10 +790,13 @@ impl Worker { // The worker is currently idle, pull a batch of work from the // injection queue. We don't want to pull *all* the work so other // workers can also get some. - let n = usize::min( - cx.shared().inject.len() / cx.shared().remotes.len() + 1, - max, - ); + let n = if core.is_searching { + cx.shared().inject.len() / cx.shared().idle.num_searching() + 1 + } else { + cx.shared().inject.len() / cx.shared().remotes.len() + 1 + }; + + let n = usize::min(n, max); // safety: passing in the correct `inject::Synced`. let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) }; @@ -821,9 +830,9 @@ impl Worker { /// Note: Only if less than half the workers are searching for tasks to steal /// a new worker will actually try to steal. The idea is to make sure not all /// workers will be trying to steal at the same time. - fn steal_work(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { + fn search_for_work(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { #[cfg(not(loom))] - const ROUNDS: usize = 1; + const ROUNDS: usize = 4; #[cfg(loom)] const ROUNDS: usize = 1; @@ -835,6 +844,8 @@ impl Worker { return Ok((None, core)); } + core = try_task_new_batch!(self, self.poll_driver(cx, core)); + // Get a snapshot of which workers are idle cx.shared().idle.snapshot(&mut self.idle_snapshot); @@ -849,6 +860,10 @@ impl Worker { if let Some(task) = self.steal_one_round(cx, &mut core, start, last) { return Ok((Some(task), core)); } + + std::thread::sleep(std::time::Duration::from_micros(3)); + + core = try_task_new_batch!(self, self.next_remote_task_batch(cx, core)); } Ok((None, core)) @@ -903,6 +918,7 @@ impl Worker { // Make sure the worker is not in the **searching** state. This enables // another idle worker to try to steal work. if self.transition_from_searching(cx, &mut core) { + super::counters::inc_num_relay_search(); cx.shared().notify_parked_local(); } @@ -1162,7 +1178,7 @@ impl Worker { // Try one last time to get tasks let n = core.run_queue.max_capacity() / 2; - if let Some(task) = self.next_remote_task_batch(cx, &mut synced, &mut core, n) { + if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) { return Ok((Some(task), core)); }