diff --git a/tokio/src/runtime/scheduler/multi_thread/idle.rs b/tokio/src/runtime/scheduler/multi_thread/idle.rs index 08951fd8d..440f315e3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/idle.rs @@ -13,6 +13,9 @@ pub(super) struct Idle { /// Number of idle cores num_idle: AtomicUsize, + /// Map of idle cores + idle_map: IdleMap, + /// Used to catch false-negatives when waking workers needs_searching: AtomicBool, @@ -20,6 +23,14 @@ pub(super) struct Idle { num_cores: usize, } +pub(super) struct IdleMap { + chunks: Vec, +} + +pub(super) struct Snapshot { + chunks: Vec, +} + /// Data synchronized by the scheduler mutex pub(super) struct Synced { /// Worker IDs that are currently sleeping @@ -34,6 +45,7 @@ impl Idle { let idle = Idle { num_searching: AtomicUsize::new(0), num_idle: AtomicUsize::new(cores.len()), + idle_map: IdleMap::new(&cores), needs_searching: AtomicBool::new(false), num_cores: cores.len(), }; @@ -51,15 +63,22 @@ impl Idle { synced.available_cores.len() } + pub(super) fn is_idle(&self, index: usize) -> bool { + self.idle_map.get(index) + } + /// Try to acquire an available core pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option> { let ret = synced.available_cores.pop(); - if ret.is_some() { + if let Some(core) = &ret { // Decrement the number of idle cores let num_idle = self.num_idle.load(Acquire) - 1; debug_assert_eq!(num_idle, synced.available_cores.len()); self.num_idle.store(num_idle, Release); + + self.idle_map.unset(core.index); + debug_assert!(self.idle_map.matches(&synced.available_cores)); } ret @@ -110,6 +129,9 @@ impl Idle { debug_assert!(!core.is_searching); core.is_searching = is_searching; + self.idle_map.unset(core.index); + debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); + // Assign the core to the worker synced.assigned_cores[worker] = Some(core); @@ -156,6 +178,8 @@ impl Idle { if let Some(core) = synced.idle.available_cores.pop() { debug_assert!(!core.is_searching); + self.idle_map.unset(core.index); + synced.assigned_cores[worker] = Some(core); workers.push(worker); @@ -170,6 +194,7 @@ impl Idle { } if !workers.is_empty() { + debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); let num_idle = synced.idle.available_cores.len(); self.num_idle.store(num_idle, Release); } else { @@ -195,6 +220,8 @@ impl Idle { let worker = synced.idle.sleepers.pop().unwrap(); let core = synced.idle.available_cores.pop().unwrap(); + self.idle_map.unset(core.index); + synced.assigned_cores[worker] = Some(core); shared.condvars[worker].notify_one(); @@ -204,6 +231,8 @@ impl Idle { .store(synced.idle.available_cores.len(), Release); } + debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); + // Wake up any other workers while let Some(index) = synced.idle.sleepers.pop() { shared.condvars[index].notify_one(); @@ -224,9 +253,13 @@ impl Idle { let num_idle = synced.idle.available_cores.len(); debug_assert_eq!(num_idle, self.num_idle.load(Acquire)); + self.idle_map.set(core.index); + // Store the core in the list of available cores synced.idle.available_cores.push(core); + debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); + // Update `num_idle` self.num_idle.store(num_idle + 1, Release); } @@ -288,6 +321,84 @@ impl Idle { } } +const BITS: usize = usize::BITS as usize; +const BIT_MASK: usize = (usize::BITS - 1) as usize; + +impl IdleMap { + fn new(cores: &[Box]) -> IdleMap { + let chunks = (0..num_chunks(cores.len())) + .map(|_| AtomicUsize::new(0)) + .collect(); + let ret = IdleMap { chunks }; + + for core in cores { + ret.set(core.index); + } + + ret + } + + fn get(&self, index: usize) -> bool { + let (chunk, mask) = index_to_mask(index); + self.chunks[chunk].load(Acquire) & mask == mask + } + + fn set(&self, index: usize) { + let (chunk, mask) = index_to_mask(index); + let prev = self.chunks[chunk].load(Acquire); + let next = prev | mask; + self.chunks[chunk].store(next, Release); + } + + fn unset(&self, index: usize) { + let (chunk, mask) = index_to_mask(index); + let prev = self.chunks[chunk].load(Acquire); + let next = prev & !mask; + self.chunks[chunk].store(next, Release); + } + + fn matches(&self, idle_cores: &[Box]) -> bool { + let expect = IdleMap::new(idle_cores); + for (i, chunk) in expect.chunks.iter().enumerate() { + if chunk.load(Acquire) != self.chunks[i].load(Acquire) { + return false; + } + } + true + } +} + +impl Snapshot { + pub(crate) fn new(idle: &Idle) -> Snapshot { + let chunks = vec![0; num_chunks(idle.idle_map.chunks.len())]; + let mut ret = Snapshot { chunks }; + ret.update(&idle.idle_map); + ret + } + + fn update(&mut self, idle_map: &IdleMap) { + for i in 0..self.chunks.len() { + self.chunks[i] = idle_map.chunks[i].load(Acquire); + } + } + + fn get(&self, index: usize) -> bool { + let (chunk, mask) = index_to_mask(index); + self.chunks[chunk] & mask == mask + } +} + +fn num_chunks(max_cores: usize) -> usize { + (max_cores / BITS) + 1 +} + +fn index_to_mask(index: usize) -> (usize, usize) { + let mask = 1 << (index & BIT_MASK); + let chunk = index / BITS; + + (chunk, mask) +} + fn num_active_workers(synced: &Synced) -> usize { synced.available_cores.capacity() - synced.available_cores.len() } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 4b949378f..ab02a66ff 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -97,12 +97,15 @@ pub(super) struct Worker { /// Used to collect a list of workers to notify workers_to_notify: Vec, + + /// Snapshot of idle core list. This helps speedup stealing + idle_snapshot: idle::Snapshot, } /// Core data pub(super) struct Core { /// Index holding this core's remote/shared state. - index: usize, + pub(super) index: usize, /// Used to schedule bookkeeping tasks every so often. tick: u32, @@ -340,6 +343,7 @@ pub(super) fn create( handle: handle.clone(), index, workers_to_notify: Vec::with_capacity(num_workers - 1), + idle_snapshot: idle::Snapshot::new(&handle.shared.idle), }; handle @@ -823,8 +827,19 @@ impl Worker { continue; } + // If the core is currently idle, then there is nothing to steal. + if cx.shared().idle.is_idle(i) { + continue; + } + let target = &cx.shared().remotes[i]; + if lifo { + if let Some(task) = target.lifo_slot.take_remote() { + return Some(task); + } + } + if let Some(task) = target .steal .steal_into(&mut core.run_queue, &mut core.stats) @@ -1061,7 +1076,6 @@ impl Worker { } else { Ok((None, core)) } - } fn park(&mut self, cx: &Context, mut core: Box) -> NextTaskResult {