From 277f6f7331684a0008930a43b6705ba52873d1f5 Mon Sep 17 00:00:00 2001 From: Corey Schuhen Date: Wed, 28 May 2025 18:15:15 +1000 Subject: [PATCH] Make Sync capable versions of DynamicSender and DynamicReceiver. DynamicSender and DynamicReceiver, just seem to be a fat pointer to a Channel which is already protected by it's own Mutex already. In fact, you can share the Channel already betwen threads and create Dynamic*er's in the target threads. It should be safe to share the Dynamic*er's directly. Can only be used when Mutex M of channel supoorts Sync. --- embassy-sync/src/channel.rs | 106 ++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 4d1fa9e39..1b053ecad 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -164,6 +164,57 @@ impl<'ch, T> DynamicSender<'ch, T> { } } +/// Send-only access to a [`Channel`] without knowing channel size. +/// This version can be sent between threads but can only be created if the underlying mutex is Sync. +pub struct SendDynamicSender<'ch, T> { + pub(crate) channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for SendDynamicSender<'ch, T> { + fn clone(&self) -> Self { + *self + } +} + +impl<'ch, T> Copy for SendDynamicSender<'ch, T> {} +unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {} +unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {} + +impl<'ch, M, T, const N: usize> From> for SendDynamicSender<'ch, T> +where + M: RawMutex + Sync + Send, +{ + fn from(s: Sender<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + +impl<'ch, T> SendDynamicSender<'ch, T> { + /// Sends a value. + /// + /// See [`Channel::send()`] + pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> { + DynamicSendFuture { + channel: self.channel, + message: Some(message), + } + } + + /// Attempt to immediately send a message. + /// + /// See [`Channel::send()`] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.channel.try_send_with_context(message, None) + } + + /// Allows a poll_fn to poll until the channel is ready to send + /// + /// See [`Channel::poll_ready_to_send()`] + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_send(cx) + } +} + /// Receive-only access to a [`Channel`]. pub struct Receiver<'ch, M, T, const N: usize> where @@ -317,6 +368,61 @@ where } } +/// Receive-only access to a [`Channel`] without knowing channel size. +/// This version can be sent between threads but can only be created if the underlying mutex is Sync. +pub struct SendableDynamicReceiver<'ch, T> { + pub(crate) channel: &'ch dyn DynamicChannel, +} + +impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> { + fn clone(&self) -> Self { + *self + } +} + +impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {} +unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {} +unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {} + +impl<'ch, T> SendableDynamicReceiver<'ch, T> { + /// Receive the next value. + /// + /// See [`Channel::receive()`]. + pub fn receive(&self) -> DynamicReceiveFuture<'_, T> { + DynamicReceiveFuture { channel: self.channel } + } + + /// Attempt to immediately receive the next value. + /// + /// See [`Channel::try_receive()`] + pub fn try_receive(&self) -> Result { + self.channel.try_receive_with_context(None) + } + + /// Allows a poll_fn to poll until the channel is ready to receive + /// + /// See [`Channel::poll_ready_to_receive()`] + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { + self.channel.poll_ready_to_receive(cx) + } + + /// Poll the channel for the next item + /// + /// See [`Channel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } +} + +impl<'ch, M, T, const N: usize> From> for SendableDynamicReceiver<'ch, T> +where + M: RawMutex + Sync + Send, +{ + fn from(s: Receiver<'ch, M, T, N>) -> Self { + Self { channel: s.channel } + } +} + impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N> where M: RawMutex,