mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
sync: add mpsc::Receiver::{capacity,max_capacity}
(#6511)
This commit is contained in:
parent
3c8d8e60ca
commit
b7d4fba707
@ -481,7 +481,7 @@ impl<T> Receiver<T> {
|
||||
/// assert!(!rx.is_closed());
|
||||
///
|
||||
/// rx.close();
|
||||
///
|
||||
///
|
||||
/// assert!(rx.is_closed());
|
||||
/// }
|
||||
/// ```
|
||||
@ -530,6 +530,86 @@ impl<T> Receiver<T> {
|
||||
self.chan.len()
|
||||
}
|
||||
|
||||
/// Returns the current capacity of the channel.
|
||||
///
|
||||
/// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
|
||||
/// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
|
||||
/// This is distinct from [`max_capacity`], which always returns buffer capacity initially
|
||||
/// specified when calling [`channel`].
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::sync::mpsc;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (tx, mut rx) = mpsc::channel::<()>(5);
|
||||
///
|
||||
/// assert_eq!(rx.capacity(), 5);
|
||||
///
|
||||
/// // Making a reservation drops the capacity by one.
|
||||
/// let permit = tx.reserve().await.unwrap();
|
||||
/// assert_eq!(rx.capacity(), 4);
|
||||
/// assert_eq!(rx.len(), 0);
|
||||
///
|
||||
/// // Sending and receiving a value increases the capacity by one.
|
||||
/// permit.send(());
|
||||
/// assert_eq!(rx.len(), 1);
|
||||
/// rx.recv().await.unwrap();
|
||||
/// assert_eq!(rx.capacity(), 5);
|
||||
///
|
||||
/// // Directly sending a message drops the capacity by one.
|
||||
/// tx.send(()).await.unwrap();
|
||||
/// assert_eq!(rx.capacity(), 4);
|
||||
/// assert_eq!(rx.len(), 1);
|
||||
///
|
||||
/// // Receiving the message increases the capacity by one.
|
||||
/// rx.recv().await.unwrap();
|
||||
/// assert_eq!(rx.capacity(), 5);
|
||||
/// assert_eq!(rx.len(), 0);
|
||||
/// }
|
||||
/// ```
|
||||
/// [`capacity`]: Receiver::capacity
|
||||
/// [`max_capacity`]: Receiver::max_capacity
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.chan.semaphore().semaphore.available_permits()
|
||||
}
|
||||
|
||||
/// Returns the maximum buffer capacity of the channel.
|
||||
///
|
||||
/// The maximum capacity is the buffer capacity initially specified when calling
|
||||
/// [`channel`]. This is distinct from [`capacity`], which returns the *current*
|
||||
/// available buffer capacity: as messages are sent and received, the value
|
||||
/// returned by [`capacity`] will go up or down, whereas the value
|
||||
/// returned by [`max_capacity`] will remain constant.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::sync::mpsc;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (tx, rx) = mpsc::channel::<()>(5);
|
||||
///
|
||||
/// // both max capacity and capacity are the same at first
|
||||
/// assert_eq!(rx.max_capacity(), 5);
|
||||
/// assert_eq!(rx.capacity(), 5);
|
||||
///
|
||||
/// // Making a reservation doesn't change the max capacity.
|
||||
/// let permit = tx.reserve().await.unwrap();
|
||||
/// assert_eq!(rx.max_capacity(), 5);
|
||||
/// // but drops the capacity by one
|
||||
/// assert_eq!(rx.capacity(), 4);
|
||||
/// }
|
||||
/// ```
|
||||
/// [`capacity`]: Receiver::capacity
|
||||
/// [`max_capacity`]: Receiver::max_capacity
|
||||
pub fn max_capacity(&self) -> usize {
|
||||
self.chan.semaphore().bound
|
||||
}
|
||||
|
||||
/// Polls to receive the next message on this channel.
|
||||
///
|
||||
/// This method returns:
|
||||
@ -1059,7 +1139,7 @@ impl<T> Sender<T> {
|
||||
///
|
||||
/// // The iterator should now be exhausted
|
||||
/// assert!(permit.next().is_none());
|
||||
///
|
||||
///
|
||||
/// // The value sent on the permit is received
|
||||
/// assert_eq!(rx.recv().await.unwrap(), 456);
|
||||
/// assert_eq!(rx.recv().await.unwrap(), 457);
|
||||
@ -1274,7 +1354,7 @@ impl<T> Sender<T> {
|
||||
/// // The value sent on the permit is received
|
||||
/// assert_eq!(rx.recv().await.unwrap(), 456);
|
||||
/// assert_eq!(rx.recv().await.unwrap(), 457);
|
||||
///
|
||||
///
|
||||
/// // Trying to call try_reserve_many with 0 will return an empty iterator
|
||||
/// let mut permit = tx.try_reserve_many(0).unwrap();
|
||||
/// assert!(permit.next().is_none());
|
||||
@ -1447,7 +1527,7 @@ impl<T> Sender<T> {
|
||||
/// [`channel`]. This is distinct from [`capacity`], which returns the *current*
|
||||
/// available buffer capacity: as messages are sent and received, the
|
||||
/// value returned by [`capacity`] will go up or down, whereas the value
|
||||
/// returned by `max_capacity` will remain constant.
|
||||
/// returned by [`max_capacity`] will remain constant.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
|
@ -465,6 +465,10 @@ impl<T, S: Semaphore> Rx<T, S> {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn semaphore(&self) -> &S {
|
||||
&self.inner.semaphore
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, S: Semaphore> Drop for Rx<T, S> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user