Merge pull request #4262 from cschuhen/send_on_dynamic_channel

Enable Sync and Send for DynamicSender and DynamicReceiver.
This commit is contained in:
Ulf Lilleengen 2025-05-28 10:43:33 +00:00 committed by GitHub
commit 62cf9d592b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -164,6 +164,57 @@ impl<'ch, T> DynamicSender<'ch, T> {
}
}
/// Send-only access to a [`Channel`] without knowing channel size.
/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
pub struct SendDynamicSender<'ch, T> {
pub(crate) channel: &'ch dyn DynamicChannel<T>,
}
impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
fn clone(&self) -> Self {
*self
}
}
impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
where
M: RawMutex + Sync + Send,
{
fn from(s: Sender<'ch, M, T, N>) -> Self {
Self { channel: s.channel }
}
}
impl<'ch, T> SendDynamicSender<'ch, T> {
/// Sends a value.
///
/// See [`Channel::send()`]
pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
DynamicSendFuture {
channel: self.channel,
message: Some(message),
}
}
/// Attempt to immediately send a message.
///
/// See [`Channel::send()`]
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
self.channel.try_send_with_context(message, None)
}
/// Allows a poll_fn to poll until the channel is ready to send
///
/// See [`Channel::poll_ready_to_send()`]
pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
self.channel.poll_ready_to_send(cx)
}
}
/// Receive-only access to a [`Channel`].
pub struct Receiver<'ch, M, T, const N: usize>
where
@ -337,6 +388,61 @@ where
}
}
/// Receive-only access to a [`Channel`] without knowing channel size.
/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
pub struct SendableDynamicReceiver<'ch, T> {
pub(crate) channel: &'ch dyn DynamicChannel<T>,
}
impl<'ch, T> Clone for SendableDynamicReceiver<'ch, T> {
fn clone(&self) -> Self {
*self
}
}
impl<'ch, T> Copy for SendableDynamicReceiver<'ch, T> {}
unsafe impl<'ch, T: Send> Send for SendableDynamicReceiver<'ch, T> {}
unsafe impl<'ch, T: Send> Sync for SendableDynamicReceiver<'ch, T> {}
impl<'ch, T> SendableDynamicReceiver<'ch, T> {
/// Receive the next value.
///
/// See [`Channel::receive()`].
pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
DynamicReceiveFuture { channel: self.channel }
}
/// Attempt to immediately receive the next value.
///
/// See [`Channel::try_receive()`]
pub fn try_receive(&self) -> Result<T, TryReceiveError> {
self.channel.try_receive_with_context(None)
}
/// Allows a poll_fn to poll until the channel is ready to receive
///
/// See [`Channel::poll_ready_to_receive()`]
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
self.channel.poll_ready_to_receive(cx)
}
/// Poll the channel for the next item
///
/// See [`Channel::poll_receive()`]
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
self.channel.poll_receive(cx)
}
}
impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendableDynamicReceiver<'ch, T>
where
M: RawMutex + Sync + Send,
{
fn from(s: Receiver<'ch, M, T, N>) -> Self {
Self { channel: s.channel }
}
}
impl<'ch, M, T, const N: usize> futures_util::Stream for Receiver<'ch, M, T, N>
where
M: RawMutex,