sync: add mpsc::Receiver::blocking_recv_many (#6867)

Fixes: #6865
Co-authored-by: Rafael Bachmann <rafael.bachmann@paessler.com>
This commit is contained in:
Rafael Bachmann 2024-10-17 12:02:12 +03:00 committed by GitHub
parent c9e998e4b3
commit 1656d8e231
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 53 additions and 0 deletions

View File

@ -419,6 +419,16 @@ impl<T> Receiver<T> {
crate::future::block_on(self.recv())
}
/// Variant of [`Self::recv_many`] for blocking contexts.
///
/// The same conditions as in [`Self::blocking_recv`] apply.
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
crate::future::block_on(self.recv_many(buffer, limit))
}
/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while

View File

@ -319,6 +319,16 @@ impl<T> UnboundedReceiver<T> {
crate::future::block_on(self.recv())
}
/// Variant of [`Self::recv_many`] for blocking contexts.
///
/// The same conditions as in [`Self::blocking_recv`] apply.
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
crate::future::block_on(self.recv_many(buffer, limit))
}
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while

View File

@ -129,6 +129,22 @@ fn mpsc_bounded_receiver_blocking_recv_panic_caller() -> Result<(), Box<dyn Erro
Ok(())
}
#[test]
fn mpsc_bounded_receiver_blocking_recv_many_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = current_thread();
let (_tx, mut rx) = mpsc::channel::<u8>(1);
rt.block_on(async {
let _ = rx.blocking_recv();
});
});
// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());
Ok(())
}
#[test]
fn mpsc_bounded_sender_blocking_send_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
@ -161,6 +177,23 @@ fn mpsc_unbounded_receiver_blocking_recv_panic_caller() -> Result<(), Box<dyn Er
Ok(())
}
#[test]
fn mpsc_unbounded_receiver_blocking_recv_many_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = current_thread();
let (_tx, mut rx) = mpsc::unbounded_channel::<u8>();
let mut vec = vec![];
rt.block_on(async {
let _ = rx.blocking_recv_many(&mut vec, 1);
});
});
// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());
Ok(())
}
#[test]
fn semaphore_merge_unrelated_owned_permits() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {