Merge pull request #4608 from diondokter/upstream-drs-2

[embassy-executor]: Upstream "Earliest Deadline First" Scheduler (version 2)
This commit is contained in:
Dario Nieuwenhuis 2025-09-11 13:52:14 +00:00 committed by GitHub
commit 42c68622ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 612 additions and 167 deletions

1
ci.sh
View File

@ -239,6 +239,7 @@ cargo batch \
--- build --release --manifest-path docs/examples/layer-by-layer/blinky-async/Cargo.toml --target thumbv7em-none-eabi \ --- build --release --manifest-path docs/examples/layer-by-layer/blinky-async/Cargo.toml --target thumbv7em-none-eabi \
--- build --release --manifest-path examples/nrf52810/Cargo.toml --target thumbv7em-none-eabi --artifact-dir out/examples/nrf52810 \ --- build --release --manifest-path examples/nrf52810/Cargo.toml --target thumbv7em-none-eabi --artifact-dir out/examples/nrf52810 \
--- build --release --manifest-path examples/nrf52840/Cargo.toml --target thumbv7em-none-eabi --artifact-dir out/examples/nrf52840 \ --- build --release --manifest-path examples/nrf52840/Cargo.toml --target thumbv7em-none-eabi --artifact-dir out/examples/nrf52840 \
--- build --release --manifest-path examples/nrf52840-edf/Cargo.toml --target thumbv7em-none-eabi --artifact-dir out/examples/nrf52840-edf \
--- build --release --manifest-path examples/nrf5340/Cargo.toml --target thumbv8m.main-none-eabihf --artifact-dir out/examples/nrf5340 \ --- build --release --manifest-path examples/nrf5340/Cargo.toml --target thumbv8m.main-none-eabihf --artifact-dir out/examples/nrf5340 \
--- build --release --manifest-path examples/nrf54l15/Cargo.toml --target thumbv8m.main-none-eabihf --artifact-dir out/examples/nrf54l15 \ --- build --release --manifest-path examples/nrf54l15/Cargo.toml --target thumbv8m.main-none-eabihf --artifact-dir out/examples/nrf54l15 \
--- build --release --manifest-path examples/nrf9160/Cargo.toml --target thumbv8m.main-none-eabihf --artifact-dir out/examples/nrf9160 \ --- build --release --manifest-path examples/nrf9160/Cargo.toml --target thumbv8m.main-none-eabihf --artifact-dir out/examples/nrf9160 \

View File

@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added new metadata API for tasks. - Added new metadata API for tasks.
- Main task automatically gets a name of `main` when the `metadata-name` feature is enabled. - Main task automatically gets a name of `main` when the `metadata-name` feature is enabled.
- Upgraded rtos-trace - Upgraded rtos-trace
- Added optional "earliest deadline first" EDF scheduling
## 0.9.1 - 2025-08-31 ## 0.9.1 - 2025-08-31

View File

@ -24,6 +24,7 @@ build = [
{target = "thumbv7em-none-eabi", features = ["arch-cortex-m", "executor-thread"]}, {target = "thumbv7em-none-eabi", features = ["arch-cortex-m", "executor-thread"]},
{target = "thumbv7em-none-eabi", features = ["arch-cortex-m", "executor-interrupt"]}, {target = "thumbv7em-none-eabi", features = ["arch-cortex-m", "executor-interrupt"]},
{target = "thumbv7em-none-eabi", features = ["arch-cortex-m", "executor-interrupt", "executor-thread"]}, {target = "thumbv7em-none-eabi", features = ["arch-cortex-m", "executor-interrupt", "executor-thread"]},
{target = "thumbv7em-none-eabi", features = ["arch-cortex-m", "executor-interrupt", "executor-thread", "scheduler-deadline", "embassy-time-driver"]},
{target = "armv7a-none-eabi", features = ["arch-cortex-ar", "executor-thread"]}, {target = "armv7a-none-eabi", features = ["arch-cortex-ar", "executor-thread"]},
{target = "armv7r-none-eabi", features = ["arch-cortex-ar", "executor-thread"]}, {target = "armv7r-none-eabi", features = ["arch-cortex-ar", "executor-thread"]},
{target = "armv7r-none-eabihf", features = ["arch-cortex-ar", "executor-thread"]}, {target = "armv7r-none-eabihf", features = ["arch-cortex-ar", "executor-thread"]},
@ -35,7 +36,7 @@ build = [
[package.metadata.embassy_docs] [package.metadata.embassy_docs]
src_base = "https://github.com/embassy-rs/embassy/blob/embassy-executor-v$VERSION/embassy-executor/src/" src_base = "https://github.com/embassy-rs/embassy/blob/embassy-executor-v$VERSION/embassy-executor/src/"
src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-executor/src/" src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-executor/src/"
features = ["defmt"] features = ["defmt", "scheduler-deadline"]
flavors = [ flavors = [
{ name = "std", target = "x86_64-unknown-linux-gnu", features = ["arch-std", "executor-thread"] }, { name = "std", target = "x86_64-unknown-linux-gnu", features = ["arch-std", "executor-thread"] },
{ name = "wasm", target = "wasm32-unknown-unknown", features = ["arch-wasm", "executor-thread"] }, { name = "wasm", target = "wasm32-unknown-unknown", features = ["arch-wasm", "executor-thread"] },
@ -46,7 +47,7 @@ flavors = [
[package.metadata.docs.rs] [package.metadata.docs.rs]
default-target = "thumbv7em-none-eabi" default-target = "thumbv7em-none-eabi"
targets = ["thumbv7em-none-eabi"] targets = ["thumbv7em-none-eabi"]
features = ["defmt", "arch-cortex-m", "executor-thread", "executor-interrupt"] features = ["defmt", "arch-cortex-m", "executor-thread", "executor-interrupt", "scheduler-deadline", "embassy-time-driver"]
[dependencies] [dependencies]
defmt = { version = "1.0.1", optional = true } defmt = { version = "1.0.1", optional = true }
@ -76,6 +77,11 @@ js-sys = { version = "0.3", optional = true }
# arch-avr dependencies # arch-avr dependencies
avr-device = { version = "0.7.0", optional = true } avr-device = { version = "0.7.0", optional = true }
[dependencies.cordyceps]
version = "0.3.4"
features = ["no-cache-pad"]
[dev-dependencies] [dev-dependencies]
critical-section = { version = "1.1", features = ["std"] } critical-section = { version = "1.1", features = ["std"] }
trybuild = "1.0" trybuild = "1.0"
@ -123,5 +129,13 @@ executor-interrupt = []
## Enable tracing hooks ## Enable tracing hooks
trace = ["_any_trace"] trace = ["_any_trace"]
## Enable support for rtos-trace framework ## Enable support for rtos-trace framework
rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "dep:embassy-time-driver"] rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "embassy-time-driver"]
_any_trace = [] _any_trace = []
## Enable "Earliest Deadline First" Scheduler, using soft-realtime "deadlines" to prioritize
## tasks based on the remaining time before their deadline. Adds some overhead.
scheduler-deadline = []
## Enable the embassy_time_driver dependency.
## This can unlock extra APIs, for example for the `sheduler-deadline`
embassy-time-driver = ["dep:embassy-time-driver"]

View File

@ -7,11 +7,15 @@ use core::task::Poll;
use critical_section::Mutex; use critical_section::Mutex;
use crate::raw; use crate::raw;
#[cfg(feature = "scheduler-deadline")]
use crate::raw::Deadline;
/// Metadata associated with a task. /// Metadata associated with a task.
pub struct Metadata { pub struct Metadata {
#[cfg(feature = "metadata-name")] #[cfg(feature = "metadata-name")]
name: Mutex<Cell<Option<&'static str>>>, name: Mutex<Cell<Option<&'static str>>>,
#[cfg(feature = "scheduler-deadline")]
deadline: raw::Deadline,
} }
impl Metadata { impl Metadata {
@ -19,6 +23,10 @@ impl Metadata {
Self { Self {
#[cfg(feature = "metadata-name")] #[cfg(feature = "metadata-name")]
name: Mutex::new(Cell::new(None)), name: Mutex::new(Cell::new(None)),
// NOTE: The deadline is set to zero to allow the initializer to reside in `.bss`. This
// will be lazily initalized in `initialize_impl`
#[cfg(feature = "scheduler-deadline")]
deadline: raw::Deadline::new_unset(),
} }
} }
@ -52,4 +60,63 @@ impl Metadata {
pub fn set_name(&self, name: &'static str) { pub fn set_name(&self, name: &'static str) {
critical_section::with(|cs| self.name.borrow(cs).set(Some(name))) critical_section::with(|cs| self.name.borrow(cs).set(Some(name)))
} }
/// Get this task's deadline.
#[cfg(feature = "scheduler-deadline")]
pub fn deadline(&self) -> u64 {
self.deadline.instant_ticks()
}
/// Set this task's deadline.
///
/// This method does NOT check whether the deadline has already passed.
#[cfg(feature = "scheduler-deadline")]
pub fn set_deadline(&self, instant_ticks: u64) {
self.deadline.set(instant_ticks);
}
/// Remove this task's deadline.
/// This brings it back to the defaul where it's not scheduled ahead of other tasks.
#[cfg(feature = "scheduler-deadline")]
pub fn unset_deadline(&self) {
self.deadline.set(Deadline::UNSET_TICKS);
}
/// Set this task's deadline `duration_ticks` in the future from when
/// this future is polled. This deadline is saturated to the max tick value.
///
/// Analogous to `Timer::after`.
#[cfg(all(feature = "scheduler-deadline", feature = "embassy-time-driver"))]
pub fn set_deadline_after(&self, duration_ticks: u64) {
let now = embassy_time_driver::now();
// Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
// it for now, we can probably make this wrapping_add for performance
// reasons later.
let deadline = now.saturating_add(duration_ticks);
self.set_deadline(deadline);
}
/// Set the this task's deadline `increment_ticks` from the previous deadline.
///
/// This deadline is saturated to the max tick value.
///
/// Note that by default (unless otherwise set), tasks start life with the deadline
/// not set, which means this method will have no effect.
///
/// Analogous to one increment of `Ticker::every().next()`.
///
/// Returns the deadline that was set.
#[cfg(feature = "scheduler-deadline")]
pub fn increment_deadline(&self, duration_ticks: u64) {
let last = self.deadline();
// Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
// it for now, we can probably make this wrapping_add for performance
// reasons later.
let deadline = last.saturating_add(duration_ticks);
self.set_deadline(deadline);
}
} }

View File

@ -0,0 +1,44 @@
use core::sync::atomic::{AtomicU32, Ordering};
/// A type for interacting with the deadline of the current task
///
/// Requires the `scheduler-deadline` feature.
///
/// Note: Interacting with the deadline should be done locally in a task.
/// In theory you could try to set or read the deadline from another task,
/// but that will result in weird (though not unsound) behavior.
pub(crate) struct Deadline {
instant_ticks_hi: AtomicU32,
instant_ticks_lo: AtomicU32,
}
impl Deadline {
pub(crate) const fn new(instant_ticks: u64) -> Self {
Self {
instant_ticks_hi: AtomicU32::new((instant_ticks >> 32) as u32),
instant_ticks_lo: AtomicU32::new(instant_ticks as u32),
}
}
pub(crate) const fn new_unset() -> Self {
Self::new(Self::UNSET_TICKS)
}
pub(crate) fn set(&self, instant_ticks: u64) {
self.instant_ticks_hi
.store((instant_ticks >> 32) as u32, Ordering::Relaxed);
self.instant_ticks_lo.store(instant_ticks as u32, Ordering::Relaxed);
}
/// Deadline value in ticks, same time base and ticks as `embassy-time`
pub(crate) fn instant_ticks(&self) -> u64 {
let hi = self.instant_ticks_hi.load(Ordering::Relaxed) as u64;
let lo = self.instant_ticks_lo.load(Ordering::Relaxed) as u64;
(hi << 32) | lo
}
/// Sentinel value representing an "unset" deadline, which has lower priority
/// than any other set deadline value
pub(crate) const UNSET_TICKS: u64 = u64::MAX;
}

View File

@ -7,8 +7,6 @@
//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe. //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
mod run_queue; mod run_queue;
#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")] #[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
@ -28,6 +26,9 @@ pub(crate) mod util;
#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")] #[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
mod waker; mod waker;
#[cfg(feature = "scheduler-deadline")]
mod deadline;
use core::future::Future; use core::future::Future;
use core::marker::PhantomData; use core::marker::PhantomData;
use core::mem; use core::mem;
@ -38,6 +39,8 @@ use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering; use core::sync::atomic::Ordering;
use core::task::{Context, Poll, Waker}; use core::task::{Context, Poll, Waker};
#[cfg(feature = "scheduler-deadline")]
pub(crate) use deadline::Deadline;
use embassy_executor_timer_queue::TimerQueueItem; use embassy_executor_timer_queue::TimerQueueItem;
#[cfg(feature = "arch-avr")] #[cfg(feature = "arch-avr")]
use portable_atomic::AtomicPtr; use portable_atomic::AtomicPtr;
@ -96,6 +99,7 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static
pub(crate) struct TaskHeader { pub(crate) struct TaskHeader {
pub(crate) state: State, pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem, pub(crate) run_queue_item: RunQueueItem,
pub(crate) executor: AtomicPtr<SyncExecutor>, pub(crate) executor: AtomicPtr<SyncExecutor>,
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>, poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
@ -296,6 +300,11 @@ impl<F: Future + 'static> AvailableTask<F> {
self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
self.task.future.write_in_place(future); self.task.future.write_in_place(future);
// By default, deadlines are set to the maximum value, so that any task WITH
// a set deadline will ALWAYS be scheduled BEFORE a task WITHOUT a set deadline
#[cfg(feature = "scheduler-deadline")]
self.task.raw.metadata.unset_deadline();
let task = TaskRef::new(self.task); let task = TaskRef::new(self.task);
SpawnToken::new(task) SpawnToken::new(task)

View File

@ -0,0 +1,194 @@
use core::ptr::{addr_of_mut, NonNull};
use cordyceps::sorted_list::Links;
use cordyceps::Linked;
#[cfg(feature = "scheduler-deadline")]
use cordyceps::SortedList;
#[cfg(target_has_atomic = "ptr")]
type TransferStack<T> = cordyceps::TransferStack<T>;
#[cfg(not(target_has_atomic = "ptr"))]
type TransferStack<T> = MutexTransferStack<T>;
use super::{TaskHeader, TaskRef};
/// Use `cordyceps::sorted_list::Links` as the singly linked list
/// for RunQueueItems.
pub(crate) type RunQueueItem = Links<TaskHeader>;
/// Implements the `Linked` trait, allowing for singly linked list usage
/// of any of cordyceps' `TransferStack` (used for the atomic runqueue),
/// `SortedList` (used with the DRS scheduler), or `Stack`, which is
/// popped atomically from the `TransferStack`.
unsafe impl Linked<Links<TaskHeader>> for TaskHeader {
type Handle = TaskRef;
// Convert a TaskRef into a TaskHeader ptr
fn into_ptr(r: TaskRef) -> NonNull<TaskHeader> {
r.ptr
}
// Convert a TaskHeader into a TaskRef
unsafe fn from_ptr(ptr: NonNull<TaskHeader>) -> TaskRef {
TaskRef { ptr }
}
// Given a pointer to a TaskHeader, obtain a pointer to the Links structure,
// which can be used to traverse to other TaskHeader nodes in the linked list
unsafe fn links(ptr: NonNull<TaskHeader>) -> NonNull<Links<TaskHeader>> {
let ptr: *mut TaskHeader = ptr.as_ptr();
NonNull::new_unchecked(addr_of_mut!((*ptr).run_queue_item))
}
}
/// Atomic task queue using a very, very simple lock-free linked-list queue:
///
/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
///
/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
/// null. Then the batch is iterated following the next pointers until null is reached.
///
/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
/// for our purposes: it can't create fairness problems since the next batch won't run until the
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
stack: TransferStack<TaskHeader>,
}
impl RunQueue {
pub const fn new() -> Self {
Self {
stack: TransferStack::new(),
}
}
/// Enqueues an item. Returns true if the queue was empty.
///
/// # Safety
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef, _tok: super::state::Token) -> bool {
self.stack.push_was_empty(
task,
#[cfg(not(target_has_atomic = "ptr"))]
_tok,
)
}
/// # Standard atomic runqueue
///
/// Empty the queue, then call `on_task` for each task that was in the queue.
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
#[cfg(not(feature = "scheduler-deadline"))]
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
let taken = self.stack.take_all();
for taskref in taken {
run_dequeue(&taskref);
on_task(taskref);
}
}
/// # Earliest Deadline First Scheduler
///
/// This algorithm will loop until all enqueued tasks are processed.
///
/// Before polling a task, all currently enqueued tasks will be popped from the
/// runqueue, and will be added to the working `sorted` list, a linked-list that
/// sorts tasks by their deadline, with nearest deadline items in the front, and
/// furthest deadline items in the back.
///
/// After popping and sorting all pending tasks, the SOONEST task will be popped
/// from the front of the queue, and polled by calling `on_task` on it.
///
/// This process will repeat until the local `sorted` queue AND the global
/// runqueue are both empty, at which point this function will return.
#[cfg(feature = "scheduler-deadline")]
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
let mut sorted =
SortedList::<TaskHeader>::new_with_cmp(|lhs, rhs| lhs.metadata.deadline().cmp(&rhs.metadata.deadline()));
loop {
// For each loop, grab any newly pended items
let taken = self.stack.take_all();
// Sort these into the list - this is potentially expensive! We do an
// insertion sort of new items, which iterates the linked list.
//
// Something on the order of `O(n * m)`, where `n` is the number
// of new tasks, and `m` is the number of already pending tasks.
sorted.extend(taken);
// Pop the task with the SOONEST deadline. If there are no tasks
// pending, then we are done.
let Some(taskref) = sorted.pop_front() else {
return;
};
// We got one task, mark it as dequeued, and process the task.
run_dequeue(&taskref);
on_task(taskref);
}
}
}
/// atomic state does not require a cs...
#[cfg(target_has_atomic = "ptr")]
#[inline(always)]
fn run_dequeue(taskref: &TaskRef) {
taskref.header().state.run_dequeue();
}
/// ...while non-atomic state does
#[cfg(not(target_has_atomic = "ptr"))]
#[inline(always)]
fn run_dequeue(taskref: &TaskRef) {
critical_section::with(|cs| {
taskref.header().state.run_dequeue(cs);
})
}
/// A wrapper type that acts like TransferStack by wrapping a normal Stack in a CS mutex
#[cfg(not(target_has_atomic = "ptr"))]
struct MutexTransferStack<T: Linked<cordyceps::stack::Links<T>>> {
inner: critical_section::Mutex<core::cell::UnsafeCell<cordyceps::Stack<T>>>,
}
#[cfg(not(target_has_atomic = "ptr"))]
impl<T: Linked<cordyceps::stack::Links<T>>> MutexTransferStack<T> {
const fn new() -> Self {
Self {
inner: critical_section::Mutex::new(core::cell::UnsafeCell::new(cordyceps::Stack::new())),
}
}
/// Push an item to the transfer stack, returning whether the stack was previously empty
fn push_was_empty(&self, item: T::Handle, token: super::state::Token) -> bool {
// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
// for the lifetime of the token, but does NOT protect against re-entrant access.
// However, we never *return* the reference, nor do we recurse (or call another method
// like `take_all`) that could ever allow for re-entrant aliasing. Therefore, the
// presence of the critical section is sufficient to guarantee exclusive access to
// the `inner` field for the purposes of this function.
let inner = unsafe { &mut *self.inner.borrow(token).get() };
let is_empty = inner.is_empty();
inner.push(item);
is_empty
}
fn take_all(&self) -> cordyceps::Stack<T> {
critical_section::with(|cs| {
// SAFETY: The critical-section mutex guarantees that there is no *concurrent* access
// for the lifetime of the token, but does NOT protect against re-entrant access.
// However, we never *return* the reference, nor do we recurse (or call another method
// like `push_was_empty`) that could ever allow for re-entrant aliasing. Therefore, the
// presence of the critical section is sufficient to guarantee exclusive access to
// the `inner` field for the purposes of this function.
let inner = unsafe { &mut *self.inner.borrow(cs).get() };
inner.take_all()
})
}
}

View File

@ -1,88 +0,0 @@
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicPtr, Ordering};
use super::{TaskHeader, TaskRef};
use crate::raw::util::SyncUnsafeCell;
pub(crate) struct RunQueueItem {
next: SyncUnsafeCell<Option<TaskRef>>,
}
impl RunQueueItem {
pub const fn new() -> Self {
Self {
next: SyncUnsafeCell::new(None),
}
}
}
/// Atomic task queue using a very, very simple lock-free linked-list queue:
///
/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
///
/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
/// null. Then the batch is iterated following the next pointers until null is reached.
///
/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
/// for our purposes: it can't create fairness problems since the next batch won't run until the
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
head: AtomicPtr<TaskHeader>,
}
impl RunQueue {
pub const fn new() -> Self {
Self {
head: AtomicPtr::new(ptr::null_mut()),
}
}
/// Enqueues an item. Returns true if the queue was empty.
///
/// # Safety
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
let mut was_empty = false;
self.head
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |prev| {
was_empty = prev.is_null();
unsafe {
// safety: the pointer is either null or valid
let prev = NonNull::new(prev).map(|ptr| TaskRef::from_ptr(ptr.as_ptr()));
// safety: there are no concurrent accesses to `next`
task.header().run_queue_item.next.set(prev);
}
Some(task.as_ptr() as *mut _)
})
.ok();
was_empty
}
/// Empty the queue, then call `on_task` for each task that was in the queue.
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
// Atomically empty the queue.
let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
// safety: the pointer is either null or valid
let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
// Iterate the linked list of tasks that were previously in the queue.
while let Some(task) = next {
// If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task.
// safety: there are no concurrent accesses to `next`
next = unsafe { task.header().run_queue_item.next.get() };
task.header().state.run_dequeue();
on_task(task);
}
}
}

View File

@ -1,74 +0,0 @@
use core::cell::Cell;
use critical_section::{CriticalSection, Mutex};
use super::TaskRef;
pub(crate) struct RunQueueItem {
next: Mutex<Cell<Option<TaskRef>>>,
}
impl RunQueueItem {
pub const fn new() -> Self {
Self {
next: Mutex::new(Cell::new(None)),
}
}
}
/// Atomic task queue using a very, very simple lock-free linked-list queue:
///
/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
///
/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
/// null. Then the batch is iterated following the next pointers until null is reached.
///
/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
/// for our purposes: it can't create fairness problems since the next batch won't run until the
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
head: Mutex<Cell<Option<TaskRef>>>,
}
impl RunQueue {
pub const fn new() -> Self {
Self {
head: Mutex::new(Cell::new(None)),
}
}
/// Enqueues an item. Returns true if the queue was empty.
///
/// # Safety
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);
prev.is_none()
}
/// Empty the queue, then call `on_task` for each task that was in the queue.
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
// Atomically empty the queue.
let mut next = critical_section::with(|cs| self.head.borrow(cs).take());
// Iterate the linked list of tasks that were previously in the queue.
while let Some(task) = next {
// If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task.
critical_section::with(|cs| {
next = task.header().run_queue_item.next.borrow(cs).get();
task.header().state.run_dequeue(cs);
});
on_task(task);
}
}
}

View File

@ -0,0 +1,9 @@
[target.'cfg(all(target_arch = "arm", target_os = "none"))']
# replace nRF82840_xxAA with your chip as listed in `probe-rs chip list`
runner = "probe-rs run --chip nRF52840_xxAA"
[build]
target = "thumbv7em-none-eabi"
[env]
DEFMT_LOG = "debug"

View File

@ -0,0 +1,27 @@
[package]
edition = "2021"
name = "embassy-nrf52840-edf-examples"
version = "0.1.0"
license = "MIT OR Apache-2.0"
publish = false
[dependencies]
# NOTE: "scheduler-deadline" and "embassy-time-driver" features are enabled
embassy-executor = { version = "0.9.0", path = "../../embassy-executor", features = ["arch-cortex-m", "executor-thread", "executor-interrupt", "defmt", "scheduler-deadline", "embassy-time-driver"] }
embassy-time = { version = "0.5.0", path = "../../embassy-time", features = ["defmt", "defmt-timestamp-uptime"] }
embassy-nrf = { version = "0.7.0", path = "../../embassy-nrf", features = ["defmt", "nrf52840", "time-driver-rtc1", "gpiote", "unstable-pac", "time"] }
defmt = "1.0.1"
defmt-rtt = "1.0.0"
cortex-m = { version = "0.7.6", features = ["inline-asm", "critical-section-single-core"] }
cortex-m-rt = "0.7.0"
panic-probe = { version = "1.0.0", features = ["print-defmt"] }
[profile.release]
debug = 2
[package.metadata.embassy]
build = [
{ target = "thumbv7em-none-eabi", artifact-dir = "out/examples/nrf52840-edf" }
]

View File

@ -0,0 +1,35 @@
//! This build script copies the `memory.x` file from the crate root into
//! a directory where the linker can always find it at build time.
//! For many projects this is optional, as the linker always searches the
//! project root directory -- wherever `Cargo.toml` is. However, if you
//! are using a workspace or have a more complicated build setup, this
//! build script becomes required. Additionally, by requesting that
//! Cargo re-run the build script whenever `memory.x` is changed,
//! updating `memory.x` ensures a rebuild of the application with the
//! new memory settings.
use std::env;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
fn main() {
// Put `memory.x` in our output directory and ensure it's
// on the linker search path.
let out = &PathBuf::from(env::var_os("OUT_DIR").unwrap());
File::create(out.join("memory.x"))
.unwrap()
.write_all(include_bytes!("memory.x"))
.unwrap();
println!("cargo:rustc-link-search={}", out.display());
// By default, Cargo will re-run a build script whenever
// any file in the project changes. By specifying `memory.x`
// here, we ensure the build script is only re-run when
// `memory.x` is changed.
println!("cargo:rerun-if-changed=memory.x");
println!("cargo:rustc-link-arg-bins=--nmagic");
println!("cargo:rustc-link-arg-bins=-Tlink.x");
println!("cargo:rustc-link-arg-bins=-Tdefmt.x");
}

View File

@ -0,0 +1,12 @@
MEMORY
{
/* NOTE 1 K = 1 KiBi = 1024 bytes */
FLASH : ORIGIN = 0x00000000, LENGTH = 1024K
RAM : ORIGIN = 0x20000000, LENGTH = 256K
/* These values correspond to the NRF52840 with Softdevices S140 7.3.0 */
/*
FLASH : ORIGIN = 0x00027000, LENGTH = 868K
RAM : ORIGIN = 0x20020000, LENGTH = 128K
*/
}

View File

@ -0,0 +1,194 @@
//! Basic side-by-side example of the Earliest Deadline First scheduler
//!
//! This test spawns a number of background "ambient system load" workers
//! that are constantly working, and runs two sets of trials.
//!
//! The first trial runs with no deadline set, so our trial task is at the
//! same prioritization level as the background worker tasks.
//!
//! The second trial sets a deadline, meaning that it will be given higher
//! scheduling priority than background tasks, that have no deadline set
#![no_std]
#![no_main]
use core::sync::atomic::{compiler_fence, Ordering};
use defmt::unwrap;
use embassy_executor::Spawner;
use embassy_time::{Duration, Instant, Timer};
use {defmt_rtt as _, panic_probe as _};
#[embassy_executor::main]
async fn main(spawner: Spawner) {
embassy_nrf::init(Default::default());
// Enable flash cache to remove some flash latency jitter
compiler_fence(Ordering::SeqCst);
embassy_nrf::pac::NVMC.icachecnf().write(|w| {
w.set_cacheen(true);
});
compiler_fence(Ordering::SeqCst);
//
// Baseline system load tunables
//
// how many load tasks? More load tasks means more tasks contending
// for the runqueue
let tasks = 32;
// how long should each task work for? The longer the working time,
// the longer the max jitter possible, even when a task is prioritized,
// as EDF is still cooperative and not pre-emptive
//
// 33 ticks ~= 1ms
let work_time_ticks = 33;
// what fraction, 1/denominator, should the system be busy?
// bigger number means **less** busy
//
// 2 => 50%
// 4 => 25%
// 10 => 10%
let denominator = 2;
// Total time window, so each worker is working 1/denominator
// amount of the total time
let time_window = work_time_ticks * u64::from(tasks) * denominator;
// Spawn all of our load workers!
for i in 0..tasks {
spawner.spawn(unwrap!(load_task(i, work_time_ticks, time_window)));
}
// Let all the tasks spin up
defmt::println!("Spinning up load tasks...");
Timer::after_secs(1).await;
//
// Trial task worker tunables
//
// How many steps should the workers under test run?
// More steps means more chances to have to wait for other tasks
// in line ahead of us.
let num_steps = 100;
// How many ticks should the worker take working on each step?
//
// 33 ticks ~= 1ms
let work_ticks = 33;
// How many ticks should the worker wait on each step?
//
// 66 ticks ~= 2ms
let idle_ticks = 66;
// How many times to repeat each trial?
let trials = 3;
// The total time a trial would take, in a perfect unloaded system
let theoretical = (num_steps * work_ticks) + (num_steps * idle_ticks);
defmt::println!("");
defmt::println!("Starting UNPRIORITIZED worker trials");
for _ in 0..trials {
//
// UNPRIORITIZED worker
//
defmt::println!("");
defmt::println!("Starting unprioritized worker");
let start = Instant::now();
for _ in 0..num_steps {
let now = Instant::now();
while now.elapsed().as_ticks() < work_ticks {}
Timer::after_ticks(idle_ticks).await;
}
let elapsed = start.elapsed().as_ticks();
defmt::println!(
"Trial complete, theoretical ticks: {=u64}, actual ticks: {=u64}",
theoretical,
elapsed
);
let ratio = ((elapsed as f32) / (theoretical as f32)) * 100.0;
defmt::println!("Took {=f32}% of ideal time", ratio);
Timer::after_millis(500).await;
}
Timer::after_secs(1).await;
defmt::println!("");
defmt::println!("Starting PRIORITIZED worker trials");
for _ in 0..trials {
//
// PRIORITIZED worker
//
defmt::println!("");
defmt::println!("Starting prioritized worker");
let start = Instant::now();
// Set the deadline to ~2x the theoretical time. In practice, setting any deadline
// here elevates the current task above all other worker tasks.
let meta = embassy_executor::Metadata::for_current_task().await;
meta.set_deadline_after(theoretical * 2);
// Perform the trial
for _ in 0..num_steps {
let now = Instant::now();
while now.elapsed().as_ticks() < work_ticks {}
Timer::after_ticks(idle_ticks).await;
}
let elapsed = start.elapsed().as_ticks();
defmt::println!(
"Trial complete, theoretical ticks: {=u64}, actual ticks: {=u64}",
theoretical,
elapsed
);
let ratio = ((elapsed as f32) / (theoretical as f32)) * 100.0;
defmt::println!("Took {=f32}% of ideal time", ratio);
// Unset the deadline, deadlines are not automatically cleared, and if our
// deadline is in the past, then we get very high priority!
meta.unset_deadline();
Timer::after_millis(500).await;
}
defmt::println!("");
defmt::println!("Trials Complete.");
}
#[embassy_executor::task(pool_size = 32)]
async fn load_task(id: u32, ticks_on: u64, ttl_ticks: u64) {
let mut last_print = Instant::now();
let mut last_tick = last_print;
let mut variance = 0;
let mut max_variance = 0;
loop {
let tgt = last_tick + Duration::from_ticks(ttl_ticks);
assert!(tgt > Instant::now(), "fell too behind!");
Timer::at(tgt).await;
let now = Instant::now();
// How late are we from the target?
let var = now.duration_since(tgt).as_ticks();
max_variance = max_variance.max(var);
variance += var;
// blocking work
while now.elapsed().as_ticks() < ticks_on {}
if last_print.elapsed() >= Duration::from_secs(1) {
defmt::trace!(
"Task {=u32} variance ticks (1s): {=u64}, max: {=u64}, act: {=u64}",
id,
variance,
max_variance,
ticks_on,
);
max_variance = 0;
variance = 0;
last_print = Instant::now();
}
last_tick = tgt;
}
}