From 042abc805a84d09231174e41edb0e498baaf7295 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Wed, 28 May 2025 10:36:05 +0200 Subject: [PATCH] feat: add support for channel peek Add support for peeking into the front of the channel if the value implements Clone. This can be useful in single-receiver situations where you don't want to remove the item from the queue until you've successfully processed it. --- embassy-sync/src/channel.rs | 78 ++++++++++++++++++++++++++++ embassy-sync/src/priority_channel.rs | 64 +++++++++++++++++++++++ 2 files changed, 142 insertions(+) diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 4d1fa9e39..a229c52e7 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -208,6 +208,16 @@ where self.channel.try_receive() } + /// Peek at the next value without removing it from the queue. + /// + /// See [`Channel::try_peek()`] + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.channel.try_peek() + } + /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`Channel::poll_ready_to_receive()`] @@ -293,6 +303,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> { self.channel.try_receive_with_context(None) } + /// Peek at the next value without removing it from the queue. + /// + /// See [`Channel::try_peek()`] + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.channel.try_peek_with_context(None) + } + /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`Channel::poll_ready_to_receive()`] @@ -463,6 +483,10 @@ pub(crate) trait DynamicChannel { fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result; + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone; + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; @@ -505,6 +529,31 @@ impl ChannelState { self.try_receive_with_context(None) } + fn try_peek(&mut self) -> Result + where + T: Clone, + { + self.try_peek_with_context(None) + } + + fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + if self.queue.is_full() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.front() { + Ok(message.clone()) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryReceiveError::Empty) + } + } + fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { if self.queue.is_full() { self.senders_waker.wake(); @@ -634,6 +683,13 @@ where self.lock(|c| c.try_receive_with_context(cx)) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek_with_context(cx)) + } + /// Poll the channel for the next message pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { self.lock(|c| c.poll_receive(cx)) @@ -722,6 +778,17 @@ where self.lock(|c| c.try_receive()) } + /// Peek at the next value without removing it from the queue. + /// + /// This method will either receive a copy of the message from the channel immediately or return + /// an error if the channel is empty. + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek()) + } + /// Returns the maximum number of elements the channel can hold. pub const fn capacity(&self) -> usize { N @@ -769,6 +836,13 @@ where Channel::try_receive_with_context(self, cx) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + Channel::try_peek_with_context(self, cx) + } + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { Channel::poll_ready_to_send(self, cx) } @@ -851,6 +925,8 @@ mod tests { fn simple_send_and_receive() { let c = Channel::::new(); assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_peek().unwrap(), 1); + assert_eq!(c.try_peek().unwrap(), 1); assert_eq!(c.try_receive().unwrap(), 1); } @@ -881,6 +957,8 @@ mod tests { let r = c.dyn_receiver(); assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_peek().unwrap(), 1); + assert_eq!(r.try_peek().unwrap(), 1); assert_eq!(r.try_receive().unwrap(), 1); } diff --git a/embassy-sync/src/priority_channel.rs b/embassy-sync/src/priority_channel.rs index 36959204f..623c52993 100644 --- a/embassy-sync/src/priority_channel.rs +++ b/embassy-sync/src/priority_channel.rs @@ -175,6 +175,16 @@ where self.channel.try_receive() } + /// Peek at the next value without removing it from the queue. + /// + /// See [`PriorityChannel::try_peek()`] + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.channel.try_peek_with_context(None) + } + /// Allows a poll_fn to poll until the channel is ready to receive /// /// See [`PriorityChannel::poll_ready_to_receive()`] @@ -343,6 +353,31 @@ where self.try_receive_with_context(None) } + fn try_peek(&mut self) -> Result + where + T: Clone, + { + self.try_peek_with_context(None) + } + + fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + if self.queue.len() == self.queue.capacity() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.peek() { + Ok(message.clone()) + } else { + if let Some(cx) = cx { + self.receiver_waker.register(cx.waker()); + } + Err(TryReceiveError::Empty) + } + } + fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result { if self.queue.len() == self.queue.capacity() { self.senders_waker.wake(); @@ -478,6 +513,13 @@ where self.lock(|c| c.try_receive_with_context(cx)) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek_with_context(cx)) + } + /// Poll the channel for the next message pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { self.lock(|c| c.poll_receive(cx)) @@ -548,6 +590,17 @@ where self.lock(|c| c.try_receive()) } + /// Peek at the next value without removing it from the queue. + /// + /// This method will either receive a copy of the message from the channel immediately or return + /// an error if the channel is empty. + pub fn try_peek(&self) -> Result + where + T: Clone, + { + self.lock(|c| c.try_peek()) + } + /// Removes elements from the channel based on the given predicate. pub fn remove_if(&self, predicate: F) where @@ -617,6 +670,13 @@ where PriorityChannel::try_receive_with_context(self, cx) } + fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result + where + T: Clone, + { + PriorityChannel::try_peek_with_context(self, cx) + } + fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { PriorityChannel::poll_ready_to_send(self, cx) } @@ -705,6 +765,8 @@ mod tests { fn simple_send_and_receive() { let c = PriorityChannel::::new(); assert!(c.try_send(1).is_ok()); + assert_eq!(c.try_peek().unwrap(), 1); + assert_eq!(c.try_peek().unwrap(), 1); assert_eq!(c.try_receive().unwrap(), 1); } @@ -725,6 +787,8 @@ mod tests { let r: DynamicReceiver<'_, u32> = c.receiver().into(); assert!(s.try_send(1).is_ok()); + assert_eq!(r.try_peek().unwrap(), 1); + assert_eq!(r.try_peek().unwrap(), 1); assert_eq!(r.try_receive().unwrap(), 1); }