This commit is contained in:
Carl Lerche 2023-06-07 15:37:58 -07:00
parent 1c8d22c18b
commit 5b4225b13c
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
3 changed files with 441 additions and 127 deletions

View File

@ -43,7 +43,8 @@ impl Handle {
} }
pub(crate) fn shutdown(&self) { pub(crate) fn shutdown(&self) {
self.close(); // self.close();
todo!()
} }
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output> pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
@ -54,7 +55,8 @@ impl Handle {
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
if let Some(notified) = notified { if let Some(notified) = notified {
me.schedule_task(notified, false); // me.schedule_task(notified, false);
todo!()
} }
handle handle

View File

@ -1,20 +1,20 @@
//! Coordinates idling workers //! Coordinates idling workers
use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::runtime::scheduler::multi_thread::Shared; use crate::runtime::scheduler::multi_thread::Shared;
use std::fmt; use std::fmt;
use std::sync::atomic::Ordering::{self, SeqCst}; use std::sync::atomic::Ordering::{self, SeqCst};
pub(super) struct Idle { pub(super) struct Idle {
/// Tracks both the number of searching workers and the number of unparked /// Number of searching workers
/// workers. num_searching: AtomicUsize,
///
/// Used as a fast-path to avoid acquiring the lock when needed.
state: AtomicUsize,
/// Total number of workers. /// Number of sleeping workers
num_workers: usize, num_sleeping: AtomicUsize,
/// Used to catch false-negatives when waking workers
needs_searching: AtomicBool,
} }
/// Data synchronized by the scheduler mutex /// Data synchronized by the scheduler mutex
@ -23,15 +23,9 @@ pub(super) struct Synced {
sleepers: Vec<usize>, sleepers: Vec<usize>,
} }
const UNPARK_SHIFT: usize = 16;
const UNPARK_MASK: usize = !SEARCH_MASK;
const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
#[derive(Copy, Clone)]
struct State(usize);
impl Idle { impl Idle {
pub(super) fn new(num_workers: usize) -> (Idle, Synced) { pub(super) fn new(num_workers: usize) -> (Idle, Synced) {
/*
let init = State::new(num_workers); let init = State::new(num_workers);
let idle = Idle { let idle = Idle {
@ -44,11 +38,14 @@ impl Idle {
}; };
(idle, synced) (idle, synced)
*/
todo!()
} }
/// If there are no workers actively searching, returns the index of a /// If there are no workers actively searching, returns the index of a
/// worker currently sleeping. /// worker currently sleeping.
pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> { pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
/*
// If at least one worker is spinning, work being notified will // If at least one worker is spinning, work being notified will
// eventually be found. A searching thread will find **some** work and // eventually be found. A searching thread will find **some** work and
// notify another worker, eventually leading to our work being found. // notify another worker, eventually leading to our work being found.
@ -79,6 +76,8 @@ impl Idle {
debug_assert!(ret.is_some()); debug_assert!(ret.is_some());
ret ret
*/
todo!()
} }
/// Returns `true` if the worker needs to do a final check for submitted /// Returns `true` if the worker needs to do a final check for submitted
@ -89,6 +88,7 @@ impl Idle {
worker: usize, worker: usize,
is_searching: bool, is_searching: bool,
) -> bool { ) -> bool {
/*
// Acquire the lock // Acquire the lock
let mut lock = shared.synced.lock(); let mut lock = shared.synced.lock();
@ -99,9 +99,12 @@ impl Idle {
lock.idle.sleepers.push(worker); lock.idle.sleepers.push(worker);
ret ret
*/
todo!()
} }
pub(super) fn transition_worker_to_searching(&self) -> bool { pub(super) fn transition_worker_to_searching(&self) -> bool {
/*
let state = State::load(&self.state, SeqCst); let state = State::load(&self.state, SeqCst);
if 2 * state.num_searching() >= self.num_workers { if 2 * state.num_searching() >= self.num_workers {
return false; return false;
@ -112,6 +115,8 @@ impl Idle {
// prevent too much contention. // prevent too much contention.
State::inc_num_searching(&self.state, SeqCst); State::inc_num_searching(&self.state, SeqCst);
true true
*/
todo!()
} }
/// A lightweight transition from searching -> running. /// A lightweight transition from searching -> running.
@ -119,7 +124,8 @@ impl Idle {
/// Returns `true` if this is the final searching worker. The caller /// Returns `true` if this is the final searching worker. The caller
/// **must** notify a new worker. /// **must** notify a new worker.
pub(super) fn transition_worker_from_searching(&self) -> bool { pub(super) fn transition_worker_from_searching(&self) -> bool {
State::dec_num_searching(&self.state) // State::dec_num_searching(&self.state)
todo!()
} }
/// Unpark a specific worker. This happens if tasks are submitted from /// Unpark a specific worker. This happens if tasks are submitted from
@ -127,6 +133,7 @@ impl Idle {
/// ///
/// Returns `true` if the worker was parked before calling the method. /// Returns `true` if the worker was parked before calling the method.
pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool { pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
/*
let mut lock = shared.synced.lock(); let mut lock = shared.synced.lock();
let sleepers = &mut lock.idle.sleepers; let sleepers = &mut lock.idle.sleepers;
@ -142,6 +149,8 @@ impl Idle {
} }
false false
*/
todo!()
} }
/// Returns `true` if `worker_id` is contained in the sleep set. /// Returns `true` if `worker_id` is contained in the sleep set.
@ -150,91 +159,11 @@ impl Idle {
lock.idle.sleepers.contains(&worker_id) lock.idle.sleepers.contains(&worker_id)
} }
/*
fn notify_should_wakeup(&self) -> bool { fn notify_should_wakeup(&self) -> bool {
let state = State(self.state.fetch_add(0, SeqCst)); let state = State(self.state.fetch_add(0, SeqCst));
state.num_searching() == 0 && state.num_unparked() < self.num_workers state.num_searching() == 0 && state.num_unparked() < self.num_workers
} }
*/
} }
impl State {
fn new(num_workers: usize) -> State {
// All workers start in the unparked state
let ret = State(num_workers << UNPARK_SHIFT);
debug_assert_eq!(num_workers, ret.num_unparked());
debug_assert_eq!(0, ret.num_searching());
ret
}
fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
State(cell.load(ordering))
}
fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
}
fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
cell.fetch_add(1, ordering);
}
/// Returns `true` if this is the final searching worker
fn dec_num_searching(cell: &AtomicUsize) -> bool {
let state = State(cell.fetch_sub(1, SeqCst));
state.num_searching() == 1
}
/// Track a sleeping worker
///
/// Returns `true` if this is the final searching worker.
fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
let mut dec = 1 << UNPARK_SHIFT;
if is_searching {
dec += 1;
}
let prev = State(cell.fetch_sub(dec, SeqCst));
is_searching && prev.num_searching() == 1
}
/// Number of workers currently searching
fn num_searching(self) -> usize {
self.0 & SEARCH_MASK
}
/// Number of workers currently unparked
fn num_unparked(self) -> usize {
(self.0 & UNPARK_MASK) >> UNPARK_SHIFT
}
}
impl From<usize> for State {
fn from(src: usize) -> State {
State(src)
}
}
impl From<State> for usize {
fn from(src: State) -> usize {
src.0
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("worker::State")
.field("num_unparked", &self.num_unparked())
.field("num_searching", &self.num_searching())
.finish()
}
}
#[test]
fn test_state() {
assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
let state = State::new(10);
assert_eq!(10, state.num_unparked());
assert_eq!(0, state.num_searching());
}

View File

@ -56,7 +56,7 @@
//! the inject queue indefinitely. This would be a ref-count cycle and a memory //! the inject queue indefinitely. This would be a ref-count cycle and a memory
//! leak. //! leak.
use crate::loom::sync::{Arc, Mutex}; use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::runtime; use crate::runtime;
use crate::runtime::context; use crate::runtime::context;
use crate::runtime::scheduler::multi_thread::{ use crate::runtime::scheduler::multi_thread::{
@ -90,16 +90,13 @@ cfg_not_taskdump! {
pub(super) struct Worker { pub(super) struct Worker {
/// Reference to scheduler's handle /// Reference to scheduler's handle
handle: Arc<Handle>, handle: Arc<Handle>,
/// Index holding this worker's remote state
index: usize,
/// Used to hand-off a worker's core to another thread.
core: AtomicCell<Core>,
} }
/// Core data /// Core data
struct Core { struct Core {
/// Index holding this core's remote/shared state.
index: usize,
/// Used to schedule bookkeeping tasks every so often. /// Used to schedule bookkeeping tasks every so often.
tick: u32, tick: u32,
@ -127,12 +124,6 @@ struct Core {
/// True if the scheduler is being traced /// True if the scheduler is being traced
is_traced: bool, is_traced: bool,
/// Parker
///
/// Stored in an `Option` as the parker is added / removed to make the
/// borrow checker happy.
park: Option<Parker>,
/// Per-worker runtime stats /// Per-worker runtime stats
stats: Stats, stats: Stats,
@ -163,6 +154,9 @@ pub(crate) struct Shared {
/// Data synchronized by the scheduler mutex /// Data synchronized by the scheduler mutex
pub(super) synced: Mutex<Synced>, pub(super) synced: Mutex<Synced>,
/// Condition variable used to unblock waiting workers
condvar: Condvar,
/// Cores that have observed the shutdown signal /// Cores that have observed the shutdown signal
/// ///
/// The core is **not** placed back in the worker to avoid it from being /// The core is **not** placed back in the worker to avoid it from being
@ -190,6 +184,9 @@ pub(crate) struct Shared {
/// Data synchronized by the scheduler mutex /// Data synchronized by the scheduler mutex
pub(crate) struct Synced { pub(crate) struct Synced {
/// Cores not currently assigned to workers
cores: Vec<Box<Core>>,
/// Synchronized state for `Idle`. /// Synchronized state for `Idle`.
pub(super) idle: idle::Synced, pub(super) idle: idle::Synced,
@ -208,8 +205,8 @@ struct Remote {
/// Thread-local context /// Thread-local context
pub(crate) struct Context { pub(crate) struct Context {
/// Worker // /// Worker
worker: Arc<Worker>, // worker: Arc<Worker>,
/// Core data /// Core data
core: RefCell<Option<Box<Core>>>, core: RefCell<Option<Box<Core>>>,
@ -247,12 +244,14 @@ pub(super) fn create(
seed_generator: RngSeedGenerator, seed_generator: RngSeedGenerator,
config: Config, config: Config,
) -> (Arc<Handle>, Launch) { ) -> (Arc<Handle>, Launch) {
/*
let mut cores = Vec::with_capacity(size); let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size); let mut remotes = Vec::with_capacity(size);
let mut parkers = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size); let mut worker_metrics = Vec::with_capacity(size);
// Create the local queues // Create the local queues
for _ in 0..size { for i in 0..size {
let (steal, run_queue) = queue::local(); let (steal, run_queue) = queue::local();
let park = park.clone(); let park = park.clone();
@ -261,6 +260,7 @@ pub(super) fn create(
let stats = Stats::new(&metrics); let stats = Stats::new(&metrics);
cores.push(Box::new(Core { cores.push(Box::new(Core {
index: i,
tick: 0, tick: 0,
lifo_slot: None, lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot, lifo_enabled: !config.disable_lifo_slot,
@ -268,12 +268,12 @@ pub(super) fn create(
is_searching: false, is_searching: false,
is_shutdown: false, is_shutdown: false,
is_traced: false, is_traced: false,
park: Some(park),
global_queue_interval: stats.tuned_global_queue_interval(&config), global_queue_interval: stats.tuned_global_queue_interval(&config),
stats, stats,
rand: FastRand::from_seed(config.seed_generator.next_seed()), rand: FastRand::from_seed(config.seed_generator.next_seed()),
})); }));
parkers.push(park);
remotes.push(Remote { steal, unpark }); remotes.push(Remote { steal, unpark });
worker_metrics.push(metrics); worker_metrics.push(metrics);
} }
@ -306,15 +306,17 @@ pub(super) fn create(
let mut launch = Launch(vec![]); let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() { for (core, park) in cores.drain(..).zip(parkers.drain(..)) {
launch.0.push(Arc::new(Worker { launch.0.push(Arc::new(Worker {
handle: handle.clone(), handle: handle.clone(),
index, park,
core: AtomicCell::new(Some(core)), core: AtomicCell::new(Some(core)),
})); }));
} }
(handle, launch) (handle, launch)
*/
todo!()
} }
#[track_caller] #[track_caller]
@ -322,6 +324,7 @@ pub(crate) fn block_in_place<F, R>(f: F) -> R
where where
F: FnOnce() -> R, F: FnOnce() -> R,
{ {
/*
// Try to steal the worker core back // Try to steal the worker core back
struct Reset(coop::Budget); struct Reset(coop::Budget);
@ -426,17 +429,22 @@ where
} else { } else {
f() f()
} }
*/
todo!()
} }
impl Launch { impl Launch {
pub(crate) fn launch(mut self) { pub(crate) fn launch(mut self) {
/*
for worker in self.0.drain(..) { for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker)); runtime::spawn_blocking(move || run(worker));
} }
*/
todo!();
} }
} }
fn run(worker: Arc<Worker>) { fn run(mut worker: Worker) {
struct AbortOnPanic; struct AbortOnPanic;
impl Drop for AbortOnPanic { impl Drop for AbortOnPanic {
@ -453,19 +461,20 @@ fn run(worker: Arc<Worker>) {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
let _abort_on_panic = AbortOnPanic; let _abort_on_panic = AbortOnPanic;
/*
// Acquire a core. If this fails, then another thread is running this // Acquire a core. If this fails, then another thread is running this
// worker and there is nothing further to do. // worker and there is nothing further to do.
let core = match worker.core.take() { let core = match worker.core.take() {
Some(core) => core, Some(core) => core,
None => return, None => return,
}; };
*/
let handle = scheduler::Handle::MultiThread(worker.handle.clone()); let handle = scheduler::Handle::MultiThread(worker.handle.clone());
crate::runtime::context::enter_runtime(&handle, true, |_| { crate::runtime::context::enter_runtime(&handle, true, |_| {
// Set the worker context. // Set the worker context.
let cx = scheduler::Context::MultiThread(Context { let cx = scheduler::Context::MultiThread(Context {
worker,
core: RefCell::new(None), core: RefCell::new(None),
defer: Defer::new(), defer: Defer::new(),
}); });
@ -473,9 +482,14 @@ fn run(worker: Arc<Worker>) {
context::set_scheduler(&cx, || { context::set_scheduler(&cx, || {
let cx = cx.expect_multi_thread(); let cx = cx.expect_multi_thread();
// Run the worker
worker.run(&cx);
/*
// This should always be an error. It only returns a `Result` to support // This should always be an error. It only returns a `Result` to support
// using `?` to short circuit. // using `?` to short circuit.
assert!(cx.run(core).is_err()); assert!(cx.run(core).is_err());
*/
// Check if there are any deferred tasks to notify. This can happen when // Check if there are any deferred tasks to notify. This can happen when
// the worker core is lost due to `block_in_place()` being called from // the worker core is lost due to `block_in_place()` being called from
@ -485,6 +499,374 @@ fn run(worker: Arc<Worker>) {
}); });
} }
impl Worker {
fn run(&mut self, cx: &Context) {
// First, acquire a core. If no cores are available, the thread will
// block until one becomes available.
let mut core = self.acquire_core();
// Start as "processing" tasks as polling tasks from the local queue
// will be one of the first things we do.
core.stats.start_processing_scheduled_tasks();
while !core.is_shutdown {
self.assert_lifo_enabled_is_correct(&core);
if core.is_traced {
core = self.handle.trace_core(core);
}
// Increment the tick
core.tick();
// Run maintenance, if needed
core = self.maintenance(core);
// First, check work available to the current worker.
if let Some(task) = self.next_task(&mut core) {
core = match self.run_task(task, core) {
Ok(core) => core,
Err(_) => return,
};
continue;
}
// We consumed all work in the queues and will start searching for work.
core.stats.end_processing_scheduled_tasks();
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = self.steal_work(&mut core) {
// Found work, switch back to processing
core.stats.start_processing_scheduled_tasks();
core = match self.run_task(task, core) {
Ok(core) => core,
Err(_) => return,
};
} else {
/*
// Wait for work
core = if !self.defer.is_empty() {
self.park_timeout(core, Some(Duration::from_millis(0)))
} else {
self.park(core)
};
*/
todo!()
}
}
self.pre_shutdown(&mut core);
// Signal shutdown
self.shutdown_core(core);
todo!()
}
fn acquire_core(&mut self) -> Box<Core> {
let mut core = todo!();
// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
self.reset_lifo_enabled(&mut core);
todo!()
}
fn next_task(&self, core: &mut Core) -> Option<Notified> {
/*
if self.tick % self.global_queue_interval == 0 {
// Update the global queue interval, if needed
self.tune_global_queue_interval(worker);
worker
.handle
.next_remote_task()
.or_else(|| self.next_local_task())
} else {
let maybe_task = self.next_local_task();
if maybe_task.is_some() {
return maybe_task;
}
if worker.inject().is_empty() {
return None;
}
// Other threads can only **remove** tasks from the current worker's
// `run_queue`. So, we can be confident that by the time we call
// `run_queue.push_back` below, there will be *at least* `cap`
// available slots in the queue.
let cap = usize::min(
self.run_queue.remaining_slots(),
self.run_queue.max_capacity() / 2,
);
// The worker is currently idle, pull a batch of work from the
// injection queue. We don't want to pull *all* the work so other
// workers can also get some.
let n = usize::min(
worker.inject().len() / worker.handle.shared.remotes.len() + 1,
cap,
);
let mut synced = worker.handle.shared.synced.lock();
// safety: passing in the correct `inject::Synced`.
let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
// Pop the first task to return immedietly
let ret = tasks.next();
// Push the rest of the on the run queue
self.run_queue.push_back(tasks);
ret
}
*/
todo!();
}
/// Function responsible for stealing tasks from another worker
///
/// Note: Only if less than half the workers are searching for tasks to steal
/// a new worker will actually try to steal. The idea is to make sure not all
/// workers will be trying to steal at the same time.
fn steal_work(&self, core: &mut Core) -> Option<Notified> {
/*
if !self.transition_to_searching(worker) {
return None;
}
let num = worker.handle.shared.remotes.len();
// Start from a random worker
let start = self.rand.fastrand_n(num as u32) as usize;
for i in 0..num {
let i = (start + i) % num;
// Don't steal from ourself! We know we don't have work.
if i == self.index {
continue;
}
let target = &worker.handle.shared.remotes[i];
if let Some(task) = target
.steal
.steal_into(&mut self.run_queue, &mut self.stats)
{
return Some(task);
}
}
// Fallback on checking the global queue
worker.handle.next_remote_task()
*/
todo!()
}
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
/*
let task = self.worker.handle.shared.owned.assert_owner(task);
// Make sure the worker is not in the **searching** state. This enables
// another idle worker to try to steal work.
core.transition_from_searching(&self.worker);
self.assert_lifo_enabled_is_correct(&core);
// Measure the poll start time. Note that we may end up polling other
// tasks under this measurement. In this case, the tasks came from the
// LIFO slot and are considered part of the current task for scheduling
// purposes. These tasks inherent the "parent"'s limits.
core.stats.start_poll();
// Make the core available to the runtime context
*self.core.borrow_mut() = Some(core);
// Run the task
coop::budget(|| {
task.run();
let mut lifo_polls = 0;
// As long as there is budget remaining and a task exists in the
// `lifo_slot`, then keep running.
loop {
// Check if we still have the core. If not, the core was stolen
// by another worker.
let mut core = match self.core.borrow_mut().take() {
Some(core) => core,
None => {
// In this case, we cannot call `reset_lifo_enabled()`
// because the core was stolen. The stealer will handle
// that at the top of `Context::run`
return Err(());
}
};
// Check for a task in the LIFO slot
let task = match core.lifo_slot.take() {
Some(task) => task,
None => {
self.reset_lifo_enabled(&mut core);
core.stats.end_poll();
return Ok(core);
}
};
if !coop::has_budget_remaining() {
core.stats.end_poll();
// Not enough budget left to run the LIFO task, push it to
// the back of the queue and return.
core.run_queue.push_back_or_overflow(
task,
&*self.worker.handle,
&mut core.stats,
);
// If we hit this point, the LIFO slot should be enabled.
// There is no need to reset it.
debug_assert!(core.lifo_enabled);
return Ok(core);
}
// Track that we are about to run a task from the LIFO slot.
lifo_polls += 1;
super::counters::inc_lifo_schedules();
// Disable the LIFO slot if we reach our limit
//
// In ping-ping style workloads where task A notifies task B,
// which notifies task A again, continuously prioritizing the
// LIFO slot can cause starvation as these two tasks will
// repeatedly schedule the other. To mitigate this, we limit the
// number of times the LIFO slot is prioritized.
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
core.lifo_enabled = false;
super::counters::inc_lifo_capped();
}
// Run the LIFO task, then loop
*self.core.borrow_mut() = Some(core);
let task = self.worker.handle.shared.owned.assert_owner(task);
task.run();
}
})
*/
todo!()
}
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
/*
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
super::counters::inc_num_maintenance();
core.stats.end_processing_scheduled_tasks();
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
// Run regularly scheduled maintenance
core.maintenance(&self.worker);
core.stats.start_processing_scheduled_tasks();
}
core
*/
todo!()
}
/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&self, core: &mut Core) {
/*
// Signal to all tasks to shut down.
worker.handle.shared.owned.close_and_shutdown_all();
self.stats
.submit(&worker.handle.shared.worker_metrics[self.index]);
*/
todo!()
}
/// Signals that a worker has observed the shutdown signal and has replaced
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown_core(&self, core: Box<Core>) {
/*
let mut cores = self.shared.shutdown_cores.lock();
cores.push(core);
if cores.len() != self.shared.remotes.len() {
return;
}
debug_assert!(self.shared.owned.is_empty());
for mut core in cores.drain(..) {
core.shutdown(self);
}
// Drain the injection queue
//
// We already shut down every task, so we can simply drop the tasks.
while let Some(task) = self.next_remote_task() {
drop(task);
}
*/
todo!()
}
fn reset_lifo_enabled(&self, core: &mut Core) {
core.lifo_enabled = !self.handle.shared.config.disable_lifo_slot;
}
fn assert_lifo_enabled_is_correct(&self, core: &Core) {
debug_assert_eq!(
core.lifo_enabled,
!self.handle.shared.config.disable_lifo_slot
);
}
}
impl Context {
pub(crate) fn defer(&self, waker: &Waker) {
// self.defer.defer(waker);
todo!();
}
}
impl task::Schedule for Arc<Handle> {
fn release(&self, task: &Task) -> Option<Task> {
// self.shared.owned.remove(task)
todo!()
}
fn schedule(&self, task: Notified) {
// self.schedule_task(task, false);
todo!()
}
fn yield_now(&self, task: Notified) {
// self.schedule_task(task, true);
todo!()
}
}
impl Core {
/// Increment the tick
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
}
}
/*
impl Context { impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult { fn run(&self, mut core: Box<Core>) -> RunResult {
// Reset `lifo_enabled` here in case the core was previously stolen from // Reset `lifo_enabled` here in case the core was previously stolen from
@ -812,7 +1194,7 @@ impl Core {
let i = (start + i) % num; let i = (start + i) % num;
// Don't steal from ourself! We know we don't have work. // Don't steal from ourself! We know we don't have work.
if i == worker.index { if i == self.index {
continue; continue;
} }
@ -860,7 +1242,7 @@ impl Core {
// between the last work scan and transitioning out of searching. // between the last work scan and transitioning out of searching.
let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked( let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
&worker.handle.shared, &worker.handle.shared,
worker.index, self.index,
self.is_searching, self.is_searching,
); );
@ -888,7 +1270,7 @@ impl Core {
.handle .handle
.shared .shared
.idle .idle
.unpark_worker_by_id(&worker.handle.shared, worker.index); .unpark_worker_by_id(&worker.handle.shared, self.index);
return true; return true;
} }
@ -896,7 +1278,7 @@ impl Core {
.handle .handle
.shared .shared
.idle .idle
.is_parked(&worker.handle.shared, worker.index) .is_parked(&worker.handle.shared, self.index)
{ {
return false; return false;
} }
@ -909,7 +1291,7 @@ impl Core {
/// Runs maintenance work such as checking the pool's state. /// Runs maintenance work such as checking the pool's state.
fn maintenance(&mut self, worker: &Worker) { fn maintenance(&mut self, worker: &Worker) {
self.stats self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]); .submit(&worker.handle.shared.worker_metrics[self.index]);
if !self.is_shutdown { if !self.is_shutdown {
// Check if the scheduler has been shutdown // Check if the scheduler has been shutdown
@ -930,7 +1312,7 @@ impl Core {
worker.handle.shared.owned.close_and_shutdown_all(); worker.handle.shared.owned.close_and_shutdown_all();
self.stats self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]); .submit(&worker.handle.shared.worker_metrics[self.index]);
} }
/// Shuts down the core. /// Shuts down the core.
@ -1181,6 +1563,7 @@ fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
_ => f(None), _ => f(None),
}) })
} }
*/
// `u32::abs_diff` is not available on Tokio's MSRV. // `u32::abs_diff` is not available on Tokio's MSRV.
fn abs_diff(a: u32, b: u32) -> u32 { fn abs_diff(a: u32, b: u32) -> u32 {