Merge pull request #4550 from bugadani/time-queue

Make timer queue item opaque, move to new crate
This commit is contained in:
Ulf Lilleengen 2025-08-25 17:28:38 +00:00 committed by GitHub
commit ac60eaeddd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 283 additions and 146 deletions

View File

@ -45,6 +45,8 @@ jobs:
- 'embassy-executor/**'
embassy-executor-macros:
- 'embassy-executor-macros/**'
embassy-executor-timer-queue:
- 'embassy-executor-timer-queue/**'
embassy-futures:
- 'embassy-futures/**'
embassy-imxrt:
@ -162,6 +164,13 @@ jobs:
changeLogPath: embassy-executor-macros/CHANGELOG.md
skipLabels: "skip-changelog"
missingUpdateErrorMessage: "Please add a changelog entry in the embassy-executor-macros/CHANGELOG.md file."
- name: Check that changelog updated (embassy-executor-timer-queue)
if: steps.changes.outputs.embassy-executor-timer-queue == 'true'
uses: dangoslen/changelog-enforcer@v3
with:
changeLogPath: embassy-executor-timer-queue/CHANGELOG.md
skipLabels: "skip-changelog"
missingUpdateErrorMessage: "Please add a changelog entry in the embassy-executor-timer-queue/CHANGELOG.md file."
- name: Check that changelog updated (embassy-futures)
if: steps.changes.outputs.embassy-futures == 'true'
uses: dangoslen/changelog-enforcer@v3

View File

@ -0,0 +1,10 @@
# Changelog for embassy-time-queue-utils
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreeased
- Initial implementation

View File

@ -0,0 +1,35 @@
[package]
name = "embassy-executor-timer-queue"
version = "0.1.0"
edition = "2021"
description = "Timer queue item and interface between embassy-executor and timer queues"
repository = "https://github.com/embassy-rs/embassy"
documentation = "https://docs.embassy.dev/embassy-executor-timer-queue"
readme = "README.md"
license = "MIT OR Apache-2.0"
categories = [
"embedded",
"no-std",
"concurrency",
"asynchronous",
]
[dependencies]
[features]
#! ### Timer Queue Item Size
#! Sets the size of the timer items.
## 4 words
timer-item-size-4-words = []
## 6 words
timer-item-size-6-words = []
## 8 words
timer-item-size-8-words = []
[package.metadata.embassy_docs]
src_base = "https://github.com/embassy-rs/embassy/blob/embassy-executor-timer-queue-v$VERSION/embassy-executor-timer-queue/src/"
src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-executor-timer-queue/src/"
target = "x86_64-unknown-linux-gnu"

View File

@ -0,0 +1,10 @@
# embassy-executor-time-queue
This crate defines the timer queue item that embassy-executor provides, and a way to access it, for
executor-integrated timer queues. The crate decouples the release cycle of embassy-executor from
that of the queue implementations'.
As a HAL implementer, you only need to depend on this crate if you want to implement executor-integrated
timer queues yourself, without using [`embassy-time-queue-utils`](https://crates.io/crates/embassy-time-queue-utils).
As a HAL user, you should not need to depend on this crate.

View File

@ -0,0 +1,104 @@
//! Timer queue item for embassy-executor integrated timer queues
//!
//! `embassy-executor` provides the memory needed to implement integrated timer queues. This crate
//! exists to separate that memory from `embassy-executor` itself, to decouple the timer queue's
//! release cycle from `embassy-executor`.
//!
//! This crate contains two things:
//! - [`TimerQueueItem`]: The item type that can be requested from the executor. The size of this
//! type can be configured using the `timer-item-size-N-words` Cargo features.
//! - The expectation that `extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &mut TimerQueueItem`
//! is implemented (by `embassy-executor`, most likely). This function must return a mutable
//! reference to the `TimerQueueItem` associated with the given waker.
//!
//! As a queue implementor, you will need to choose one of the `timer-item-size-N-words` features to
//! select a queue item size. You can then define your own item type, which must be
//! `#[repr(align(8))]` (or less) and must fit into the size you selected.
//!
//! You can access the `TimerQueueItem` from a `Waker` using the [`from_embassy_waker`](TimerQueueItem::from_embassy_waker)
//! method. You can then use the [`as_ref`](TimerQueueItem::as_ref) and [`as_mut`](TimerQueueItem::as_mut)
//! methods to reinterpret the data stored in the item as your custom item type.
#![no_std]
use core::task::Waker;
const ITEM_WORDS: usize = if cfg!(feature = "timer-item-size-8-words") {
8
} else if cfg!(feature = "timer-item-size-6-words") {
6
} else if cfg!(feature = "timer-item-size-4-words") {
4
} else {
0
};
/// The timer queue item provided by the executor.
///
/// This type is opaque, it only provides the raw storage for a queue item. The queue implementation
/// is responsible for reinterpreting the contents of the item using [`TimerQueueItem::as_ref`] and
/// [`TimerQueueItem::as_mut`].
#[repr(align(8))]
pub struct TimerQueueItem {
data: [usize; ITEM_WORDS],
}
impl TimerQueueItem {
/// Creates a new, zero-initialized `TimerQueueItem`.
pub const fn new() -> Self {
Self { data: [0; ITEM_WORDS] }
}
/// Retrieves the `TimerQueueItem` reference that belongs to the task of the waker.
///
/// Panics if called with a non-embassy waker.
///
/// # Safety
///
/// The caller must ensure they are not violating Rust's aliasing rules - it is not allowed
/// to use this method to create multiple mutable references to the same `TimerQueueItem` at
/// the same time.
///
/// This function must only be called in the context of a timer queue implementation.
pub unsafe fn from_embassy_waker(waker: &Waker) -> &'static mut Self {
unsafe extern "Rust" {
// Waker -> TimerQueueItem, validates that Waker is an embassy Waker.
fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem;
}
unsafe { __embassy_time_queue_item_from_waker(waker) }
}
/// Access the data as a reference to a type `T`.
///
/// Safety:
///
/// - The type must be valid when zero-initialized.
/// - The timer queue should only be interpreted as a single type `T` during its lifetime.
pub unsafe fn as_ref<T>(&self) -> &T {
const { validate::<T>() }
unsafe { &*(self.data.as_ptr() as *const T) }
}
/// Access the data as a reference to a type `T`.
///
/// Safety:
///
/// - The type must be valid when zero-initialized.
/// - The timer queue should only be interpreted as a single type `T` during its lifetime.
pub unsafe fn as_mut<T>(&self) -> &mut T {
const { validate::<T>() }
unsafe { &mut *(self.data.as_ptr() as *mut T) }
}
}
const fn validate<T>() {
const {
assert!(
core::mem::size_of::<TimerQueueItem>() >= core::mem::size_of::<T>(),
"embassy-executor-timer-queue item size is smaller than the requested type. Select a larger timer-item-size-N-words feature."
);
assert!(
core::mem::align_of::<TimerQueueItem>() >= core::mem::align_of::<T>(),
"the alignment of the requested type is greater than 8"
);
}
}

View File

@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->
## Unreleased - ReleaseDate
- Added `extern "Rust" fn __embassy_time_queue_item_from_waker`
- Removed `TaskRef::dangling`
- Added `embassy_time_queue_utils` as a dependency
- Moved the `TimeQueueItem` struct and `timer-item-payload-size-*` features into embassy-time-queue-utils
## 0.8.0 - 2025-07-31
- Added `SpawnToken::id`

View File

@ -35,6 +35,7 @@ rtos-trace = { version = "0.1.3", optional = true }
embassy-executor-macros = { version = "0.7.0", path = "../embassy-executor-macros" }
embassy-time-driver = { version = "0.2", path = "../embassy-time-driver", optional = true }
embassy-executor-timer-queue = { version = "0.1", path = "../embassy-executor-timer-queue" }
critical-section = "1.1"
document-features = "0.2.7"
@ -98,19 +99,3 @@ executor-interrupt = []
trace = []
## Enable support for rtos-trace framework
rtos-trace = ["dep:rtos-trace", "trace", "dep:embassy-time-driver"]
#! ### Timer Item Payload Size
#! Sets the size of the payload for timer items, allowing integrated timer implementors to store
#! additional data in the timer item. The payload field will be aligned to this value as well.
#! If these features are not defined, the timer item will contain no payload field.
_timer-item-payload = [] # A size was picked
## 1 bytes
timer-item-payload-size-1 = ["_timer-item-payload"]
## 2 bytes
timer-item-payload-size-2 = ["_timer-item-payload"]
## 4 bytes
timer-item-payload-size-4 = ["_timer-item-payload"]
## 8 bytes
timer-item-payload-size-8 = ["_timer-item-payload"]

View File

@ -16,7 +16,6 @@ mod run_queue;
#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")]
mod state;
pub mod timer_queue;
#[cfg(feature = "trace")]
pub mod trace;
pub(crate) mod util;
@ -31,8 +30,9 @@ use core::ptr::NonNull;
#[cfg(not(feature = "arch-avr"))]
use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering;
use core::task::{Context, Poll};
use core::task::{Context, Poll, Waker};
use embassy_executor_timer_queue::TimerQueueItem;
#[cfg(feature = "arch-avr")]
use portable_atomic::AtomicPtr;
@ -42,6 +42,11 @@ use self::util::{SyncUnsafeCell, UninitCell};
pub use self::waker::task_from_waker;
use super::SpawnToken;
#[no_mangle]
extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
unsafe { task_from_waker(waker).timer_queue_item() }
}
/// Raw task header for use in task pointers.
///
/// A task can be in one of the following states:
@ -88,7 +93,7 @@ pub(crate) struct TaskHeader {
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
/// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
pub(crate) timer_queue_item: TimerQueueItem,
#[cfg(feature = "trace")]
pub(crate) name: Option<&'static str>,
#[cfg(feature = "trace")]
@ -120,16 +125,6 @@ impl TaskRef {
}
}
/// # Safety
///
/// The result of this function must only be compared
/// for equality, or stored, but not used.
pub const unsafe fn dangling() -> Self {
Self {
ptr: NonNull::dangling(),
}
}
pub(crate) fn header(self) -> &'static TaskHeader {
unsafe { self.ptr.as_ref() }
}
@ -140,9 +135,13 @@ impl TaskRef {
executor.as_ref().map(|e| Executor::wrap(e))
}
/// Returns a reference to the timer queue item.
pub fn timer_queue_item(&self) -> &'static timer_queue::TimerQueueItem {
&self.header().timer_queue_item
/// Returns a mutable reference to the timer queue item.
///
/// Safety
///
/// This function must only be called in the context of the integrated timer queue.
pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
unsafe { &mut self.ptr.as_mut().timer_queue_item }
}
/// The returned pointer is valid for the entire TaskStorage.
@ -189,7 +188,7 @@ impl<F: Future + 'static> TaskStorage<F> {
// Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
poll_fn: SyncUnsafeCell::new(None),
timer_queue_item: timer_queue::TimerQueueItem::new(),
timer_queue_item: TimerQueueItem::new(),
#[cfg(feature = "trace")]
name: None,
#[cfg(feature = "trace")]

View File

@ -1,73 +0,0 @@
//! Timer queue operations.
use core::cell::Cell;
use super::TaskRef;
#[cfg(feature = "_timer-item-payload")]
macro_rules! define_opaque {
($size:tt) => {
/// An opaque data type.
#[repr(align($size))]
pub struct OpaqueData {
data: [u8; $size],
}
impl OpaqueData {
const fn new() -> Self {
Self { data: [0; $size] }
}
/// Access the data as a reference to a type `T`.
///
/// Safety:
///
/// The caller must ensure that the size of the type `T` is less than, or equal to
/// the size of the payload, and must ensure that the alignment of the type `T` is
/// less than, or equal to the alignment of the payload.
///
/// The type must be valid when zero-initialized.
pub unsafe fn as_ref<T>(&self) -> &T {
&*(self.data.as_ptr() as *const T)
}
}
};
}
#[cfg(feature = "timer-item-payload-size-1")]
define_opaque!(1);
#[cfg(feature = "timer-item-payload-size-2")]
define_opaque!(2);
#[cfg(feature = "timer-item-payload-size-4")]
define_opaque!(4);
#[cfg(feature = "timer-item-payload-size-8")]
define_opaque!(8);
/// An item in the timer queue.
pub struct TimerQueueItem {
/// The next item in the queue.
///
/// If this field contains `Some`, the item is in the queue. The last item in the queue has a
/// value of `Some(dangling_pointer)`
pub next: Cell<Option<TaskRef>>,
/// The time at which this item expires.
pub expires_at: Cell<u64>,
/// Some implementation-defined, zero-initialized piece of data.
#[cfg(feature = "_timer-item-payload")]
pub payload: OpaqueData,
}
unsafe impl Sync for TimerQueueItem {}
impl TimerQueueItem {
pub(crate) const fn new() -> Self {
Self {
next: Cell::new(None),
expires_at: Cell::new(0),
#[cfg(feature = "_timer-item-payload")]
payload: OpaqueData::new(),
}
}
}

View File

@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
- Removed the embassy-executor dependency
## 0.2.0 - 2025-08-04
Bumped embassy-executor
## 0.1.0 - 2024-01-11
Initial release

View File

@ -22,7 +22,7 @@ links = "embassy-time-queue"
[dependencies]
heapless = "0.8"
embassy-executor = { version = "0.8", path = "../embassy-executor" }
embassy-executor-timer-queue = { version = "0.1", path = "../embassy-executor-timer-queue", features = ["timer-item-size-6-words"] }
[features]
#! ### Generic Queue

View File

@ -1,7 +1,6 @@
#![no_std]
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
#![deny(missing_debug_implementations)]
#[cfg(feature = "_generic-queue")]
pub mod queue_generic;

View File

@ -1,16 +1,50 @@
//! Timer queue operations.
use core::cell::Cell;
use core::cmp::min;
use core::ptr::NonNull;
use core::task::Waker;
use embassy_executor::raw::TaskRef;
use embassy_executor_timer_queue::TimerQueueItem;
/// An item in the timer queue.
#[derive(Default)]
struct QueueItem {
/// The next item in the queue.
///
/// If this field contains `Some`, the item is in the queue. The last item in the queue has a
/// value of `Some(dangling_pointer)`
pub next: Cell<Option<NonNull<QueueItem>>>,
/// The time at which this item expires.
pub expires_at: u64,
/// The registered waker. If Some, the item is enqueued in the timer queue.
pub waker: Option<Waker>,
}
unsafe impl Sync for QueueItem {}
/// A timer queue, with items integrated into tasks.
#[derive(Debug)]
///
/// # Safety
///
/// **This Queue is only safe when there is a single integrated queue in the system.**
///
/// If there are multiple integrated queues, additional checks are necessary to ensure that a Waker
/// is not attempted to be enqueued in multiple queues.
pub struct Queue {
head: Cell<Option<TaskRef>>,
head: Cell<Option<NonNull<QueueItem>>>,
}
impl core::fmt::Debug for Queue {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Queue").finish()
}
}
unsafe impl Send for Queue {}
unsafe impl Sync for Queue {}
impl Queue {
/// Creates a new timer queue.
pub const fn new() -> Self {
@ -22,25 +56,41 @@ impl Queue {
/// If this function returns `true`, the called should find the next expiration time and set
/// a new alarm for that time.
pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool {
let task = embassy_executor::raw::task_from_waker(waker);
let item = task.timer_queue_item();
if item.next.get().is_none() {
// If not in the queue, add it and update.
let prev = self.head.replace(Some(task));
item.next.set(if prev.is_none() {
Some(unsafe { TaskRef::dangling() })
} else {
prev
});
item.expires_at.set(at);
true
} else if at <= item.expires_at.get() {
// If expiration is sooner than previously set, update.
item.expires_at.set(at);
true
} else {
// Task does not need to be updated.
false
let item = unsafe {
// Safety: the `&mut self`, along with the Safety note of the Queue, are sufficient to
// ensure that this function creates the only mutable reference to the queue item.
TimerQueueItem::from_embassy_waker(waker)
};
let item = unsafe { item.as_mut::<QueueItem>() };
match item.waker.as_ref() {
Some(_) if at <= item.expires_at => {
// If expiration is sooner than previously set, update.
item.expires_at = at;
// The waker is always stored in its own queue item, so we don't need to update it.
// Trigger a queue update in case this item can be immediately dequeued.
true
}
Some(_) => {
// Queue item does not need to be updated, the task will be scheduled to be woken
// before the new expiration.
false
}
None => {
// If not in the queue, add it and update.
let mut item_ptr = NonNull::from(item);
let prev = self.head.replace(Some(item_ptr));
let item = unsafe { item_ptr.as_mut() };
item.expires_at = at;
item.waker = Some(waker.clone());
item.next.set(prev);
// The default implementation doesn't care about the
// opaque payload, leave it unchanged.
true
}
}
}
@ -51,33 +101,29 @@ impl Queue {
pub fn next_expiration(&mut self, now: u64) -> u64 {
let mut next_expiration = u64::MAX;
self.retain(|p| {
let item = p.timer_queue_item();
let expires = item.expires_at.get();
if expires <= now {
self.retain(|item| {
if item.expires_at <= now {
// Timer expired, process task.
embassy_executor::raw::wake_task(p);
if let Some(waker) = item.waker.take() {
waker.wake();
}
false
} else {
// Timer didn't yet expire, or never expires.
next_expiration = min(next_expiration, expires);
expires != u64::MAX
next_expiration = min(next_expiration, item.expires_at);
item.expires_at != u64::MAX
}
});
next_expiration
}
fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
fn retain(&mut self, mut f: impl FnMut(&mut QueueItem) -> bool) {
let mut prev = &self.head;
while let Some(p) = prev.get() {
if unsafe { p == TaskRef::dangling() } {
// prev was the last item, stop
break;
}
let item = p.timer_queue_item();
if f(p) {
while let Some(mut p) = prev.get() {
let mut item = unsafe { p.as_mut() };
if f(&mut item) {
// Skip to next
prev = &item.next;
} else {