Merge time-driver and time-queue-driver traits, make HALs own and handle the queue.

This commit is contained in:
Dario Nieuwenhuis 2024-12-08 23:27:32 +01:00 committed by Dániel Buga
parent ec96395d08
commit b268b1795f
No known key found for this signature in database
11 changed files with 328 additions and 517 deletions

2
.github/ci/test.sh vendored
View File

@ -17,7 +17,7 @@ cargo test --manifest-path ./embassy-futures/Cargo.toml
cargo test --manifest-path ./embassy-sync/Cargo.toml
cargo test --manifest-path ./embassy-embedded-hal/Cargo.toml
cargo test --manifest-path ./embassy-hal-internal/Cargo.toml
cargo test --manifest-path ./embassy-time/Cargo.toml --features mock-driver
cargo test --manifest-path ./embassy-time/Cargo.toml --features mock-driver,embassy-time-queue-driver/generic-queue-8
cargo test --manifest-path ./embassy-time-driver/Cargo.toml
cargo test --manifest-path ./embassy-boot/Cargo.toml

View File

@ -1,11 +1,11 @@
use core::cell::Cell;
use core::cell::{Cell, RefCell};
use core::sync::atomic::{compiler_fence, AtomicU32, Ordering};
use critical_section::CriticalSection;
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::blocking_mutex::CriticalSectionMutex as Mutex;
use embassy_time_driver::Driver;
use embassy_time_queue_driver::GlobalTimerQueue;
use embassy_time_queue_driver::Queue;
use crate::interrupt::InterruptExt;
use crate::{interrupt, pac};
@ -111,11 +111,13 @@ struct RtcDriver {
period: AtomicU32,
/// Timestamp at which to fire alarm. u64::MAX if no alarm is scheduled.
alarms: Mutex<AlarmState>,
queue: Mutex<RefCell<Queue>>,
}
embassy_time_driver::time_driver_impl!(static DRIVER: RtcDriver = RtcDriver {
period: AtomicU32::new(0),
alarms: Mutex::const_new(CriticalSectionRawMutex::new(), AlarmState::new()),
queue: Mutex::new(RefCell::new(Queue::new())),
});
impl RtcDriver {
@ -194,59 +196,60 @@ impl RtcDriver {
alarm.timestamp.set(u64::MAX);
// Call after clearing alarm, so the callback can set another alarm.
TIMER_QUEUE_DRIVER.dispatch();
let mut next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now());
while !self.set_alarm(cs, next) {
next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now());
}
}
fn set_alarm(&self, timestamp: u64) -> bool {
critical_section::with(|cs| {
let n = 0;
let alarm = &self.alarms.borrow(cs);
alarm.timestamp.set(timestamp);
fn set_alarm(&self, cs: CriticalSection, timestamp: u64) -> bool {
let n = 0;
let alarm = &self.alarms.borrow(cs);
alarm.timestamp.set(timestamp);
let r = rtc();
let r = rtc();
let t = self.now();
if timestamp <= t {
// If alarm timestamp has passed the alarm will not fire.
// Disarm the alarm and return `false` to indicate that.
r.intenclr().write(|w| w.0 = compare_n(n));
let t = self.now();
if timestamp <= t {
// If alarm timestamp has passed the alarm will not fire.
// Disarm the alarm and return `false` to indicate that.
r.intenclr().write(|w| w.0 = compare_n(n));
alarm.timestamp.set(u64::MAX);
alarm.timestamp.set(u64::MAX);
return false;
}
return false;
}
// If it hasn't triggered yet, setup it in the compare channel.
// If it hasn't triggered yet, setup it in the compare channel.
// Write the CC value regardless of whether we're going to enable it now or not.
// This way, when we enable it later, the right value is already set.
// Write the CC value regardless of whether we're going to enable it now or not.
// This way, when we enable it later, the right value is already set.
// nrf52 docs say:
// If the COUNTER is N, writing N or N+1 to a CC register may not trigger a COMPARE event.
// To workaround this, we never write a timestamp smaller than N+3.
// N+2 is not safe because rtc can tick from N to N+1 between calling now() and writing cc.
//
// It is impossible for rtc to tick more than once because
// - this code takes less time than 1 tick
// - it runs with interrupts disabled so nothing else can preempt it.
//
// This means that an alarm can be delayed for up to 2 ticks (from t+1 to t+3), but this is allowed
// by the Alarm trait contract. What's not allowed is triggering alarms *before* their scheduled time,
// and we don't do that here.
let safe_timestamp = timestamp.max(t + 3);
r.cc(n).write(|w| w.set_compare(safe_timestamp as u32 & 0xFFFFFF));
// nrf52 docs say:
// If the COUNTER is N, writing N or N+1 to a CC register may not trigger a COMPARE event.
// To workaround this, we never write a timestamp smaller than N+3.
// N+2 is not safe because rtc can tick from N to N+1 between calling now() and writing cc.
//
// It is impossible for rtc to tick more than once because
// - this code takes less time than 1 tick
// - it runs with interrupts disabled so nothing else can preempt it.
//
// This means that an alarm can be delayed for up to 2 ticks (from t+1 to t+3), but this is allowed
// by the Alarm trait contract. What's not allowed is triggering alarms *before* their scheduled time,
// and we don't do that here.
let safe_timestamp = timestamp.max(t + 3);
r.cc(n).write(|w| w.set_compare(safe_timestamp as u32 & 0xFFFFFF));
let diff = timestamp - t;
if diff < 0xc00000 {
r.intenset().write(|w| w.0 = compare_n(n));
} else {
// If it's too far in the future, don't setup the compare channel yet.
// It will be setup later by `next_period`.
r.intenclr().write(|w| w.0 = compare_n(n));
}
let diff = timestamp - t;
if diff < 0xc00000 {
r.intenset().write(|w| w.0 = compare_n(n));
} else {
// If it's too far in the future, don't setup the compare channel yet.
// It will be setup later by `next_period`.
r.intenclr().write(|w| w.0 = compare_n(n));
}
true
})
true
}
}
@ -258,6 +261,19 @@ impl Driver for RtcDriver {
let counter = rtc().counter().read().0;
calc_now(period, counter)
}
fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
critical_section::with(|cs| {
let mut queue = self.queue.borrow(cs).borrow_mut();
if queue.schedule_wake(at, waker) {
let mut next = queue.next_expiration(self.now());
while !self.set_alarm(cs, next) {
next = queue.next_expiration(self.now());
}
}
})
}
}
#[cfg(feature = "_nrf54l")]
@ -277,8 +293,3 @@ fn RTC1() {
pub(crate) fn init(irq_prio: crate::interrupt::Priority) {
DRIVER.init(irq_prio)
}
embassy_time_queue_driver::timer_queue_impl!(
static TIMER_QUEUE_DRIVER: GlobalTimerQueue
= GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration))
);

View File

@ -1,10 +1,11 @@
//! Timer driver.
use core::cell::Cell;
use core::cell::{Cell, RefCell};
use critical_section::CriticalSection;
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::blocking_mutex::Mutex;
use embassy_time_driver::Driver;
use embassy_time_queue_driver::GlobalTimerQueue;
use embassy_time_queue_driver::Queue;
#[cfg(feature = "rp2040")]
use pac::TIMER;
#[cfg(feature = "_rp235x")]
@ -20,12 +21,14 @@ unsafe impl Send for AlarmState {}
struct TimerDriver {
alarms: Mutex<CriticalSectionRawMutex, AlarmState>,
queue: Mutex<CriticalSectionRawMutex, RefCell<Queue>>,
}
embassy_time_driver::time_driver_impl!(static DRIVER: TimerDriver = TimerDriver{
alarms: Mutex::const_new(CriticalSectionRawMutex::new(), AlarmState {
timestamp: Cell::new(0),
}),
queue: Mutex::new(RefCell::new(Queue::new()))
});
impl Driver for TimerDriver {
@ -39,34 +42,45 @@ impl Driver for TimerDriver {
}
}
}
fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
critical_section::with(|cs| {
let mut queue = self.queue.borrow(cs).borrow_mut();
if queue.schedule_wake(at, waker) {
let mut next = queue.next_expiration(self.now());
while !self.set_alarm(cs, next) {
next = queue.next_expiration(self.now());
}
}
})
}
}
impl TimerDriver {
fn set_alarm(&self, timestamp: u64) -> bool {
fn set_alarm(&self, cs: CriticalSection, timestamp: u64) -> bool {
let n = 0;
critical_section::with(|cs| {
let alarm = &self.alarms.borrow(cs);
alarm.timestamp.set(timestamp);
let alarm = &self.alarms.borrow(cs);
alarm.timestamp.set(timestamp);
// Arm it.
// Note that we're not checking the high bits at all. This means the irq may fire early
// if the alarm is more than 72 minutes (2^32 us) in the future. This is OK, since on irq fire
// it is checked if the alarm time has passed.
TIMER.alarm(n).write_value(timestamp as u32);
// Arm it.
// Note that we're not checking the high bits at all. This means the irq may fire early
// if the alarm is more than 72 minutes (2^32 us) in the future. This is OK, since on irq fire
// it is checked if the alarm time has passed.
TIMER.alarm(n).write_value(timestamp as u32);
let now = self.now();
if timestamp <= now {
// If alarm timestamp has passed the alarm will not fire.
// Disarm the alarm and return `false` to indicate that.
TIMER.armed().write(|w| w.set_armed(1 << n));
let now = self.now();
if timestamp <= now {
// If alarm timestamp has passed the alarm will not fire.
// Disarm the alarm and return `false` to indicate that.
TIMER.armed().write(|w| w.set_armed(1 << n));
alarm.timestamp.set(u64::MAX);
alarm.timestamp.set(u64::MAX);
false
} else {
true
}
})
false
} else {
true
}
}
fn check_alarm(&self) {
@ -75,7 +89,7 @@ impl TimerDriver {
let alarm = &self.alarms.borrow(cs);
let timestamp = alarm.timestamp.get();
if timestamp <= self.now() {
self.trigger_alarm()
self.trigger_alarm(cs)
} else {
// Not elapsed, arm it again.
// This can happen if it was set more than 2^32 us in the future.
@ -87,8 +101,11 @@ impl TimerDriver {
TIMER.intr().write(|w| w.set_alarm(n, true));
}
fn trigger_alarm(&self) {
TIMER_QUEUE_DRIVER.dispatch();
fn trigger_alarm(&self, cs: CriticalSection) {
let mut next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now());
while !self.set_alarm(cs, next) {
next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now());
}
}
}
@ -125,8 +142,3 @@ fn TIMER_IRQ_0() {
fn TIMER0_IRQ_0() {
DRIVER.check_alarm()
}
embassy_time_queue_driver::timer_queue_impl!(
static TIMER_QUEUE_DRIVER: GlobalTimerQueue
= GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration))
);

View File

@ -1,13 +1,13 @@
#![allow(non_snake_case)]
use core::cell::Cell;
use core::cell::{Cell, RefCell};
use core::sync::atomic::{compiler_fence, AtomicU32, Ordering};
use critical_section::CriticalSection;
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::blocking_mutex::Mutex;
use embassy_time_driver::{Driver, TICK_HZ};
use embassy_time_queue_driver::GlobalTimerQueue;
use embassy_time_queue_driver::Queue;
use stm32_metapac::timer::{regs, TimGp16};
use crate::interrupt::typelevel::Interrupt;
@ -214,6 +214,7 @@ pub(crate) struct RtcDriver {
alarm: Mutex<CriticalSectionRawMutex, AlarmState>,
#[cfg(feature = "low-power")]
rtc: Mutex<CriticalSectionRawMutex, Cell<Option<&'static Rtc>>>,
queue: Mutex<CriticalSectionRawMutex, RefCell<Queue>>,
}
embassy_time_driver::time_driver_impl!(static DRIVER: RtcDriver = RtcDriver {
@ -221,6 +222,7 @@ embassy_time_driver::time_driver_impl!(static DRIVER: RtcDriver = RtcDriver {
alarm: Mutex::const_new(CriticalSectionRawMutex::new(), AlarmState::new()),
#[cfg(feature = "low-power")]
rtc: Mutex::const_new(CriticalSectionRawMutex::new(), Cell::new(None)),
queue: Mutex::new(RefCell::new(Queue::new()))
});
impl RtcDriver {
@ -266,8 +268,7 @@ impl RtcDriver {
fn on_interrupt(&self) {
let r = regs_gp16();
// XXX: reduce the size of this critical section ?
critical_section::with(|_cs| {
critical_section::with(|cs| {
let sr = r.sr().read();
let dier = r.dier().read();
@ -288,7 +289,7 @@ impl RtcDriver {
let n = 0;
if sr.ccif(n + 1) && dier.ccie(n + 1) {
self.trigger_alarm();
self.trigger_alarm(cs);
}
})
}
@ -315,8 +316,11 @@ impl RtcDriver {
})
}
fn trigger_alarm(&self) {
TIMER_QUEUE_DRIVER.dispatch();
fn trigger_alarm(&self, cs: CriticalSection) {
let mut next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now());
while !self.set_alarm(cs, next) {
next = self.queue.borrow(cs).borrow_mut().next_expiration(self.now());
}
}
/*
@ -366,9 +370,9 @@ impl RtcDriver {
// Now, recompute alarm
let alarm = self.alarm.borrow(cs);
if !self.set_alarm(alarm.timestamp.get()) {
if !self.set_alarm(cs, alarm.timestamp.get()) {
// If the alarm timestamp has passed, we need to trigger it
self.trigger_alarm();
self.trigger_alarm(cs);
}
}
@ -441,49 +445,47 @@ impl RtcDriver {
})
}
fn set_alarm(&self, timestamp: u64) -> bool {
critical_section::with(|cs| {
let r = regs_gp16();
fn set_alarm(&self, cs: CriticalSection, timestamp: u64) -> bool {
let r = regs_gp16();
let n = 0;
self.alarm.borrow(cs).timestamp.set(timestamp);
let n = 0;
self.alarm.borrow(cs).timestamp.set(timestamp);
let t = self.now();
if timestamp <= t {
// If alarm timestamp has passed the alarm will not fire.
// Disarm the alarm and return `false` to indicate that.
r.dier().modify(|w| w.set_ccie(n + 1, false));
let t = self.now();
if timestamp <= t {
// If alarm timestamp has passed the alarm will not fire.
// Disarm the alarm and return `false` to indicate that.
r.dier().modify(|w| w.set_ccie(n + 1, false));
self.alarm.borrow(cs).timestamp.set(u64::MAX);
self.alarm.borrow(cs).timestamp.set(u64::MAX);
return false;
}
return false;
}
// Write the CCR value regardless of whether we're going to enable it now or not.
// This way, when we enable it later, the right value is already set.
r.ccr(n + 1).write(|w| w.set_ccr(timestamp as u16));
// Write the CCR value regardless of whether we're going to enable it now or not.
// This way, when we enable it later, the right value is already set.
r.ccr(n + 1).write(|w| w.set_ccr(timestamp as u16));
// Enable it if it'll happen soon. Otherwise, `next_period` will enable it.
let diff = timestamp - t;
r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000));
// Enable it if it'll happen soon. Otherwise, `next_period` will enable it.
let diff = timestamp - t;
r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000));
// Reevaluate if the alarm timestamp is still in the future
let t = self.now();
if timestamp <= t {
// If alarm timestamp has passed since we set it, we have a race condition and
// the alarm may or may not have fired.
// Disarm the alarm and return `false` to indicate that.
// It is the caller's responsibility to handle this ambiguity.
r.dier().modify(|w| w.set_ccie(n + 1, false));
// Reevaluate if the alarm timestamp is still in the future
let t = self.now();
if timestamp <= t {
// If alarm timestamp has passed since we set it, we have a race condition and
// the alarm may or may not have fired.
// Disarm the alarm and return `false` to indicate that.
// It is the caller's responsibility to handle this ambiguity.
r.dier().modify(|w| w.set_ccie(n + 1, false));
self.alarm.borrow(cs).timestamp.set(u64::MAX);
self.alarm.borrow(cs).timestamp.set(u64::MAX);
return false;
}
return false;
}
// We're confident the alarm will ring in the future.
true
})
// We're confident the alarm will ring in the future.
true
}
}
@ -496,6 +498,19 @@ impl Driver for RtcDriver {
let counter = r.cnt().read().cnt();
calc_now(period, counter)
}
fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
critical_section::with(|cs| {
let mut queue = self.queue.borrow(cs).borrow_mut();
if queue.schedule_wake(at, waker) {
let mut next = queue.next_expiration(self.now());
while !self.set_alarm(cs, next) {
next = queue.next_expiration(self.now());
}
}
})
}
}
#[cfg(feature = "low-power")]
@ -506,8 +521,3 @@ pub(crate) fn get_driver() -> &'static RtcDriver {
pub(crate) fn init(cs: CriticalSection) {
DRIVER.init(cs)
}
embassy_time_queue_driver::timer_queue_impl!(
static TIMER_QUEUE_DRIVER: GlobalTimerQueue
= GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration))
);

View File

@ -38,6 +38,8 @@
//! # Example
//!
//! ```
//! use core::task::Waker;
//!
//! use embassy_time_driver::Driver;
//!
//! struct MyDriver{} // not public!
@ -46,6 +48,10 @@
//! fn now(&self) -> u64 {
//! todo!()
//! }
//!
//! fn schedule_wake(&self, at: u64, waker: &Waker) {
//! todo!()
//! }
//! }
//!
//! embassy_time_driver::time_driver_impl!(static DRIVER: MyDriver = MyDriver{});
@ -54,6 +60,8 @@
//! ## Feature flags
#![doc = document_features::document_features!(feature_label = r#"<span class="stab portability"><code>{feature}</code></span>"#)]
use core::task::Waker;
mod tick;
/// Ticks per second of the global timebase.
@ -74,6 +82,10 @@ pub trait Driver: Send + Sync + 'static {
/// you MUST extend them to 64-bit, for example by counting overflows in software,
/// or chaining multiple timers together.
fn now(&self) -> u64;
/// Schedules a waker to be awoken at moment `at`.
/// If this moment is in the past, the waker might be awoken immediately.
fn schedule_wake(&self, at: u64, waker: &Waker);
}
extern "Rust" {
@ -97,5 +109,10 @@ macro_rules! time_driver_impl {
fn _embassy_time_now() -> u64 {
<$t as $crate::Driver>::now(&$name)
}
#[no_mangle]
fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker) {
<$t as $crate::Driver>::schedule_wake(&$name, at, waker);
}
};
}

View File

@ -49,23 +49,18 @@
//! embassy_time_queue_driver::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
//! ```
use core::task::Waker;
#[cfg(not(feature = "integrated-timers"))]
pub mod queue_generic;
#[cfg(feature = "integrated-timers")]
pub mod queue_integrated;
use core::cell::RefCell;
use core::task::Waker;
#[cfg(feature = "integrated-timers")]
pub use queue_integrated::Queue;
use critical_section::Mutex;
/// Timer queue
pub trait TimerQueue {
/// Schedules a waker in the queue to be awoken at moment `at`.
///
/// If this moment is in the past, the waker might be awoken immediately.
fn schedule_wake(&'static self, at: u64, waker: &Waker);
}
#[cfg(not(feature = "integrated-timers"))]
pub use queue_generic::Queue;
extern "Rust" {
fn _embassy_time_schedule_wake(at: u64, waker: &Waker);
@ -73,7 +68,10 @@ extern "Rust" {
/// Schedule the given waker to be woken at `at`.
pub fn schedule_wake(at: u64, waker: &Waker) {
#[cfg(feature = "integrated-timers")]
// This function is not implemented in embassy-time-driver because it needs access to executor
// internals. The function updates task state, then delegates to the implementation provided
// by the time driver.
#[cfg(not(feature = "_generic-queue"))]
{
use embassy_executor::raw::task_from_waker;
use embassy_executor::raw::timer_queue::TimerEnqueueOperation;
@ -89,121 +87,3 @@ pub fn schedule_wake(at: u64, waker: &Waker) {
}
unsafe { _embassy_time_schedule_wake(at, waker) }
}
/// Set the TimerQueue implementation.
///
/// See the module documentation for an example.
#[macro_export]
macro_rules! timer_queue_impl {
(static $name:ident: $t: ty = $val:expr) => {
static $name: $t = $val;
#[no_mangle]
fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker) {
<$t as $crate::TimerQueue>::schedule_wake(&$name, at, waker);
}
};
}
#[cfg(feature = "integrated-timers")]
type InnerQueue = queue_integrated::TimerQueue;
#[cfg(not(feature = "integrated-timers"))]
type InnerQueue = queue_generic::Queue;
/// A timer queue implementation that can be used as a global timer queue.
///
/// This implementation is not thread-safe, and should be protected by a mutex of some sort.
pub struct GenericTimerQueue<F: Fn(u64) -> bool> {
queue: InnerQueue,
set_alarm: F,
}
impl<F: Fn(u64) -> bool> GenericTimerQueue<F> {
/// Creates a new timer queue.
///
/// `set_alarm` is a function that should set the next alarm time. The function should
/// return `true` if the alarm was set, and `false` if the alarm was in the past.
pub const fn new(set_alarm: F) -> Self {
Self {
queue: InnerQueue::new(),
set_alarm,
}
}
/// Schedules a task to run at a specific time, and returns whether any changes were made.
pub fn schedule_wake(&mut self, at: u64, waker: &core::task::Waker) {
#[cfg(feature = "integrated-timers")]
let waker = embassy_executor::raw::task_from_waker(waker);
if self.queue.schedule_wake(at, waker) {
self.dispatch()
}
}
/// Dequeues expired timers and returns the next alarm time.
pub fn next_expiration(&mut self, now: u64) -> u64 {
self.queue.next_expiration(now)
}
/// Handle the alarm.
///
/// Call this function when the next alarm is due.
pub fn dispatch(&mut self) {
let mut next_expiration = self.next_expiration(embassy_time_driver::now());
while !(self.set_alarm)(next_expiration) {
// next_expiration is in the past, dequeue and find a new expiration
next_expiration = self.next_expiration(next_expiration);
}
}
}
/// A [`GenericTimerQueue`] protected by a critical section. Directly useable as a [`TimerQueue`].
pub struct GlobalTimerQueue {
inner: Mutex<RefCell<GenericTimerQueue<fn(u64) -> bool>>>,
}
impl GlobalTimerQueue {
/// Creates a new timer queue.
///
/// `set_alarm` is a function that should set the next alarm time. The function should
/// return `true` if the alarm was set, and `false` if the alarm was in the past.
pub const fn new(set_alarm: fn(u64) -> bool) -> Self {
Self {
inner: Mutex::new(RefCell::new(GenericTimerQueue::new(set_alarm))),
}
}
/// Schedules a task to run at a specific time, and returns whether any changes were made.
pub fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
critical_section::with(|cs| {
let mut inner = self.inner.borrow_ref_mut(cs);
inner.schedule_wake(at, waker);
});
}
/// Dequeues expired timers and returns the next alarm time.
pub fn next_expiration(&self, now: u64) -> u64 {
critical_section::with(|cs| {
let mut inner = self.inner.borrow_ref_mut(cs);
inner.next_expiration(now)
})
}
/// Handle the alarm.
///
/// Call this function when the next alarm is due.
pub fn dispatch(&self) {
critical_section::with(|cs| {
let mut inner = self.inner.borrow_ref_mut(cs);
inner.dispatch()
})
}
}
impl TimerQueue for GlobalTimerQueue {
fn schedule_wake(&'static self, at: u64, waker: &Waker) {
GlobalTimerQueue::schedule_wake(self, at, waker)
}
}

View File

@ -1,15 +1,16 @@
//! Timer queue operations.
use core::cell::Cell;
use core::cmp::min;
use core::task::Waker;
use embassy_executor::raw::TaskRef;
/// A timer queue, with items integrated into tasks.
pub struct TimerQueue {
pub struct Queue {
head: Cell<Option<TaskRef>>,
}
impl TimerQueue {
impl Queue {
/// Creates a new timer queue.
pub const fn new() -> Self {
Self { head: Cell::new(None) }
@ -19,11 +20,12 @@ impl TimerQueue {
///
/// If this function returns `true`, the called should find the next expiration time and set
/// a new alarm for that time.
pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool {
let item = p.timer_queue_item();
pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool {
let task = embassy_executor::raw::task_from_waker(waker);
let item = task.timer_queue_item();
if item.next.get().is_none() {
// If not in the queue, add it and update.
let prev = self.head.replace(Some(p));
let prev = self.head.replace(Some(task));
item.next.set(if prev.is_none() {
Some(unsafe { TaskRef::dangling() })
} else {

View File

@ -384,7 +384,6 @@ tick-hz-5_242_880_000 = ["embassy-time-driver/tick-hz-5_242_880_000"]
[dependencies]
embassy-time-driver = { version = "0.1.0", path = "../embassy-time-driver" }
embassy-time-queue-driver = { version = "0.1.0", path = "../embassy-time-queue-driver" }
defmt = { version = "0.3", optional = true }
log = { version = "0.4.14", optional = true }

View File

@ -1,7 +1,9 @@
use core::cell::RefCell;
use core::task::Waker;
use critical_section::Mutex as CsMutex;
use embassy_time_driver::Driver;
use embassy_time_queue_driver::Queue;
use crate::{Duration, Instant};
@ -52,50 +54,12 @@ impl MockDriver {
/// Advances the time by the specified [`Duration`].
/// Calling any alarm callbacks that are due.
pub fn advance(&self, duration: Duration) {
let notify = {
critical_section::with(|cs| {
let mut inner = self.0.borrow_ref_mut(cs);
inner.now += duration;
let now = inner.now.as_ticks();
if inner.alarm.timestamp <= now {
inner.alarm.timestamp = u64::MAX;
Some((inner.alarm.callback, inner.alarm.ctx))
} else {
None
}
})
};
if let Some((callback, ctx)) = notify {
(callback)(ctx);
}
}
/// Configures a callback to be called when the alarm fires.
pub fn set_alarm_callback(&self, callback: fn(*mut ()), ctx: *mut ()) {
critical_section::with(|cs| {
let mut inner = self.0.borrow_ref_mut(cs);
let inner = &mut *self.0.borrow_ref_mut(cs);
inner.alarm.callback = callback;
inner.alarm.ctx = ctx;
});
}
/// Sets the alarm to fire at the specified timestamp.
pub fn set_alarm(&self, timestamp: u64) -> bool {
critical_section::with(|cs| {
let mut inner = self.0.borrow_ref_mut(cs);
if timestamp <= inner.now.as_ticks() {
false
} else {
inner.alarm.timestamp = timestamp;
true
}
inner.now += duration;
// wake expired tasks.
inner.queue.next_expiration(inner.now.as_ticks());
})
}
}
@ -104,44 +68,38 @@ impl Driver for MockDriver {
fn now(&self) -> u64 {
critical_section::with(|cs| self.0.borrow_ref(cs).now).as_ticks()
}
fn schedule_wake(&self, at: u64, waker: &Waker) {
critical_section::with(|cs| {
let inner = &mut *self.0.borrow_ref_mut(cs);
// enqueue it
inner.queue.schedule_wake(at, waker);
// wake it if it's in the past.
inner.queue.next_expiration(inner.now.as_ticks());
})
}
}
struct InnerMockDriver {
now: Instant,
alarm: AlarmState,
queue: Queue,
}
impl InnerMockDriver {
const fn new() -> Self {
Self {
now: Instant::from_ticks(0),
alarm: AlarmState::new(),
queue: Queue::new(),
}
}
}
struct AlarmState {
timestamp: u64,
callback: fn(*mut ()),
ctx: *mut (),
}
impl AlarmState {
const fn new() -> Self {
Self {
timestamp: u64::MAX,
callback: Self::noop,
ctx: core::ptr::null_mut(),
}
}
fn noop(_ctx: *mut ()) {}
}
unsafe impl Send for AlarmState {}
#[cfg(test)]
mod tests {
use core::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Wake;
use serial_test::serial;
use super::*;
@ -163,24 +121,25 @@ mod tests {
#[test]
#[serial]
fn test_set_alarm_not_in_future() {
fn test_schedule_wake() {
setup();
let driver = MockDriver::get();
assert_eq!(false, driver.set_alarm(driver.now()));
}
static CALLBACK_CALLED: AtomicBool = AtomicBool::new(false);
#[test]
#[serial]
fn test_alarm() {
setup();
struct MockWaker;
impl Wake for MockWaker {
fn wake(self: Arc<Self>) {
CALLBACK_CALLED.store(true, Ordering::Relaxed);
}
}
let waker = Arc::new(MockWaker).into();
let driver = MockDriver::get();
static mut CALLBACK_CALLED: bool = false;
driver.set_alarm_callback(|_| unsafe { CALLBACK_CALLED = true }, core::ptr::null_mut());
driver.set_alarm(driver.now() + 1);
assert_eq!(false, unsafe { CALLBACK_CALLED });
driver.schedule_wake(driver.now() + 1, &waker);
assert_eq!(false, CALLBACK_CALLED.load(Ordering::Relaxed));
driver.advance(Duration::from_secs(1));
assert_eq!(true, unsafe { CALLBACK_CALLED });
assert_eq!(true, CALLBACK_CALLED.load(Ordering::Relaxed));
}
}

View File

@ -1,97 +1,67 @@
use std::cell::{RefCell, UnsafeCell};
use std::mem::MaybeUninit;
use std::sync::{Condvar, Mutex, Once};
use std::sync::{Condvar, Mutex};
use std::thread;
use std::time::{Duration as StdDuration, Instant as StdInstant};
use std::{ptr, thread};
use critical_section::Mutex as CsMutex;
use embassy_time_driver::Driver;
use embassy_time_queue_driver::GlobalTimerQueue;
struct AlarmState {
timestamp: u64,
}
unsafe impl Send for AlarmState {}
impl AlarmState {
const fn new() -> Self {
Self { timestamp: u64::MAX }
}
}
use embassy_time_queue_driver::Queue;
struct TimeDriver {
once: Once,
// The STD Driver implementation requires the alarm's mutex to be reentrant, which the STD Mutex isn't
// Fortunately, mutexes based on the `critical-section` crate are reentrant, because the critical sections
// themselves are reentrant
alarm: UninitCell<CsMutex<RefCell<AlarmState>>>,
zero_instant: UninitCell<StdInstant>,
signaler: UninitCell<Signaler>,
signaler: Signaler,
inner: Mutex<Inner>,
}
struct Inner {
zero_instant: Option<StdInstant>,
queue: Queue,
}
embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
once: Once::new(),
alarm: UninitCell::uninit(),
zero_instant: UninitCell::uninit(),
signaler: UninitCell::uninit(),
inner: Mutex::new(Inner{
zero_instant: None,
queue: Queue::new(),
}),
signaler: Signaler::new(),
});
impl TimeDriver {
fn init(&self) {
self.once.call_once(|| unsafe {
self.alarm
.write(CsMutex::new(RefCell::new(const { AlarmState::new() })));
self.zero_instant.write(StdInstant::now());
self.signaler.write(Signaler::new());
thread::spawn(Self::alarm_thread);
});
}
fn alarm_thread() {
let zero = unsafe { DRIVER.zero_instant.read() };
loop {
let now = DRIVER.now();
let next_alarm = critical_section::with(|cs| {
let mut alarm = unsafe { DRIVER.alarm.as_ref() }.borrow_ref_mut(cs);
if alarm.timestamp <= now {
alarm.timestamp = u64::MAX;
TIMER_QUEUE_DRIVER.dispatch();
}
alarm.timestamp
});
// Ensure we don't overflow
let until = zero
.checked_add(StdDuration::from_micros(next_alarm))
.unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
unsafe { DRIVER.signaler.as_ref() }.wait_until(until);
}
}
fn set_alarm(&self, timestamp: u64) -> bool {
self.init();
critical_section::with(|cs| {
let mut alarm = unsafe { self.alarm.as_ref() }.borrow_ref_mut(cs);
alarm.timestamp = timestamp;
unsafe { self.signaler.as_ref() }.signal();
});
true
impl Inner {
fn init(&mut self) -> StdInstant {
*self.zero_instant.get_or_insert_with(|| {
thread::spawn(alarm_thread);
StdInstant::now()
})
}
}
impl Driver for TimeDriver {
fn now(&self) -> u64 {
self.init();
let zero = unsafe { self.zero_instant.read() };
let mut inner = self.inner.lock().unwrap();
let zero = inner.init();
StdInstant::now().duration_since(zero).as_micros() as u64
}
fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
let mut inner = self.inner.lock().unwrap();
inner.init();
if inner.queue.schedule_wake(at, waker) {
self.signaler.signal();
}
}
}
fn alarm_thread() {
let zero = DRIVER.inner.lock().unwrap().zero_instant.unwrap();
loop {
let now = DRIVER.now();
let next_alarm = DRIVER.inner.lock().unwrap().queue.next_expiration(now);
// Ensure we don't overflow
let until = zero
.checked_add(StdDuration::from_micros(next_alarm))
.unwrap_or_else(|| StdInstant::now() + StdDuration::from_secs(1));
DRIVER.signaler.wait_until(until);
}
}
struct Signaler {
@ -100,7 +70,7 @@ struct Signaler {
}
impl Signaler {
fn new() -> Self {
const fn new() -> Self {
Self {
mutex: Mutex::new(false),
condvar: Condvar::new(),
@ -132,40 +102,3 @@ impl Signaler {
self.condvar.notify_one();
}
}
pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
unsafe impl<T> Send for UninitCell<T> {}
unsafe impl<T> Sync for UninitCell<T> {}
impl<T> UninitCell<T> {
pub const fn uninit() -> Self {
Self(MaybeUninit::uninit())
}
pub unsafe fn as_ptr(&self) -> *const T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_mut_ptr(&self) -> *mut T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_ref(&self) -> &T {
&*self.as_ptr()
}
pub unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
}
}
impl<T: Copy> UninitCell<T> {
pub unsafe fn read(&self) -> T {
ptr::read(self.as_mut_ptr())
}
}
embassy_time_queue_driver::timer_queue_impl!(
static TIMER_QUEUE_DRIVER: GlobalTimerQueue
= GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration))
);

View File

@ -1,10 +1,7 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::{Mutex, Once};
use std::sync::Mutex;
use embassy_time_driver::Driver;
use embassy_time_queue_driver::GlobalTimerQueue;
use embassy_time_queue_driver::Queue;
use wasm_bindgen::prelude::*;
use wasm_timer::Instant as StdInstant;
@ -12,8 +9,6 @@ struct AlarmState {
token: Option<f64>,
}
unsafe impl Send for AlarmState {}
impl AlarmState {
const fn new() -> Self {
Self { token: None }
@ -27,33 +22,38 @@ extern "C" {
}
struct TimeDriver {
once: Once,
alarm: UninitCell<Mutex<AlarmState>>,
zero_instant: UninitCell<StdInstant>,
closure: UninitCell<Closure<dyn FnMut()>>,
inner: Mutex<Inner>,
}
struct Inner {
alarm: AlarmState,
zero_instant: Option<StdInstant>,
queue: Queue,
closure: Option<Closure<dyn FnMut()>>,
}
unsafe impl Send for Inner {}
embassy_time_driver::time_driver_impl!(static DRIVER: TimeDriver = TimeDriver {
once: Once::new(),
alarm: UninitCell::uninit(),
zero_instant: UninitCell::uninit(),
closure: UninitCell::uninit()
inner: Mutex::new(Inner{
zero_instant: None,
queue: Queue::new(),
alarm: AlarmState::new(),
closure: None,
}),
});
impl TimeDriver {
fn init(&self) {
self.once.call_once(|| unsafe {
self.alarm.write(Mutex::new(const { AlarmState::new() }));
self.zero_instant.write(StdInstant::now());
self.closure
.write(Closure::new(Box::new(|| TIMER_QUEUE_DRIVER.dispatch())));
});
impl Inner {
fn init(&mut self) -> StdInstant {
*self.zero_instant.get_or_insert_with(StdInstant::now)
}
fn set_alarm(&self, timestamp: u64) -> bool {
self.init();
let mut alarm = unsafe { self.alarm.as_ref() }.lock().unwrap();
if let Some(token) = alarm.token {
fn now(&mut self) -> u64 {
StdInstant::now().duration_since(self.zero_instant.unwrap()).as_micros() as u64
}
fn set_alarm(&mut self, timestamp: u64) -> bool {
if let Some(token) = self.alarm.token {
clearTimeout(token);
}
@ -62,7 +62,8 @@ impl TimeDriver {
false
} else {
let timeout = (timestamp - now) as u32;
alarm.token = Some(setTimeout(unsafe { self.closure.as_ref() }, timeout / 1000));
let closure = self.closure.get_or_insert_with(|| Closure::new(dispatch));
self.alarm.token = Some(setTimeout(closure, timeout / 1000));
true
}
@ -71,45 +72,32 @@ impl TimeDriver {
impl Driver for TimeDriver {
fn now(&self) -> u64 {
self.init();
let zero = unsafe { self.zero_instant.read() };
let mut inner = self.inner.lock().unwrap();
let zero = inner.init();
StdInstant::now().duration_since(zero).as_micros() as u64
}
}
pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
unsafe impl<T> Send for UninitCell<T> {}
unsafe impl<T> Sync for UninitCell<T> {}
impl<T> UninitCell<T> {
pub const fn uninit() -> Self {
Self(MaybeUninit::uninit())
}
unsafe fn as_ptr(&self) -> *const T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_mut_ptr(&self) -> *mut T {
(*self.0.as_ptr()).get()
}
pub unsafe fn as_ref(&self) -> &T {
&*self.as_ptr()
}
pub unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
let mut inner = self.inner.lock().unwrap();
inner.init();
if inner.queue.schedule_wake(at, waker) {
let now = inner.now();
let mut next = inner.queue.next_expiration(now);
while !inner.set_alarm(next) {
let now = inner.now();
next = inner.queue.next_expiration(now);
}
}
}
}
impl<T: Copy> UninitCell<T> {
pub unsafe fn read(&self) -> T {
ptr::read(self.as_mut_ptr())
fn dispatch() {
let inner = &mut *DRIVER.inner.lock().unwrap();
let now = inner.now();
let mut next = inner.queue.next_expiration(now);
while !inner.set_alarm(next) {
let now = inner.now();
next = inner.queue.next_expiration(now);
}
}
embassy_time_queue_driver::timer_queue_impl!(
static TIMER_QUEUE_DRIVER: GlobalTimerQueue
= GlobalTimerQueue::new(|next_expiration| DRIVER.set_alarm(next_expiration))
);