diff --git a/cyw43/src/runner.rs b/cyw43/src/runner.rs index c72cf0def..e90316302 100644 --- a/cyw43/src/runner.rs +++ b/cyw43/src/runner.rs @@ -1,6 +1,5 @@ use embassy_futures::select::{select3, Either3}; use embassy_net_driver_channel as ch; -use embassy_sync::pubsub::PubSubBehavior; use embassy_time::{block_for, Duration, Timer}; use embedded_hal_1::digital::OutputPin; @@ -438,13 +437,16 @@ where // publish() is a deadlock risk in the current design as awaiting here prevents ioctls // The `Runner` always yields when accessing the device, so consumers always have a chance to receive the event // (if they are actively awaiting the queue) - self.events.queue.publish_immediate(events::Message::new( - Status { - event_type: evt_type, - status, - }, - event_payload, - )); + self.events + .queue + .immediate_publisher() + .publish_immediate(events::Message::new( + Status { + event_type: evt_type, + status, + }, + event_payload, + )); } } CHANNEL_TYPE_DATA => { diff --git a/embassy-sync/CHANGELOG.md b/embassy-sync/CHANGELOG.md index 7a830a853..bb919b28a 100644 --- a/embassy-sync/CHANGELOG.md +++ b/embassy-sync/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `Channel`. - Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `PriorityChannel`. - Add `capacity`, `free_capacity`, `len`, `is_empty` and `is_full` functions to `PubSubChannel`. +- Made `PubSubBehavior` sealed + - If you called `.publish_immediate(...)` on the queue directly before, then now call `.immediate_publisher().publish_immediate(...)` ## 0.5.0 - 2023-12-04 diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index 754747ab8..af3d6db2a 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -189,7 +189,7 @@ impl PubSubBehavior +impl SealedPubSubBehavior for PubSubChannel { fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll> { @@ -248,13 +248,6 @@ impl usize { - self.inner.lock(|s| { - let s = s.borrow(); - s.queue.capacity() - s.queue.len() - }) - } - fn unregister_subscriber(&self, subscriber_next_message_id: u64) { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -268,6 +261,26 @@ impl usize { + self.capacity() + } + + fn free_capacity(&self) -> usize { + self.free_capacity() + } + + fn len(&self) -> usize { + self.len() + } + + fn is_empty(&self) -> bool { + self.is_empty() + } + + fn is_full(&self) -> bool { + self.is_full() + } } /// Internal state for the PubSub channel @@ -421,7 +434,7 @@ pub enum Error { /// 'Middle level' behaviour of the pubsub channel. /// This trait is used so that Sub and Pub can be generic over the channel. -pub trait PubSubBehavior { +trait SealedPubSubBehavior { /// Try to get a message from the queue with the given message id. /// /// If the message is not yet present and a context is given, then its waker is registered in the subsriber wakers. @@ -439,8 +452,22 @@ pub trait PubSubBehavior { /// Publish a message immediately fn publish_immediate(&self, message: T); - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers - fn space(&self) -> usize; + /// Returns the maximum number of elements the channel can hold. + fn capacity(&self) -> usize; + + /// Returns the free capacity of the channel. + /// + /// This is equivalent to `capacity() - len()` + fn free_capacity(&self) -> usize; + + /// Returns the number of elements currently in the channel. + fn len(&self) -> usize; + + /// Returns whether the channel is empty. + fn is_empty(&self) -> bool; + + /// Returns whether the channel is full. + fn is_full(&self) -> bool; /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); @@ -449,6 +476,13 @@ pub trait PubSubBehavior { fn unregister_publisher(&self); } +/// 'Middle level' behaviour of the pubsub channel. +/// This trait is used so that Sub and Pub can be generic over the channel. +#[allow(private_bounds)] +pub trait PubSubBehavior: SealedPubSubBehavior {} + +impl> PubSubBehavior for C {} + /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -581,6 +615,7 @@ mod tests { assert_eq!(pub0.try_publish(0), Ok(())); assert_eq!(pub0.try_publish(0), Ok(())); assert_eq!(pub0.try_publish(0), Ok(())); + assert!(pub0.is_full()); assert_eq!(pub0.try_publish(0), Err(0)); drop(sub0); @@ -613,32 +648,42 @@ mod tests { } #[futures_test::test] - async fn correct_space() { + async fn correct_len() { let channel = PubSubChannel::::new(); let mut sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); let pub0 = channel.publisher().unwrap(); - assert_eq!(pub0.space(), 4); + assert!(sub0.is_empty()); + assert!(sub1.is_empty()); + assert!(pub0.is_empty()); + assert_eq!(pub0.free_capacity(), 4); + assert_eq!(pub0.len(), 0); pub0.publish(42).await; - assert_eq!(pub0.space(), 3); + assert_eq!(pub0.free_capacity(), 3); + assert_eq!(pub0.len(), 1); pub0.publish(42).await; - assert_eq!(pub0.space(), 2); + assert_eq!(pub0.free_capacity(), 2); + assert_eq!(pub0.len(), 2); sub0.next_message().await; sub0.next_message().await; - assert_eq!(pub0.space(), 2); + assert_eq!(pub0.free_capacity(), 2); + assert_eq!(pub0.len(), 2); sub1.next_message().await; - assert_eq!(pub0.space(), 3); + assert_eq!(pub0.free_capacity(), 3); + assert_eq!(pub0.len(), 1); + sub1.next_message().await; - assert_eq!(pub0.space(), 4); + assert_eq!(pub0.free_capacity(), 4); + assert_eq!(pub0.len(), 0); } #[futures_test::test] @@ -649,29 +694,29 @@ mod tests { let mut sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); - assert_eq!(4, pub0.space()); + assert_eq!(4, pub0.free_capacity()); pub0.publish(1).await; pub0.publish(2).await; - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); assert_eq!(1, sub0.try_next_message_pure().unwrap()); assert_eq!(2, sub0.try_next_message_pure().unwrap()); - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); drop(sub0); - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); assert_eq!(1, sub1.try_next_message_pure().unwrap()); - assert_eq!(3, channel.space()); + assert_eq!(3, channel.free_capacity()); drop(sub1); - assert_eq!(4, channel.space()); + assert_eq!(4, channel.free_capacity()); } struct CloneCallCounter(usize); diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index e1edc9eb9..26e2c63b7 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -43,12 +43,31 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { self.channel.publish_with_context(message, None) } - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. /// - /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. - /// So checking doesn't give any guarantees.* - pub fn space(&self) -> usize { - self.channel.space() + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() } } @@ -124,12 +143,31 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { self.channel.publish_with_context(message, None) } - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. /// - /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. - /// So checking doesn't give any guarantees.* - pub fn space(&self) -> usize { - self.channel.space() + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() } } diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index f420a75f0..2e2bd26a9 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -65,10 +65,39 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } } - /// The amount of messages this subscriber hasn't received yet + /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically + /// for this subscriber. pub fn available(&self) -> u64 { self.channel.available(self.next_message_id) } + + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + /// See [Self::available] for how many messages are available for this subscriber. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() + } } impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {