mirror of
				https://github.com/tokio-rs/tokio.git
				synced 2025-11-03 14:02:47 +00:00 
			
		
		
		
	Add Mutex::try_lock and (Unbounded)Receiver::try_recv (#1939)
This commit is contained in:
		
							parent
							
								
									5d5755dca4
								
							
						
					
					
						commit
						975576952f
					
				@ -1,5 +1,5 @@
 | 
			
		||||
use crate::sync::mpsc::chan;
 | 
			
		||||
use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError};
 | 
			
		||||
use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
 | 
			
		||||
use crate::sync::semaphore;
 | 
			
		||||
 | 
			
		||||
use std::fmt;
 | 
			
		||||
@ -150,6 +150,21 @@ impl<T> Receiver<T> {
 | 
			
		||||
        self.chan.recv(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Attempts to return a pending value on this receiver without blocking.
 | 
			
		||||
    ///
 | 
			
		||||
    /// This method will never block the caller in order to wait for data to
 | 
			
		||||
    /// become available. Instead, this will always return immediately with
 | 
			
		||||
    /// a possible option of pending data on the channel.
 | 
			
		||||
    ///
 | 
			
		||||
    /// This is useful for a flavor of "optimistic check" before deciding to
 | 
			
		||||
    /// block on a receiver.
 | 
			
		||||
    ///
 | 
			
		||||
    /// Compared with recv, this function has two failure cases instead of
 | 
			
		||||
    /// one (one for disconnection, one for an empty buffer).
 | 
			
		||||
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
 | 
			
		||||
        self.chan.try_recv()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Closes the receiving half of a channel, without dropping it.
 | 
			
		||||
    ///
 | 
			
		||||
    /// This prevents any further messages from being sent on the channel while
 | 
			
		||||
 | 
			
		||||
@ -2,7 +2,7 @@ use crate::loom::cell::CausalCell;
 | 
			
		||||
use crate::loom::future::AtomicWaker;
 | 
			
		||||
use crate::loom::sync::atomic::AtomicUsize;
 | 
			
		||||
use crate::loom::sync::Arc;
 | 
			
		||||
use crate::sync::mpsc::error::ClosedError;
 | 
			
		||||
use crate::sync::mpsc::error::{ClosedError, TryRecvError};
 | 
			
		||||
use crate::sync::mpsc::{error, list};
 | 
			
		||||
 | 
			
		||||
use std::fmt;
 | 
			
		||||
@ -306,6 +306,22 @@ where
 | 
			
		||||
            }
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Receive the next value without blocking
 | 
			
		||||
    pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
 | 
			
		||||
        use super::block::Read::*;
 | 
			
		||||
        self.inner.rx_fields.with_mut(|rx_fields_ptr| {
 | 
			
		||||
            let rx_fields = unsafe { &mut *rx_fields_ptr };
 | 
			
		||||
            match rx_fields.list.pop(&self.inner.tx) {
 | 
			
		||||
                Some(Value(value)) => {
 | 
			
		||||
                    self.inner.semaphore.add_permit();
 | 
			
		||||
                    Ok(value)
 | 
			
		||||
                }
 | 
			
		||||
                Some(Closed) => Err(TryRecvError::Closed),
 | 
			
		||||
                None => Err(TryRecvError::Empty),
 | 
			
		||||
            }
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T, S> Drop for Rx<T, S>
 | 
			
		||||
 | 
			
		||||
@ -65,6 +65,35 @@ impl fmt::Display for RecvError {
 | 
			
		||||
 | 
			
		||||
impl Error for RecvError {}
 | 
			
		||||
 | 
			
		||||
// ===== TryRecvError =====
 | 
			
		||||
 | 
			
		||||
/// This enumeration is the list of the possible reasons that try_recv
 | 
			
		||||
/// could not return data when called.
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum TryRecvError {
 | 
			
		||||
    /// This channel is currently empty, but the Sender(s) have not yet
 | 
			
		||||
    /// disconnected, so data may yet become available.
 | 
			
		||||
    Empty,
 | 
			
		||||
    /// The channel's sending half has been closed, and there will
 | 
			
		||||
    /// never be any more data received on it.
 | 
			
		||||
    Closed,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl fmt::Display for TryRecvError {
 | 
			
		||||
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
 | 
			
		||||
        write!(
 | 
			
		||||
            fmt,
 | 
			
		||||
            "{}",
 | 
			
		||||
            match self {
 | 
			
		||||
                TryRecvError::Empty => "channel empty",
 | 
			
		||||
                TryRecvError::Closed => "channel closed",
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Error for TryRecvError {}
 | 
			
		||||
 | 
			
		||||
// ===== ClosedError =====
 | 
			
		||||
 | 
			
		||||
/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,6 @@
 | 
			
		||||
use crate::loom::sync::atomic::AtomicUsize;
 | 
			
		||||
use crate::sync::mpsc::chan;
 | 
			
		||||
use crate::sync::mpsc::error::SendError;
 | 
			
		||||
use crate::sync::mpsc::error::{SendError, TryRecvError};
 | 
			
		||||
 | 
			
		||||
use std::fmt;
 | 
			
		||||
use std::task::{Context, Poll};
 | 
			
		||||
@ -123,6 +123,21 @@ impl<T> UnboundedReceiver<T> {
 | 
			
		||||
        poll_fn(|cx| self.poll_recv(cx)).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Attempts to return a pending value on this receiver without blocking.
 | 
			
		||||
    ///
 | 
			
		||||
    /// This method will never block the caller in order to wait for data to
 | 
			
		||||
    /// become available. Instead, this will always return immediately with
 | 
			
		||||
    /// a possible option of pending data on the channel.
 | 
			
		||||
    ///
 | 
			
		||||
    /// This is useful for a flavor of "optimistic check" before deciding to
 | 
			
		||||
    /// block on a receiver.
 | 
			
		||||
    ///
 | 
			
		||||
    /// Compared with recv, this function has two failure cases instead of
 | 
			
		||||
    /// one (one for disconnection, one for an empty buffer).
 | 
			
		||||
    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
 | 
			
		||||
        self.chan.try_recv()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Closes the receiving half of a channel, without dropping it.
 | 
			
		||||
    ///
 | 
			
		||||
    /// This prevents any further messages from being sent on the channel while
 | 
			
		||||
 | 
			
		||||
@ -38,6 +38,7 @@ use crate::future::poll_fn;
 | 
			
		||||
use crate::sync::semaphore;
 | 
			
		||||
 | 
			
		||||
use std::cell::UnsafeCell;
 | 
			
		||||
use std::error::Error;
 | 
			
		||||
use std::fmt;
 | 
			
		||||
use std::ops::{Deref, DerefMut};
 | 
			
		||||
 | 
			
		||||
@ -73,6 +74,30 @@ unsafe impl<T> Send for Mutex<T> where T: Send {}
 | 
			
		||||
unsafe impl<T> Sync for Mutex<T> where T: Send {}
 | 
			
		||||
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
 | 
			
		||||
 | 
			
		||||
/// An enumeration of possible errors associated with a `TryLockResult`
 | 
			
		||||
/// which can occur while trying to aquire a lock from the `try_lock`
 | 
			
		||||
/// method on a `Mutex`.
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum TryLockError {
 | 
			
		||||
    /// The lock could not be acquired at this time because the operation
 | 
			
		||||
    /// would otherwise block.
 | 
			
		||||
    WouldBlock,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl fmt::Display for TryLockError {
 | 
			
		||||
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
 | 
			
		||||
        write!(
 | 
			
		||||
            fmt,
 | 
			
		||||
            "{}",
 | 
			
		||||
            match self {
 | 
			
		||||
                TryLockError::WouldBlock => "operation would block"
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Error for TryLockError {}
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
#[cfg(not(loom))]
 | 
			
		||||
fn bounds() {
 | 
			
		||||
@ -104,6 +129,15 @@ impl<T> Mutex<T> {
 | 
			
		||||
            });
 | 
			
		||||
        guard
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Try to aquire the lock
 | 
			
		||||
    pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
 | 
			
		||||
        let mut permit = semaphore::Permit::new();
 | 
			
		||||
        match permit.try_acquire(&self.s) {
 | 
			
		||||
            Ok(_) => Ok(MutexGuard { lock: self, permit }),
 | 
			
		||||
            Err(_) => Err(TryLockError::WouldBlock),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a, T> Drop for MutexGuard<'a, T> {
 | 
			
		||||
 | 
			
		||||
@ -2,7 +2,7 @@
 | 
			
		||||
#![cfg(feature = "full")]
 | 
			
		||||
 | 
			
		||||
use tokio::sync::mpsc;
 | 
			
		||||
use tokio::sync::mpsc::error::TrySendError;
 | 
			
		||||
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
 | 
			
		||||
use tokio_test::task;
 | 
			
		||||
use tokio_test::{
 | 
			
		||||
    assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
 | 
			
		||||
@ -413,3 +413,41 @@ fn unconsumed_messages_are_dropped() {
 | 
			
		||||
 | 
			
		||||
    assert_eq!(1, Arc::strong_count(&msg));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
fn try_recv() {
 | 
			
		||||
    let (mut tx, mut rx) = mpsc::channel(1);
 | 
			
		||||
    match rx.try_recv() {
 | 
			
		||||
        Err(TryRecvError::Empty) => {}
 | 
			
		||||
        _ => panic!(),
 | 
			
		||||
    }
 | 
			
		||||
    tx.try_send(42).unwrap();
 | 
			
		||||
    match rx.try_recv() {
 | 
			
		||||
        Ok(42) => {}
 | 
			
		||||
        _ => panic!(),
 | 
			
		||||
    }
 | 
			
		||||
    drop(tx);
 | 
			
		||||
    match rx.try_recv() {
 | 
			
		||||
        Err(TryRecvError::Closed) => {}
 | 
			
		||||
        _ => panic!(),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
fn try_recv_unbounded() {
 | 
			
		||||
    let (tx, mut rx) = mpsc::unbounded_channel();
 | 
			
		||||
    match rx.try_recv() {
 | 
			
		||||
        Err(TryRecvError::Empty) => {}
 | 
			
		||||
        _ => panic!(),
 | 
			
		||||
    }
 | 
			
		||||
    tx.send(42).unwrap();
 | 
			
		||||
    match rx.try_recv() {
 | 
			
		||||
        Ok(42) => {}
 | 
			
		||||
        _ => panic!(),
 | 
			
		||||
    }
 | 
			
		||||
    drop(tx);
 | 
			
		||||
    match rx.try_recv() {
 | 
			
		||||
        Err(TryRecvError::Closed) => {}
 | 
			
		||||
        _ => panic!(),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -134,3 +134,16 @@ async fn aborted_future_2() {
 | 
			
		||||
    .await
 | 
			
		||||
    .expect("Mutex is locked");
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
fn try_lock() {
 | 
			
		||||
    let m: Mutex<usize> = Mutex::new(0);
 | 
			
		||||
    {
 | 
			
		||||
        let g1 = m.try_lock();
 | 
			
		||||
        assert_eq!(g1.is_ok(), true);
 | 
			
		||||
        let g2 = m.try_lock();
 | 
			
		||||
        assert_eq!(g2.is_ok(), false);
 | 
			
		||||
    }
 | 
			
		||||
    let g3 = m.try_lock();
 | 
			
		||||
    assert_eq!(g3.is_ok(), true);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user