Make Sync capable versions of DynamicSender and DynamicReceiver.

DynamicSender and DynamicReceiver, just seem to be a fat pointer to a
Channel which is already protected by it's own Mutex already. In fact,
you can share the Channel already betwen threads and create Dynamic*er's
in the target threads. It should be safe to share the Dynamic*er's
directly. Can only be used when Mutex M of channel supoorts Sync.
This commit is contained in:
Corey Schuhen 2025-05-28 18:15:15 +10:00
parent 645883d874
commit 277f6f7331

View File

@ -164,6 +164,57 @@ impl<'ch, T> DynamicSender<'ch, T> {
}
}
/// Send-only access to a [`Channel`] without knowing channel size.
/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
pub struct SendDynamicSender<'ch, T> {
pub(crate) channel: &'ch dyn DynamicChannel<T>,
}
impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
fn clone(&self) -> Self {
*self
}
}
impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
where
M: RawMutex + Sync + Send,
{
fn from(s: Sender<'ch, M, T, N>) -> Self {
Self { channel: s.channel }
}
}
impl<'ch, T> SendDynamicSender<'ch, T> {
/// Sends a value.
///
/// See [`Channel::send()`]
pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
DynamicSendFuture {
channel: self.channel,
message: Some(message),
}
}
/// Attempt to immediately send a message.
///
/// See [`Channel::send()`]
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
self.channel.try_send_with_context(message, None)
}
/// Allows a poll_fn to poll until the channel is ready to send
///
/// See [`Channel::poll_ready_to_send()`]
pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
self.channel.poll_ready_to_send(cx)
}
}
/// Receive-only access to a [`Channel`].
pub struct Receiver<'ch, M, T, const N: usize>
where
@ -317,6 +368,61 @@ where
}
}
/// Receive-only access to a [`Channel`] without knowing channel size.
/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
pub struct SendableDynamicReceiver<'ch, T> {
pub(crate) channel: &'ch dyn DynamicChannel<T>,
}
impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> {
fn clone(&self) -> Self {
*self
}
}
impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {}
unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {}
unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {}
impl<'ch, T> SendableDynamicReceiver<'ch, T> {
/// Receive the next value.
///
/// See [`Channel::receive()`].
pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
DynamicReceiveFuture { channel: self.channel }
}
/// Attempt to immediately receive the next value.
///
/// See [`Channel::try_receive()`]
pub fn try_receive(&self) -> Result<T, TryReceiveError> {
self.channel.try_receive_with_context(None)
}
/// Allows a poll_fn to poll until the channel is ready to receive
///
/// See [`Channel::poll_ready_to_receive()`]
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
self.channel.poll_ready_to_receive(cx)
}
/// Poll the channel for the next item
///
/// See [`Channel::poll_receive()`]
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
self.channel.poll_receive(cx)
}
}
impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendableDynamicReceiver<'ch, T>
where
M: RawMutex + Sync + Send,
{
fn from(s: Receiver<'ch, M, T, N>) -> Self {
Self { channel: s.channel }
}
}
impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N>
where
M: RawMutex,