Remove ConcurrentQueue & other minor tweaks (#4058)

* Remove ConcurrentQueue

* Unrequire preempt driver for 802.15.4
This commit is contained in:
Dániel Buga 2025-09-05 16:59:08 +02:00 committed by GitHub
parent 105c163306
commit 24d2122c14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 150 additions and 248 deletions

View File

@ -68,6 +68,31 @@ impl QueueInner {
true
}
unsafe fn remove(&mut self, item: *const u8) {
// do what the ESP-IDF implementations does...
// just remove all elements and add them back except the one we need to remove -
// good enough for now
let count = self.len();
if count == 0 {
return;
}
let mut tmp_item = vec![0; self.item_size];
let item_slice = unsafe { core::slice::from_raw_parts(item, self.item_size) };
for _ in 0..count {
if !unsafe { self.try_dequeue(tmp_item.as_mut_ptr().cast()) } {
break;
}
if &tmp_item[..] != item_slice {
_ = unsafe { self.try_enqueue(tmp_item.as_mut_ptr().cast()) };
}
// Note that even if we find our item, we'll need to keep cycling through everything to
// keep insertion order.
}
}
fn len(&self) -> usize {
if self.current_write >= self.current_read {
self.current_write - self.current_read
@ -132,6 +157,10 @@ impl Queue {
self.inner.with(|queue| unsafe { queue.try_dequeue(item) })
}
unsafe fn remove(&self, item: *const u8) {
self.inner.with(|queue| unsafe { queue.remove(item) })
}
fn messages_waiting(&self) -> usize {
self.inner.with(|queue| queue.len())
}
@ -176,6 +205,12 @@ impl QueueImplementation for Queue {
unsafe { queue.try_receive(item) }
}
unsafe fn remove(queue: QueuePtr, item: *const u8) {
let queue = unsafe { Queue::from_ptr(queue) };
unsafe { queue.remove(item) }
}
fn messages_waiting(queue: QueuePtr) -> usize {
let queue = unsafe { Queue::from_ptr(queue) };

View File

@ -22,6 +22,7 @@ unsafe extern "Rust" {
fn esp_preempt_queue_try_send_to_back(queue: QueuePtr, item: *const u8) -> bool;
fn esp_preempt_queue_receive(queue: QueuePtr, item: *mut u8, timeout_us: Option<u32>) -> bool;
fn esp_preempt_queue_try_receive(queue: QueuePtr, item: *mut u8) -> bool;
fn esp_preempt_queue_remove(queue: QueuePtr, item: *const u8);
fn esp_preempt_queue_messages_waiting(queue: QueuePtr) -> usize;
}
@ -97,6 +98,14 @@ pub trait QueueImplementation {
/// a size equal to the queue's item size.
unsafe fn try_receive(queue: QueuePtr, item: *mut u8) -> bool;
/// Removes an item from the queue.
///
/// # Safety
///
/// The caller must ensure that `item` can be dereferenced and points to an allocation of
/// a size equal to the queue's item size.
unsafe fn remove(queue: QueuePtr, item: *const u8);
/// Returns the number of messages in the queue.
fn messages_waiting(queue: QueuePtr) -> usize;
}
@ -162,6 +171,12 @@ macro_rules! register_queue_implementation {
unsafe { <$t as $crate::queue::QueueImplementation>::try_receive(queue, item) }
}
#[unsafe(no_mangle)]
#[inline]
fn esp_preempt_queue_remove(queue: QueuePtr, item: *mut u8) {
unsafe { <$t as $crate::queue::QueueImplementation>::remove(queue, item) }
}
#[unsafe(no_mangle)]
#[inline]
fn esp_preempt_queue_messages_waiting(queue: QueuePtr) -> usize {
@ -278,6 +293,16 @@ impl QueueHandle {
unsafe { esp_preempt_queue_try_receive(self.0, item) }
}
/// Removes an item from the queue.
///
/// # Safety
///
/// The caller must ensure that `item` can be dereferenced and points to an allocation of
/// a size equal to the queue's item size.
pub unsafe fn remove(&self, item: *const u8) {
unsafe { esp_preempt_queue_remove(self.0, item) }
}
/// Returns the number of messages in the queue.
pub fn messages_waiting(&self) -> usize {
unsafe { esp_preempt_queue_messages_waiting(self.0) }

View File

@ -1,19 +1,15 @@
use alloc::boxed::Box;
use core::{
mem::{size_of_val, transmute},
mem::transmute,
ptr::{addr_of, addr_of_mut},
};
use esp_hal::time::{Duration, Instant};
use esp_hal::time::Instant;
use super::*;
use crate::{
binary::{c_types::*, include::*},
compat::{
self,
common::{ConcurrentQueue, str_from_c},
},
preempt::yield_task,
compat::{self, OSI_FUNCS_TIME_BLOCKING, common::str_from_c, queue},
};
#[cfg_attr(esp32c2, path = "os_adapter_esp32c2.rs")]
@ -426,24 +422,25 @@ unsafe extern "C" fn esp_intr_free(_ret_handle: *mut *mut c_void) -> i32 {
}
#[repr(C)]
#[allow(non_camel_case_types)]
/// Contains pointers to functions used by the BLE NPL (Non-Preemptive Layer).
pub(crate) struct npl_funcs_t {
p_ble_npl_os_started: Option<unsafe extern "C" fn() -> bool>,
p_ble_npl_get_current_task_id: Option<unsafe extern "C" fn() -> *const c_void>,
p_ble_npl_eventq_init: Option<unsafe extern "C" fn(queue: *const ble_npl_eventq)>,
p_ble_npl_eventq_deinit: Option<unsafe extern "C" fn(queue: *const ble_npl_eventq)>,
p_ble_npl_eventq_init: Option<unsafe extern "C" fn(queue: *mut ble_npl_eventq)>,
p_ble_npl_eventq_deinit: Option<unsafe extern "C" fn(queue: *mut ble_npl_eventq)>,
p_ble_npl_eventq_get: Option<
unsafe extern "C" fn(
queue: *const ble_npl_eventq,
queue: *mut ble_npl_eventq,
time: ble_npl_time_t,
) -> *const ble_npl_event,
>,
p_ble_npl_eventq_put:
Option<unsafe extern "C" fn(queue: *const ble_npl_eventq, event: *const ble_npl_event)>,
Option<unsafe extern "C" fn(queue: *mut ble_npl_eventq, event: *const ble_npl_event)>,
p_ble_npl_eventq_remove:
Option<unsafe extern "C" fn(queue: *const ble_npl_eventq, event: *const ble_npl_event)>,
Option<unsafe extern "C" fn(queue: *mut ble_npl_eventq, event: *const ble_npl_event)>,
p_ble_npl_event_run: Option<unsafe extern "C" fn(event: *const ble_npl_event)>,
p_ble_npl_eventq_is_empty: Option<unsafe extern "C" fn(queue: *const ble_npl_eventq) -> bool>,
p_ble_npl_eventq_is_empty: Option<unsafe extern "C" fn(queue: *mut ble_npl_eventq) -> bool>,
p_ble_npl_event_init: Option<
unsafe extern "C" fn(
event: *const ble_npl_event,
@ -630,22 +627,28 @@ unsafe extern "C" fn ble_npl_hw_set_isr(_no: i32, _mask: u32) {
todo!()
}
// NPL counts in milliseconds. Let's not lose range by using micros for our tick rate.
const fn npl_time_to_ms(time: ble_npl_time_t) -> u32 {
time
}
const fn npl_ms_to_time(ms: u32) -> ble_npl_time_t {
ms
}
unsafe extern "C" fn ble_npl_time_delay(time: ble_npl_time_t) {
let start = Instant::now();
let timeout = Duration::from_millis(time as u64);
while start.elapsed() <= timeout {
yield_task();
}
let time = npl_time_to_ms(time);
crate::preempt::usleep(time * 1000);
}
unsafe extern "C" fn ble_npl_time_ticks_to_ms32(time: ble_npl_time_t) -> u32 {
trace!("ble_npl_time_ticks_to_ms32 {}", time);
time
npl_time_to_ms(time)
}
unsafe extern "C" fn ble_npl_time_ms_to_ticks32(ms: u32) -> ble_npl_time_t {
trace!("ble_npl_time_ms_to_ticks32 {}", ms);
ms
npl_ms_to_time(ms)
}
unsafe extern "C" fn ble_npl_time_ticks_to_ms(
@ -653,9 +656,7 @@ unsafe extern "C" fn ble_npl_time_ticks_to_ms(
p_ms: *mut u32,
) -> ble_npl_error_t {
trace!("ble_npl_time_ticks_to_ms {}", time);
unsafe {
*p_ms = time;
}
unsafe { *p_ms = npl_time_to_ms(time) };
0
}
@ -664,9 +665,7 @@ unsafe extern "C" fn ble_npl_time_ms_to_ticks(
p_time: *mut ble_npl_time_t,
) -> ble_npl_error_t {
trace!("ble_npl_time_ms_to_ticks {}", ms);
unsafe {
*p_time = ms;
}
unsafe { *p_time = npl_ms_to_time(ms) };
0
}
@ -856,12 +855,11 @@ unsafe extern "C" fn ble_npl_event_init(
}
}
unsafe extern "C" fn ble_npl_eventq_is_empty(queue: *const ble_npl_eventq) -> bool {
unsafe extern "C" fn ble_npl_eventq_is_empty(queue: *mut ble_npl_eventq) -> bool {
trace!("ble_npl_eventq_is_empty {:?}", queue);
let wrapper = unwrap!(unsafe { queue.as_mut() }, "queue wrapper is null");
let queue = unsafe { (*queue).dummy } as *mut ConcurrentQueue;
assert!(!queue.is_null());
unsafe { (*queue).count() == 0 }
queue::queue_messages_waiting(wrapper.dummy as _) == 0
}
unsafe extern "C" fn ble_npl_event_run(event: *const ble_npl_event) {
@ -884,12 +882,12 @@ unsafe extern "C" fn ble_npl_event_run(event: *const ble_npl_event) {
}
unsafe extern "C" fn ble_npl_eventq_remove(
queue: *const ble_npl_eventq,
queue: *mut ble_npl_eventq,
event: *const ble_npl_event,
) {
trace!("ble_npl_eventq_remove {:?} {:?}", queue, event);
unsafe {
assert!((*queue).dummy != 0);
let evt = (*event).dummy as *mut Event;
assert!(!evt.is_null());
@ -897,17 +895,16 @@ unsafe extern "C" fn ble_npl_eventq_remove(
return;
}
let queue = (*queue).dummy as *mut ConcurrentQueue;
(*queue).remove(addr_of!(event) as *mut _);
let wrapper = unwrap!(queue.as_mut(), "queue wrapper is null");
queue::queue_remove(wrapper.dummy as _, (&raw const event).cast());
(*evt).queued = false;
}
}
unsafe extern "C" fn ble_npl_eventq_put(queue: *const ble_npl_eventq, event: *const ble_npl_event) {
unsafe extern "C" fn ble_npl_eventq_put(queue: *mut ble_npl_eventq, event: *const ble_npl_event) {
trace!("ble_npl_eventq_put {:?} {:?}", queue, event);
assert!(unsafe { (*queue).dummy } != 0);
let evt = unsafe { (*event).dummy } as *mut Event;
assert!(!evt.is_null());
@ -920,64 +917,52 @@ unsafe extern "C" fn ble_npl_eventq_put(queue: *const ble_npl_eventq, event: *co
(*evt).queued = true;
}
let queue = unsafe { (*queue).dummy } as *mut ConcurrentQueue;
let mut event = event as usize;
unsafe {
(*queue).enqueue(addr_of_mut!(event).cast());
}
let wrapper = unwrap!(unsafe { queue.as_mut() }, "queue wrapper is null");
// Store the pointer to the ble_npl_event in the queue - this is what we'll need to dequeue.
queue::queue_send_to_back(
wrapper.dummy as _,
(&raw const event).cast(),
OSI_FUNCS_TIME_BLOCKING,
);
}
unsafe extern "C" fn ble_npl_eventq_get(
queue: *const ble_npl_eventq,
time: ble_npl_time_t,
queue: *mut ble_npl_eventq,
timeout: ble_npl_time_t,
) -> *const ble_npl_event {
trace!("ble_npl_eventq_get {:?} {}", queue, time);
trace!("ble_npl_eventq_get {:?} {}", queue, timeout);
let queue = unsafe { (*queue).dummy } as *mut ConcurrentQueue;
let mut evt = core::ptr::null_mut::<ble_npl_event>();
let wrapper = unwrap!(unsafe { queue.as_mut() }, "queue wrapper is null");
let timeout = if time == TIME_FOREVER {
None
} else {
Some(Duration::from_millis(time as u64))
};
let start = Instant::now();
let mut event: usize = 0;
loop {
if unsafe { (*queue).try_dequeue(addr_of_mut!(event).cast()) } {
let event = event as *mut ble_npl_event;
let evt = unsafe { (*event).dummy } as *mut Event;
trace!("got {:x}", evt as usize);
unsafe {
(*evt).queued = false;
}
return event as *const ble_npl_event;
if queue::queue_receive(wrapper.dummy as _, (&raw mut evt).cast(), timeout) == 1 {
trace!("got {:x}", evt as usize);
unsafe {
let evt = (*evt).dummy as *mut Event;
(*evt).queued = false;
}
}
if let Some(timeout) = timeout
&& start.elapsed() >= timeout
{
debug!("No event in queue after timeout: {:?}", timeout);
return core::ptr::null();
}
evt.cast_const()
}
yield_task();
unsafe extern "C" fn ble_npl_eventq_init(queue: *mut ble_npl_eventq) {
trace!("ble_npl_eventq_init {:?}", queue);
let queue_ptr = queue::queue_create(EVENT_QUEUE_SIZE as _, core::mem::size_of::<usize>() as _);
unsafe {
(*queue).dummy = queue_ptr as i32;
}
}
unsafe extern "C" fn ble_npl_eventq_deinit(queue: *const ble_npl_eventq) {
unsafe extern "C" fn ble_npl_eventq_deinit(queue: *mut ble_npl_eventq) {
trace!("ble_npl_eventq_deinit {:?}", queue);
let queue = queue.cast_mut();
assert!(unsafe { (*queue).dummy } != 0);
let real_queue = unsafe { (*queue).dummy } as *mut ConcurrentQueue;
unsafe {
core::ptr::drop_in_place(real_queue);
}
unsafe {
(*queue).dummy = 0;
}
let wrapper = unwrap!(unsafe { queue.as_mut() }, "queue wrapper is null");
queue::queue_delete(wrapper.dummy as _);
wrapper.dummy = 0;
}
unsafe extern "C" fn ble_npl_callout_init(
@ -1017,30 +1002,13 @@ unsafe extern "C" fn callout_timer_callback_wrapper(arg: *mut c_void) {
unsafe {
if !(*co).eventq.is_null() {
ble_npl_eventq_put(addr_of!((*co).eventq).cast(), addr_of!((*co).events));
ble_npl_eventq_put((*co).eventq.cast_mut(), addr_of!((*co).events));
} else {
ble_npl_event_run(addr_of!((*co).events));
}
}
}
unsafe extern "C" fn ble_npl_eventq_init(queue: *const ble_npl_eventq) {
trace!("ble_npl_eventq_init {:?}", queue);
let queue = queue as *mut ble_npl_eventq;
let raw_queue = ConcurrentQueue::new(EVENT_QUEUE_SIZE, 4);
let ptr =
unsafe { crate::compat::malloc::malloc(size_of_val(&raw_queue)) } as *mut ConcurrentQueue;
unsafe {
ptr.write(raw_queue);
}
unsafe {
(*queue).dummy = ptr as i32;
}
}
unsafe extern "C" fn ble_npl_mutex_init(_mutex: *const ble_npl_mutex) -> u32 {
todo!()
}

View File

@ -308,7 +308,15 @@ pub unsafe extern "C" fn __esp_radio_gettimeofday(tv: *mut timeval, _tz: *mut ()
#[unsafe(no_mangle)]
pub unsafe extern "C" fn __esp_radio_esp_timer_get_time() -> i64 {
trace!("esp_timer_get_time");
crate::time::ticks_to_micros(crate::preempt::now()) as i64
// Just using IEEE802.15.4 doesn't need the current time. If we don't use `preempt::now`, users
// will not need to have a scheduler in their firmware.
cfg_if::cfg_if! {
if #[cfg(any(feature = "wifi", feature = "ble"))] {
crate::time::ticks_to_micros(crate::preempt::now()) as i64
} else {
unreachable!()
}
}
}
#[unsafe(no_mangle)]

View File

@ -27,147 +27,6 @@ struct Mutex {
recursive: bool,
}
pub(crate) struct ConcurrentQueue {
raw_queue: NonReentrantMutex<RawQueue>,
}
impl ConcurrentQueue {
pub(crate) fn new(count: usize, item_size: usize) -> Self {
Self {
raw_queue: NonReentrantMutex::new(RawQueue::new(count, item_size)),
}
}
pub(crate) fn enqueue(&mut self, item: *mut c_void) -> i32 {
self.raw_queue.with(|q| unsafe { q.enqueue(item) })
}
pub(crate) fn try_dequeue(&mut self, item: *mut c_void) -> bool {
self.raw_queue.with(|q| unsafe { q.try_dequeue(item) })
}
pub(crate) fn remove(&mut self, item: *mut c_void) {
self.raw_queue.with(|q| unsafe { q.remove(item) })
}
pub(crate) fn count(&self) -> usize {
self.raw_queue.with(|q| unsafe { q.count() })
}
}
/// A naive and pretty much unsafe queue to back the queues used in drivers and
/// supplicant code.
///
/// The [ConcurrentQueue] wrapper should be used.
pub struct RawQueue {
item_size: usize,
capacity: usize,
current_read: usize,
current_write: usize,
storage: Box<[u8]>,
}
impl RawQueue {
/// This allocates underlying storage. See [release_storage]
pub fn new(capacity: usize, item_size: usize) -> Self {
let storage = unsafe {
let mut boxed = Box::new_uninit_slice(capacity * item_size);
for i in 0..capacity * item_size {
boxed[i].write(0);
}
boxed.assume_init()
};
Self {
item_size,
capacity,
current_read: 0,
current_write: 0,
storage,
}
}
fn get(&self, index: usize) -> &[u8] {
let item_start = self.item_size * index;
&self.storage[item_start..][..self.item_size]
}
fn get_mut(&mut self, index: usize) -> &mut [u8] {
let item_start = self.item_size * index;
&mut self.storage[item_start..][..self.item_size]
}
fn full(&self) -> bool {
self.count() == self.capacity
}
fn empty(&self) -> bool {
self.count() == 0
}
unsafe fn enqueue(&mut self, item: *mut c_void) -> i32 {
if !self.full() {
let item = unsafe { core::slice::from_raw_parts(item as *const u8, self.item_size) };
let dst = self.get_mut(self.current_write);
dst.copy_from_slice(item);
self.current_write = (self.current_write + 1) % self.capacity;
1
} else {
0
}
}
unsafe fn try_dequeue(&mut self, item: *mut c_void) -> bool {
if !self.empty() {
let item = unsafe { core::slice::from_raw_parts_mut(item as *mut u8, self.item_size) };
let src = self.get(self.current_read);
item.copy_from_slice(src);
self.current_read = (self.current_read + 1) % self.capacity;
true
} else {
false
}
}
unsafe fn remove(&mut self, item: *mut c_void) {
// do what the ESP-IDF implementations does ...
// just remove all elements and add them back except the one we need to remove -
// good enough for now
let item_slice = unsafe { core::slice::from_raw_parts(item as *const u8, self.item_size) };
let count = self.count();
if count == 0 {
return;
}
let mut tmp_item = Vec::<u8>::new();
tmp_item.reserve_exact(self.item_size);
tmp_item.resize(self.item_size, 0);
for _ in 0..count {
if !unsafe { self.try_dequeue(tmp_item.as_mut_ptr().cast()) } {
break;
}
if &tmp_item[..] != item_slice {
unsafe { self.enqueue(tmp_item.as_mut_ptr().cast()) };
}
}
}
fn count(&self) -> usize {
if self.current_write >= self.current_read {
self.current_write - self.current_read
} else {
self.capacity - self.current_read + self.current_write
}
}
}
pub unsafe fn str_from_c<'a>(s: *const c_char) -> &'a str {
unsafe {
let c_str = core::ffi::CStr::from_ptr(s.cast());

View File

@ -95,6 +95,16 @@ pub(crate) fn queue_try_receive(queue: *mut c_void, item: *mut c_void) -> i32 {
unsafe { handle.try_receive(item.cast()) as i32 }
}
pub(crate) fn queue_remove(queue: *mut c_void, item: *const c_void) {
trace!("queue_remove queue {:?} item {:x}", queue, item as usize);
let ptr = unwrap!(QueuePtr::new(queue.cast()), "queue is null");
let handle = unsafe { QueueHandle::ref_from_ptr(&ptr) };
unsafe { handle.remove(item.cast()) }
}
pub(crate) fn queue_messages_waiting(queue: *mut c_void) -> u32 {
trace!("queue_msg_waiting {:?}", queue);

View File

@ -7,8 +7,8 @@ use crate::{
preempt::timer::TimerPtr,
};
pub(crate) fn compat_timer_arm(ets_timer: *mut ets_timer, tmout: u32, repeat: bool) {
compat_timer_arm_us(ets_timer, tmout * 1000, repeat);
pub(crate) fn compat_timer_arm(ets_timer: *mut ets_timer, tmout_ms: u32, repeat: bool) {
compat_timer_arm_us(ets_timer, tmout_ms * 1000, repeat);
}
pub(crate) fn compat_timer_arm_us(ets_timer: *mut ets_timer, us: u32, repeat: bool) {

View File

@ -1739,6 +1739,7 @@ impl WifiDeviceMode {
fn tx_token(&self) -> Option<WifiTxToken> {
if !self.can_send() {
// TODO: perhaps we can use a counting semaphore with a short blocking timeout
crate::preempt::yield_task();
}
@ -1752,6 +1753,7 @@ impl WifiDeviceMode {
fn rx_token(&self) -> Option<(WifiRxToken, WifiTxToken)> {
let is_empty = self.data_queue_rx().with(|q| q.is_empty());
if is_empty || !self.can_send() {
// TODO: use an OS queue with a short timeout
crate::preempt::yield_task();
}

View File

@ -4,7 +4,6 @@
use esp_alloc as _;
use esp_backtrace as _;
use esp_hal::main;
use esp_preempt as _;
use esp_println::println;
use esp_radio::ieee802154::{Config, Ieee802154};

View File

@ -4,7 +4,6 @@
use esp_alloc as _;
use esp_backtrace as _;
use esp_hal::main;
use esp_preempt as _;
use esp_println::println;
use esp_radio::ieee802154::{Config, Ieee802154};

View File

@ -4,7 +4,6 @@
use esp_alloc as _;
use esp_backtrace as _;
use esp_hal::{delay::Delay, main};
use esp_preempt as _;
use esp_println::println;
use esp_radio::ieee802154::{Config, Frame, Ieee802154};
use ieee802154::mac::{

View File

@ -4,7 +4,6 @@
use esp_alloc as _;
use esp_backtrace as _;
use esp_hal::{delay::Delay, main};
use esp_preempt as _;
use esp_println::println;
use esp_radio::ieee802154::{Config, Frame, Ieee802154};
use ieee802154::mac::{

View File

@ -14,7 +14,6 @@ use esp_hal::{
system::software_reset,
uart::{self, Uart},
};
use esp_preempt as _;
use esp_println::println;
use esp_radio::ieee802154::{Config, Ieee802154};