Allow hooking into the executor (#3737)

* Allow hooking into the executor

* Attempt to test that the callback API is not dysfunctional
This commit is contained in:
Dániel Buga 2025-07-03 11:17:34 +02:00 committed by GitHub
parent f8b43a9ca9
commit 583be70559
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 169 additions and 52 deletions

View File

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `Executor::run_with_callbacks` and the associated `Callbacks` trait (#3737)
### Changed

View File

@ -6,7 +6,6 @@ use embassy_executor::Spawner;
#[cfg(all(low_power_wait, multi_core))]
use esp_hal::interrupt::software::SoftwareInterrupt;
use esp_hal::{interrupt::Priority, system::Cpu};
#[cfg(low_power_wait)]
use portable_atomic::{AtomicBool, Ordering};
use super::InnerExecutor;
@ -15,28 +14,36 @@ pub(crate) const THREAD_MODE_CONTEXT: usize = 16;
/// global atomic used to keep track of whether there is work to do since sev()
/// is not available on either Xtensa or RISC-V
#[cfg(low_power_wait)]
static SIGNAL_WORK_THREAD_MODE: [AtomicBool; Cpu::COUNT] =
[const { AtomicBool::new(false) }; Cpu::COUNT];
pub(crate) fn pend_thread_mode(_core: usize) {
#[cfg(low_power_wait)]
{
// Signal that there is work to be done.
SIGNAL_WORK_THREAD_MODE[_core].store(true, Ordering::Relaxed);
pub(crate) fn pend_thread_mode(core: usize) {
// Signal that there is work to be done.
SIGNAL_WORK_THREAD_MODE[core].store(true, Ordering::Relaxed);
// If we are pending a task on the current core, we're done. Otherwise, we
// need to make sure the other core wakes up.
#[cfg(multi_core)]
if _core != Cpu::current() as usize {
// We need to clear the interrupt from software. We don't actually
// need it to trigger and run the interrupt handler, we just need to
// kick waiti to return.
unsafe { SoftwareInterrupt::<3>::steal().raise() };
}
// If we are pending a task on the current core, we're done. Otherwise, we
// need to make sure the other core wakes up.
#[cfg(all(low_power_wait, multi_core))]
if core != Cpu::current() as usize {
// We need to clear the interrupt from software. We don't actually
// need it to trigger and run the interrupt handler, we just need to
// kick waiti to return.
unsafe { SoftwareInterrupt::<3>::steal().raise() };
}
}
/// Callbacks to run code before/after polling the task queue.
pub trait Callbacks {
/// Called just before polling the executor.
fn before_poll(&mut self);
/// Called after the executor is polled, if there is no work scheduled.
///
/// Note that tasks can become ready at any point during the execution
/// of this function.
fn on_idle(&mut self);
}
/// Thread mode executor.
///
/// This is the simplest and most common kind of executor. It runs on thread
@ -49,6 +56,7 @@ create one instance per core. The executors don't steal tasks from each other."
)]
pub struct Executor {
inner: InnerExecutor,
cpu: Cpu,
not_send: PhantomData<*mut ()>,
}
@ -61,14 +69,16 @@ impl Executor {
This will use software-interrupt 3 which isn't available for anything else to wake the other core(s)."#
)]
pub fn new() -> Self {
let cpu = Cpu::current();
Self {
inner: InnerExecutor::new(
// Priority 1 means the timer queue can be accessed at interrupt priority 1 - for
// the thread mode executor it needs to be one higher than the base run level, to
// allow alarm interrupts to be handled.
Priority::Priority1,
(THREAD_MODE_CONTEXT + Cpu::current() as usize) as *mut (),
(THREAD_MODE_CONTEXT + cpu as usize) as *mut (),
),
cpu,
not_send: PhantomData,
}
}
@ -94,6 +104,58 @@ This will use software-interrupt 3 which isn't available for anything else to wa
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
struct NoHooks(usize);
impl Callbacks for NoHooks {
fn before_poll(&mut self) {
#[cfg(low_power_wait)]
SIGNAL_WORK_THREAD_MODE[self.0].store(false, Ordering::Relaxed);
}
fn on_idle(&mut self) {}
}
self.run_inner(init, NoHooks(self.cpu as usize))
}
/// Run the executor with callbacks.
///
/// See [Callbacks] on when the callbacks are called.
///
/// See [Self::run] for more information about running the executor.
///
/// This function never returns.
pub fn run_with_callbacks(
&'static mut self,
init: impl FnOnce(Spawner),
callbacks: impl Callbacks,
) -> ! {
struct Hooks<'a, CB: Callbacks>(CB, &'a AtomicBool);
impl<CB: Callbacks> Callbacks for Hooks<'_, CB> {
fn before_poll(&mut self) {
// Clear the flag unconditionally since we'll use it to decide
// if on_idle should be called.
self.1.store(false, Ordering::Relaxed);
self.0.before_poll()
}
fn on_idle(&mut self) {
// Make sure we only call on_idle if the executor would otherwise go to sleep.
if !self.1.load(Ordering::Acquire) {
self.0.on_idle();
}
}
}
self.run_inner(
init,
Hooks(callbacks, &SIGNAL_WORK_THREAD_MODE[self.cpu as usize]),
)
}
fn run_inner(&'static mut self, init: impl FnOnce(Spawner), mut hooks: impl Callbacks) -> ! {
#[cfg(all(multi_core, low_power_wait))]
unwrap!(esp_hal::interrupt::enable(
esp_hal::peripherals::Interrupt::FROM_CPU_INTR3,
@ -104,14 +166,15 @@ This will use software-interrupt 3 which isn't available for anything else to wa
init(self.inner.inner.spawner());
#[cfg(low_power_wait)]
let cpu = Cpu::current() as usize;
loop {
hooks.before_poll();
unsafe { self.inner.inner.poll() };
hooks.on_idle();
#[cfg(low_power_wait)]
Self::wait_impl(cpu);
Self::wait_impl(self.cpu as usize);
}
}
@ -154,8 +217,6 @@ This will use software-interrupt 3 which isn't available for anything else to wa
_ => unsafe { core::arch::asm!("waiti 5") },
}
}
// If this races and some waker sets the signal, we'll reset it, but still poll.
SIGNAL_WORK_THREAD_MODE[cpu].store(false, Ordering::Relaxed);
}
#[cfg(all(riscv, low_power_wait))]
@ -169,9 +230,6 @@ This will use software-interrupt 3 which isn't available for anything else to wa
unsafe { core::arch::asm!("wfi") };
}
});
// if an interrupt occurred while waiting, it will be serviced here
// If this races and some waker sets the signal, we'll reset it, but still poll.
SIGNAL_WORK_THREAD_MODE[cpu].store(false, Ordering::Relaxed);
}
}

View File

@ -56,7 +56,7 @@ use esp_hal::timer::{AnyTimer, timg::Timer as TimgTimer};
pub use macros::embassy_main as main;
#[cfg(feature = "executors")]
pub use self::executor::{Executor, InterruptExecutor};
pub use self::executor::{Callbacks, Executor, InterruptExecutor};
use self::time_driver::{EmbassyTimer, Timer};
#[cfg(feature = "executors")]

View File

@ -19,10 +19,8 @@ use esp_hal::interrupt::{
software::{SoftwareInterrupt, SoftwareInterruptControl},
};
#[cfg(multi_core)]
use esp_hal::system::{CpuControl, Stack};
#[cfg(multi_core)]
use esp_hal_embassy::Executor;
use esp_hal_embassy::InterruptExecutor;
use esp_hal::system::{AppCoreGuard, CpuControl, Stack};
use esp_hal_embassy::{Executor, InterruptExecutor};
use hil_test::mk_static;
esp_bootloader_esp_idf::esp_app_desc!();
@ -39,6 +37,36 @@ async fn responder_task(
}
}
#[embassy_executor::task]
async fn tester_task(
signal: &'static Signal<CriticalSectionRawMutex, ()>,
response: &'static Signal<CriticalSectionRawMutex, ()>,
) {
response.wait().await;
for _ in 0..3 {
signal.signal(());
response.wait().await;
}
embedded_test::export::check_outcome(());
}
#[embassy_executor::task]
#[cfg(multi_core)]
async fn tester_task_multi_core(
signal: &'static Signal<CriticalSectionRawMutex, ()>,
response: &'static Signal<CriticalSectionRawMutex, ()>,
core_guard: AppCoreGuard<'static>,
) {
response.wait().await;
for _ in 0..3 {
signal.signal(());
response.wait().await;
}
core::mem::drop(core_guard);
embedded_test::export::check_outcome(());
}
struct Context {
interrupt: SoftwareInterrupt<'static, 1>,
#[cfg(multi_core)]
@ -46,8 +74,10 @@ struct Context {
}
#[cfg(test)]
#[embedded_test::tests(default_timeout = 3, executor = esp_hal_embassy::Executor::new())]
#[embedded_test::tests(default_timeout = 3)]
mod test {
use esp_hal_embassy::Callbacks;
use super::*;
#[init]
@ -65,26 +95,54 @@ mod test {
}
#[test]
async fn run_interrupt_executor_test(ctx: Context) {
fn run_test_with_callbacks_api(ctx: Context) {
let interrupt_executor =
mk_static!(InterruptExecutor<1>, InterruptExecutor::new(ctx.interrupt));
let signal = mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
let response = mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
let spawner = interrupt_executor.start(Priority::Priority3);
spawner.must_spawn(responder_task(signal, response));
response.wait().await;
for _ in 0..3 {
signal.signal(());
response.wait().await;
let thread_executor = mk_static!(Executor, Executor::new());
struct NoCallbacks;
impl Callbacks for NoCallbacks {
fn before_poll(&mut self) {}
fn on_idle(&mut self) {}
}
let callbacks = NoCallbacks;
thread_executor.run_with_callbacks(
|spawner| {
spawner.must_spawn(tester_task(signal, response));
},
callbacks,
)
}
#[test]
fn run_interrupt_executor_test(ctx: Context) {
let interrupt_executor =
mk_static!(InterruptExecutor<1>, InterruptExecutor::new(ctx.interrupt));
let signal = mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
let response = mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
let spawner = interrupt_executor.start(Priority::Priority3);
spawner.must_spawn(responder_task(signal, response));
let thread_executor = mk_static!(Executor, Executor::new());
thread_executor.run(|spawner| {
spawner.must_spawn(tester_task(signal, response));
})
}
#[test]
#[cfg(multi_core)]
async fn run_interrupt_executor_test_on_core_1(mut ctx: Context) {
fn run_interrupt_executor_test_on_core_1(mut ctx: Context) {
let app_core_stack = mk_static!(Stack<8192>, Stack::new());
let response = &*mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
let signal = &*mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
@ -102,21 +160,21 @@ mod test {
}
};
let _guard = ctx
let guard = ctx
.cpu_control
.start_app_core(app_core_stack, cpu1_fnctn)
.unwrap();
response.wait().await;
for _ in 0..3 {
signal.signal(());
response.wait().await;
}
let thread_executor = mk_static!(Executor, Executor::new());
thread_executor.run(|spawner| {
spawner.must_spawn(tester_task_multi_core(signal, response, guard));
})
}
#[test]
#[cfg(multi_core)]
async fn run_thread_executor_test_on_core_1(mut ctx: Context) {
fn run_thread_executor_test_on_core_1(mut ctx: Context) {
let app_core_stack = mk_static!(Stack<8192>, Stack::new());
let signal = mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
let response = mk_static!(Signal<CriticalSectionRawMutex, ()>, Signal::new());
@ -128,15 +186,15 @@ mod test {
});
};
let _guard = ctx
let guard = ctx
.cpu_control
.start_app_core(app_core_stack, cpu1_fnctn)
.unwrap();
response.wait().await;
for _ in 0..3 {
signal.signal(());
response.wait().await;
}
let thread_executor = mk_static!(Executor, Executor::new());
thread_executor.run(|spawner| {
spawner.must_spawn(tester_task_multi_core(signal, response, guard));
})
}
}