From b268b1795fed58544c166c41842ce0d66328aa3e Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Sun, 8 Dec 2024 23:27:32 +0100 Subject: [PATCH] Merge time-driver and time-queue-driver traits, make HALs own and handle the queue. --- .github/ci/test.sh | 2 +- embassy-nrf/src/time_driver.rs | 109 ++++++------ embassy-rp/src/time_driver.rs | 72 ++++---- embassy-stm32/src/time_driver.rs | 104 ++++++----- embassy-time-driver/src/lib.rs | 17 ++ embassy-time-queue-driver/src/lib.rs | 140 ++------------- .../src/queue_integrated.rs | 12 +- embassy-time/Cargo.toml | 1 - embassy-time/src/driver_mock.rs | 113 ++++-------- embassy-time/src/driver_std.rs | 163 ++++++------------ embassy-time/src/driver_wasm.rs | 112 ++++++------ 11 files changed, 328 insertions(+), 517 deletions(-) diff --git a/.github/ci/test.sh b/.github/ci/test.sh index 285f3f29e..0fd6820d2 100755 --- a/.github/ci/test.sh +++ b/.github/ci/test.sh @@ -17,7 +17,7 @@ cargo test --manifest-path ./embassy-futures/Cargo.toml cargo test --manifest-path ./embassy-sync/Cargo.toml cargo test --manifest-path ./embassy-embedded-hal/Cargo.toml cargo test --manifest-path ./embassy-hal-internal/Cargo.toml -cargo test --manifest-path ./embassy-time/Cargo.toml --features mock-driver +cargo test --manifest-path ./embassy-time/Cargo.toml --features mock-driver,embassy-time-queue-driver/generic-queue-8 cargo test --manifest-path ./embassy-time-driver/Cargo.toml cargo test --manifest-path ./embassy-boot/Cargo.toml diff --git a/embassy-nrf/src/time_driver.rs b/embassy-nrf/src/time_driver.rs index f8b3c4bbc..a27fae9a8 100644 --- a/embassy-nrf/src/time_driver.rs +++ b/embassy-nrf/src/time_driver.rs @@ -1,11 +1,11 @@ -use core::cell::Cell; +use core::cell::{Cell, RefCell}; use core::sync::atomic::{compiler_fence, AtomicU32, Ordering}; use critical_section::CriticalSection; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::CriticalSectionMutex as Mutex; use embassy_time_driver::Driver; -use embassy_time_queue_driver::GlobalTimerQueue; +use embassy_time_queue_driver::Queue; use crate::interrupt::InterruptExt; use crate::{interrupt, pac}; @@ -111,11 +111,13 @@ struct RtcDriver { period: AtomicU32, /// Timestamp at which to fire alarm. u64::MAX if no alarm is scheduled. alarms: Mutex, + queue: Mutex>, } embassy_time_driver::time_driver_impl!(static DRIVER: RtcDriver = RtcDriver { period: AtomicU32::new(0), alarms: Mutex::const_new(CriticalSectionRawMutex::new(), AlarmState::new()), + queue: Mutex::new(RefCell::new(Queue::new())), }); impl RtcDriver { @@ -194,59 +196,60 @@ impl RtcDriver { alarm.timestamp.set(u64::MAX); // Call after clearing alarm, so the callback can set another alarm. - TIMER_QUEUE_DRIVER.dispatch(); + let mut next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now()); + while !self.set_alarm(cs, next) { + next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now()); + } } - fn set_alarm(&self, timestamp: u64) -> bool { - critical_section::with(|cs| { - let n = 0; - let alarm = &self.alarms.borrow(cs); - alarm.timestamp.set(timestamp); + fn set_alarm(&self, cs: CriticalSection, timestamp: u64) -> bool { + let n = 0; + let alarm = &self.alarms.borrow(cs); + alarm.timestamp.set(timestamp); - let r = rtc(); + let r = rtc(); - let t = self.now(); - if timestamp <= t { - // If alarm timestamp has passed the alarm will not fire. - // Disarm the alarm and return `false` to indicate that. - r.intenclr().write(|w| w.0 = compare_n(n)); + let t = self.now(); + if timestamp <= t { + // If alarm timestamp has passed the alarm will not fire. + // Disarm the alarm and return `false` to indicate that. + r.intenclr().write(|w| w.0 = compare_n(n)); - alarm.timestamp.set(u64::MAX); + alarm.timestamp.set(u64::MAX); - return false; - } + return false; + } - // If it hasn't triggered yet, setup it in the compare channel. + // If it hasn't triggered yet, setup it in the compare channel. - // Write the CC value regardless of whether we're going to enable it now or not. - // This way, when we enable it later, the right value is already set. + // Write the CC value regardless of whether we're going to enable it now or not. + // This way, when we enable it later, the right value is already set. - // nrf52 docs say: - // If the COUNTER is N, writing N or N+1 to a CC register may not trigger a COMPARE event. - // To workaround this, we never write a timestamp smaller than N+3. - // N+2 is not safe because rtc can tick from N to N+1 between calling now() and writing cc. - // - // It is impossible for rtc to tick more than once because - // - this code takes less time than 1 tick - // - it runs with interrupts disabled so nothing else can preempt it. - // - // This means that an alarm can be delayed for up to 2 ticks (from t+1 to t+3), but this is allowed - // by the Alarm trait contract. What's not allowed is triggering alarms *before* their scheduled time, - // and we don't do that here. - let safe_timestamp = timestamp.max(t + 3); - r.cc(n).write(|w| w.set_compare(safe_timestamp as u32 & 0xFFFFFF)); + // nrf52 docs say: + // If the COUNTER is N, writing N or N+1 to a CC register may not trigger a COMPARE event. + // To workaround this, we never write a timestamp smaller than N+3. + // N+2 is not safe because rtc can tick from N to N+1 between calling now() and writing cc. + // + // It is impossible for rtc to tick more than once because + // - this code takes less time than 1 tick + // - it runs with interrupts disabled so nothing else can preempt it. + // + // This means that an alarm can be delayed for up to 2 ticks (from t+1 to t+3), but this is allowed + // by the Alarm trait contract. What's not allowed is triggering alarms *before* their scheduled time, + // and we don't do that here. + let safe_timestamp = timestamp.max(t + 3); + r.cc(n).write(|w| w.set_compare(safe_timestamp as u32 & 0xFFFFFF)); - let diff = timestamp - t; - if diff < 0xc00000 { - r.intenset().write(|w| w.0 = compare_n(n)); - } else { - // If it's too far in the future, don't setup the compare channel yet. - // It will be setup later by `next_period`. - r.intenclr().write(|w| w.0 = compare_n(n)); - } + let diff = timestamp - t; + if diff < 0xc00000 { + r.intenset().write(|w| w.0 = compare_n(n)); + } else { + // If it's too far in the future, don't setup the compare channel yet. + // It will be setup later by `next_period`. + r.intenclr().write(|w| w.0 = compare_n(n)); + } - true - }) + true } } @@ -258,6 +261,19 @@ impl Driver for RtcDriver { let counter = rtc().counter().read().0; calc_now(period, counter) } + + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + critical_section::with(|cs| { + let mut queue = self.queue.borrow(cs).borrow_mut(); + + if queue.schedule_wake(at, waker) { + let mut next = queue.next_expiration(self.now()); + while !self.set_alarm(cs, next) { + next = queue.next_expiration(self.now()); + } + } + }) + } } #[cfg(feature = "_nrf54l")] @@ -277,8 +293,3 @@ fn RTC1() { pub(crate) fn init(irq_prio: crate::interrupt::Priority) { DRIVER.init(irq_prio) } - -embassy_time_queue_driver::timer_queue_impl!( - static TIMER_QUEUE_DRIVER: GlobalTimerQueue - = GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration)) -); diff --git a/embassy-rp/src/time_driver.rs b/embassy-rp/src/time_driver.rs index 17ae5fff3..a0eaec10e 100644 --- a/embassy-rp/src/time_driver.rs +++ b/embassy-rp/src/time_driver.rs @@ -1,10 +1,11 @@ //! Timer driver. -use core::cell::Cell; +use core::cell::{Cell, RefCell}; +use critical_section::CriticalSection; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::Mutex; use embassy_time_driver::Driver; -use embassy_time_queue_driver::GlobalTimerQueue; +use embassy_time_queue_driver::Queue; #[cfg(feature = "rp2040")] use pac::TIMER; #[cfg(feature = "_rp235x")] @@ -20,12 +21,14 @@ unsafe impl Send for AlarmState {} struct TimerDriver { alarms: Mutex, + queue: Mutex>, } embassy_time_driver::time_driver_impl!(static DRIVER: TimerDriver = TimerDriver{ alarms: Mutex::const_new(CriticalSectionRawMutex::new(), AlarmState { timestamp: Cell::new(0), }), + queue: Mutex::new(RefCell::new(Queue::new())) }); impl Driver for TimerDriver { @@ -39,34 +42,45 @@ impl Driver for TimerDriver { } } } + + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + critical_section::with(|cs| { + let mut queue = self.queue.borrow(cs).borrow_mut(); + + if queue.schedule_wake(at, waker) { + let mut next = queue.next_expiration(self.now()); + while !self.set_alarm(cs, next) { + next = queue.next_expiration(self.now()); + } + } + }) + } } impl TimerDriver { - fn set_alarm(&self, timestamp: u64) -> bool { + fn set_alarm(&self, cs: CriticalSection, timestamp: u64) -> bool { let n = 0; - critical_section::with(|cs| { - let alarm = &self.alarms.borrow(cs); - alarm.timestamp.set(timestamp); + let alarm = &self.alarms.borrow(cs); + alarm.timestamp.set(timestamp); - // Arm it. - // Note that we're not checking the high bits at all. This means the irq may fire early - // if the alarm is more than 72 minutes (2^32 us) in the future. This is OK, since on irq fire - // it is checked if the alarm time has passed. - TIMER.alarm(n).write_value(timestamp as u32); + // Arm it. + // Note that we're not checking the high bits at all. This means the irq may fire early + // if the alarm is more than 72 minutes (2^32 us) in the future. This is OK, since on irq fire + // it is checked if the alarm time has passed. + TIMER.alarm(n).write_value(timestamp as u32); - let now = self.now(); - if timestamp <= now { - // If alarm timestamp has passed the alarm will not fire. - // Disarm the alarm and return `false` to indicate that. - TIMER.armed().write(|w| w.set_armed(1 << n)); + let now = self.now(); + if timestamp <= now { + // If alarm timestamp has passed the alarm will not fire. + // Disarm the alarm and return `false` to indicate that. + TIMER.armed().write(|w| w.set_armed(1 << n)); - alarm.timestamp.set(u64::MAX); + alarm.timestamp.set(u64::MAX); - false - } else { - true - } - }) + false + } else { + true + } } fn check_alarm(&self) { @@ -75,7 +89,7 @@ impl TimerDriver { let alarm = &self.alarms.borrow(cs); let timestamp = alarm.timestamp.get(); if timestamp <= self.now() { - self.trigger_alarm() + self.trigger_alarm(cs) } else { // Not elapsed, arm it again. // This can happen if it was set more than 2^32 us in the future. @@ -87,8 +101,11 @@ impl TimerDriver { TIMER.intr().write(|w| w.set_alarm(n, true)); } - fn trigger_alarm(&self) { - TIMER_QUEUE_DRIVER.dispatch(); + fn trigger_alarm(&self, cs: CriticalSection) { + let mut next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now()); + while !self.set_alarm(cs, next) { + next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now()); + } } } @@ -125,8 +142,3 @@ fn TIMER_IRQ_0() { fn TIMER0_IRQ_0() { DRIVER.check_alarm() } - -embassy_time_queue_driver::timer_queue_impl!( - static TIMER_QUEUE_DRIVER: GlobalTimerQueue - = GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration)) -); diff --git a/embassy-stm32/src/time_driver.rs b/embassy-stm32/src/time_driver.rs index 290f857ad..a4c333d82 100644 --- a/embassy-stm32/src/time_driver.rs +++ b/embassy-stm32/src/time_driver.rs @@ -1,13 +1,13 @@ #![allow(non_snake_case)] -use core::cell::Cell; +use core::cell::{Cell, RefCell}; use core::sync::atomic::{compiler_fence, AtomicU32, Ordering}; use critical_section::CriticalSection; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::Mutex; use embassy_time_driver::{Driver, TICK_HZ}; -use embassy_time_queue_driver::GlobalTimerQueue; +use embassy_time_queue_driver::Queue; use stm32_metapac::timer::{regs, TimGp16}; use crate::interrupt::typelevel::Interrupt; @@ -214,6 +214,7 @@ pub(crate) struct RtcDriver { alarm: Mutex, #[cfg(feature = "low-power")] rtc: Mutex>>, + queue: Mutex>, } embassy_time_driver::time_driver_impl!(static DRIVER: RtcDriver = RtcDriver { @@ -221,6 +222,7 @@ embassy_time_driver::time_driver_impl!(static DRIVER: RtcDriver = RtcDriver { alarm: Mutex::const_new(CriticalSectionRawMutex::new(), AlarmState::new()), #[cfg(feature = "low-power")] rtc: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)), + queue: Mutex::new(RefCell::new(Queue::new())) }); impl RtcDriver { @@ -266,8 +268,7 @@ impl RtcDriver { fn on_interrupt(&self) { let r = regs_gp16(); - // XXX: reduce the size of this critical section ? - critical_section::with(|_cs| { + critical_section::with(|cs| { let sr = r.sr().read(); let dier = r.dier().read(); @@ -288,7 +289,7 @@ impl RtcDriver { let n = 0; if sr.ccif(n + 1) && dier.ccie(n + 1) { - self.trigger_alarm(); + self.trigger_alarm(cs); } }) } @@ -315,8 +316,11 @@ impl RtcDriver { }) } - fn trigger_alarm(&self) { - TIMER_QUEUE_DRIVER.dispatch(); + fn trigger_alarm(&self, cs: CriticalSection) { + let mut next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now()); + while !self.set_alarm(cs, next) { + next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now()); + } } /* @@ -366,9 +370,9 @@ impl RtcDriver { // Now, recompute alarm let alarm = self.alarm.borrow(cs); - if !self.set_alarm(alarm.timestamp.get()) { + if !self.set_alarm(cs, alarm.timestamp.get()) { // If the alarm timestamp has passed, we need to trigger it - self.trigger_alarm(); + self.trigger_alarm(cs); } } @@ -441,49 +445,47 @@ impl RtcDriver { }) } - fn set_alarm(&self, timestamp: u64) -> bool { - critical_section::with(|cs| { - let r = regs_gp16(); + fn set_alarm(&self, cs: CriticalSection, timestamp: u64) -> bool { + let r = regs_gp16(); - let n = 0; - self.alarm.borrow(cs).timestamp.set(timestamp); + let n = 0; + self.alarm.borrow(cs).timestamp.set(timestamp); - let t = self.now(); - if timestamp <= t { - // If alarm timestamp has passed the alarm will not fire. - // Disarm the alarm and return `false` to indicate that. - r.dier().modify(|w| w.set_ccie(n + 1, false)); + let t = self.now(); + if timestamp <= t { + // If alarm timestamp has passed the alarm will not fire. + // Disarm the alarm and return `false` to indicate that. + r.dier().modify(|w| w.set_ccie(n + 1, false)); - self.alarm.borrow(cs).timestamp.set(u64::MAX); + self.alarm.borrow(cs).timestamp.set(u64::MAX); - return false; - } + return false; + } - // Write the CCR value regardless of whether we're going to enable it now or not. - // This way, when we enable it later, the right value is already set. - r.ccr(n + 1).write(|w| w.set_ccr(timestamp as u16)); + // Write the CCR value regardless of whether we're going to enable it now or not. + // This way, when we enable it later, the right value is already set. + r.ccr(n + 1).write(|w| w.set_ccr(timestamp as u16)); - // Enable it if it'll happen soon. Otherwise, `next_period` will enable it. - let diff = timestamp - t; - r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000)); + // Enable it if it'll happen soon. Otherwise, `next_period` will enable it. + let diff = timestamp - t; + r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000)); - // Reevaluate if the alarm timestamp is still in the future - let t = self.now(); - if timestamp <= t { - // If alarm timestamp has passed since we set it, we have a race condition and - // the alarm may or may not have fired. - // Disarm the alarm and return `false` to indicate that. - // It is the caller's responsibility to handle this ambiguity. - r.dier().modify(|w| w.set_ccie(n + 1, false)); + // Reevaluate if the alarm timestamp is still in the future + let t = self.now(); + if timestamp <= t { + // If alarm timestamp has passed since we set it, we have a race condition and + // the alarm may or may not have fired. + // Disarm the alarm and return `false` to indicate that. + // It is the caller's responsibility to handle this ambiguity. + r.dier().modify(|w| w.set_ccie(n + 1, false)); - self.alarm.borrow(cs).timestamp.set(u64::MAX); + self.alarm.borrow(cs).timestamp.set(u64::MAX); - return false; - } + return false; + } - // We're confident the alarm will ring in the future. - true - }) + // We're confident the alarm will ring in the future. + true } } @@ -496,6 +498,19 @@ impl Driver for RtcDriver { let counter = r.cnt().read().cnt(); calc_now(period, counter) } + + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + critical_section::with(|cs| { + let mut queue = self.queue.borrow(cs).borrow_mut(); + + if queue.schedule_wake(at, waker) { + let mut next = queue.next_expiration(self.now()); + while !self.set_alarm(cs, next) { + next = queue.next_expiration(self.now()); + } + } + }) + } } #[cfg(feature = "low-power")] @@ -506,8 +521,3 @@ pub(crate) fn get_driver() -> &'static RtcDriver { pub(crate) fn init(cs: CriticalSection) { DRIVER.init(cs) } - -embassy_time_queue_driver::timer_queue_impl!( - static TIMER_QUEUE_DRIVER: GlobalTimerQueue - = GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration)) -); diff --git a/embassy-time-driver/src/lib.rs b/embassy-time-driver/src/lib.rs index ffb363cd7..090969d8c 100644 --- a/embassy-time-driver/src/lib.rs +++ b/embassy-time-driver/src/lib.rs @@ -38,6 +38,8 @@ //! # Example //! //! ``` +//! use core::task::Waker; +//! //! use embassy_time_driver::Driver; //! //! struct MyDriver{} // not public! @@ -46,6 +48,10 @@ //! fn now(&self) -> u64 { //! todo!() //! } +//! +//! fn schedule_wake(&self, at: u64, waker: &Waker) { +//! todo!() +//! } //! } //! //! embassy_time_driver::time_driver_impl!(static DRIVER: MyDriver = MyDriver{}); @@ -54,6 +60,8 @@ //! ## Feature flags #![doc = document_features::document_features!(feature_label = r#"{feature}"#)] +use core::task::Waker; + mod tick; /// Ticks per second of the global timebase. @@ -74,6 +82,10 @@ pub trait Driver: Send + Sync + 'static { /// you MUST extend them to 64-bit, for example by counting overflows in software, /// or chaining multiple timers together. fn now(&self) -> u64; + + /// Schedules a waker to be awoken at moment `at`. + /// If this moment is in the past, the waker might be awoken immediately. + fn schedule_wake(&self, at: u64, waker: &Waker); } extern "Rust" { @@ -97,5 +109,10 @@ macro_rules! time_driver_impl { fn _embassy_time_now() -> u64 { <$t as $crate::Driver>::now(&$name) } + + #[no_mangle] + fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker) { + <$t as $crate::Driver>::schedule_wake(&$name, at, waker); + } }; } diff --git a/embassy-time-queue-driver/src/lib.rs b/embassy-time-queue-driver/src/lib.rs index 2d5fd449a..ed490a0ef 100644 --- a/embassy-time-queue-driver/src/lib.rs +++ b/embassy-time-queue-driver/src/lib.rs @@ -49,23 +49,18 @@ //! embassy_time_queue_driver::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{}); //! ``` +use core::task::Waker; + #[cfg(not(feature = "integrated-timers"))] pub mod queue_generic; #[cfg(feature = "integrated-timers")] pub mod queue_integrated; -use core::cell::RefCell; -use core::task::Waker; +#[cfg(feature = "integrated-timers")] +pub use queue_integrated::Queue; -use critical_section::Mutex; - -/// Timer queue -pub trait TimerQueue { - /// Schedules a waker in the queue to be awoken at moment `at`. - /// - /// If this moment is in the past, the waker might be awoken immediately. - fn schedule_wake(&'static self, at: u64, waker: &Waker); -} +#[cfg(not(feature = "integrated-timers"))] +pub use queue_generic::Queue; extern "Rust" { fn _embassy_time_schedule_wake(at: u64, waker: &Waker); @@ -73,7 +68,10 @@ extern "Rust" { /// Schedule the given waker to be woken at `at`. pub fn schedule_wake(at: u64, waker: &Waker) { - #[cfg(feature = "integrated-timers")] + // This function is not implemented in embassy-time-driver because it needs access to executor + // internals. The function updates task state, then delegates to the implementation provided + // by the time driver. + #[cfg(not(feature = "_generic-queue"))] { use embassy_executor::raw::task_from_waker; use embassy_executor::raw::timer_queue::TimerEnqueueOperation; @@ -89,121 +87,3 @@ pub fn schedule_wake(at: u64, waker: &Waker) { } unsafe { _embassy_time_schedule_wake(at, waker) } } - -/// Set the TimerQueue implementation. -/// -/// See the module documentation for an example. -#[macro_export] -macro_rules! timer_queue_impl { - (static $name:ident: $t: ty = $val:expr) => { - static $name: $t = $val; - - #[no_mangle] - fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker) { - <$t as $crate::TimerQueue>::schedule_wake(&$name, at, waker); - } - }; -} - -#[cfg(feature = "integrated-timers")] -type InnerQueue = queue_integrated::TimerQueue; - -#[cfg(not(feature = "integrated-timers"))] -type InnerQueue = queue_generic::Queue; - -/// A timer queue implementation that can be used as a global timer queue. -/// -/// This implementation is not thread-safe, and should be protected by a mutex of some sort. -pub struct GenericTimerQueue bool> { - queue: InnerQueue, - set_alarm: F, -} - -impl bool> GenericTimerQueue { - /// Creates a new timer queue. - /// - /// `set_alarm` is a function that should set the next alarm time. The function should - /// return `true` if the alarm was set, and `false` if the alarm was in the past. - pub const fn new(set_alarm: F) -> Self { - Self { - queue: InnerQueue::new(), - set_alarm, - } - } - - /// Schedules a task to run at a specific time, and returns whether any changes were made. - pub fn schedule_wake(&mut self, at: u64, waker: &core::task::Waker) { - #[cfg(feature = "integrated-timers")] - let waker = embassy_executor::raw::task_from_waker(waker); - - if self.queue.schedule_wake(at, waker) { - self.dispatch() - } - } - - /// Dequeues expired timers and returns the next alarm time. - pub fn next_expiration(&mut self, now: u64) -> u64 { - self.queue.next_expiration(now) - } - - /// Handle the alarm. - /// - /// Call this function when the next alarm is due. - pub fn dispatch(&mut self) { - let mut next_expiration = self.next_expiration(embassy_time_driver::now()); - - while !(self.set_alarm)(next_expiration) { - // next_expiration is in the past, dequeue and find a new expiration - next_expiration = self.next_expiration(next_expiration); - } - } -} - -/// A [`GenericTimerQueue`] protected by a critical section. Directly useable as a [`TimerQueue`]. -pub struct GlobalTimerQueue { - inner: Mutex bool>>>, -} - -impl GlobalTimerQueue { - /// Creates a new timer queue. - /// - /// `set_alarm` is a function that should set the next alarm time. The function should - /// return `true` if the alarm was set, and `false` if the alarm was in the past. - pub const fn new(set_alarm: fn(u64) -> bool) -> Self { - Self { - inner: Mutex::new(RefCell::new(GenericTimerQueue::new(set_alarm))), - } - } - - /// Schedules a task to run at a specific time, and returns whether any changes were made. - pub fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { - critical_section::with(|cs| { - let mut inner = self.inner.borrow_ref_mut(cs); - inner.schedule_wake(at, waker); - }); - } - - /// Dequeues expired timers and returns the next alarm time. - pub fn next_expiration(&self, now: u64) -> u64 { - critical_section::with(|cs| { - let mut inner = self.inner.borrow_ref_mut(cs); - inner.next_expiration(now) - }) - } - - /// Handle the alarm. - /// - /// Call this function when the next alarm is due. - pub fn dispatch(&self) { - critical_section::with(|cs| { - let mut inner = self.inner.borrow_ref_mut(cs); - inner.dispatch() - }) - } -} - -impl TimerQueue for GlobalTimerQueue { - fn schedule_wake(&'static self, at: u64, waker: &Waker) { - GlobalTimerQueue::schedule_wake(self, at, waker) - } -} diff --git a/embassy-time-queue-driver/src/queue_integrated.rs b/embassy-time-queue-driver/src/queue_integrated.rs index b905c00c3..6bb4c0c1a 100644 --- a/embassy-time-queue-driver/src/queue_integrated.rs +++ b/embassy-time-queue-driver/src/queue_integrated.rs @@ -1,15 +1,16 @@ //! Timer queue operations. use core::cell::Cell; use core::cmp::min; +use core::task::Waker; use embassy_executor::raw::TaskRef; /// A timer queue, with items integrated into tasks. -pub struct TimerQueue { +pub struct Queue { head: Cell>, } -impl TimerQueue { +impl Queue { /// Creates a new timer queue. pub const fn new() -> Self { Self { head: Cell::new(None) } @@ -19,11 +20,12 @@ impl TimerQueue { /// /// If this function returns `true`, the called should find the next expiration time and set /// a new alarm for that time. - pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool { - let item = p.timer_queue_item(); + pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool { + let task = embassy_executor::raw::task_from_waker(waker); + let item = task.timer_queue_item(); if item.next.get().is_none() { // If not in the queue, add it and update. - let prev = self.head.replace(Some(p)); + let prev = self.head.replace(Some(task)); item.next.set(if prev.is_none() { Some(unsafe { TaskRef::dangling() }) } else { diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml index e3074119f..9959e2863 100644 --- a/embassy-time/Cargo.toml +++ b/embassy-time/Cargo.toml @@ -384,7 +384,6 @@ tick-hz-5_242_880_000 = ["embassy-time-driver/tick-hz-5_242_880_000"] [dependencies] embassy-time-driver = { version = "0.1.0", path = "../embassy-time-driver" } -embassy-time-queue-driver = { version = "0.1.0", path = "../embassy-time-queue-driver" } defmt = { version = "0.3", optional = true } log = { version = "0.4.14", optional = true } diff --git a/embassy-time/src/driver_mock.rs b/embassy-time/src/driver_mock.rs index 829eb0437..138d60499 100644 --- a/embassy-time/src/driver_mock.rs +++ b/embassy-time/src/driver_mock.rs @@ -1,7 +1,9 @@ use core::cell::RefCell; +use core::task::Waker; use critical_section::Mutex as CsMutex; use embassy_time_driver::Driver; +use embassy_time_queue_driver::Queue; use crate::{Duration, Instant}; @@ -52,50 +54,12 @@ impl MockDriver { /// Advances the time by the specified [`Duration`]. /// Calling any alarm callbacks that are due. pub fn advance(&self, duration: Duration) { - let notify = { - critical_section::with(|cs| { - let mut inner = self.0.borrow_ref_mut(cs); - - inner.now += duration; - - let now = inner.now.as_ticks(); - - if inner.alarm.timestamp <= now { - inner.alarm.timestamp = u64::MAX; - - Some((inner.alarm.callback, inner.alarm.ctx)) - } else { - None - } - }) - }; - - if let Some((callback, ctx)) = notify { - (callback)(ctx); - } - } - - /// Configures a callback to be called when the alarm fires. - pub fn set_alarm_callback(&self, callback: fn(*mut ()), ctx: *mut ()) { critical_section::with(|cs| { - let mut inner = self.0.borrow_ref_mut(cs); + let inner = &mut *self.0.borrow_ref_mut(cs); - inner.alarm.callback = callback; - inner.alarm.ctx = ctx; - }); - } - - /// Sets the alarm to fire at the specified timestamp. - pub fn set_alarm(&self, timestamp: u64) -> bool { - critical_section::with(|cs| { - let mut inner = self.0.borrow_ref_mut(cs); - - if timestamp <= inner.now.as_ticks() { - false - } else { - inner.alarm.timestamp = timestamp; - true - } + inner.now += duration; + // wake expired tasks. + inner.queue.next_expiration(inner.now.as_ticks()); }) } } @@ -104,44 +68,38 @@ impl Driver for MockDriver { fn now(&self) -> u64 { critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks() } + + fn schedule_wake(&self, at: u64, waker: &Waker) { + critical_section::with(|cs| { + let inner = &mut *self.0.borrow_ref_mut(cs); + // enqueue it + inner.queue.schedule_wake(at, waker); + // wake it if it's in the past. + inner.queue.next_expiration(inner.now.as_ticks()); + }) + } } struct InnerMockDriver { now: Instant, - alarm: AlarmState, + queue: Queue, } impl InnerMockDriver { const fn new() -> Self { Self { now: Instant::from_ticks(0), - alarm: AlarmState::new(), + queue: Queue::new(), } } } -struct AlarmState { - timestamp: u64, - callback: fn(*mut ()), - ctx: *mut (), -} - -impl AlarmState { - const fn new() -> Self { - Self { - timestamp: u64::MAX, - callback: Self::noop, - ctx: core::ptr::null_mut(), - } - } - - fn noop(_ctx: *mut ()) {} -} - -unsafe impl Send for AlarmState {} - #[cfg(test)] mod tests { + use core::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::task::Wake; + use serial_test::serial; use super::*; @@ -163,24 +121,25 @@ mod tests { #[test] #[serial] - fn test_set_alarm_not_in_future() { + fn test_schedule_wake() { setup(); - let driver = MockDriver::get(); - assert_eq!(false, driver.set_alarm(driver.now())); - } + static CALLBACK_CALLED: AtomicBool = AtomicBool::new(false); - #[test] - #[serial] - fn test_alarm() { - setup(); + struct MockWaker; + + impl Wake for MockWaker { + fn wake(self: Arc) { + CALLBACK_CALLED.store(true, Ordering::Relaxed); + } + } + let waker = Arc::new(MockWaker).into(); let driver = MockDriver::get(); - static mut CALLBACK_CALLED: bool = false; - driver.set_alarm_callback(|_| unsafe { CALLBACK_CALLED = true }, core::ptr::null_mut()); - driver.set_alarm(driver.now() + 1); - assert_eq!(false, unsafe { CALLBACK_CALLED }); + + driver.schedule_wake(driver.now() + 1, &waker); + assert_eq!(false, CALLBACK_CALLED.load(Ordering::Relaxed)); driver.advance(Duration::from_secs(1)); - assert_eq!(true, unsafe { CALLBACK_CALLED }); + assert_eq!(true, CALLBACK_CALLED.load(Ordering::Relaxed)); } } diff --git a/embassy-time/src/driver_std.rs b/embassy-time/src/driver_std.rs index 45467f09b..35888fddd 100644 --- a/embassy-time/src/driver_std.rs +++ b/embassy-time/src/driver_std.rs @@ -1,97 +1,67 @@ -use std::cell::{RefCell, UnsafeCell}; -use std::mem::MaybeUninit; -use std::sync::{Condvar, Mutex, Once}; +use std::sync::{Condvar, Mutex}; +use std::thread; use std::time::{Duration as StdDuration, Instant as StdInstant}; -use std::{ptr, thread}; -use critical_section::Mutex as CsMutex; use embassy_time_driver::Driver; -use embassy_time_queue_driver::GlobalTimerQueue; - -struct AlarmState { - timestamp: u64, -} - -unsafe impl Send for AlarmState {} - -impl AlarmState { - const fn new() -> Self { - Self { timestamp: u64::MAX } - } -} +use embassy_time_queue_driver::Queue; struct TimeDriver { - once: Once, - // The STD Driver implementation requires the alarm's mutex to be reentrant, which the STD Mutex isn't - // Fortunately, mutexes based on the `critical-section` crate are reentrant, because the critical sections - // themselves are reentrant - alarm: UninitCell>>, - zero_instant: UninitCell, - signaler: UninitCell, + signaler: Signaler, + inner: Mutex, +} + +struct Inner { + zero_instant: Option, + queue: Queue, } embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { - once: Once::new(), - alarm: UninitCell::uninit(), - zero_instant: UninitCell::uninit(), - signaler: UninitCell::uninit(), + inner: Mutex::new(Inner{ + zero_instant: None, + queue: Queue::new(), + }), + signaler: Signaler::new(), }); -impl TimeDriver { - fn init(&self) { - self.once.call_once(|| unsafe { - self.alarm - .write(CsMutex::new(RefCell::new(const { AlarmState::new() }))); - self.zero_instant.write(StdInstant::now()); - self.signaler.write(Signaler::new()); - - thread::spawn(Self::alarm_thread); - }); - } - - fn alarm_thread() { - let zero = unsafe { DRIVER.zero_instant.read() }; - loop { - let now = DRIVER.now(); - - let next_alarm = critical_section::with(|cs| { - let mut alarm = unsafe { DRIVER.alarm.as_ref() }.borrow_ref_mut(cs); - if alarm.timestamp <= now { - alarm.timestamp = u64::MAX; - - TIMER_QUEUE_DRIVER.dispatch(); - } - alarm.timestamp - }); - - // Ensure we don't overflow - let until = zero - .checked_add(StdDuration::from_micros(next_alarm)) - .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1)); - - unsafe { DRIVER.signaler.as_ref() }.wait_until(until); - } - } - - fn set_alarm(&self, timestamp: u64) -> bool { - self.init(); - critical_section::with(|cs| { - let mut alarm = unsafe { self.alarm.as_ref() }.borrow_ref_mut(cs); - alarm.timestamp = timestamp; - unsafe { self.signaler.as_ref() }.signal(); - }); - - true +impl Inner { + fn init(&mut self) -> StdInstant { + *self.zero_instant.get_or_insert_with(|| { + thread::spawn(alarm_thread); + StdInstant::now() + }) } } impl Driver for TimeDriver { fn now(&self) -> u64 { - self.init(); - - let zero = unsafe { self.zero_instant.read() }; + let mut inner = self.inner.lock().unwrap(); + let zero = inner.init(); StdInstant::now().duration_since(zero).as_micros() as u64 } + + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + let mut inner = self.inner.lock().unwrap(); + inner.init(); + if inner.queue.schedule_wake(at, waker) { + self.signaler.signal(); + } + } +} + +fn alarm_thread() { + let zero = DRIVER.inner.lock().unwrap().zero_instant.unwrap(); + loop { + let now = DRIVER.now(); + + let next_alarm = DRIVER.inner.lock().unwrap().queue.next_expiration(now); + + // Ensure we don't overflow + let until = zero + .checked_add(StdDuration::from_micros(next_alarm)) + .unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1)); + + DRIVER.signaler.wait_until(until); + } } struct Signaler { @@ -100,7 +70,7 @@ struct Signaler { } impl Signaler { - fn new() -> Self { + const fn new() -> Self { Self { mutex: Mutex::new(false), condvar: Condvar::new(), @@ -132,40 +102,3 @@ impl Signaler { self.condvar.notify_one(); } } - -pub(crate) struct UninitCell(MaybeUninit>); -unsafe impl Send for UninitCell {} -unsafe impl Sync for UninitCell {} - -impl UninitCell { - pub const fn uninit() -> Self { - Self(MaybeUninit::uninit()) - } - - pub unsafe fn as_ptr(&self) -> *const T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_mut_ptr(&self) -> *mut T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_ref(&self) -> &T { - &*self.as_ptr() - } - - pub unsafe fn write(&self, val: T) { - ptr::write(self.as_mut_ptr(), val) - } -} - -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) - } -} - -embassy_time_queue_driver::timer_queue_impl!( - static TIMER_QUEUE_DRIVER: GlobalTimerQueue - = GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration)) -); diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs index dcc935fde..bcdd1670b 100644 --- a/embassy-time/src/driver_wasm.rs +++ b/embassy-time/src/driver_wasm.rs @@ -1,10 +1,7 @@ -use std::cell::UnsafeCell; -use std::mem::MaybeUninit; -use std::ptr; -use std::sync::{Mutex, Once}; +use std::sync::Mutex; use embassy_time_driver::Driver; -use embassy_time_queue_driver::GlobalTimerQueue; +use embassy_time_queue_driver::Queue; use wasm_bindgen::prelude::*; use wasm_timer::Instant as StdInstant; @@ -12,8 +9,6 @@ struct AlarmState { token: Option, } -unsafe impl Send for AlarmState {} - impl AlarmState { const fn new() -> Self { Self { token: None } @@ -27,33 +22,38 @@ extern "C" { } struct TimeDriver { - once: Once, - alarm: UninitCell>, - zero_instant: UninitCell, - closure: UninitCell>, + inner: Mutex, } +struct Inner { + alarm: AlarmState, + zero_instant: Option, + queue: Queue, + closure: Option>, +} + +unsafe impl Send for Inner {} + embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver { - once: Once::new(), - alarm: UninitCell::uninit(), - zero_instant: UninitCell::uninit(), - closure: UninitCell::uninit() + inner: Mutex::new(Inner{ + zero_instant: None, + queue: Queue::new(), + alarm: AlarmState::new(), + closure: None, + }), }); -impl TimeDriver { - fn init(&self) { - self.once.call_once(|| unsafe { - self.alarm.write(Mutex::new(const { AlarmState::new() })); - self.zero_instant.write(StdInstant::now()); - self.closure - .write(Closure::new(Box::new(|| TIMER_QUEUE_DRIVER.dispatch()))); - }); +impl Inner { + fn init(&mut self) -> StdInstant { + *self.zero_instant.get_or_insert_with(StdInstant::now) } - fn set_alarm(&self, timestamp: u64) -> bool { - self.init(); - let mut alarm = unsafe { self.alarm.as_ref() }.lock().unwrap(); - if let Some(token) = alarm.token { + fn now(&mut self) -> u64 { + StdInstant::now().duration_since(self.zero_instant.unwrap()).as_micros() as u64 + } + + fn set_alarm(&mut self, timestamp: u64) -> bool { + if let Some(token) = self.alarm.token { clearTimeout(token); } @@ -62,7 +62,8 @@ impl TimeDriver { false } else { let timeout = (timestamp - now) as u32; - alarm.token = Some(setTimeout(unsafe { self.closure.as_ref() }, timeout / 1000)); + let closure = self.closure.get_or_insert_with(|| Closure::new(dispatch)); + self.alarm.token = Some(setTimeout(closure, timeout / 1000)); true } @@ -71,45 +72,32 @@ impl TimeDriver { impl Driver for TimeDriver { fn now(&self) -> u64 { - self.init(); - - let zero = unsafe { self.zero_instant.read() }; + let mut inner = self.inner.lock().unwrap(); + let zero = inner.init(); StdInstant::now().duration_since(zero).as_micros() as u64 } -} -pub(crate) struct UninitCell(MaybeUninit>); -unsafe impl Send for UninitCell {} -unsafe impl Sync for UninitCell {} - -impl UninitCell { - pub const fn uninit() -> Self { - Self(MaybeUninit::uninit()) - } - unsafe fn as_ptr(&self) -> *const T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_mut_ptr(&self) -> *mut T { - (*self.0.as_ptr()).get() - } - - pub unsafe fn as_ref(&self) -> &T { - &*self.as_ptr() - } - - pub unsafe fn write(&self, val: T) { - ptr::write(self.as_mut_ptr(), val) + fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { + let mut inner = self.inner.lock().unwrap(); + inner.init(); + if inner.queue.schedule_wake(at, waker) { + let now = inner.now(); + let mut next = inner.queue.next_expiration(now); + while !inner.set_alarm(next) { + let now = inner.now(); + next = inner.queue.next_expiration(now); + } + } } } -impl UninitCell { - pub unsafe fn read(&self) -> T { - ptr::read(self.as_mut_ptr()) +fn dispatch() { + let inner = &mut *DRIVER.inner.lock().unwrap(); + + let now = inner.now(); + let mut next = inner.queue.next_expiration(now); + while !inner.set_alarm(next) { + let now = inner.now(); + next = inner.queue.next_expiration(now); } } - -embassy_time_queue_driver::timer_queue_impl!( - static TIMER_QUEUE_DRIVER: GlobalTimerQueue - = GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration)) -);