rt: small current_thread scheduler cleanup (#5701)

There should be no functional changes.
This commit is contained in:
Carl Lerche 2023-05-19 08:15:31 -07:00 committed by GitHub
parent 29a6f468a6
commit c88f9bc930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -46,7 +46,7 @@ pub(crate) struct Handle {
/// a function that will perform the scheduling work and acts as a capability token.
struct Core {
/// Scheduler run queue
tasks: VecDeque<task::Notified<Arc<Handle>>>,
tasks: VecDeque<Notified>,
/// Current tick
tick: u32,
@ -67,7 +67,7 @@ struct Core {
/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.
queue: Mutex<Option<VecDeque<task::Notified<Arc<Handle>>>>>,
queue: Mutex<Option<VecDeque<Notified>>>,
/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Handle>>,
@ -95,6 +95,8 @@ struct Context {
core: RefCell<Option<Box<Core>>>,
}
type Notified = task::Notified<Arc<Handle>>;
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;
@ -211,7 +213,7 @@ impl CurrentThread {
// Drain local queue
// We already shut down every task, so we just need to drop the task.
while let Some(task) = core.pop_task(handle) {
while let Some(task) = core.next_local_task(handle) {
drop(task);
}
@ -229,7 +231,7 @@ impl CurrentThread {
assert!(handle.shared.owned.is_empty());
// Submit metrics
core.metrics.submit(&handle.shared.worker_metrics);
core.submit_metrics(handle);
// Shutdown the resource drivers
if let Some(driver) = core.driver.as_mut() {
@ -250,7 +252,20 @@ impl fmt::Debug for CurrentThread {
// ===== impl Core =====
impl Core {
fn pop_task(&mut self, handle: &Handle) -> Option<task::Notified<Arc<Handle>>> {
/// Get and increment the current tick
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
}
fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
if self.tick % handle.shared.config.global_queue_interval == 0 {
handle.pop().or_else(|| self.next_local_task(handle))
} else {
self.next_local_task(handle).or_else(|| handle.pop())
}
}
fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
let ret = self.tasks.pop_front();
handle
.shared
@ -259,7 +274,7 @@ impl Core {
ret
}
fn push_task(&mut self, handle: &Handle, task: task::Notified<Arc<Handle>>) {
fn push_task(&mut self, handle: &Handle, task: Notified) {
self.tasks.push_back(task);
self.metrics.inc_local_schedule_count();
handle
@ -267,6 +282,10 @@ impl Core {
.worker_metrics
.set_queue_depth(self.tasks.len());
}
fn submit_metrics(&mut self, handle: &Handle) {
self.metrics.submit(&handle.shared.worker_metrics);
}
}
fn did_defer_tasks() -> bool {
@ -317,7 +336,7 @@ impl Context {
if core.tasks.is_empty() {
// Park until the thread is signaled
core.metrics.about_to_park();
core.metrics.submit(&handle.shared.worker_metrics);
core.submit_metrics(handle);
let (c, _) = self.enter(core, || {
driver.park(&handle.driver);
@ -344,7 +363,8 @@ impl Context {
fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
core.metrics.submit(&handle.shared.worker_metrics);
core.submit_metrics(handle);
let (mut core, _) = self.enter(core, || {
driver.park_timeout(&handle.driver, Duration::from_millis(0));
wake_deferred_tasks();
@ -441,7 +461,7 @@ impl Handle {
dump::Dump::new(traces)
}
fn pop(&self) -> Option<task::Notified<Arc<Handle>>> {
fn pop(&self) -> Option<Notified> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
@ -622,15 +642,9 @@ impl CoreGuard<'_> {
return (core, None);
}
// Get and increment the current tick
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);
core.tick();
let entry = if tick % handle.shared.config.global_queue_interval == 0 {
handle.pop().or_else(|| core.tasks.pop_front())
} else {
core.tasks.pop_front().or_else(|| handle.pop())
};
let entry = core.next_task(handle);
let task = match entry {
Some(entry) => entry,