From 332bc44b8c3687492b85ff3ad3f4ea7b67608dd2 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 2 Aug 2021 10:45:50 +0200 Subject: [PATCH 1/3] Expose SendFuture and RecvFuture types Having these types available makes it easier to store futures for later use as the named types can be embedded in other types at compile time. --- embassy/src/util/mpsc.rs | 45 ++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index cc9e2a5dd..d41c86291 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs @@ -156,18 +156,10 @@ where /// closed by `recv` until they are all consumed. /// /// [`close`]: Self::close - pub async fn recv(&mut self) -> Option { - futures::future::poll_fn(|cx| self.recv_poll(cx)).await - } - - fn recv_poll(&mut self, cx: &mut Context<'_>) -> Poll> { - Channel::lock(self.channel_cell, |c| { - match c.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(Some(v)), - Err(TryRecvError::Closed) => Poll::Ready(None), - Err(TryRecvError::Empty) => Poll::Pending, - } - }) + pub fn recv(&mut self) -> RecvFuture<'ch, M, T, N> { + RecvFuture { + channel_cell: self.channel_cell, + } } /// Attempts to immediately receive a message on this `Receiver` @@ -202,6 +194,30 @@ where } } +pub struct RecvFuture<'ch, M, T, const N: usize> +where + M: Mutex, +{ + channel_cell: &'ch UnsafeCell>, +} + +impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N> +where + M: Mutex, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Channel::lock(self.channel_cell, |c| { + match c.try_recv_with_context(Some(cx)) { + Ok(v) => Poll::Ready(Some(v)), + Err(TryRecvError::Closed) => Poll::Ready(None), + Err(TryRecvError::Empty) => Poll::Pending, + } + }) + } +} + impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> where M: Mutex, @@ -224,12 +240,11 @@ where /// /// [`close`]: Receiver::close /// [`Receiver`]: Receiver - pub async fn send(&self, message: T) -> Result<(), SendError> { + pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> { SendFuture { sender: self.clone(), message: Some(message), } - .await } /// Attempts to immediately send a message on this `Sender` @@ -278,7 +293,7 @@ where } } -struct SendFuture<'ch, M, T, const N: usize> +pub struct SendFuture<'ch, M, T, const N: usize> where M: Mutex, { From 4d8d8e386f9e85a6273cf93f1c8feb5871ed96af Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 2 Aug 2021 11:18:59 +0200 Subject: [PATCH 2/3] Make RecvFuture sync RecvFuture always locks the underlying Channel when polled. --- embassy/src/util/mpsc.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index d41c86291..4c6e8a6f0 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs @@ -218,6 +218,16 @@ where } } +// Safe to pass the receive future around since it locks channel whenever polled +unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where + M: Mutex + Sync +{ +} +unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where + M: Mutex + Sync +{ +} + impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N> where M: Mutex, From f2c2ad06caa0b05c4c8a9b3b88741afe5ab9f836 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 2 Aug 2021 12:42:06 +0200 Subject: [PATCH 3/3] Use lifetime to ensure only a single future is created at a time --- embassy/src/util/mpsc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index 4c6e8a6f0..4a934eb2f 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs @@ -156,7 +156,7 @@ where /// closed by `recv` until they are all consumed. /// /// [`close`]: Self::close - pub fn recv(&mut self) -> RecvFuture<'ch, M, T, N> { + pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> { RecvFuture { channel_cell: self.channel_cell, }