diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 1b9460031..2cf2b1c32 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -361,38 +361,16 @@ struct RecvGuard<'a, T> { } /// Receive a value future -struct Recv -where - R: AsMut>, -{ +struct Recv<'a, T> { /// Receiver being waited on - receiver: R, + receiver: &'a mut Receiver, /// Entry in the waiter `LinkedList` waiter: UnsafeCell, - - _p: std::marker::PhantomData, } -/// `AsMut` is not implemented for `T` (coherence). Explicitly implementing -/// `AsMut` for `Receiver` would be included in the public API of the receiver -/// type. Instead, `Borrow` is used internally to bridge the gap. -struct Borrow(T); - -impl AsMut> for Borrow> { - fn as_mut(&mut self) -> &mut Receiver { - &mut self.0 - } -} - -impl<'a, T> AsMut> for Borrow<&'a mut Receiver> { - fn as_mut(&mut self) -> &mut Receiver { - &mut *self.0 - } -} - -unsafe impl> + Send, T: Send> Send for Recv {} -unsafe impl> + Sync, T: Send> Sync for Recv {} +unsafe impl<'a, T: Send> Send for Recv<'a, T> {} +unsafe impl<'a, T: Send> Sync for Recv<'a, T> {} /// Max number of receivers. Reserve space to lock. const MAX_RECEIVERS: usize = usize::MAX >> 2; @@ -892,7 +870,7 @@ impl Receiver { /// } /// ``` pub async fn recv(&mut self) -> Result { - let fut = Recv::<_, T>::new(Borrow(self)); + let fut = Recv::new(self); fut.await } @@ -965,11 +943,8 @@ impl Drop for Receiver { } } -impl Recv -where - R: AsMut>, -{ - fn new(receiver: R) -> Recv { +impl<'a, T> Recv<'a, T> { + fn new(receiver: &'a mut Receiver) -> Recv<'a, T> { Recv { receiver, waiter: UnsafeCell::new(Waiter { @@ -978,7 +953,6 @@ where pointers: linked_list::Pointers::new(), _p: PhantomPinned, }), - _p: std::marker::PhantomData, } } @@ -990,14 +964,13 @@ where is_unpin::<&mut Receiver>(); let me = self.get_unchecked_mut(); - (me.receiver.as_mut(), &me.waiter) + (me.receiver, &me.waiter) } } } -impl Future for Recv +impl<'a, T> Future for Recv<'a, T> where - R: AsMut>, T: Clone, { type Output = Result; @@ -1016,14 +989,11 @@ where } } -impl Drop for Recv -where - R: AsMut>, -{ +impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { // Acquire the tail lock. This is required for safety before accessing // the waiter node. - let mut tail = self.receiver.as_mut().shared.tail.lock(); + let mut tail = self.receiver.shared.tail.lock(); // safety: tail lock is held let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });