From cf80e6a8f1d5cd62b03b1cb540608e1902ececc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Thu, 11 Sep 2025 10:03:44 +0200 Subject: [PATCH] Implement priority levels (#4090) * Add priority to tasks * Separate Mutex out of counting Semaphore * Add priority inheritance to mutexes * Test priority inheritance, fix issues * Compare against the owner's current priority --- esp-preempt/src/queue.rs | 8 ++ esp-preempt/src/run_queue.rs | 93 ++++++++++++++---- esp-preempt/src/scheduler.rs | 74 ++++++++------ esp-preempt/src/semaphore.rs | 64 +++++++++---- esp-preempt/src/task/mod.rs | 73 +++++++++----- esp-preempt/src/task/xtensa.rs | 6 +- esp-preempt/src/timer/mod.rs | 27 ++++-- esp-preempt/src/timer_queue.rs | 2 +- esp-radio-preempt-driver/src/semaphore.rs | 8 +- esp-radio/src/compat/mutex.rs | 2 +- esp-radio/src/lib.rs | 7 +- hil-test/Cargo.toml | 3 +- hil-test/tests/esp_radio_init.rs | 112 ++++++++++++++++++++++ 13 files changed, 366 insertions(+), 113 deletions(-) diff --git a/esp-preempt/src/queue.rs b/esp-preempt/src/queue.rs index b1339714a..a6725489c 100644 --- a/esp-preempt/src/queue.rs +++ b/esp-preempt/src/queue.rs @@ -136,10 +136,12 @@ impl Queue { loop { let enqueued = self.inner.with(|queue| { if unsafe { queue.try_enqueue(item) } { + trace!("Queue - notify with item"); queue.waiting_for_item.notify(); true } else { // The task will go to sleep when the above critical section is released. + trace!("Queue - wait for space - {:?}", deadline); queue.waiting_for_space.wait_with_deadline(deadline); false } @@ -157,6 +159,7 @@ impl Queue { if let Some(deadline) = deadline && deadline < Instant::now() { + debug!("Queue - send to back - timed out"); // We have a deadline and we've timed out. return false; } @@ -182,10 +185,12 @@ impl Queue { // Attempt to dequeue an item from the queue let dequeued = self.inner.with(|queue| { if unsafe { queue.try_dequeue(item) } { + trace!("Queue - notify with space"); queue.waiting_for_space.notify(); true } else { // The task will go to sleep when the above critical section is released. + trace!("Queue - wait for item - {:?}", deadline); queue.waiting_for_item.wait_with_deadline(deadline); false } @@ -204,6 +209,7 @@ impl Queue { && deadline < Instant::now() { // We have a deadline and we've timed out. + debug!("Queue - timed out waiting for item"); return false; } // We can block more, so let's attempt to dequeue again. @@ -213,6 +219,7 @@ impl Queue { unsafe fn try_receive(&self, item: *mut u8) -> bool { self.inner.with(|queue| { if unsafe { queue.try_dequeue(item) } { + trace!("Queue - notify with space"); queue.waiting_for_space.notify(); true } else { @@ -230,6 +237,7 @@ impl Queue { } if was_full && !queue.full() { + trace!("Queue - notify with space"); queue.waiting_for_space.notify(); } }) diff --git a/esp-preempt/src/run_queue.rs b/esp-preempt/src/run_queue.rs index 416b532c5..93a8145bc 100644 --- a/esp-preempt/src/run_queue.rs +++ b/esp-preempt/src/run_queue.rs @@ -1,45 +1,100 @@ -use crate::task::{TaskPtr, TaskQueue, TaskReadyQueueElement, TaskState}; +use crate::task::{TaskExt, TaskPtr, TaskQueue, TaskReadyQueueElement, TaskState}; + +pub(crate) struct MaxPriority { + max: usize, + mask: usize, +} + +impl MaxPriority { + pub const MAX_PRIORITY: usize = 31; + + const fn new() -> Self { + Self { max: 0, mask: 0 } + } + + fn mark_ready(&mut self, level: usize) { + self.max = self.max.max(level); + self.mask |= 1 << level; + } + + fn unmark(&mut self, level: usize) { + self.mask &= !(1 << level); + self.max = Self::MAX_PRIORITY.saturating_sub(self.mask.leading_zeros() as usize); + } + + fn ready(&self) -> usize { + // Priority 0 must always be ready + self.max + } +} pub(crate) struct RunQueue { - // TODO: one queue per priority level - pub(crate) ready_tasks: TaskQueue, + pub(crate) ready_priority: MaxPriority, + + pub(crate) ready_tasks: [TaskQueue; MaxPriority::MAX_PRIORITY + 1], } impl RunQueue { - pub(crate) const PRIORITY_LEVELS: usize = 32; - pub(crate) const fn new() -> Self { Self { - ready_tasks: TaskQueue::new(), + ready_priority: MaxPriority::new(), + ready_tasks: [const { TaskQueue::new() }; MaxPriority::MAX_PRIORITY + 1], } } - pub(crate) fn mark_same_priority_task_ready(&mut self, ready_task: TaskPtr) { - self.mark_task_ready(ready_task); - } + pub(crate) fn mark_task_ready(&mut self, mut ready_task: TaskPtr) -> bool { + let priority = ready_task.priority(self); + let current_prio = self.ready_priority.ready(); - pub(crate) fn mark_task_ready(&mut self, mut ready_task: TaskPtr) { - // TODO: this will need to track max ready priority. - unsafe { ready_task.as_mut().state = TaskState::Ready }; + ready_task.set_state(TaskState::Ready); if let Some(mut containing_queue) = unsafe { ready_task.as_mut().current_queue.take() } { unsafe { containing_queue.as_mut().remove(ready_task); } } - self.ready_tasks.push(ready_task); + self.ready_tasks[priority].remove(ready_task); + self.ready_tasks[priority].push(ready_task); + + self.ready_priority.mark_ready(priority); + if priority > current_prio { + debug!( + "mark_task_ready - New prio level: {}", + self.ready_priority.ready() + ); + true + } else { + false + } } pub(crate) fn pop(&mut self) -> Option { - // TODO: on current prio level - self.ready_tasks.pop() + let current_prio = self.ready_priority.ready(); + debug!("pop - from level: {}", current_prio); + + let popped = self.ready_tasks[current_prio].pop(); + + if self.ready_tasks[current_prio].is_empty() { + self.ready_priority.unmark(current_prio); + debug!("pop - New prio level: {}", self.ready_priority.ready()); + } + + popped } - pub(crate) fn is_empty(&self) -> bool { - // TODO: on current prio level - self.ready_tasks.is_empty() + pub(crate) fn is_level_empty(&self, level: usize) -> bool { + self.ready_tasks[level].is_empty() } pub(crate) fn remove(&mut self, to_delete: TaskPtr) { - self.ready_tasks.remove(to_delete); + let priority = to_delete.priority(self); + self.ready_tasks[priority].remove(to_delete); + + if self.ready_tasks[priority].is_empty() { + self.ready_priority.unmark(priority); + debug!( + "remove - last task removed - New prio level: {}", + self.ready_priority.ready() + ); + } } } diff --git a/esp-preempt/src/scheduler.rs b/esp-preempt/src/scheduler.rs index f861b658c..f13622d58 100644 --- a/esp-preempt/src/scheduler.rs +++ b/esp-preempt/src/scheduler.rs @@ -7,17 +7,9 @@ use esp_sync::NonReentrantMutex; use crate::{ InternalMemory, - run_queue::RunQueue, + run_queue::{MaxPriority, RunQueue}, semaphore::Semaphore, - task::{ - self, - Context, - TaskAllocListElement, - TaskDeleteListElement, - TaskList, - TaskPtr, - TaskState, - }, + task::{self, Task, TaskAllocListElement, TaskDeleteListElement, TaskExt, TaskList, TaskPtr}, timer::TimeDriver, timer_queue, }; @@ -133,7 +125,12 @@ impl SchedulerState { if event.is_timer_event() { unwrap!(self.time_driver.as_mut()).handle_alarm(|ready_task| { - debug_assert_eq!(ready_task.state, task::TaskState::Sleeping); + debug_assert_eq!( + ready_task.state, + task::TaskState::Sleeping, + "task: {:?}", + ready_task as *const Task + ); debug!("Task {:?} is ready", ready_task as *const _); @@ -146,11 +143,16 @@ impl SchedulerState { { // Current task is still ready, mark it as such. debug!("re-queueing current task: {:?}", current_task); - self.run_queue.mark_same_priority_task_ready(current_task); + self.run_queue.mark_task_ready(current_task); } if let Some(next_task) = self.select_next_task() { - debug_assert_eq!(unsafe { next_task.as_ref().state }, task::TaskState::Ready); + debug_assert_eq!( + next_task.state(), + task::TaskState::Ready, + "task: {:?}", + next_task + ); trace!("Switching task {:?} -> {:?}", self.current_task, next_task); @@ -189,11 +191,15 @@ impl SchedulerState { } fn arm_time_slice_alarm(&mut self) { - let ready_tasks = !self.run_queue.is_empty(); + // The current task is not in the run queue. If the run queue on the current priority level + // is empty, the current task is the only one running at its priority level. In this + // case, we don't need time slicing. + let current_priority = unwrap!(self.current_task).priority(&mut self.run_queue); + let ready_tasks = !self.run_queue.is_level_empty(current_priority); unwrap!(self.time_driver.as_mut()).arm_next_wakeup(ready_tasks); } - pub(crate) fn schedule_task_deletion(&mut self, task_to_delete: *mut Context) -> bool { + pub(crate) fn schedule_task_deletion(&mut self, task_to_delete: *mut Task) -> bool { let current_task = unwrap!(self.current_task); let task_to_delete = NonNull::new(task_to_delete).unwrap_or(current_task); let is_current = task_to_delete == current_task; @@ -218,16 +224,12 @@ impl SchedulerState { } pub(crate) fn resume_task(&mut self, task: TaskPtr) { - if unsafe { task.as_ref().state == TaskState::Ready } { - return; - } let timer_queue = unwrap!(self.time_driver.as_mut()); timer_queue.timer_queue.remove(task); - self.run_queue.mark_task_ready(task); - - // if task.priority > current_task.priority - task::yield_task(); + if self.run_queue.mark_task_ready(task) { + task::yield_task(); + } } fn delete_task(&mut self, to_delete: TaskPtr) { @@ -271,13 +273,19 @@ impl Scheduler { task: extern "C" fn(*mut c_void), param: *mut c_void, task_stack_size: usize, + priority: u32, ) -> TaskPtr { - let task = Box::new_in(Context::new(task, param, task_stack_size), InternalMemory); + let task = Box::new_in( + Task::new(task, param, task_stack_size, priority), + InternalMemory, + ); let task_ptr = NonNull::from(Box::leak(task)); SCHEDULER.with(|state| { state.all_tasks.push(task_ptr); - state.run_queue.mark_task_ready(task_ptr); + if state.run_queue.mark_task_ready(task_ptr) { + task::yield_task(); + } }); debug!("Task created: {:?}", task_ptr); @@ -308,6 +316,7 @@ impl esp_radio_preempt_driver::Scheduler for Scheduler { timer_queue::create_timer_task(); self.with(|scheduler| unwrap!(scheduler.time_driver.as_mut()).start()); + task::yield_task(); } fn disable(&self) { @@ -325,20 +334,25 @@ impl esp_radio_preempt_driver::Scheduler for Scheduler { } fn max_task_priority(&self) -> u32 { - RunQueue::PRIORITY_LEVELS as u32 - 1 + MaxPriority::MAX_PRIORITY as u32 } fn task_create( &self, task: extern "C" fn(*mut c_void), param: *mut c_void, - _priority: u32, + priority: u32, _pin_to_core: Option, task_stack_size: usize, ) -> *mut c_void { - self.create_task(task, param, task_stack_size) - .as_ptr() - .cast() + self.create_task( + task, + param, + task_stack_size, + priority.min(self.max_task_priority()), + ) + .as_ptr() + .cast() } fn current_task(&self) -> *mut c_void { @@ -346,7 +360,7 @@ impl esp_radio_preempt_driver::Scheduler for Scheduler { } fn schedule_task_deletion(&self, task_handle: *mut c_void) { - task::schedule_task_deletion(task_handle as *mut Context) + task::schedule_task_deletion(task_handle as *mut Task) } fn current_task_thread_semaphore(&self) -> SemaphorePtr { diff --git a/esp-preempt/src/semaphore.rs b/esp-preempt/src/semaphore.rs index 9cfb16895..7250175e9 100644 --- a/esp-preempt/src/semaphore.rs +++ b/esp-preempt/src/semaphore.rs @@ -9,7 +9,8 @@ use esp_radio_preempt_driver::{ use esp_sync::NonReentrantMutex; use crate::{ - task::{TaskPtr, current_task}, + SCHEDULER, + task::{TaskExt, TaskPtr, current_task}, wait_queue::WaitQueue, }; @@ -19,8 +20,10 @@ enum SemaphoreInner { max: u32, waiting: WaitQueue, }, - RecursiveMutex { + Mutex { + recursive: bool, owner: Option, + original_priority: usize, lock_counter: u32, waiting: WaitQueue, }, @@ -37,18 +40,36 @@ impl SemaphoreInner { false } } - SemaphoreInner::RecursiveMutex { + SemaphoreInner::Mutex { + recursive, owner, lock_counter, + original_priority, .. } => { - // TODO: priority inheritance let current = current_task(); - if owner.is_none() || owner.unwrap() == current { - *lock_counter += 1; - true + if let Some(owner) = owner { + if *owner == current && *recursive { + *lock_counter += 1; + true + } else { + // We can't lock the mutex. Make sure the mutex holder has a high enough + // priority to avoid priority inversion. + SCHEDULER.with(|scheduler| { + let current_priority = current.priority(&mut scheduler.run_queue); + if owner.priority(&mut scheduler.run_queue) < current_priority { + owner.set_priority(&mut scheduler.run_queue, current_priority); + scheduler.resume_task(*owner); + } + false + }) + } } else { - false + *owner = Some(current); + *lock_counter += 1; + *original_priority = + SCHEDULER.with(|scheduler| current.priority(&mut scheduler.run_queue)); + true } } } @@ -64,9 +85,10 @@ impl SemaphoreInner { false } } - SemaphoreInner::RecursiveMutex { + SemaphoreInner::Mutex { owner, lock_counter, + original_priority, .. } => { let current = current_task(); @@ -74,7 +96,11 @@ impl SemaphoreInner { if *owner == Some(current) && *lock_counter > 0 { *lock_counter -= 1; if *lock_counter == 0 { - *owner = None; + if let Some(owner) = owner.take() { + SCHEDULER.with(|scheduler| { + owner.set_priority(&mut scheduler.run_queue, *original_priority); + }); + } } true } else { @@ -87,23 +113,25 @@ impl SemaphoreInner { fn current_count(&mut self) -> u32 { match self { SemaphoreInner::Counting { current, .. } => *current, - SemaphoreInner::RecursiveMutex { .. } => { + SemaphoreInner::Mutex { .. } => { panic!("RecursiveMutex does not support current_count") } } } fn wait_with_deadline(&mut self, deadline: Option) { + trace!("Semaphore wait_with_deadline - {:?}", deadline); match self { SemaphoreInner::Counting { waiting, .. } => waiting.wait_with_deadline(deadline), - SemaphoreInner::RecursiveMutex { waiting, .. } => waiting.wait_with_deadline(deadline), + SemaphoreInner::Mutex { waiting, .. } => waiting.wait_with_deadline(deadline), } } - fn notify_one(&mut self) { + fn notify(&mut self) { + trace!("Semaphore notify"); match self { SemaphoreInner::Counting { waiting, .. } => waiting.notify(), - SemaphoreInner::RecursiveMutex { waiting, .. } => waiting.notify(), + SemaphoreInner::Mutex { waiting, .. } => waiting.notify(), } } } @@ -120,9 +148,11 @@ impl Semaphore { max, waiting: WaitQueue::new(), }, - SemaphoreKind::RecursiveMutex => SemaphoreInner::RecursiveMutex { + SemaphoreKind::Mutex | SemaphoreKind::RecursiveMutex => SemaphoreInner::Mutex { + recursive: matches!(kind, SemaphoreKind::RecursiveMutex), owner: None, lock_counter: 0, + original_priority: 0, waiting: WaitQueue::new(), }, }; @@ -153,6 +183,7 @@ impl Semaphore { }); if taken { + debug!("Semaphore - take - success"); return true; } @@ -165,6 +196,7 @@ impl Semaphore { && deadline < Instant::now() { // We have a deadline and we've timed out. + trace!("Semaphore - take - timed out"); return false; } // We can block more, so let's attempt to take the semaphore again. @@ -178,7 +210,7 @@ impl Semaphore { pub fn give(&self) -> bool { self.inner.with(|sem| { if sem.try_give() { - sem.notify_one(); + sem.notify(); true } else { false diff --git a/esp-preempt/src/task/mod.rs b/esp-preempt/src/task/mod.rs index 67eb32148..a94bf0876 100644 --- a/esp-preempt/src/task/mod.rs +++ b/esp-preempt/src/task/mod.rs @@ -21,7 +21,7 @@ pub(crate) enum TaskState { Sleeping, } -pub(crate) type TaskPtr = NonNull; +pub(crate) type TaskPtr = NonNull; pub(crate) type TaskListItem = Option; /// An abstraction that allows the task to contain multiple different queue pointers. @@ -57,12 +57,34 @@ task_list_item!(TaskTimerQueueElement, timer_queue_item); /// implement stuff for NonNull. pub(crate) trait TaskExt { fn resume(self); + fn priority(self, _: &mut RunQueue) -> usize; + fn set_priority(self, _: &mut RunQueue, new_pro: usize); + fn state(self) -> TaskState; + fn set_state(self, state: TaskState); } impl TaskExt for TaskPtr { fn resume(self) { SCHEDULER.with(|scheduler| scheduler.resume_task(self)) } + + fn priority(self, _: &mut RunQueue) -> usize { + unsafe { self.as_ref().priority as usize } + } + + fn set_priority(mut self, run_queue: &mut RunQueue, new_pro: usize) { + run_queue.remove(self); + unsafe { self.as_mut().priority = new_pro as u32 }; + } + + fn state(self) -> TaskState { + unsafe { self.as_ref().state } + } + + fn set_state(mut self, state: TaskState) { + trace!("Task {:?} state changed to {:?}", self, state); + unsafe { self.as_mut().state = state }; + } } /// A singly linked list of tasks. @@ -173,7 +195,7 @@ impl TaskQueue { } #[repr(C)] -pub(crate) struct Context { +pub(crate) struct Task { #[cfg(riscv)] pub trap_frame: Registers, #[cfg(xtensa)] @@ -181,6 +203,7 @@ pub(crate) struct Context { pub thread_semaphore: Option, pub state: TaskState, pub _allocated_stack: Box<[MaybeUninit], InternalMemory>, + pub priority: u32, pub wakeup_at: u64, @@ -203,13 +226,17 @@ pub(crate) struct Context { const STACK_CANARY: u32 = 0xDEEDBAAD; -impl Context { +impl Task { pub(crate) fn new( task_fn: extern "C" fn(*mut c_void), param: *mut c_void, task_stack_size: usize, + priority: u32, ) -> Self { - trace!("task_create {:?} {:?} {}", task_fn, param, task_stack_size); + trace!( + "task_create {:?}({:?}) stack_size = {} priority = {}", + task_fn, param, task_stack_size, priority + ); let task_stack_size_words = task_stack_size / 4; let mut stack = Box::<[u32], _>::new_uninit_slice_in(task_stack_size_words, InternalMemory); @@ -218,12 +245,13 @@ impl Context { stack[0] = MaybeUninit::new(STACK_CANARY); - Context { + Task { trap_frame: new_task_context(task_fn, param, stack_top), thread_semaphore: None, state: TaskState::Ready, _allocated_stack: stack, current_queue: None, + priority, wakeup_at: 0, @@ -243,14 +271,14 @@ impl Context { unsafe { self._allocated_stack[0].assume_init() }, STACK_CANARY, "Stack overflow detected in {:?}", - self as *const Context + self as *const Task ); } } -impl Drop for Context { +impl Drop for Task { fn drop(&mut self) { - debug!("Dropping task: {:?}", self as *mut Context); + debug!("Dropping task: {:?}", self as *mut Task); if let Some(sem) = self.thread_semaphore { let sem = unsafe { SemaphoreHandle::from_ptr(sem) }; core::mem::drop(sem) @@ -261,7 +289,7 @@ impl Drop for Context { pub(super) fn allocate_main_task() { // This context will be filled out by the first context switch. let task = Box::new_in( - Context { + Task { #[cfg(riscv)] trap_frame: Registers::default(), #[cfg(xtensa)] @@ -270,6 +298,7 @@ pub(super) fn allocate_main_task() { state: TaskState::Ready, _allocated_stack: Box::<[u32], _>::new_uninit_slice_in(0, InternalMemory), current_queue: None, + priority: 1, wakeup_at: 0, @@ -292,26 +321,26 @@ pub(super) fn allocate_main_task() { // The main task is already running, no need to add it to the ready queue. state.all_tasks.push(main_task_ptr); state.current_task = Some(main_task_ptr); + state.run_queue.mark_task_ready(main_task_ptr); }) } pub(crate) fn spawn_idle_task() { - let ptr = SCHEDULER.create_task(idle_task, core::ptr::null_mut(), 4096); + let ptr = SCHEDULER.create_task(idle_task, core::ptr::null_mut(), 4096, 0); debug!("Idle task created: {:?}", ptr); } pub(crate) extern "C" fn idle_task(_: *mut c_void) { loop { - yield_task(); - // TODO: once we have priorities, we can waiti: - // #[cfg(xtensa)] - // unsafe { - // core::arch::asm!("waiti 0") - // } - // #[cfg(riscv)] - // unsafe { - // core::arch::asm!("wfi") - // } + // TODO: make this configurable. + #[cfg(xtensa)] + unsafe { + core::arch::asm!("waiti 0"); + } + #[cfg(riscv)] + unsafe { + core::arch::asm!("wfi"); + } } } @@ -338,7 +367,7 @@ pub(super) fn delete_all_tasks() { } } -pub(super) fn with_current_task(mut cb: impl FnMut(&mut Context) -> R) -> R { +pub(super) fn with_current_task(mut cb: impl FnMut(&mut Task) -> R) -> R { SCHEDULER.with(|state| cb(unsafe { unwrap!(state.current_task).as_mut() })) } @@ -346,7 +375,7 @@ pub(super) fn current_task() -> TaskPtr { with_current_task(|task| NonNull::from(task)) } -pub(super) fn schedule_task_deletion(task: *mut Context) { +pub(super) fn schedule_task_deletion(task: *mut Task) { trace!("schedule_task_deletion {:?}", task); let deleting_current = SCHEDULER.with(|state| state.schedule_task_deletion(task)); diff --git a/esp-preempt/src/task/xtensa.rs b/esp-preempt/src/task/xtensa.rs index e5f1407a8..f82172817 100644 --- a/esp-preempt/src/task/xtensa.rs +++ b/esp-preempt/src/task/xtensa.rs @@ -3,7 +3,7 @@ use core::ffi::c_void; pub(crate) use esp_hal::trapframe::TrapFrame; use esp_hal::{xtensa_lx, xtensa_lx_rt}; -use crate::{SCHEDULER, task::Context}; +use crate::{SCHEDULER, task::Task}; pub(crate) fn new_task_context( task_fn: extern "C" fn(*mut c_void), @@ -34,11 +34,11 @@ pub(crate) fn new_task_context( } } -pub(crate) fn restore_task_context(ctx: &mut Context, trap_frame: &mut TrapFrame) { +pub(crate) fn restore_task_context(ctx: &mut Task, trap_frame: &mut TrapFrame) { *trap_frame = ctx.trap_frame; } -pub(crate) fn save_task_context(ctx: &mut Context, trap_frame: &TrapFrame) { +pub(crate) fn save_task_context(ctx: &mut Task, trap_frame: &TrapFrame) { ctx.trap_frame = *trap_frame; } diff --git a/esp-preempt/src/timer/mod.rs b/esp-preempt/src/timer/mod.rs index 0771b8f26..bb69fe141 100644 --- a/esp-preempt/src/timer/mod.rs +++ b/esp-preempt/src/timer/mod.rs @@ -7,7 +7,7 @@ use crate::{ SCHEDULER, TICK_RATE, TimeBase, - task::{Context, TaskPtr, TaskQueue, TaskState, TaskTimerQueueElement}, + task::{Task, TaskExt, TaskPtr, TaskQueue, TaskState, TaskTimerQueueElement}, }; const TIMESLICE_DURATION: Duration = Rate::from_hz(TICK_RATE).as_duration(); @@ -91,7 +91,7 @@ impl TimeDriver { self.timer.stop(); } - pub(crate) fn handle_alarm(&mut self, mut on_task_ready: impl FnMut(&mut Context)) { + pub(crate) fn handle_alarm(&mut self, mut on_task_ready: impl FnMut(&mut Task)) { let mut timer_queue = core::mem::take(&mut self.timer_queue); let now = Instant::now().duration_since_epoch().as_micros(); @@ -136,21 +136,28 @@ impl TimeDriver { } pub(crate) fn schedule_wakeup(&mut self, mut current_task: TaskPtr, at: Instant) -> bool { - unsafe { debug_assert_eq!(current_task.as_mut().state, TaskState::Ready) }; - - // Target time is in the past, don't sleep. - if at <= Instant::now() { - return false; - } - - unsafe { current_task.as_mut().state = TaskState::Sleeping }; + debug_assert_eq!( + current_task.state(), + TaskState::Ready, + "task: {:?}", + current_task + ); // Target time is infinite, suspend task without waking up via timer. if at == Instant::EPOCH + Duration::MAX { + current_task.set_state(TaskState::Sleeping); debug!("Suspending task: {:?}", current_task); return true; } + // Target time is in the past, don't sleep. + if at <= Instant::now() { + debug!("Target time is in the past"); + return false; + } + + current_task.set_state(TaskState::Sleeping); + let timestamp = at.duration_since_epoch().as_micros(); debug!( "Scheduling wakeup for task {:?} at timestamp {}", diff --git a/esp-preempt/src/timer_queue.rs b/esp-preempt/src/timer_queue.rs index ef50ba6f1..65f244d12 100644 --- a/esp-preempt/src/timer_queue.rs +++ b/esp-preempt/src/timer_queue.rs @@ -320,7 +320,7 @@ register_timer_implementation!(Timer); pub(crate) fn create_timer_task() { // create the timer task TIMER_QUEUE.inner.with(|q| { - let task_ptr = SCHEDULER.create_task(timer_task, core::ptr::null_mut(), 8192); + let task_ptr = SCHEDULER.create_task(timer_task, core::ptr::null_mut(), 8192, 2); q.task = Some(task_ptr); }); } diff --git a/esp-radio-preempt-driver/src/semaphore.rs b/esp-radio-preempt-driver/src/semaphore.rs index 8867e395c..d1c468770 100644 --- a/esp-radio-preempt-driver/src/semaphore.rs +++ b/esp-radio-preempt-driver/src/semaphore.rs @@ -28,12 +28,12 @@ pub type SemaphorePtr = NonNull<()>; /// The type of semaphore or mutex to create. pub enum SemaphoreKind { - /// Counting semaphore or non-recursive mutex. - /// - /// To obtain a non-recursive mutex, use [`SemaphoreKind::Counting`] with maximum and initial - /// counts of 1. + /// Counting semaphore. Counting { max: u32, initial: u32 }, + /// Non-recursive mutex. + Mutex, + /// Recursive mutex. RecursiveMutex, } diff --git a/esp-radio/src/compat/mutex.rs b/esp-radio/src/compat/mutex.rs index ab25213a2..cf73b4ff4 100644 --- a/esp-radio/src/compat/mutex.rs +++ b/esp-radio/src/compat/mutex.rs @@ -7,7 +7,7 @@ pub(crate) fn mutex_create(recursive: bool) -> *mut c_void { let ptr = SemaphoreHandle::new(if recursive { SemaphoreKind::RecursiveMutex } else { - SemaphoreKind::Counting { max: 1, initial: 1 } + SemaphoreKind::Mutex }) .leak() .as_ptr() diff --git a/esp-radio/src/lib.rs b/esp-radio/src/lib.rs index fe94ad114..c11f337dc 100644 --- a/esp-radio/src/lib.rs +++ b/esp-radio/src/lib.rs @@ -141,12 +141,9 @@ use hal::{ time::Rate, }; +use crate::radio::{setup_radio_isr, shutdown_radio_isr}; #[cfg(feature = "wifi")] use crate::wifi::WifiError; -use crate::{ - preempt::yield_task, - radio::{setup_radio_isr, shutdown_radio_isr}, -}; // can't use instability on inline module definitions, see https://github.com/rust-lang/rust/issues/54727 #[doc(hidden)] @@ -273,8 +270,6 @@ pub fn init<'d>() -> Result, InitializationError> { // This initializes the task switcher preempt::enable(); - yield_task(); - wifi_set_log_verbose(); init_radio_clocks(); diff --git a/hil-test/Cargo.toml b/hil-test/Cargo.toml index fa8637db3..704252431 100644 --- a/hil-test/Cargo.toml +++ b/hil-test/Cargo.toml @@ -236,6 +236,7 @@ esp-preempt = { path = "../esp-preempt", optional = true } esp-storage = { path = "../esp-storage", optional = true } esp-sync = { path = "../esp-sync" } esp-radio = { path = "../esp-radio", optional = true } +esp-radio-preempt-driver = { path = "../esp-radio-preempt-driver", optional = true } portable-atomic = "1.11.0" static_cell = { version = "2.1.0" } semihosting = { version = "0.1", features= ["stdio", "panic-handler"] } @@ -262,7 +263,7 @@ default = [] unstable = ["esp-hal/unstable"] defmt = ["dep:defmt-rtt", "esp-hal/defmt", "embedded-test/defmt", "esp-radio?/defmt", "esp-hal-embassy?/defmt", "esp-preempt?/defmt"] -esp-radio = ["dep:esp-radio", "dep:esp-preempt"] +esp-radio = ["dep:esp-radio", "dep:esp-preempt", "dep:esp-radio-preempt-driver"] # Device support (required!): esp32 = [ diff --git a/hil-test/tests/esp_radio_init.rs b/hil-test/tests/esp_radio_init.rs index f01cbd60f..041af3102 100644 --- a/hil-test/tests/esp_radio_init.rs +++ b/hil-test/tests/esp_radio_init.rs @@ -45,6 +45,8 @@ async fn try_init( #[cfg(test)] #[embedded_test::tests(default_timeout = 3, executor = esp_hal_embassy::Executor::new())] mod tests { + use defmt::info; + use super::*; #[init] @@ -125,4 +127,114 @@ mod tests { // Now, can we do it again? let _wifi = esp_radio::wifi::new(&esp_radio_ctrl, p.WIFI.reborrow()).unwrap(); } + + #[test] + fn test_esp_preempt_priority_inheritance(p: Peripherals) { + use core::ffi::c_void; + + use esp_radio_preempt_driver as preempt; + use portable_atomic::{AtomicBool, Ordering}; + use preempt::semaphore::{SemaphoreHandle, SemaphoreKind}; + + let timg0 = TimerGroup::new(p.TIMG0); + esp_preempt::init(timg0.timer0); + preempt::enable(); + + // We need three tasks to test priority inheritance: + // - A high priority task that will attempt to acquire the mutex. + // - A medium priority task that will do some unrelated work. + // - A low priority task that will hold the mutex before the high priority task could + // acquire it. + // + // Priority inversion is a situation where the higher priority task is being blocked, and a + // medium priority task is ready to run while the low priority task holds the mutex. The + // issue is that in this case the medium priority task is effectively prioritized over the + // high priority task. + // + // The test will be successful if the high priority task is able to acquire the mutex + // before the medium priority task runs. + + // The main task serves as the low priority task. + // The main task will spawn the high and medium priority tasks after obtaining the mutex. + // The medium priority task will assert that the high priority task has finished. + + struct TestContext { + ready_semaphore: SemaphoreHandle, + mutex: SemaphoreHandle, + high_priority_task_finished: AtomicBool, + } + let test_context = TestContext { + // This semaphore signals the end of the test + ready_semaphore: SemaphoreHandle::new(SemaphoreKind::Counting { max: 1, initial: 0 }), + // We'll use this mutex to test priority inheritance + mutex: SemaphoreHandle::new(SemaphoreKind::Mutex), + high_priority_task_finished: AtomicBool::new(false), + }; + + test_context.mutex.take(None); + info!("Low: mutex obtained"); + + // Spawn tasks + extern "C" fn high_priority_task(context: *mut c_void) { + let context = unsafe { &*(context as *const TestContext) }; + + info!("High: acquiring mutex"); + context.mutex.take(None); + + info!("High: acquired mutex, mark finished"); + + context + .high_priority_task_finished + .store(true, Ordering::SeqCst); + + info!("High: released mutex"); + context.mutex.give(); + + // TODO: support one-shot tasks in esp-preempt + unsafe { + preempt::schedule_task_deletion(core::ptr::null_mut()); + } + } + extern "C" fn medium_priority_task(context: *mut c_void) { + let context = unsafe { &*(context as *const TestContext) }; + + info!("Medium: asserting high-priority task finished"); + assert!(context.high_priority_task_finished.load(Ordering::SeqCst)); + + info!("Medium: marking test finished"); + context.ready_semaphore.give(); + + // TODO: support one-shot tasks in esp-preempt + unsafe { + preempt::schedule_task_deletion(core::ptr::null_mut()); + } + } + + unsafe { + info!("Low: spawning high priority task"); + preempt::task_create( + high_priority_task, + (&raw const test_context).cast::().cast_mut(), + 3, + None, + 4096, + ); + info!("Low: spawning medium priority task"); + preempt::task_create( + medium_priority_task, + (&raw const test_context).cast::().cast_mut(), + 2, + None, + 4096, + ); + } + + // Priority inheritance means this runs before the medium priority task + info!("Low: tasks spawned, returning mutex"); + test_context.mutex.give(); + + info!("Low: wait for tasks to finish"); + test_context.ready_semaphore.take(None); + preempt::disable(); + } }