From 583be70559915f183084f82b92de8a48a437aa01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Thu, 3 Jul 2025 11:17:34 +0200 Subject: [PATCH] Allow hooking into the executor (#3737) * Allow hooking into the executor * Attempt to test that the callback API is not dysfunctional --- esp-hal-embassy/CHANGELOG.md | 1 + esp-hal-embassy/src/executor/thread.rs | 110 ++++++++++++++----- esp-hal-embassy/src/lib.rs | 2 +- hil-test/tests/embassy_interrupt_executor.rs | 108 +++++++++++++----- 4 files changed, 169 insertions(+), 52 deletions(-) diff --git a/esp-hal-embassy/CHANGELOG.md b/esp-hal-embassy/CHANGELOG.md index 8948cd130..609cb5dec 100644 --- a/esp-hal-embassy/CHANGELOG.md +++ b/esp-hal-embassy/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `Executor::run_with_callbacks` and the associated `Callbacks` trait (#3737) ### Changed diff --git a/esp-hal-embassy/src/executor/thread.rs b/esp-hal-embassy/src/executor/thread.rs index 12b6735bb..06e856898 100644 --- a/esp-hal-embassy/src/executor/thread.rs +++ b/esp-hal-embassy/src/executor/thread.rs @@ -6,7 +6,6 @@ use embassy_executor::Spawner; #[cfg(all(low_power_wait, multi_core))] use esp_hal::interrupt::software::SoftwareInterrupt; use esp_hal::{interrupt::Priority, system::Cpu}; -#[cfg(low_power_wait)] use portable_atomic::{AtomicBool, Ordering}; use super::InnerExecutor; @@ -15,28 +14,36 @@ pub(crate) const THREAD_MODE_CONTEXT: usize = 16; /// global atomic used to keep track of whether there is work to do since sev() /// is not available on either Xtensa or RISC-V -#[cfg(low_power_wait)] static SIGNAL_WORK_THREAD_MODE: [AtomicBool; Cpu::COUNT] = [const { AtomicBool::new(false) }; Cpu::COUNT]; -pub(crate) fn pend_thread_mode(_core: usize) { - #[cfg(low_power_wait)] - { - // Signal that there is work to be done. - SIGNAL_WORK_THREAD_MODE[_core].store(true, Ordering::Relaxed); +pub(crate) fn pend_thread_mode(core: usize) { + // Signal that there is work to be done. + SIGNAL_WORK_THREAD_MODE[core].store(true, Ordering::Relaxed); - // If we are pending a task on the current core, we're done. Otherwise, we - // need to make sure the other core wakes up. - #[cfg(multi_core)] - if _core != Cpu::current() as usize { - // We need to clear the interrupt from software. We don't actually - // need it to trigger and run the interrupt handler, we just need to - // kick waiti to return. - unsafe { SoftwareInterrupt::<3>::steal().raise() }; - } + // If we are pending a task on the current core, we're done. Otherwise, we + // need to make sure the other core wakes up. + #[cfg(all(low_power_wait, multi_core))] + if core != Cpu::current() as usize { + // We need to clear the interrupt from software. We don't actually + // need it to trigger and run the interrupt handler, we just need to + // kick waiti to return. + unsafe { SoftwareInterrupt::<3>::steal().raise() }; } } +/// Callbacks to run code before/after polling the task queue. +pub trait Callbacks { + /// Called just before polling the executor. + fn before_poll(&mut self); + + /// Called after the executor is polled, if there is no work scheduled. + /// + /// Note that tasks can become ready at any point during the execution + /// of this function. + fn on_idle(&mut self); +} + /// Thread mode executor. /// /// This is the simplest and most common kind of executor. It runs on thread @@ -49,6 +56,7 @@ create one instance per core. The executors don't steal tasks from each other." )] pub struct Executor { inner: InnerExecutor, + cpu: Cpu, not_send: PhantomData<*mut ()>, } @@ -61,14 +69,16 @@ impl Executor { This will use software-interrupt 3 which isn't available for anything else to wake the other core(s)."# )] pub fn new() -> Self { + let cpu = Cpu::current(); Self { inner: InnerExecutor::new( // Priority 1 means the timer queue can be accessed at interrupt priority 1 - for // the thread mode executor it needs to be one higher than the base run level, to // allow alarm interrupts to be handled. Priority::Priority1, - (THREAD_MODE_CONTEXT + Cpu::current() as usize) as *mut (), + (THREAD_MODE_CONTEXT + cpu as usize) as *mut (), ), + cpu, not_send: PhantomData, } } @@ -94,6 +104,58 @@ This will use software-interrupt 3 which isn't available for anything else to wa /// /// This function never returns. pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { + struct NoHooks(usize); + + impl Callbacks for NoHooks { + fn before_poll(&mut self) { + #[cfg(low_power_wait)] + SIGNAL_WORK_THREAD_MODE[self.0].store(false, Ordering::Relaxed); + } + + fn on_idle(&mut self) {} + } + + self.run_inner(init, NoHooks(self.cpu as usize)) + } + + /// Run the executor with callbacks. + /// + /// See [Callbacks] on when the callbacks are called. + /// + /// See [Self::run] for more information about running the executor. + /// + /// This function never returns. + pub fn run_with_callbacks( + &'static mut self, + init: impl FnOnce(Spawner), + callbacks: impl Callbacks, + ) -> ! { + struct Hooks<'a, CB: Callbacks>(CB, &'a AtomicBool); + + impl Callbacks for Hooks<'_, CB> { + fn before_poll(&mut self) { + // Clear the flag unconditionally since we'll use it to decide + // if on_idle should be called. + self.1.store(false, Ordering::Relaxed); + + self.0.before_poll() + } + + fn on_idle(&mut self) { + // Make sure we only call on_idle if the executor would otherwise go to sleep. + if !self.1.load(Ordering::Acquire) { + self.0.on_idle(); + } + } + } + + self.run_inner( + init, + Hooks(callbacks, &SIGNAL_WORK_THREAD_MODE[self.cpu as usize]), + ) + } + + fn run_inner(&'static mut self, init: impl FnOnce(Spawner), mut hooks: impl Callbacks) -> ! { #[cfg(all(multi_core, low_power_wait))] unwrap!(esp_hal::interrupt::enable( esp_hal::peripherals::Interrupt::FROM_CPU_INTR3, @@ -104,14 +166,15 @@ This will use software-interrupt 3 which isn't available for anything else to wa init(self.inner.inner.spawner()); - #[cfg(low_power_wait)] - let cpu = Cpu::current() as usize; - loop { + hooks.before_poll(); + unsafe { self.inner.inner.poll() }; + hooks.on_idle(); + #[cfg(low_power_wait)] - Self::wait_impl(cpu); + Self::wait_impl(self.cpu as usize); } } @@ -154,8 +217,6 @@ This will use software-interrupt 3 which isn't available for anything else to wa _ => unsafe { core::arch::asm!("waiti 5") }, } } - // If this races and some waker sets the signal, we'll reset it, but still poll. - SIGNAL_WORK_THREAD_MODE[cpu].store(false, Ordering::Relaxed); } #[cfg(all(riscv, low_power_wait))] @@ -169,9 +230,6 @@ This will use software-interrupt 3 which isn't available for anything else to wa unsafe { core::arch::asm!("wfi") }; } }); - // if an interrupt occurred while waiting, it will be serviced here - // If this races and some waker sets the signal, we'll reset it, but still poll. - SIGNAL_WORK_THREAD_MODE[cpu].store(false, Ordering::Relaxed); } } diff --git a/esp-hal-embassy/src/lib.rs b/esp-hal-embassy/src/lib.rs index 16c797f7b..9a40bf6a5 100644 --- a/esp-hal-embassy/src/lib.rs +++ b/esp-hal-embassy/src/lib.rs @@ -56,7 +56,7 @@ use esp_hal::timer::{AnyTimer, timg::Timer as TimgTimer}; pub use macros::embassy_main as main; #[cfg(feature = "executors")] -pub use self::executor::{Executor, InterruptExecutor}; +pub use self::executor::{Callbacks, Executor, InterruptExecutor}; use self::time_driver::{EmbassyTimer, Timer}; #[cfg(feature = "executors")] diff --git a/hil-test/tests/embassy_interrupt_executor.rs b/hil-test/tests/embassy_interrupt_executor.rs index dc3a26602..98df7bf27 100644 --- a/hil-test/tests/embassy_interrupt_executor.rs +++ b/hil-test/tests/embassy_interrupt_executor.rs @@ -19,10 +19,8 @@ use esp_hal::interrupt::{ software::{SoftwareInterrupt, SoftwareInterruptControl}, }; #[cfg(multi_core)] -use esp_hal::system::{CpuControl, Stack}; -#[cfg(multi_core)] -use esp_hal_embassy::Executor; -use esp_hal_embassy::InterruptExecutor; +use esp_hal::system::{AppCoreGuard, CpuControl, Stack}; +use esp_hal_embassy::{Executor, InterruptExecutor}; use hil_test::mk_static; esp_bootloader_esp_idf::esp_app_desc!(); @@ -39,6 +37,36 @@ async fn responder_task( } } +#[embassy_executor::task] +async fn tester_task( + signal: &'static Signal, + response: &'static Signal, +) { + response.wait().await; + for _ in 0..3 { + signal.signal(()); + response.wait().await; + } + embedded_test::export::check_outcome(()); +} + +#[embassy_executor::task] +#[cfg(multi_core)] +async fn tester_task_multi_core( + signal: &'static Signal, + response: &'static Signal, + core_guard: AppCoreGuard<'static>, +) { + response.wait().await; + for _ in 0..3 { + signal.signal(()); + response.wait().await; + } + + core::mem::drop(core_guard); + embedded_test::export::check_outcome(()); +} + struct Context { interrupt: SoftwareInterrupt<'static, 1>, #[cfg(multi_core)] @@ -46,8 +74,10 @@ struct Context { } #[cfg(test)] -#[embedded_test::tests(default_timeout = 3, executor = esp_hal_embassy::Executor::new())] +#[embedded_test::tests(default_timeout = 3)] mod test { + use esp_hal_embassy::Callbacks; + use super::*; #[init] @@ -65,26 +95,54 @@ mod test { } #[test] - async fn run_interrupt_executor_test(ctx: Context) { + fn run_test_with_callbacks_api(ctx: Context) { let interrupt_executor = mk_static!(InterruptExecutor<1>, InterruptExecutor::new(ctx.interrupt)); let signal = mk_static!(Signal, Signal::new()); let response = mk_static!(Signal, Signal::new()); let spawner = interrupt_executor.start(Priority::Priority3); - spawner.must_spawn(responder_task(signal, response)); - response.wait().await; - for _ in 0..3 { - signal.signal(()); - response.wait().await; + let thread_executor = mk_static!(Executor, Executor::new()); + + struct NoCallbacks; + + impl Callbacks for NoCallbacks { + fn before_poll(&mut self) {} + fn on_idle(&mut self) {} } + + let callbacks = NoCallbacks; + + thread_executor.run_with_callbacks( + |spawner| { + spawner.must_spawn(tester_task(signal, response)); + }, + callbacks, + ) + } + + #[test] + fn run_interrupt_executor_test(ctx: Context) { + let interrupt_executor = + mk_static!(InterruptExecutor<1>, InterruptExecutor::new(ctx.interrupt)); + let signal = mk_static!(Signal, Signal::new()); + let response = mk_static!(Signal, Signal::new()); + + let spawner = interrupt_executor.start(Priority::Priority3); + spawner.must_spawn(responder_task(signal, response)); + + let thread_executor = mk_static!(Executor, Executor::new()); + + thread_executor.run(|spawner| { + spawner.must_spawn(tester_task(signal, response)); + }) } #[test] #[cfg(multi_core)] - async fn run_interrupt_executor_test_on_core_1(mut ctx: Context) { + fn run_interrupt_executor_test_on_core_1(mut ctx: Context) { let app_core_stack = mk_static!(Stack<8192>, Stack::new()); let response = &*mk_static!(Signal, Signal::new()); let signal = &*mk_static!(Signal, Signal::new()); @@ -102,21 +160,21 @@ mod test { } }; - let _guard = ctx + let guard = ctx .cpu_control .start_app_core(app_core_stack, cpu1_fnctn) .unwrap(); - response.wait().await; - for _ in 0..3 { - signal.signal(()); - response.wait().await; - } + let thread_executor = mk_static!(Executor, Executor::new()); + + thread_executor.run(|spawner| { + spawner.must_spawn(tester_task_multi_core(signal, response, guard)); + }) } #[test] #[cfg(multi_core)] - async fn run_thread_executor_test_on_core_1(mut ctx: Context) { + fn run_thread_executor_test_on_core_1(mut ctx: Context) { let app_core_stack = mk_static!(Stack<8192>, Stack::new()); let signal = mk_static!(Signal, Signal::new()); let response = mk_static!(Signal, Signal::new()); @@ -128,15 +186,15 @@ mod test { }); }; - let _guard = ctx + let guard = ctx .cpu_control .start_app_core(app_core_stack, cpu1_fnctn) .unwrap(); - response.wait().await; - for _ in 0..3 { - signal.signal(()); - response.wait().await; - } + let thread_executor = mk_static!(Executor, Executor::new()); + + thread_executor.run(|spawner| { + spawner.must_spawn(tester_task_multi_core(signal, response, guard)); + }) } }