Merge pull request #4263 from embassy-rs/channel-peek

feat: add support for channel peek
This commit is contained in:
Ulf Lilleengen 2025-05-28 12:26:06 +02:00 committed by GitHub
commit 58db2f7d94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 142 additions and 0 deletions

View File

@ -208,6 +208,16 @@ where
self.channel.try_receive()
}
/// Peek at the next value without removing it from the queue.
///
/// See [`Channel::try_peek()`]
pub fn try_peek(&self) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.channel.try_peek()
}
/// Allows a poll_fn to poll until the channel is ready to receive
///
/// See [`Channel::poll_ready_to_receive()`]
@ -293,6 +303,16 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
self.channel.try_receive_with_context(None)
}
/// Peek at the next value without removing it from the queue.
///
/// See [`Channel::try_peek()`]
pub fn try_peek(&self) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.channel.try_peek_with_context(None)
}
/// Allows a poll_fn to poll until the channel is ready to receive
///
/// See [`Channel::poll_ready_to_receive()`]
@ -463,6 +483,10 @@ pub(crate) trait DynamicChannel<T> {
fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
where
T: Clone;
fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
@ -505,6 +529,31 @@ impl<T, const N: usize> ChannelState<T, N> {
self.try_receive_with_context(None)
}
fn try_peek(&mut self) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.try_peek_with_context(None)
}
fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
where
T: Clone,
{
if self.queue.is_full() {
self.senders_waker.wake();
}
if let Some(message) = self.queue.front() {
Ok(message.clone())
} else {
if let Some(cx) = cx {
self.receiver_waker.register(cx.waker());
}
Err(TryReceiveError::Empty)
}
}
fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
if self.queue.is_full() {
self.senders_waker.wake();
@ -634,6 +683,13 @@ where
self.lock(|c| c.try_receive_with_context(cx))
}
fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.lock(|c| c.try_peek_with_context(cx))
}
/// Poll the channel for the next message
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
self.lock(|c| c.poll_receive(cx))
@ -722,6 +778,17 @@ where
self.lock(|c| c.try_receive())
}
/// Peek at the next value without removing it from the queue.
///
/// This method will either receive a copy of the message from the channel immediately or return
/// an error if the channel is empty.
pub fn try_peek(&self) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.lock(|c| c.try_peek())
}
/// Returns the maximum number of elements the channel can hold.
pub const fn capacity(&self) -> usize {
N
@ -769,6 +836,13 @@ where
Channel::try_receive_with_context(self, cx)
}
fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
where
T: Clone,
{
Channel::try_peek_with_context(self, cx)
}
fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
Channel::poll_ready_to_send(self, cx)
}
@ -851,6 +925,8 @@ mod tests {
fn simple_send_and_receive() {
let c = Channel::<NoopRawMutex, u32, 3>::new();
assert!(c.try_send(1).is_ok());
assert_eq!(c.try_peek().unwrap(), 1);
assert_eq!(c.try_peek().unwrap(), 1);
assert_eq!(c.try_receive().unwrap(), 1);
}
@ -881,6 +957,8 @@ mod tests {
let r = c.dyn_receiver();
assert!(s.try_send(1).is_ok());
assert_eq!(r.try_peek().unwrap(), 1);
assert_eq!(r.try_peek().unwrap(), 1);
assert_eq!(r.try_receive().unwrap(), 1);
}

View File

@ -175,6 +175,16 @@ where
self.channel.try_receive()
}
/// Peek at the next value without removing it from the queue.
///
/// See [`PriorityChannel::try_peek()`]
pub fn try_peek(&self) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.channel.try_peek_with_context(None)
}
/// Allows a poll_fn to poll until the channel is ready to receive
///
/// See [`PriorityChannel::poll_ready_to_receive()`]
@ -343,6 +353,31 @@ where
self.try_receive_with_context(None)
}
fn try_peek(&mut self) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.try_peek_with_context(None)
}
fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
where
T: Clone,
{
if self.queue.len() == self.queue.capacity() {
self.senders_waker.wake();
}
if let Some(message) = self.queue.peek() {
Ok(message.clone())
} else {
if let Some(cx) = cx {
self.receiver_waker.register(cx.waker());
}
Err(TryReceiveError::Empty)
}
}
fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
if self.queue.len() == self.queue.capacity() {
self.senders_waker.wake();
@ -478,6 +513,13 @@ where
self.lock(|c| c.try_receive_with_context(cx))
}
fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.lock(|c| c.try_peek_with_context(cx))
}
/// Poll the channel for the next message
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
self.lock(|c| c.poll_receive(cx))
@ -548,6 +590,17 @@ where
self.lock(|c| c.try_receive())
}
/// Peek at the next value without removing it from the queue.
///
/// This method will either receive a copy of the message from the channel immediately or return
/// an error if the channel is empty.
pub fn try_peek(&self) -> Result<T, TryReceiveError>
where
T: Clone,
{
self.lock(|c| c.try_peek())
}
/// Removes elements from the channel based on the given predicate.
pub fn remove_if<F>(&self, predicate: F)
where
@ -617,6 +670,13 @@ where
PriorityChannel::try_receive_with_context(self, cx)
}
fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
where
T: Clone,
{
PriorityChannel::try_peek_with_context(self, cx)
}
fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
PriorityChannel::poll_ready_to_send(self, cx)
}
@ -705,6 +765,8 @@ mod tests {
fn simple_send_and_receive() {
let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
assert!(c.try_send(1).is_ok());
assert_eq!(c.try_peek().unwrap(), 1);
assert_eq!(c.try_peek().unwrap(), 1);
assert_eq!(c.try_receive().unwrap(), 1);
}
@ -725,6 +787,8 @@ mod tests {
let r: DynamicReceiver<'_, u32> = c.receiver().into();
assert!(s.try_send(1).is_ok());
assert_eq!(r.try_peek().unwrap(), 1);
assert_eq!(r.try_peek().unwrap(), 1);
assert_eq!(r.try_receive().unwrap(), 1);
}