Only lock once to wake a task

This commit is contained in:
Dániel Buga 2024-12-13 20:35:40 +01:00
parent 2c3bc75da6
commit f389ba3721
No known key found for this signature in database
6 changed files with 73 additions and 40 deletions

View File

@ -386,11 +386,11 @@ impl SyncExecutor {
/// - `task` must be set up to run in this executor.
/// - `task` must NOT be already enqueued (in this executor or another one).
#[inline(always)]
unsafe fn enqueue(&self, task: TaskRef) {
unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
#[cfg(feature = "trace")]
trace::task_ready_begin(self, &task);
if self.run_queue.enqueue(task) {
if self.run_queue.enqueue(task, l) {
self.pender.pend();
}
}
@ -401,7 +401,9 @@ impl SyncExecutor {
#[cfg(feature = "trace")]
trace::task_new(self, &task);
self.enqueue(task);
state::locked(|l| {
self.enqueue(task, l);
})
}
/// # Safety
@ -544,13 +546,13 @@ impl Executor {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.enqueue(task);
executor.enqueue(task, l);
}
}
});
}
/// Wake a task by `TaskRef` without calling pend.
@ -558,11 +560,11 @@ pub fn wake_task(task: TaskRef) {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.run_queue.enqueue(task);
executor.run_queue.enqueue(task, l);
}
}
});
}

View File

@ -45,7 +45,7 @@ impl RunQueue {
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
let mut was_empty = false;
self.head

View File

@ -44,13 +44,11 @@ impl RunQueue {
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
critical_section::with(|cs| {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);
pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);
prev.is_none()
})
prev.is_none()
}
/// Empty the queue, then call `on_task` for each task that was in the queue.

View File

@ -2,6 +2,15 @@ use core::sync::atomic::{AtomicU32, Ordering};
use super::timer_queue::TimerEnqueueOperation;
pub(crate) struct Token(());
/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}
/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
@ -34,10 +43,12 @@ impl State {
self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
}
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.state
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
if self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
@ -48,6 +59,9 @@ impl State {
}
})
.is_ok()
{
locked(f);
}
}
/// Unmark the task as run-queued. Return whether the task is spawned.

View File

@ -3,6 +3,15 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};
use super::timer_queue::TimerEnqueueOperation;
pub(crate) struct Token(());
/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}
// Must be kept in sync with the layout of `State`!
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
@ -57,9 +66,10 @@ impl State {
self.spawned.store(false, Ordering::Relaxed);
}
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
unsafe {
loop {
let state: u32;
@ -67,14 +77,15 @@ impl State {
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
asm!("clrex", options(nomem, nostack));
return false;
return;
}
let outcome: usize;
let new_state = state | STATE_RUN_QUEUED;
asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
if outcome == 0 {
return true;
locked(f);
return;
}
}
}

View File

@ -1,6 +1,7 @@
use core::cell::Cell;
use critical_section::Mutex;
pub(crate) use critical_section::{with as locked, CriticalSection as Token};
use critical_section::{CriticalSection, Mutex};
use super::timer_queue::TimerEnqueueOperation;
@ -23,13 +24,15 @@ impl State {
}
fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
critical_section::with(|cs| {
let s = self.state.borrow(cs);
let mut val = s.get();
let r = f(&mut val);
s.set(val);
r
})
critical_section::with(|cs| self.update_with_cs(cs, f))
}
fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R {
let s = self.state.borrow(cs);
let mut val = s.get();
let r = f(&mut val);
s.set(val);
r
}
/// If task is idle, mark it as spawned + run_queued and return true.
@ -51,17 +54,22 @@ impl State {
self.update(|s| *s &= !STATE_SPAWNED);
}
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.update(|s| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
false
} else {
*s |= STATE_RUN_QUEUED;
true
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
critical_section::with(|cs| {
if self.update_with_cs(cs, |s| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
false
} else {
*s |= STATE_RUN_QUEUED;
true
}
}) {
f(cs);
}
})
});
}
/// Unmark the task as run-queued. Return whether the task is spawned.