Implement priority levels (#4090)

* Add priority to tasks

* Separate Mutex out of counting Semaphore

* Add priority inheritance to mutexes

* Test priority inheritance, fix issues

* Compare against the owner's current priority
This commit is contained in:
Dániel Buga 2025-09-11 10:03:44 +02:00 committed by GitHub
parent 9e190f112d
commit cf80e6a8f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 366 additions and 113 deletions

View File

@ -136,10 +136,12 @@ impl Queue {
loop { loop {
let enqueued = self.inner.with(|queue| { let enqueued = self.inner.with(|queue| {
if unsafe { queue.try_enqueue(item) } { if unsafe { queue.try_enqueue(item) } {
trace!("Queue - notify with item");
queue.waiting_for_item.notify(); queue.waiting_for_item.notify();
true true
} else { } else {
// The task will go to sleep when the above critical section is released. // The task will go to sleep when the above critical section is released.
trace!("Queue - wait for space - {:?}", deadline);
queue.waiting_for_space.wait_with_deadline(deadline); queue.waiting_for_space.wait_with_deadline(deadline);
false false
} }
@ -157,6 +159,7 @@ impl Queue {
if let Some(deadline) = deadline if let Some(deadline) = deadline
&& deadline < Instant::now() && deadline < Instant::now()
{ {
debug!("Queue - send to back - timed out");
// We have a deadline and we've timed out. // We have a deadline and we've timed out.
return false; return false;
} }
@ -182,10 +185,12 @@ impl Queue {
// Attempt to dequeue an item from the queue // Attempt to dequeue an item from the queue
let dequeued = self.inner.with(|queue| { let dequeued = self.inner.with(|queue| {
if unsafe { queue.try_dequeue(item) } { if unsafe { queue.try_dequeue(item) } {
trace!("Queue - notify with space");
queue.waiting_for_space.notify(); queue.waiting_for_space.notify();
true true
} else { } else {
// The task will go to sleep when the above critical section is released. // The task will go to sleep when the above critical section is released.
trace!("Queue - wait for item - {:?}", deadline);
queue.waiting_for_item.wait_with_deadline(deadline); queue.waiting_for_item.wait_with_deadline(deadline);
false false
} }
@ -204,6 +209,7 @@ impl Queue {
&& deadline < Instant::now() && deadline < Instant::now()
{ {
// We have a deadline and we've timed out. // We have a deadline and we've timed out.
debug!("Queue - timed out waiting for item");
return false; return false;
} }
// We can block more, so let's attempt to dequeue again. // We can block more, so let's attempt to dequeue again.
@ -213,6 +219,7 @@ impl Queue {
unsafe fn try_receive(&self, item: *mut u8) -> bool { unsafe fn try_receive(&self, item: *mut u8) -> bool {
self.inner.with(|queue| { self.inner.with(|queue| {
if unsafe { queue.try_dequeue(item) } { if unsafe { queue.try_dequeue(item) } {
trace!("Queue - notify with space");
queue.waiting_for_space.notify(); queue.waiting_for_space.notify();
true true
} else { } else {
@ -230,6 +237,7 @@ impl Queue {
} }
if was_full && !queue.full() { if was_full && !queue.full() {
trace!("Queue - notify with space");
queue.waiting_for_space.notify(); queue.waiting_for_space.notify();
} }
}) })

View File

@ -1,45 +1,100 @@
use crate::task::{TaskPtr, TaskQueue, TaskReadyQueueElement, TaskState}; use crate::task::{TaskExt, TaskPtr, TaskQueue, TaskReadyQueueElement, TaskState};
pub(crate) struct MaxPriority {
max: usize,
mask: usize,
}
impl MaxPriority {
pub const MAX_PRIORITY: usize = 31;
const fn new() -> Self {
Self { max: 0, mask: 0 }
}
fn mark_ready(&mut self, level: usize) {
self.max = self.max.max(level);
self.mask |= 1 << level;
}
fn unmark(&mut self, level: usize) {
self.mask &= !(1 << level);
self.max = Self::MAX_PRIORITY.saturating_sub(self.mask.leading_zeros() as usize);
}
fn ready(&self) -> usize {
// Priority 0 must always be ready
self.max
}
}
pub(crate) struct RunQueue { pub(crate) struct RunQueue {
// TODO: one queue per priority level pub(crate) ready_priority: MaxPriority,
pub(crate) ready_tasks: TaskQueue<TaskReadyQueueElement>,
pub(crate) ready_tasks: [TaskQueue<TaskReadyQueueElement>; MaxPriority::MAX_PRIORITY + 1],
} }
impl RunQueue { impl RunQueue {
pub(crate) const PRIORITY_LEVELS: usize = 32;
pub(crate) const fn new() -> Self { pub(crate) const fn new() -> Self {
Self { Self {
ready_tasks: TaskQueue::new(), ready_priority: MaxPriority::new(),
ready_tasks: [const { TaskQueue::new() }; MaxPriority::MAX_PRIORITY + 1],
} }
} }
pub(crate) fn mark_same_priority_task_ready(&mut self, ready_task: TaskPtr) { pub(crate) fn mark_task_ready(&mut self, mut ready_task: TaskPtr) -> bool {
self.mark_task_ready(ready_task); let priority = ready_task.priority(self);
} let current_prio = self.ready_priority.ready();
pub(crate) fn mark_task_ready(&mut self, mut ready_task: TaskPtr) { ready_task.set_state(TaskState::Ready);
// TODO: this will need to track max ready priority.
unsafe { ready_task.as_mut().state = TaskState::Ready };
if let Some(mut containing_queue) = unsafe { ready_task.as_mut().current_queue.take() } { if let Some(mut containing_queue) = unsafe { ready_task.as_mut().current_queue.take() } {
unsafe { unsafe {
containing_queue.as_mut().remove(ready_task); containing_queue.as_mut().remove(ready_task);
} }
} }
self.ready_tasks.push(ready_task); self.ready_tasks[priority].remove(ready_task);
self.ready_tasks[priority].push(ready_task);
self.ready_priority.mark_ready(priority);
if priority > current_prio {
debug!(
"mark_task_ready - New prio level: {}",
self.ready_priority.ready()
);
true
} else {
false
}
} }
pub(crate) fn pop(&mut self) -> Option<TaskPtr> { pub(crate) fn pop(&mut self) -> Option<TaskPtr> {
// TODO: on current prio level let current_prio = self.ready_priority.ready();
self.ready_tasks.pop() debug!("pop - from level: {}", current_prio);
let popped = self.ready_tasks[current_prio].pop();
if self.ready_tasks[current_prio].is_empty() {
self.ready_priority.unmark(current_prio);
debug!("pop - New prio level: {}", self.ready_priority.ready());
}
popped
} }
pub(crate) fn is_empty(&self) -> bool { pub(crate) fn is_level_empty(&self, level: usize) -> bool {
// TODO: on current prio level self.ready_tasks[level].is_empty()
self.ready_tasks.is_empty()
} }
pub(crate) fn remove(&mut self, to_delete: TaskPtr) { pub(crate) fn remove(&mut self, to_delete: TaskPtr) {
self.ready_tasks.remove(to_delete); let priority = to_delete.priority(self);
self.ready_tasks[priority].remove(to_delete);
if self.ready_tasks[priority].is_empty() {
self.ready_priority.unmark(priority);
debug!(
"remove - last task removed - New prio level: {}",
self.ready_priority.ready()
);
}
} }
} }

View File

@ -7,17 +7,9 @@ use esp_sync::NonReentrantMutex;
use crate::{ use crate::{
InternalMemory, InternalMemory,
run_queue::RunQueue, run_queue::{MaxPriority, RunQueue},
semaphore::Semaphore, semaphore::Semaphore,
task::{ task::{self, Task, TaskAllocListElement, TaskDeleteListElement, TaskExt, TaskList, TaskPtr},
self,
Context,
TaskAllocListElement,
TaskDeleteListElement,
TaskList,
TaskPtr,
TaskState,
},
timer::TimeDriver, timer::TimeDriver,
timer_queue, timer_queue,
}; };
@ -133,7 +125,12 @@ impl SchedulerState {
if event.is_timer_event() { if event.is_timer_event() {
unwrap!(self.time_driver.as_mut()).handle_alarm(|ready_task| { unwrap!(self.time_driver.as_mut()).handle_alarm(|ready_task| {
debug_assert_eq!(ready_task.state, task::TaskState::Sleeping); debug_assert_eq!(
ready_task.state,
task::TaskState::Sleeping,
"task: {:?}",
ready_task as *const Task
);
debug!("Task {:?} is ready", ready_task as *const _); debug!("Task {:?} is ready", ready_task as *const _);
@ -146,11 +143,16 @@ impl SchedulerState {
{ {
// Current task is still ready, mark it as such. // Current task is still ready, mark it as such.
debug!("re-queueing current task: {:?}", current_task); debug!("re-queueing current task: {:?}", current_task);
self.run_queue.mark_same_priority_task_ready(current_task); self.run_queue.mark_task_ready(current_task);
} }
if let Some(next_task) = self.select_next_task() { if let Some(next_task) = self.select_next_task() {
debug_assert_eq!(unsafe { next_task.as_ref().state }, task::TaskState::Ready); debug_assert_eq!(
next_task.state(),
task::TaskState::Ready,
"task: {:?}",
next_task
);
trace!("Switching task {:?} -> {:?}", self.current_task, next_task); trace!("Switching task {:?} -> {:?}", self.current_task, next_task);
@ -189,11 +191,15 @@ impl SchedulerState {
} }
fn arm_time_slice_alarm(&mut self) { fn arm_time_slice_alarm(&mut self) {
let ready_tasks = !self.run_queue.is_empty(); // The current task is not in the run queue. If the run queue on the current priority level
// is empty, the current task is the only one running at its priority level. In this
// case, we don't need time slicing.
let current_priority = unwrap!(self.current_task).priority(&mut self.run_queue);
let ready_tasks = !self.run_queue.is_level_empty(current_priority);
unwrap!(self.time_driver.as_mut()).arm_next_wakeup(ready_tasks); unwrap!(self.time_driver.as_mut()).arm_next_wakeup(ready_tasks);
} }
pub(crate) fn schedule_task_deletion(&mut self, task_to_delete: *mut Context) -> bool { pub(crate) fn schedule_task_deletion(&mut self, task_to_delete: *mut Task) -> bool {
let current_task = unwrap!(self.current_task); let current_task = unwrap!(self.current_task);
let task_to_delete = NonNull::new(task_to_delete).unwrap_or(current_task); let task_to_delete = NonNull::new(task_to_delete).unwrap_or(current_task);
let is_current = task_to_delete == current_task; let is_current = task_to_delete == current_task;
@ -218,16 +224,12 @@ impl SchedulerState {
} }
pub(crate) fn resume_task(&mut self, task: TaskPtr) { pub(crate) fn resume_task(&mut self, task: TaskPtr) {
if unsafe { task.as_ref().state == TaskState::Ready } {
return;
}
let timer_queue = unwrap!(self.time_driver.as_mut()); let timer_queue = unwrap!(self.time_driver.as_mut());
timer_queue.timer_queue.remove(task); timer_queue.timer_queue.remove(task);
self.run_queue.mark_task_ready(task); if self.run_queue.mark_task_ready(task) {
task::yield_task();
// if task.priority > current_task.priority }
task::yield_task();
} }
fn delete_task(&mut self, to_delete: TaskPtr) { fn delete_task(&mut self, to_delete: TaskPtr) {
@ -271,13 +273,19 @@ impl Scheduler {
task: extern "C" fn(*mut c_void), task: extern "C" fn(*mut c_void),
param: *mut c_void, param: *mut c_void,
task_stack_size: usize, task_stack_size: usize,
priority: u32,
) -> TaskPtr { ) -> TaskPtr {
let task = Box::new_in(Context::new(task, param, task_stack_size), InternalMemory); let task = Box::new_in(
Task::new(task, param, task_stack_size, priority),
InternalMemory,
);
let task_ptr = NonNull::from(Box::leak(task)); let task_ptr = NonNull::from(Box::leak(task));
SCHEDULER.with(|state| { SCHEDULER.with(|state| {
state.all_tasks.push(task_ptr); state.all_tasks.push(task_ptr);
state.run_queue.mark_task_ready(task_ptr); if state.run_queue.mark_task_ready(task_ptr) {
task::yield_task();
}
}); });
debug!("Task created: {:?}", task_ptr); debug!("Task created: {:?}", task_ptr);
@ -308,6 +316,7 @@ impl esp_radio_preempt_driver::Scheduler for Scheduler {
timer_queue::create_timer_task(); timer_queue::create_timer_task();
self.with(|scheduler| unwrap!(scheduler.time_driver.as_mut()).start()); self.with(|scheduler| unwrap!(scheduler.time_driver.as_mut()).start());
task::yield_task();
} }
fn disable(&self) { fn disable(&self) {
@ -325,20 +334,25 @@ impl esp_radio_preempt_driver::Scheduler for Scheduler {
} }
fn max_task_priority(&self) -> u32 { fn max_task_priority(&self) -> u32 {
RunQueue::PRIORITY_LEVELS as u32 - 1 MaxPriority::MAX_PRIORITY as u32
} }
fn task_create( fn task_create(
&self, &self,
task: extern "C" fn(*mut c_void), task: extern "C" fn(*mut c_void),
param: *mut c_void, param: *mut c_void,
_priority: u32, priority: u32,
_pin_to_core: Option<u32>, _pin_to_core: Option<u32>,
task_stack_size: usize, task_stack_size: usize,
) -> *mut c_void { ) -> *mut c_void {
self.create_task(task, param, task_stack_size) self.create_task(
.as_ptr() task,
.cast() param,
task_stack_size,
priority.min(self.max_task_priority()),
)
.as_ptr()
.cast()
} }
fn current_task(&self) -> *mut c_void { fn current_task(&self) -> *mut c_void {
@ -346,7 +360,7 @@ impl esp_radio_preempt_driver::Scheduler for Scheduler {
} }
fn schedule_task_deletion(&self, task_handle: *mut c_void) { fn schedule_task_deletion(&self, task_handle: *mut c_void) {
task::schedule_task_deletion(task_handle as *mut Context) task::schedule_task_deletion(task_handle as *mut Task)
} }
fn current_task_thread_semaphore(&self) -> SemaphorePtr { fn current_task_thread_semaphore(&self) -> SemaphorePtr {

View File

@ -9,7 +9,8 @@ use esp_radio_preempt_driver::{
use esp_sync::NonReentrantMutex; use esp_sync::NonReentrantMutex;
use crate::{ use crate::{
task::{TaskPtr, current_task}, SCHEDULER,
task::{TaskExt, TaskPtr, current_task},
wait_queue::WaitQueue, wait_queue::WaitQueue,
}; };
@ -19,8 +20,10 @@ enum SemaphoreInner {
max: u32, max: u32,
waiting: WaitQueue, waiting: WaitQueue,
}, },
RecursiveMutex { Mutex {
recursive: bool,
owner: Option<TaskPtr>, owner: Option<TaskPtr>,
original_priority: usize,
lock_counter: u32, lock_counter: u32,
waiting: WaitQueue, waiting: WaitQueue,
}, },
@ -37,18 +40,36 @@ impl SemaphoreInner {
false false
} }
} }
SemaphoreInner::RecursiveMutex { SemaphoreInner::Mutex {
recursive,
owner, owner,
lock_counter, lock_counter,
original_priority,
.. ..
} => { } => {
// TODO: priority inheritance
let current = current_task(); let current = current_task();
if owner.is_none() || owner.unwrap() == current { if let Some(owner) = owner {
*lock_counter += 1; if *owner == current && *recursive {
true *lock_counter += 1;
true
} else {
// We can't lock the mutex. Make sure the mutex holder has a high enough
// priority to avoid priority inversion.
SCHEDULER.with(|scheduler| {
let current_priority = current.priority(&mut scheduler.run_queue);
if owner.priority(&mut scheduler.run_queue) < current_priority {
owner.set_priority(&mut scheduler.run_queue, current_priority);
scheduler.resume_task(*owner);
}
false
})
}
} else { } else {
false *owner = Some(current);
*lock_counter += 1;
*original_priority =
SCHEDULER.with(|scheduler| current.priority(&mut scheduler.run_queue));
true
} }
} }
} }
@ -64,9 +85,10 @@ impl SemaphoreInner {
false false
} }
} }
SemaphoreInner::RecursiveMutex { SemaphoreInner::Mutex {
owner, owner,
lock_counter, lock_counter,
original_priority,
.. ..
} => { } => {
let current = current_task(); let current = current_task();
@ -74,7 +96,11 @@ impl SemaphoreInner {
if *owner == Some(current) && *lock_counter > 0 { if *owner == Some(current) && *lock_counter > 0 {
*lock_counter -= 1; *lock_counter -= 1;
if *lock_counter == 0 { if *lock_counter == 0 {
*owner = None; if let Some(owner) = owner.take() {
SCHEDULER.with(|scheduler| {
owner.set_priority(&mut scheduler.run_queue, *original_priority);
});
}
} }
true true
} else { } else {
@ -87,23 +113,25 @@ impl SemaphoreInner {
fn current_count(&mut self) -> u32 { fn current_count(&mut self) -> u32 {
match self { match self {
SemaphoreInner::Counting { current, .. } => *current, SemaphoreInner::Counting { current, .. } => *current,
SemaphoreInner::RecursiveMutex { .. } => { SemaphoreInner::Mutex { .. } => {
panic!("RecursiveMutex does not support current_count") panic!("RecursiveMutex does not support current_count")
} }
} }
} }
fn wait_with_deadline(&mut self, deadline: Option<Instant>) { fn wait_with_deadline(&mut self, deadline: Option<Instant>) {
trace!("Semaphore wait_with_deadline - {:?}", deadline);
match self { match self {
SemaphoreInner::Counting { waiting, .. } => waiting.wait_with_deadline(deadline), SemaphoreInner::Counting { waiting, .. } => waiting.wait_with_deadline(deadline),
SemaphoreInner::RecursiveMutex { waiting, .. } => waiting.wait_with_deadline(deadline), SemaphoreInner::Mutex { waiting, .. } => waiting.wait_with_deadline(deadline),
} }
} }
fn notify_one(&mut self) { fn notify(&mut self) {
trace!("Semaphore notify");
match self { match self {
SemaphoreInner::Counting { waiting, .. } => waiting.notify(), SemaphoreInner::Counting { waiting, .. } => waiting.notify(),
SemaphoreInner::RecursiveMutex { waiting, .. } => waiting.notify(), SemaphoreInner::Mutex { waiting, .. } => waiting.notify(),
} }
} }
} }
@ -120,9 +148,11 @@ impl Semaphore {
max, max,
waiting: WaitQueue::new(), waiting: WaitQueue::new(),
}, },
SemaphoreKind::RecursiveMutex => SemaphoreInner::RecursiveMutex { SemaphoreKind::Mutex | SemaphoreKind::RecursiveMutex => SemaphoreInner::Mutex {
recursive: matches!(kind, SemaphoreKind::RecursiveMutex),
owner: None, owner: None,
lock_counter: 0, lock_counter: 0,
original_priority: 0,
waiting: WaitQueue::new(), waiting: WaitQueue::new(),
}, },
}; };
@ -153,6 +183,7 @@ impl Semaphore {
}); });
if taken { if taken {
debug!("Semaphore - take - success");
return true; return true;
} }
@ -165,6 +196,7 @@ impl Semaphore {
&& deadline < Instant::now() && deadline < Instant::now()
{ {
// We have a deadline and we've timed out. // We have a deadline and we've timed out.
trace!("Semaphore - take - timed out");
return false; return false;
} }
// We can block more, so let's attempt to take the semaphore again. // We can block more, so let's attempt to take the semaphore again.
@ -178,7 +210,7 @@ impl Semaphore {
pub fn give(&self) -> bool { pub fn give(&self) -> bool {
self.inner.with(|sem| { self.inner.with(|sem| {
if sem.try_give() { if sem.try_give() {
sem.notify_one(); sem.notify();
true true
} else { } else {
false false

View File

@ -21,7 +21,7 @@ pub(crate) enum TaskState {
Sleeping, Sleeping,
} }
pub(crate) type TaskPtr = NonNull<Context>; pub(crate) type TaskPtr = NonNull<Task>;
pub(crate) type TaskListItem = Option<TaskPtr>; pub(crate) type TaskListItem = Option<TaskPtr>;
/// An abstraction that allows the task to contain multiple different queue pointers. /// An abstraction that allows the task to contain multiple different queue pointers.
@ -57,12 +57,34 @@ task_list_item!(TaskTimerQueueElement, timer_queue_item);
/// implement stuff for NonNull. /// implement stuff for NonNull.
pub(crate) trait TaskExt { pub(crate) trait TaskExt {
fn resume(self); fn resume(self);
fn priority(self, _: &mut RunQueue) -> usize;
fn set_priority(self, _: &mut RunQueue, new_pro: usize);
fn state(self) -> TaskState;
fn set_state(self, state: TaskState);
} }
impl TaskExt for TaskPtr { impl TaskExt for TaskPtr {
fn resume(self) { fn resume(self) {
SCHEDULER.with(|scheduler| scheduler.resume_task(self)) SCHEDULER.with(|scheduler| scheduler.resume_task(self))
} }
fn priority(self, _: &mut RunQueue) -> usize {
unsafe { self.as_ref().priority as usize }
}
fn set_priority(mut self, run_queue: &mut RunQueue, new_pro: usize) {
run_queue.remove(self);
unsafe { self.as_mut().priority = new_pro as u32 };
}
fn state(self) -> TaskState {
unsafe { self.as_ref().state }
}
fn set_state(mut self, state: TaskState) {
trace!("Task {:?} state changed to {:?}", self, state);
unsafe { self.as_mut().state = state };
}
} }
/// A singly linked list of tasks. /// A singly linked list of tasks.
@ -173,7 +195,7 @@ impl<E: TaskListElement> TaskQueue<E> {
} }
#[repr(C)] #[repr(C)]
pub(crate) struct Context { pub(crate) struct Task {
#[cfg(riscv)] #[cfg(riscv)]
pub trap_frame: Registers, pub trap_frame: Registers,
#[cfg(xtensa)] #[cfg(xtensa)]
@ -181,6 +203,7 @@ pub(crate) struct Context {
pub thread_semaphore: Option<SemaphorePtr>, pub thread_semaphore: Option<SemaphorePtr>,
pub state: TaskState, pub state: TaskState,
pub _allocated_stack: Box<[MaybeUninit<u32>], InternalMemory>, pub _allocated_stack: Box<[MaybeUninit<u32>], InternalMemory>,
pub priority: u32,
pub wakeup_at: u64, pub wakeup_at: u64,
@ -203,13 +226,17 @@ pub(crate) struct Context {
const STACK_CANARY: u32 = 0xDEEDBAAD; const STACK_CANARY: u32 = 0xDEEDBAAD;
impl Context { impl Task {
pub(crate) fn new( pub(crate) fn new(
task_fn: extern "C" fn(*mut c_void), task_fn: extern "C" fn(*mut c_void),
param: *mut c_void, param: *mut c_void,
task_stack_size: usize, task_stack_size: usize,
priority: u32,
) -> Self { ) -> Self {
trace!("task_create {:?} {:?} {}", task_fn, param, task_stack_size); trace!(
"task_create {:?}({:?}) stack_size = {} priority = {}",
task_fn, param, task_stack_size, priority
);
let task_stack_size_words = task_stack_size / 4; let task_stack_size_words = task_stack_size / 4;
let mut stack = Box::<[u32], _>::new_uninit_slice_in(task_stack_size_words, InternalMemory); let mut stack = Box::<[u32], _>::new_uninit_slice_in(task_stack_size_words, InternalMemory);
@ -218,12 +245,13 @@ impl Context {
stack[0] = MaybeUninit::new(STACK_CANARY); stack[0] = MaybeUninit::new(STACK_CANARY);
Context { Task {
trap_frame: new_task_context(task_fn, param, stack_top), trap_frame: new_task_context(task_fn, param, stack_top),
thread_semaphore: None, thread_semaphore: None,
state: TaskState::Ready, state: TaskState::Ready,
_allocated_stack: stack, _allocated_stack: stack,
current_queue: None, current_queue: None,
priority,
wakeup_at: 0, wakeup_at: 0,
@ -243,14 +271,14 @@ impl Context {
unsafe { self._allocated_stack[0].assume_init() }, unsafe { self._allocated_stack[0].assume_init() },
STACK_CANARY, STACK_CANARY,
"Stack overflow detected in {:?}", "Stack overflow detected in {:?}",
self as *const Context self as *const Task
); );
} }
} }
impl Drop for Context { impl Drop for Task {
fn drop(&mut self) { fn drop(&mut self) {
debug!("Dropping task: {:?}", self as *mut Context); debug!("Dropping task: {:?}", self as *mut Task);
if let Some(sem) = self.thread_semaphore { if let Some(sem) = self.thread_semaphore {
let sem = unsafe { SemaphoreHandle::from_ptr(sem) }; let sem = unsafe { SemaphoreHandle::from_ptr(sem) };
core::mem::drop(sem) core::mem::drop(sem)
@ -261,7 +289,7 @@ impl Drop for Context {
pub(super) fn allocate_main_task() { pub(super) fn allocate_main_task() {
// This context will be filled out by the first context switch. // This context will be filled out by the first context switch.
let task = Box::new_in( let task = Box::new_in(
Context { Task {
#[cfg(riscv)] #[cfg(riscv)]
trap_frame: Registers::default(), trap_frame: Registers::default(),
#[cfg(xtensa)] #[cfg(xtensa)]
@ -270,6 +298,7 @@ pub(super) fn allocate_main_task() {
state: TaskState::Ready, state: TaskState::Ready,
_allocated_stack: Box::<[u32], _>::new_uninit_slice_in(0, InternalMemory), _allocated_stack: Box::<[u32], _>::new_uninit_slice_in(0, InternalMemory),
current_queue: None, current_queue: None,
priority: 1,
wakeup_at: 0, wakeup_at: 0,
@ -292,26 +321,26 @@ pub(super) fn allocate_main_task() {
// The main task is already running, no need to add it to the ready queue. // The main task is already running, no need to add it to the ready queue.
state.all_tasks.push(main_task_ptr); state.all_tasks.push(main_task_ptr);
state.current_task = Some(main_task_ptr); state.current_task = Some(main_task_ptr);
state.run_queue.mark_task_ready(main_task_ptr);
}) })
} }
pub(crate) fn spawn_idle_task() { pub(crate) fn spawn_idle_task() {
let ptr = SCHEDULER.create_task(idle_task, core::ptr::null_mut(), 4096); let ptr = SCHEDULER.create_task(idle_task, core::ptr::null_mut(), 4096, 0);
debug!("Idle task created: {:?}", ptr); debug!("Idle task created: {:?}", ptr);
} }
pub(crate) extern "C" fn idle_task(_: *mut c_void) { pub(crate) extern "C" fn idle_task(_: *mut c_void) {
loop { loop {
yield_task(); // TODO: make this configurable.
// TODO: once we have priorities, we can waiti: #[cfg(xtensa)]
// #[cfg(xtensa)] unsafe {
// unsafe { core::arch::asm!("waiti 0");
// core::arch::asm!("waiti 0") }
// } #[cfg(riscv)]
// #[cfg(riscv)] unsafe {
// unsafe { core::arch::asm!("wfi");
// core::arch::asm!("wfi") }
// }
} }
} }
@ -338,7 +367,7 @@ pub(super) fn delete_all_tasks() {
} }
} }
pub(super) fn with_current_task<R>(mut cb: impl FnMut(&mut Context) -> R) -> R { pub(super) fn with_current_task<R>(mut cb: impl FnMut(&mut Task) -> R) -> R {
SCHEDULER.with(|state| cb(unsafe { unwrap!(state.current_task).as_mut() })) SCHEDULER.with(|state| cb(unsafe { unwrap!(state.current_task).as_mut() }))
} }
@ -346,7 +375,7 @@ pub(super) fn current_task() -> TaskPtr {
with_current_task(|task| NonNull::from(task)) with_current_task(|task| NonNull::from(task))
} }
pub(super) fn schedule_task_deletion(task: *mut Context) { pub(super) fn schedule_task_deletion(task: *mut Task) {
trace!("schedule_task_deletion {:?}", task); trace!("schedule_task_deletion {:?}", task);
let deleting_current = SCHEDULER.with(|state| state.schedule_task_deletion(task)); let deleting_current = SCHEDULER.with(|state| state.schedule_task_deletion(task));

View File

@ -3,7 +3,7 @@ use core::ffi::c_void;
pub(crate) use esp_hal::trapframe::TrapFrame; pub(crate) use esp_hal::trapframe::TrapFrame;
use esp_hal::{xtensa_lx, xtensa_lx_rt}; use esp_hal::{xtensa_lx, xtensa_lx_rt};
use crate::{SCHEDULER, task::Context}; use crate::{SCHEDULER, task::Task};
pub(crate) fn new_task_context( pub(crate) fn new_task_context(
task_fn: extern "C" fn(*mut c_void), task_fn: extern "C" fn(*mut c_void),
@ -34,11 +34,11 @@ pub(crate) fn new_task_context(
} }
} }
pub(crate) fn restore_task_context(ctx: &mut Context, trap_frame: &mut TrapFrame) { pub(crate) fn restore_task_context(ctx: &mut Task, trap_frame: &mut TrapFrame) {
*trap_frame = ctx.trap_frame; *trap_frame = ctx.trap_frame;
} }
pub(crate) fn save_task_context(ctx: &mut Context, trap_frame: &TrapFrame) { pub(crate) fn save_task_context(ctx: &mut Task, trap_frame: &TrapFrame) {
ctx.trap_frame = *trap_frame; ctx.trap_frame = *trap_frame;
} }

View File

@ -7,7 +7,7 @@ use crate::{
SCHEDULER, SCHEDULER,
TICK_RATE, TICK_RATE,
TimeBase, TimeBase,
task::{Context, TaskPtr, TaskQueue, TaskState, TaskTimerQueueElement}, task::{Task, TaskExt, TaskPtr, TaskQueue, TaskState, TaskTimerQueueElement},
}; };
const TIMESLICE_DURATION: Duration = Rate::from_hz(TICK_RATE).as_duration(); const TIMESLICE_DURATION: Duration = Rate::from_hz(TICK_RATE).as_duration();
@ -91,7 +91,7 @@ impl TimeDriver {
self.timer.stop(); self.timer.stop();
} }
pub(crate) fn handle_alarm(&mut self, mut on_task_ready: impl FnMut(&mut Context)) { pub(crate) fn handle_alarm(&mut self, mut on_task_ready: impl FnMut(&mut Task)) {
let mut timer_queue = core::mem::take(&mut self.timer_queue); let mut timer_queue = core::mem::take(&mut self.timer_queue);
let now = Instant::now().duration_since_epoch().as_micros(); let now = Instant::now().duration_since_epoch().as_micros();
@ -136,21 +136,28 @@ impl TimeDriver {
} }
pub(crate) fn schedule_wakeup(&mut self, mut current_task: TaskPtr, at: Instant) -> bool { pub(crate) fn schedule_wakeup(&mut self, mut current_task: TaskPtr, at: Instant) -> bool {
unsafe { debug_assert_eq!(current_task.as_mut().state, TaskState::Ready) }; debug_assert_eq!(
current_task.state(),
// Target time is in the past, don't sleep. TaskState::Ready,
if at <= Instant::now() { "task: {:?}",
return false; current_task
} );
unsafe { current_task.as_mut().state = TaskState::Sleeping };
// Target time is infinite, suspend task without waking up via timer. // Target time is infinite, suspend task without waking up via timer.
if at == Instant::EPOCH + Duration::MAX { if at == Instant::EPOCH + Duration::MAX {
current_task.set_state(TaskState::Sleeping);
debug!("Suspending task: {:?}", current_task); debug!("Suspending task: {:?}", current_task);
return true; return true;
} }
// Target time is in the past, don't sleep.
if at <= Instant::now() {
debug!("Target time is in the past");
return false;
}
current_task.set_state(TaskState::Sleeping);
let timestamp = at.duration_since_epoch().as_micros(); let timestamp = at.duration_since_epoch().as_micros();
debug!( debug!(
"Scheduling wakeup for task {:?} at timestamp {}", "Scheduling wakeup for task {:?} at timestamp {}",

View File

@ -320,7 +320,7 @@ register_timer_implementation!(Timer);
pub(crate) fn create_timer_task() { pub(crate) fn create_timer_task() {
// create the timer task // create the timer task
TIMER_QUEUE.inner.with(|q| { TIMER_QUEUE.inner.with(|q| {
let task_ptr = SCHEDULER.create_task(timer_task, core::ptr::null_mut(), 8192); let task_ptr = SCHEDULER.create_task(timer_task, core::ptr::null_mut(), 8192, 2);
q.task = Some(task_ptr); q.task = Some(task_ptr);
}); });
} }

View File

@ -28,12 +28,12 @@ pub type SemaphorePtr = NonNull<()>;
/// The type of semaphore or mutex to create. /// The type of semaphore or mutex to create.
pub enum SemaphoreKind { pub enum SemaphoreKind {
/// Counting semaphore or non-recursive mutex. /// Counting semaphore.
///
/// To obtain a non-recursive mutex, use [`SemaphoreKind::Counting`] with maximum and initial
/// counts of 1.
Counting { max: u32, initial: u32 }, Counting { max: u32, initial: u32 },
/// Non-recursive mutex.
Mutex,
/// Recursive mutex. /// Recursive mutex.
RecursiveMutex, RecursiveMutex,
} }

View File

@ -7,7 +7,7 @@ pub(crate) fn mutex_create(recursive: bool) -> *mut c_void {
let ptr = SemaphoreHandle::new(if recursive { let ptr = SemaphoreHandle::new(if recursive {
SemaphoreKind::RecursiveMutex SemaphoreKind::RecursiveMutex
} else { } else {
SemaphoreKind::Counting { max: 1, initial: 1 } SemaphoreKind::Mutex
}) })
.leak() .leak()
.as_ptr() .as_ptr()

View File

@ -141,12 +141,9 @@ use hal::{
time::Rate, time::Rate,
}; };
use crate::radio::{setup_radio_isr, shutdown_radio_isr};
#[cfg(feature = "wifi")] #[cfg(feature = "wifi")]
use crate::wifi::WifiError; use crate::wifi::WifiError;
use crate::{
preempt::yield_task,
radio::{setup_radio_isr, shutdown_radio_isr},
};
// can't use instability on inline module definitions, see https://github.com/rust-lang/rust/issues/54727 // can't use instability on inline module definitions, see https://github.com/rust-lang/rust/issues/54727
#[doc(hidden)] #[doc(hidden)]
@ -273,8 +270,6 @@ pub fn init<'d>() -> Result<Controller<'d>, InitializationError> {
// This initializes the task switcher // This initializes the task switcher
preempt::enable(); preempt::enable();
yield_task();
wifi_set_log_verbose(); wifi_set_log_verbose();
init_radio_clocks(); init_radio_clocks();

View File

@ -236,6 +236,7 @@ esp-preempt = { path = "../esp-preempt", optional = true }
esp-storage = { path = "../esp-storage", optional = true } esp-storage = { path = "../esp-storage", optional = true }
esp-sync = { path = "../esp-sync" } esp-sync = { path = "../esp-sync" }
esp-radio = { path = "../esp-radio", optional = true } esp-radio = { path = "../esp-radio", optional = true }
esp-radio-preempt-driver = { path = "../esp-radio-preempt-driver", optional = true }
portable-atomic = "1.11.0" portable-atomic = "1.11.0"
static_cell = { version = "2.1.0" } static_cell = { version = "2.1.0" }
semihosting = { version = "0.1", features= ["stdio", "panic-handler"] } semihosting = { version = "0.1", features= ["stdio", "panic-handler"] }
@ -262,7 +263,7 @@ default = []
unstable = ["esp-hal/unstable"] unstable = ["esp-hal/unstable"]
defmt = ["dep:defmt-rtt", "esp-hal/defmt", "embedded-test/defmt", "esp-radio?/defmt", "esp-hal-embassy?/defmt", "esp-preempt?/defmt"] defmt = ["dep:defmt-rtt", "esp-hal/defmt", "embedded-test/defmt", "esp-radio?/defmt", "esp-hal-embassy?/defmt", "esp-preempt?/defmt"]
esp-radio = ["dep:esp-radio", "dep:esp-preempt"] esp-radio = ["dep:esp-radio", "dep:esp-preempt", "dep:esp-radio-preempt-driver"]
# Device support (required!): # Device support (required!):
esp32 = [ esp32 = [

View File

@ -45,6 +45,8 @@ async fn try_init(
#[cfg(test)] #[cfg(test)]
#[embedded_test::tests(default_timeout = 3, executor = esp_hal_embassy::Executor::new())] #[embedded_test::tests(default_timeout = 3, executor = esp_hal_embassy::Executor::new())]
mod tests { mod tests {
use defmt::info;
use super::*; use super::*;
#[init] #[init]
@ -125,4 +127,114 @@ mod tests {
// Now, can we do it again? // Now, can we do it again?
let _wifi = esp_radio::wifi::new(&esp_radio_ctrl, p.WIFI.reborrow()).unwrap(); let _wifi = esp_radio::wifi::new(&esp_radio_ctrl, p.WIFI.reborrow()).unwrap();
} }
#[test]
fn test_esp_preempt_priority_inheritance(p: Peripherals) {
use core::ffi::c_void;
use esp_radio_preempt_driver as preempt;
use portable_atomic::{AtomicBool, Ordering};
use preempt::semaphore::{SemaphoreHandle, SemaphoreKind};
let timg0 = TimerGroup::new(p.TIMG0);
esp_preempt::init(timg0.timer0);
preempt::enable();
// We need three tasks to test priority inheritance:
// - A high priority task that will attempt to acquire the mutex.
// - A medium priority task that will do some unrelated work.
// - A low priority task that will hold the mutex before the high priority task could
// acquire it.
//
// Priority inversion is a situation where the higher priority task is being blocked, and a
// medium priority task is ready to run while the low priority task holds the mutex. The
// issue is that in this case the medium priority task is effectively prioritized over the
// high priority task.
//
// The test will be successful if the high priority task is able to acquire the mutex
// before the medium priority task runs.
// The main task serves as the low priority task.
// The main task will spawn the high and medium priority tasks after obtaining the mutex.
// The medium priority task will assert that the high priority task has finished.
struct TestContext {
ready_semaphore: SemaphoreHandle,
mutex: SemaphoreHandle,
high_priority_task_finished: AtomicBool,
}
let test_context = TestContext {
// This semaphore signals the end of the test
ready_semaphore: SemaphoreHandle::new(SemaphoreKind::Counting { max: 1, initial: 0 }),
// We'll use this mutex to test priority inheritance
mutex: SemaphoreHandle::new(SemaphoreKind::Mutex),
high_priority_task_finished: AtomicBool::new(false),
};
test_context.mutex.take(None);
info!("Low: mutex obtained");
// Spawn tasks
extern "C" fn high_priority_task(context: *mut c_void) {
let context = unsafe { &*(context as *const TestContext) };
info!("High: acquiring mutex");
context.mutex.take(None);
info!("High: acquired mutex, mark finished");
context
.high_priority_task_finished
.store(true, Ordering::SeqCst);
info!("High: released mutex");
context.mutex.give();
// TODO: support one-shot tasks in esp-preempt
unsafe {
preempt::schedule_task_deletion(core::ptr::null_mut());
}
}
extern "C" fn medium_priority_task(context: *mut c_void) {
let context = unsafe { &*(context as *const TestContext) };
info!("Medium: asserting high-priority task finished");
assert!(context.high_priority_task_finished.load(Ordering::SeqCst));
info!("Medium: marking test finished");
context.ready_semaphore.give();
// TODO: support one-shot tasks in esp-preempt
unsafe {
preempt::schedule_task_deletion(core::ptr::null_mut());
}
}
unsafe {
info!("Low: spawning high priority task");
preempt::task_create(
high_priority_task,
(&raw const test_context).cast::<c_void>().cast_mut(),
3,
None,
4096,
);
info!("Low: spawning medium priority task");
preempt::task_create(
medium_priority_task,
(&raw const test_context).cast::<c_void>().cast_mut(),
2,
None,
4096,
);
}
// Priority inheritance means this runs before the medium priority task
info!("Low: tasks spawned, returning mutex");
test_context.mutex.give();
info!("Low: wait for tasks to finish");
test_context.ready_semaphore.take(None);
preempt::disable();
}
} }