mirror of
https://github.com/embassy-rs/embassy.git
synced 2025-09-27 04:10:25 +00:00
Implement Deadline Ranked Scheduling
This implements a minimal version of Deadline Rank Scheduling, as well as ways to access and set Deadlines. This still needs some UX improvements, but is likely Enough for testing.
This commit is contained in:
parent
535c80e61f
commit
1f50e4d496
@ -89,7 +89,6 @@ embassy-sync = { path = "../embassy-sync" }
|
||||
rustversion = "1.0.21"
|
||||
|
||||
[features]
|
||||
|
||||
## Enable nightly-only features
|
||||
nightly = ["embassy-executor-macros/nightly"]
|
||||
|
||||
@ -133,4 +132,4 @@ rtos-trace = ["_any_trace", "metadata-name", "dep:rtos-trace", "dep:embassy-time
|
||||
_any_trace = []
|
||||
|
||||
## Enable "Deadline Rank Scheduler"
|
||||
drs-scheduler = ["dep:cordyceps"]
|
||||
drs-scheduler = ["dep:cordyceps", "dep:embassy-time-driver"]
|
||||
|
@ -68,6 +68,9 @@ extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static
|
||||
#[cfg(feature = "drs-scheduler")]
|
||||
use cordyceps::{stack, Linked};
|
||||
|
||||
#[cfg(feature = "drs-scheduler")]
|
||||
pub use run_queue::Deadline;
|
||||
|
||||
/// Raw task header for use in task pointers.
|
||||
///
|
||||
/// A task can be in one of the following states:
|
||||
@ -124,6 +127,9 @@ pub(crate) struct TaskHeader {
|
||||
#[cfg(feature = "drs-scheduler")]
|
||||
pub(crate) links: stack::Links<TaskHeader>,
|
||||
|
||||
#[cfg(feature = "drs-scheduler")]
|
||||
pub(crate) deadline: SyncUnsafeCell<u64>,
|
||||
|
||||
// TODO(AJM): We could potentially replace RunQueueItem for other runqueue impls, though
|
||||
// right now cordyceps doesn't work on non-atomic systems
|
||||
#[cfg(not(feature = "drs-scheduler"))]
|
||||
@ -255,6 +261,8 @@ impl<F: Future + 'static> TaskStorage<F> {
|
||||
run_queue_item: RunQueueItem::new(),
|
||||
#[cfg(feature = "drs-scheduler")]
|
||||
links: stack::Links::new(),
|
||||
#[cfg(feature = "drs-scheduler")]
|
||||
deadline: SyncUnsafeCell::new(0u64),
|
||||
state: State::new(),
|
||||
executor: AtomicPtr::new(core::ptr::null_mut()),
|
||||
// Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
|
||||
@ -352,6 +360,10 @@ impl<F: Future + 'static> AvailableTask<F> {
|
||||
self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
|
||||
self.task.future.write_in_place(future);
|
||||
|
||||
// TODO(AJM): Some other way of setting this? Just a placeholder
|
||||
#[cfg(feature = "drs-scheduler")]
|
||||
self.task.raw.deadline.set(u64::MAX);
|
||||
|
||||
let task = TaskRef::new(self.task);
|
||||
|
||||
SpawnToken::new(task)
|
||||
|
@ -1,6 +1,7 @@
|
||||
use super::{TaskHeader, TaskRef};
|
||||
use cordyceps::TransferStack;
|
||||
|
||||
use cordyceps::{SortedList, TransferStack};
|
||||
use core::future::{Future, poll_fn};
|
||||
use core::task::Poll;
|
||||
|
||||
/// Atomic task queue using a very, very simple lock-free linked-list queue:
|
||||
///
|
||||
@ -38,10 +39,142 @@ impl RunQueue {
|
||||
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
|
||||
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
|
||||
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
|
||||
let taken = self.stack.take_all();
|
||||
for taskref in taken {
|
||||
let mut sorted = SortedList::<TaskHeader>::new(|lhs, rhs| unsafe {
|
||||
// TODO: Do we need any kind of access control here? Not if we say that
|
||||
// tasks can only set their own priority, which they can't do if we're in
|
||||
// the scheduler
|
||||
lhs.deadline.get().cmp(&rhs.deadline.get())
|
||||
});
|
||||
|
||||
loop {
|
||||
// For each loop, grab any newly pended items
|
||||
let taken = self.stack.take_all();
|
||||
|
||||
// Sort these into the list - this is potentially expensive! We do an
|
||||
// insertion sort of new items, which iterates the linked list.
|
||||
//
|
||||
// Something on the order of `O(n * m)`, where `n` is the number
|
||||
// of new tasks, and `m` is the number of already pending tasks.
|
||||
sorted.extend(taken);
|
||||
|
||||
// Pop the task with the SOONEST deadline. If there are no tasks
|
||||
// pending, then we are done.
|
||||
let Some(taskref) = sorted.pop_front() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// We got one task, mark it as dequeued, and process the task.
|
||||
taskref.header().state.run_dequeue();
|
||||
on_task(taskref);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A type for interacting with the deadline of the current task
|
||||
pub struct Deadline {
|
||||
/// Deadline value in ticks, same time base and ticks as `embassy-time`
|
||||
pub instant_ticks: u64,
|
||||
}
|
||||
|
||||
impl Deadline {
|
||||
/// Set the current task's deadline at exactly `instant_ticks`
|
||||
///
|
||||
/// This method is a future in order to access the currently executing task's
|
||||
/// header which contains the deadline.
|
||||
///
|
||||
/// Analogous to `Timer::at`.
|
||||
///
|
||||
/// TODO: Should we check/panic if the deadline is in the past?
|
||||
#[must_use = "Setting deadline must be polled to be effective"]
|
||||
pub fn set_current_task_deadline(instant_ticks: u64) -> impl Future<Output = ()> {
|
||||
poll_fn(move |cx| {
|
||||
let task = super::task_from_waker(cx.waker());
|
||||
// SAFETY: A task can only modify its own deadline, while the task is being
|
||||
// polled, meaning that there cannot be concurrent access to the deadline.
|
||||
unsafe {
|
||||
task.header().deadline.set(instant_ticks);
|
||||
}
|
||||
Poll::Ready(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Set the current task's deadline `duration_ticks` in the future from when
|
||||
/// this future is polled.
|
||||
///
|
||||
/// This method is a future in order to access the currently executing task's
|
||||
/// header which contains the deadline
|
||||
///
|
||||
/// Analogous to `Timer::after`
|
||||
///
|
||||
/// TODO: Do we want to return what the deadline is?
|
||||
#[must_use = "Setting deadline must be polled to be effective"]
|
||||
pub fn set_current_task_deadline_after(duration_ticks: u64) -> impl Future<Output = ()> {
|
||||
poll_fn(move |cx| {
|
||||
let task = super::task_from_waker(cx.waker());
|
||||
let now = embassy_time_driver::now();
|
||||
|
||||
// Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
|
||||
// it for now, we can probably make this wrapping_add for performance
|
||||
// reasons later.
|
||||
let deadline = now.saturating_add(duration_ticks);
|
||||
|
||||
// SAFETY: A task can only modify its own deadline, while the task is being
|
||||
// polled, meaning that there cannot be concurrent access to the deadline.
|
||||
unsafe {
|
||||
task.header().deadline.set(deadline);
|
||||
}
|
||||
Poll::Ready(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Set the current task's deadline `increment_ticks` from the previous deadline.
|
||||
///
|
||||
/// Note that by default (unless otherwise set), tasks start life with the deadline
|
||||
/// u64::MAX, which means this method will have no effect.
|
||||
///
|
||||
/// This method is a future in order to access the currently executing task's
|
||||
/// header which contains the deadline
|
||||
///
|
||||
/// Analogous to one increment of `Ticker::every().next()`.
|
||||
///
|
||||
/// TODO: Do we want to return what the deadline is?
|
||||
#[must_use = "Setting deadline must be polled to be effective"]
|
||||
pub fn increment_current_task_deadline(increment_ticks: u64) -> impl Future<Output = ()> {
|
||||
poll_fn(move |cx| {
|
||||
let task = super::task_from_waker(cx.waker());
|
||||
|
||||
// SAFETY: A task can only modify its own deadline, while the task is being
|
||||
// polled, meaning that there cannot be concurrent access to the deadline.
|
||||
unsafe {
|
||||
// Get the last value
|
||||
let last = task.header().deadline.get();
|
||||
|
||||
// Since ticks is a u64, saturating add is PROBABLY overly cautious, leave
|
||||
// it for now, we can probably make this wrapping_add for performance
|
||||
// reasons later.
|
||||
let deadline = last.saturating_add(increment_ticks);
|
||||
|
||||
// Store the new value
|
||||
task.header().deadline.set(deadline);
|
||||
}
|
||||
Poll::Ready(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the current task's deadline as a tick value.
|
||||
///
|
||||
/// This method is a future in order to access the currently executing task's
|
||||
/// header which contains the deadline
|
||||
pub fn get_current_task_deadline() -> impl Future<Output = Self> {
|
||||
poll_fn(move |cx| {
|
||||
let task = super::task_from_waker(cx.waker());
|
||||
|
||||
// SAFETY: A task can only modify its own deadline, while the task is being
|
||||
// polled, meaning that there cannot be concurrent access to the deadline.
|
||||
let deadline = unsafe {
|
||||
task.header().deadline.get()
|
||||
};
|
||||
Poll::Ready(Self { instant_ticks: deadline })
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user