diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index df31f8df1..a05accbf4 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -26,7 +26,9 @@ cfg_sync! { pub mod oneshot; - pub(crate) mod semaphore; + pub(crate) mod semaphore_ll; + mod semaphore; + pub use semaphore::{Semaphore, SemaphorePermit}; mod task; pub(crate) use task::AtomicWaker; @@ -48,7 +50,7 @@ cfg_not_sync! { cfg_signal! { pub(crate) mod mpsc; - pub(crate) mod semaphore; + pub(crate) mod semaphore_ll; } } diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 5cca1596d..7294e4d56 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,6 +1,6 @@ use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; -use crate::sync::semaphore; +use crate::sync::semaphore_ll as semaphore; use std::fmt; use std::task::{Context, Poll}; diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index b6e94d5a0..7a15e8b3a 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -382,7 +382,7 @@ impl Drop for Chan { } } -use crate::sync::semaphore::TryAcquireError; +use crate::sync::semaphore_ll::TryAcquireError; impl From for TrySendError { fn from(src: TryAcquireError) -> TrySendError { @@ -398,9 +398,9 @@ impl From for TrySendError { // ===== impl Semaphore for (::Semaphore, capacity) ===== -use crate::sync::semaphore::Permit; +use crate::sync::semaphore_ll::Permit; -impl Semaphore for (crate::sync::semaphore::Semaphore, usize) { +impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { type Permit = Permit; fn new_permit() -> Permit { diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index bee00df41..ec59d6953 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -35,7 +35,7 @@ //! [`MutexGuard`]: struct.MutexGuard.html use crate::future::poll_fn; -use crate::sync::semaphore; +use crate::sync::semaphore_ll as semaphore; use std::cell::UnsafeCell; use std::error::Error; @@ -74,7 +74,7 @@ unsafe impl Sync for Mutex where T: Send {} unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {} /// An enumeration of possible errors associated with a `TryLockResult` -/// which can occur while trying to aquire a lock from the `try_lock` +/// which can occur while trying to acquire a lock from the `try_lock` /// method on a `Mutex`. #[derive(Debug)] pub enum TryLockError { @@ -129,7 +129,7 @@ impl Mutex { guard } - /// Try to aquire the lock + /// Try to acquire the lock pub fn try_lock(&self) -> Result, TryLockError> { let mut permit = semaphore::Permit::new(); match permit.try_acquire(&self.s) { diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index dab73a099..c1e9ef3ed 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,1070 +1,91 @@ -#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] +use super::semaphore_ll as ll; // low level implementation +use crate::future::poll_fn; -//! Thread-safe, asynchronous counting semaphore. -//! -//! A `Semaphore` instance holds a set of permits. Permits are used to -//! synchronize access to a shared resource. -//! -//! Before accessing the shared resource, callers acquire a permit from the -//! semaphore. Once the permit is acquired, the caller then enters the critical -//! section. If no permits are available, then acquiring the semaphore returns -//! `Pending`. The task is woken once a permit becomes available. - -use crate::loom::{ - cell::CausalCell, - future::AtomicWaker, - sync::atomic::{AtomicPtr, AtomicUsize}, - thread, -}; - -use std::fmt; -use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; -use std::sync::Arc; -use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; -use std::usize; - -/// Futures-aware semaphore. -pub(crate) struct Semaphore { - /// Tracks both the waiter queue tail pointer and the number of remaining - /// permits. - state: AtomicUsize, - - /// waiter queue head pointer. - head: CausalCell>, - - /// Coordinates access to the queue head. - rx_lock: AtomicUsize, - - /// Stub waiter node used as part of the MPSC channel algorithm. - stub: Box, -} - -/// A semaphore permit +/// Counting semaphore performing asynchronous permit aquisition. /// -/// Tracks the lifecycle of a semaphore permit. +/// A semaphore maintains a set of permits. Permits are used to synchronize +/// access to a shared resource. A semaphore differs from a mutex in that it +/// can allow more than one concurrent caller to access the shared resource at a +/// time. /// -/// An instance of `Permit` is intended to be used with a **single** instance of -/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore -/// instances will result in unexpected behavior. -/// -/// `Permit` does **not** release the permit back to the semaphore on drop. It -/// is the user's responsibility to ensure that `Permit::release` is called -/// before dropping the permit. +/// When `acquire` is called and the semaphore has remaining permits, the +/// function immediately returns a permit. However, if no remaining permits are +/// available, `acquire` (asynchronously) waits until an outstanding permit is +/// dropped. At this point, the freed permit is assigned to the caller. #[derive(Debug)] -pub(crate) struct Permit { - waiter: Option>, - state: PermitState, +pub struct Semaphore { + /// The low level semaphore + ll_sem: ll::Semaphore, } -/// Error returned by `Permit::poll_acquire`. +/// A permit from the semaphore +#[must_use] #[derive(Debug)] -pub(crate) struct AcquireError(()); - -/// Error returned by `Permit::try_acquire`. -#[derive(Debug)] -pub(crate) struct TryAcquireError { - kind: ErrorKind, +pub struct SemaphorePermit<'a> { + sem: &'a Semaphore, + // the low level permit + ll_permit: ll::Permit, } -#[derive(Debug)] -enum ErrorKind { - Closed, - NoPermits, -} - -/// Node used to notify the semaphore waiter when permit is available. -#[derive(Debug)] -struct WaiterNode { - /// Stores waiter state. - /// - /// See `NodeState` for more details. - state: AtomicUsize, - - /// Task to wake when a permit is made available. - waker: AtomicWaker, - - /// Next pointer in the queue of waiting senders. - next: AtomicPtr, -} - -/// Semaphore state +/// Error returned from the [`Semaphore::try_acquire`] function. /// -/// The 2 low bits track the modes. +/// A `try_acquire` operation can only fail if the semaphore has no available +/// permits. /// -/// - Closed -/// - Full -/// -/// When not full, the rest of the `usize` tracks the total number of messages -/// in the channel. When full, the rest of the `usize` is a pointer to the tail -/// of the "waiting senders" queue. -#[derive(Copy, Clone)] -struct SemState(usize); - -/// Permit state -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -enum PermitState { - /// The permit has not been requested. - Idle, - - /// Currently waiting for a permit to be made available and assigned to the - /// waiter. - Waiting, - - /// The permit has been acquired. - Acquired, -} - -/// Waiter node state -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -#[repr(usize)] -enum NodeState { - /// Not waiting for a permit and the node is not in the wait queue. - /// - /// This is the initial state. - Idle = 0, - - /// Not waiting for a permit but the node is in the wait queue. - /// - /// This happens when the waiter has previously requested a permit, but has - /// since canceled the request. The node cannot be removed by the waiter, so - /// this state informs the receiver to skip the node when it pops it from - /// the wait queue. - Queued = 1, - - /// Waiting for a permit and the node is in the wait queue. - QueuedWaiting = 2, - - /// The waiter has been assigned a permit and the node has been removed from - /// the queue. - Assigned = 3, - - /// The semaphore has been closed. No more permits will be issued. - Closed = 4, -} - -// ===== impl Semaphore ===== +/// [`Semaphore::try_acquire`]: Semaphore::try_acquire +#[derive(Debug)] +pub struct TryAcquireError(()); impl Semaphore { /// Creates a new semaphore with the initial number of permits - /// - /// # Panics - /// - /// Panics if `permits` is zero. - pub(crate) fn new(permits: usize) -> Semaphore { - let stub = Box::new(WaiterNode::new()); - let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap(); - - // Allocations are aligned - debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0); - - let state = SemState::new(permits, &stub); - - Semaphore { - state: AtomicUsize::new(state.to_usize()), - head: CausalCell::new(ptr), - rx_lock: AtomicUsize::new(0), - stub, + pub fn new(permits: usize) -> Self { + Self { + ll_sem: ll::Semaphore::new(permits), } } /// Returns the current number of available permits - pub(crate) fn available_permits(&self) -> usize { - let curr = SemState::load(&self.state, Acquire); - curr.available_permits() - } - - /// Poll for a permit - fn poll_permit( - &self, - mut permit: Option<(&mut Context<'_>, &mut Permit)>, - ) -> Poll> { - // Load the current state - let mut curr = SemState::load(&self.state, Acquire); - - // Tracks a *mut WaiterNode representing an Arc clone. - // - // This avoids having to bump the ref count unless required. - let mut maybe_strong: Option> = None; - - macro_rules! undo_strong { - () => { - if let Some(waiter) = maybe_strong { - // The waiter was cloned, but never got queued. - // Before entering `poll_permit`, the waiter was in the - // `Idle` state. We must transition the node back to the - // idle state. - let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) }; - waiter.revert_to_idle(); - } - }; - } - - loop { - let mut next = curr; - - if curr.is_closed() { - undo_strong!(); - return Ready(Err(AcquireError::closed())); - } - - if !next.acquire_permit(&self.stub) { - debug_assert!(curr.waiter().is_some()); - - if maybe_strong.is_none() { - if let Some((ref mut cx, ref mut permit)) = permit { - // Get the Sender's waiter node, or initialize one - let waiter = permit - .waiter - .get_or_insert_with(|| Arc::new(WaiterNode::new())); - - waiter.register(cx); - - if !waiter.to_queued_waiting() { - // The node is alrady queued, there is no further work - // to do. - return Pending; - } - - maybe_strong = Some(WaiterNode::into_non_null(waiter.clone())); - } else { - // If no `waiter`, then the task is not registered and there - // is no further work to do. - return Pending; - } - } - - next.set_waiter(maybe_strong.unwrap()); - } - - debug_assert_ne!(curr.0, 0); - debug_assert_ne!(next.0, 0); - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => { - match curr.waiter() { - Some(prev_waiter) => { - let waiter = maybe_strong.unwrap(); - - // Finish pushing - unsafe { - prev_waiter.as_ref().next.store(waiter.as_ptr(), Release); - } - - return Pending; - } - None => { - undo_strong!(); - - return Ready(Ok(())); - } - } - } - Err(actual) => { - curr = actual; - } - } - } - } - - /// Close the semaphore. This prevents the semaphore from issuing new - /// permits and notifies all pending waiters. - pub(crate) fn close(&self) { - // Acquire the `rx_lock`, setting the "closed" flag on the lock. - let prev = self.rx_lock.fetch_or(1, AcqRel); - - if prev != 0 { - // Another thread has the lock and will be responsible for notifying - // pending waiters. - return; - } - - self.add_permits_locked(0, true); + pub fn available_permits(&self) -> usize { + self.ll_sem.available_permits() } /// Add `n` new permits to the semaphore. - pub(crate) fn add_permits(&self, n: usize) { - if n == 0 { - return; - } - - // TODO: Handle overflow. A panic is not sufficient, the process must - // abort. - let prev = self.rx_lock.fetch_add(n << 1, AcqRel); - - if prev != 0 { - // Another thread has the lock and will be responsible for notifying - // pending waiters. - return; - } - - self.add_permits_locked(n, false); + pub fn add_permits(&self, n: usize) { + self.ll_sem.add_permits(n); } - fn add_permits_locked(&self, mut rem: usize, mut closed: bool) { - while rem > 0 || closed { - if closed { - SemState::fetch_set_closed(&self.state, AcqRel); - } - - // Release the permits and notify - self.add_permits_locked2(rem, closed); - - let n = rem << 1; - - let actual = if closed { - let actual = self.rx_lock.fetch_sub(n | 1, AcqRel); - closed = false; - actual - } else { - let actual = self.rx_lock.fetch_sub(n, AcqRel); - closed = actual & 1 == 1; - actual - }; - - rem = (actual >> 1) - rem; - } - } - - /// Release a specific amount of permits to the semaphore - /// - /// This function is called by `add_permits` after the add lock has been - /// acquired. - fn add_permits_locked2(&self, mut n: usize, closed: bool) { - while n > 0 || closed { - let waiter = match self.pop(n, closed) { - Some(waiter) => waiter, - None => { - return; - } - }; - - if waiter.notify(closed) { - n = n.saturating_sub(1); - } - } - } - - /// Pop a waiter - /// - /// `rem` represents the remaining number of times the caller will pop. If - /// there are no more waiters to pop, `rem` is used to set the available - /// permits. - fn pop(&self, rem: usize, closed: bool) -> Option> { - 'outer: loop { - unsafe { - let mut head = self.head.with(|head| *head); - let mut next_ptr = head.as_ref().next.load(Acquire); - - let stub = self.stub(); - - if head == stub { - let next = match NonNull::new(next_ptr) { - Some(next) => next, - None => { - // This loop is not part of the standard intrusive mpsc - // channel algorithm. This is where we atomically pop - // the last task and add `rem` to the remaining capacity. - // - // This modification to the pop algorithm works because, - // at this point, we have not done any work (only done - // reading). We have a *pretty* good idea that there is - // no concurrent pusher. - // - // The capacity is then atomically added by doing an - // AcqRel CAS on `state`. The `state` cell is the - // linchpin of the algorithm. - // - // By successfully CASing `head` w/ AcqRel, we ensure - // that, if any thread was racing and entered a push, we - // see that and abort pop, retrying as it is - // "inconsistent". - let mut curr = SemState::load(&self.state, Acquire); - - loop { - if curr.has_waiter(&self.stub) { - // Inconsistent - thread::yield_now(); - continue 'outer; - } - - // When closing the semaphore, nodes are popped - // with `rem == 0`. In this case, we are not - // adding permits, but notifying waiters of the - // semaphore's closed state. - if rem == 0 { - debug_assert!(curr.is_closed(), "state = {:?}", curr); - return None; - } - - let mut next = curr; - next.release_permits(rem, &self.stub); - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => return None, - Err(actual) => { - curr = actual; - } - } - } - } - }; - - self.head.with_mut(|head| *head = next); - head = next; - next_ptr = next.as_ref().next.load(Acquire); - } - - if let Some(next) = NonNull::new(next_ptr) { - self.head.with_mut(|head| *head = next); - - return Some(Arc::from_raw(head.as_ptr())); - } - - let state = SemState::load(&self.state, Acquire); - - // This must always be a pointer as the wait list is not empty. - let tail = state.waiter().unwrap(); - - if tail != head { - // Inconsistent - thread::yield_now(); - continue 'outer; - } - - self.push_stub(closed); - - next_ptr = head.as_ref().next.load(Acquire); - - if let Some(next) = NonNull::new(next_ptr) { - self.head.with_mut(|head| *head = next); - - return Some(Arc::from_raw(head.as_ptr())); - } - - // Inconsistent state, loop - thread::yield_now(); - } - } - } - - unsafe fn push_stub(&self, closed: bool) { - let stub = self.stub(); - - // Set the next pointer. This does not require an atomic operation as - // this node is not accessible. The write will be flushed with the next - // operation - stub.as_ref().next.store(ptr::null_mut(), Relaxed); - - // Update the tail to point to the new node. We need to see the previous - // node in order to update the next pointer as well as release `task` - // to any other threads calling `push`. - let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel); - - debug_assert_eq!(closed, prev.is_closed()); - - // The stub is only pushed when there are pending tasks. Because of - // this, the state must *always* be in pointer mode. - let prev = prev.waiter().unwrap(); - - // We don't want the *existing* pointer to be a stub. - debug_assert_ne!(prev, stub); - - // Release `task` to the consume end. - prev.as_ref().next.store(stub.as_ptr(), Release); - } - - fn stub(&self) -> NonNull { - unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) } - } -} - -impl fmt::Debug for Semaphore { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Semaphore") - .field("state", &SemState::load(&self.state, Relaxed)) - .field("head", &self.head.with(|ptr| ptr)) - .field("rx_lock", &self.rx_lock.load(Relaxed)) - .field("stub", &self.stub) - .finish() - } -} - -unsafe impl Send for Semaphore {} -unsafe impl Sync for Semaphore {} - -// ===== impl Permit ===== - -impl Permit { - /// Create a new `Permit`. - /// - /// The permit begins in the "unacquired" state. - pub(crate) fn new() -> Permit { - Permit { - waiter: None, - state: PermitState::Idle, - } - } - - /// Returns true if the permit has been acquired - pub(crate) fn is_acquired(&self) -> bool { - self.state == PermitState::Acquired - } - - /// Try to acquire the permit. If no permits are available, the current task - /// is notified once a new permit becomes available. - pub(crate) fn poll_acquire( - &mut self, - cx: &mut Context<'_>, - semaphore: &Semaphore, - ) -> Poll> { - match self.state { - PermitState::Idle => {} - PermitState::Waiting => { - let waiter = self.waiter.as_ref().unwrap(); - - if waiter.acquire(cx)? { - self.state = PermitState::Acquired; - return Ready(Ok(())); - } else { - return Pending; - } - } - PermitState::Acquired => { - return Ready(Ok(())); - } - } - - match semaphore.poll_permit(Some((cx, self)))? { - Ready(()) => { - self.state = PermitState::Acquired; - Ready(Ok(())) - } - Pending => { - self.state = PermitState::Waiting; - Pending - } - } - } - - /// Try to acquire the permit. - pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { - match self.state { - PermitState::Idle => {} - PermitState::Waiting => { - let waiter = self.waiter.as_ref().unwrap(); - - if waiter.acquire2().map_err(to_try_acquire)? { - self.state = PermitState::Acquired; - return Ok(()); - } else { - return Err(TryAcquireError::no_permits()); - } - } - PermitState::Acquired => { - return Ok(()); - } - } - - match semaphore.poll_permit(None).map_err(to_try_acquire)? { - Ready(()) => { - self.state = PermitState::Acquired; - Ok(()) - } - Pending => Err(TryAcquireError::no_permits()), - } - } - - /// Release a permit back to the semaphore - pub(crate) fn release(&mut self, semaphore: &Semaphore) { - if self.forget2() { - semaphore.add_permits(1); - } - } - - /// Forget the permit **without** releasing it back to the semaphore. - /// - /// After calling `forget`, `poll_acquire` is able to acquire new permit - /// from the sempahore. - /// - /// Repeatedly calling `forget` without associated calls to `add_permit` - /// will result in the semaphore losing all permits. - pub(crate) fn forget(&mut self) { - self.forget2(); - } - - /// Returns `true` if the permit was acquired - fn forget2(&mut self) -> bool { - match self.state { - PermitState::Idle => false, - PermitState::Waiting => { - let ret = self.waiter.as_ref().unwrap().cancel_interest(); - self.state = PermitState::Idle; - ret - } - PermitState::Acquired => { - self.state = PermitState::Idle; - true - } - } - } -} - -impl Default for Permit { - fn default() -> Self { - Self::new() - } -} - -// ===== impl AcquireError ==== - -impl AcquireError { - fn closed() -> AcquireError { - AcquireError(()) - } -} - -fn to_try_acquire(_: AcquireError) -> TryAcquireError { - TryAcquireError::closed() -} - -impl fmt::Display for AcquireError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "semaphore closed") - } -} - -impl ::std::error::Error for AcquireError {} - -// ===== impl TryAcquireError ===== - -impl TryAcquireError { - fn closed() -> TryAcquireError { - TryAcquireError { - kind: ErrorKind::Closed, - } - } - - fn no_permits() -> TryAcquireError { - TryAcquireError { - kind: ErrorKind::NoPermits, - } - } - - /// Returns true if the error was caused by a closed semaphore. - pub(crate) fn is_closed(&self) -> bool { - match self.kind { - ErrorKind::Closed => true, - _ => false, - } - } - - /// Returns true if the error was caused by calling `try_acquire` on a - /// semaphore with no available permits. - pub(crate) fn is_no_permits(&self) -> bool { - match self.kind { - ErrorKind::NoPermits => true, - _ => false, - } - } -} - -impl fmt::Display for TryAcquireError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = match self.kind { - ErrorKind::Closed => "semaphore closed", - ErrorKind::NoPermits => "no permits available", + /// Acquire permit from the semaphore + pub async fn acquire(&self) -> SemaphorePermit<'_> { + let mut permit = SemaphorePermit { + sem: &self, + ll_permit: ll::Permit::new(), }; - write!(fmt, "{}", descr) + poll_fn(|cx| permit.ll_permit.poll_acquire(cx, &self.ll_sem)).await.unwrap(); + permit + } + + /// Try to acquire a permit form the semaphore + pub fn try_acquire(&self) -> Result, TryAcquireError> { + let mut ll_permit = ll::Permit::new(); + match ll_permit.try_acquire(&self.ll_sem) { + Ok(_) => Ok(SemaphorePermit { sem: self, ll_permit }), + Err(_) => Err(TryAcquireError(())), + } + } } -impl ::std::error::Error for TryAcquireError {} - -// ===== impl WaiterNode ===== - -impl WaiterNode { - fn new() -> WaiterNode { - WaiterNode { - state: AtomicUsize::new(NodeState::new().to_usize()), - waker: AtomicWaker::new(), - next: AtomicPtr::new(ptr::null_mut()), - } - } - - fn acquire(&self, cx: &mut Context<'_>) -> Result { - if self.acquire2()? { - return Ok(true); - } - - self.waker.register_by_ref(cx.waker()); - - self.acquire2() - } - - fn acquire2(&self) -> Result { - use self::NodeState::*; - - match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) { - Ok(_) => Ok(true), - Err(Closed) => Err(AcquireError::closed()), - Err(_) => Ok(false), - } - } - - fn register(&self, cx: &mut Context<'_>) { - self.waker.register_by_ref(cx.waker()) - } - - /// Returns `true` if the permit has been acquired - fn cancel_interest(&self) -> bool { - use self::NodeState::*; - - match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) { - // Successfully removed interest from the queued node. The permit - // has not been assigned to the node. - Ok(_) => false, - // The semaphore has been closed, there is no further action to - // take. - Err(Closed) => false, - // The permit has been assigned. It must be acquired in order to - // be released back to the semaphore. - Err(Assigned) => { - match self.acquire2() { - Ok(true) => true, - // Not a reachable state - Ok(false) => panic!(), - // The semaphore has been closed, no further action to take. - Err(_) => false, - } - } - Err(state) => panic!("unexpected state = {:?}", state), - } - } - - /// Transition the state to `QueuedWaiting`. - /// - /// This step can only happen from `Queued` or from `Idle`. - /// - /// Returns `true` if transitioning into a queued state. - fn to_queued_waiting(&self) -> bool { - use self::NodeState::*; - - let mut curr = NodeState::load(&self.state, Acquire); - - loop { - debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr); - let next = QueuedWaiting; - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => { - if curr.is_queued() { - return false; - } else { - // Transitioned to queued, reset next pointer - self.next.store(ptr::null_mut(), Relaxed); - return true; - } - } - Err(actual) => { - curr = actual; - } - } - } - } - - /// Notify the waiter - /// - /// Returns `true` if the waiter accepts the notification - fn notify(&self, closed: bool) -> bool { - use self::NodeState::*; - - // Assume QueuedWaiting state - let mut curr = QueuedWaiting; - - loop { - let next = match curr { - Queued => Idle, - QueuedWaiting => { - if closed { - Closed - } else { - Assigned - } - } - actual => panic!("actual = {:?}", actual), - }; - - match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { - Ok(_) => match curr { - QueuedWaiting => { - self.waker.wake(); - return true; - } - _ => return false, - }, - Err(actual) => curr = actual, - } - } - } - - fn revert_to_idle(&self) { - use self::NodeState::Idle; - - // There are no other handles to the node - NodeState::store(&self.state, Idle, Relaxed); - } - - #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293 - fn into_non_null(self: Arc) -> NonNull { - let ptr = Arc::into_raw(self); - unsafe { NonNull::new_unchecked(ptr as *mut _) } +impl<'a> SemaphorePermit<'a> { + /// Forget the permit **without** releasing it back to the semaphore. + /// This can be used to reduce the amount of permits available from a + /// semaphore. + pub fn forget(mut self) { + self.ll_permit.forget(); } } -// ===== impl State ===== - -/// Flag differentiating between available permits and waiter pointers. -/// -/// If we assume pointers are properly aligned, then the least significant bit -/// will always be zero. So, we use that bit to track if the value represents a -/// number. -const NUM_FLAG: usize = 0b01; - -const CLOSED_FLAG: usize = 0b10; - -const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT; - -/// When representing "numbers", the state has to be shifted this much (to get -/// rid of the flag bit). -const NUM_SHIFT: usize = 2; - -impl SemState { - /// Returns a new default `State` value. - fn new(permits: usize, stub: &WaiterNode) -> SemState { - assert!(permits <= MAX_PERMITS); - - if permits > 0 { - SemState((permits << NUM_SHIFT) | NUM_FLAG) - } else { - SemState(stub as *const _ as usize) - } - } - - /// Returns a `State` tracking `ptr` as the tail of the queue. - fn new_ptr(tail: NonNull, closed: bool) -> SemState { - let mut val = tail.as_ptr() as usize; - - if closed { - val |= CLOSED_FLAG; - } - - SemState(val) - } - - /// Returns the amount of remaining capacity - fn available_permits(self) -> usize { - if !self.has_available_permits() { - return 0; - } - - self.0 >> NUM_SHIFT - } - - /// Returns true if the state has permits that can be claimed by a waiter. - fn has_available_permits(self) -> bool { - self.0 & NUM_FLAG == NUM_FLAG - } - - fn has_waiter(self, stub: &WaiterNode) -> bool { - !self.has_available_permits() && !self.is_stub(stub) - } - - /// Try to acquire a permit - /// - /// # Return - /// - /// Returns `true` if the permit was acquired, `false` otherwise. If `false` - /// is returned, it can be assumed that `State` represents the head pointer - /// in the mpsc channel. - fn acquire_permit(&mut self, stub: &WaiterNode) -> bool { - if !self.has_available_permits() { - return false; - } - - debug_assert!(self.waiter().is_none()); - - self.0 -= 1 << NUM_SHIFT; - - if self.0 == NUM_FLAG { - // Set the state to the stub pointer. - self.0 = stub as *const _ as usize; - } - - true - } - - /// Release permits - /// - /// Returns `true` if the permits were accepted. - fn release_permits(&mut self, permits: usize, stub: &WaiterNode) { - debug_assert!(permits > 0); - - if self.is_stub(stub) { - self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG); - return; - } - - debug_assert!(self.has_available_permits()); - - self.0 += permits << NUM_SHIFT; - } - - fn is_waiter(self) -> bool { - self.0 & NUM_FLAG == 0 - } - - /// Returns the waiter, if one is set. - fn waiter(self) -> Option> { - if self.is_waiter() { - let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored"); - - Some(waiter) - } else { - None - } - } - - /// Assumes `self` represents a pointer - fn as_ptr(self) -> *mut WaiterNode { - (self.0 & !CLOSED_FLAG) as *mut WaiterNode - } - - /// Set to a pointer to a waiter. - /// - /// This can only be done from the full state. - fn set_waiter(&mut self, waiter: NonNull) { - let waiter = waiter.as_ptr() as usize; - debug_assert!(waiter & NUM_FLAG == 0); - debug_assert!(!self.is_closed()); - - self.0 = waiter; - } - - fn is_stub(self, stub: &WaiterNode) -> bool { - self.as_ptr() as usize == stub as *const _ as usize - } - - /// Load the state from an AtomicUsize. - fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState { - let value = cell.load(ordering); - SemState(value) - } - - /// Swap the values - fn swap(self, cell: &AtomicUsize, ordering: Ordering) -> SemState { - let prev = SemState(cell.swap(self.to_usize(), ordering)); - debug_assert_eq!(prev.is_closed(), self.is_closed()); - prev - } - - /// Compare and exchange the current value into the provided cell - fn compare_exchange( - self, - cell: &AtomicUsize, - prev: SemState, - success: Ordering, - failure: Ordering, - ) -> Result { - debug_assert_eq!(prev.is_closed(), self.is_closed()); - - let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure); - - res.map(SemState).map_err(SemState) - } - - fn fetch_set_closed(cell: &AtomicUsize, ordering: Ordering) -> SemState { - let value = cell.fetch_or(CLOSED_FLAG, ordering); - SemState(value) - } - - fn is_closed(self) -> bool { - self.0 & CLOSED_FLAG == CLOSED_FLAG - } - - /// Converts the state into a `usize` representation. - fn to_usize(self) -> usize { - self.0 - } -} - -impl fmt::Debug for SemState { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut fmt = fmt.debug_struct("SemState"); - - if self.is_waiter() { - fmt.field("state", &""); - } else { - fmt.field("permits", &self.available_permits()); - } - - fmt.finish() - } -} - -// ===== impl NodeState ===== - -impl NodeState { - fn new() -> NodeState { - NodeState::Idle - } - - fn from_usize(value: usize) -> NodeState { - use self::NodeState::*; - - match value { - 0 => Idle, - 1 => Queued, - 2 => QueuedWaiting, - 3 => Assigned, - 4 => Closed, - _ => panic!(), - } - } - - fn load(cell: &AtomicUsize, ordering: Ordering) -> NodeState { - NodeState::from_usize(cell.load(ordering)) - } - - /// Store a value - fn store(cell: &AtomicUsize, value: NodeState, ordering: Ordering) { - cell.store(value.to_usize(), ordering); - } - - fn compare_exchange( - self, - cell: &AtomicUsize, - prev: NodeState, - success: Ordering, - failure: Ordering, - ) -> Result { - cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure) - .map(NodeState::from_usize) - .map_err(NodeState::from_usize) - } - - /// Returns `true` if `self` represents a queued state. - fn is_queued(self) -> bool { - use self::NodeState::*; - - match self { - Queued | QueuedWaiting => true, - _ => false, - } - } - - fn to_usize(self) -> usize { - self as usize +impl<'a> Drop for SemaphorePermit<'_> { + fn drop(&mut self) { + self.ll_permit.release(&self.sem.ll_sem); } } diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs new file mode 100644 index 000000000..0ce858386 --- /dev/null +++ b/tokio/src/sync/semaphore_ll.rs @@ -0,0 +1,1070 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + +//! Thread-safe, asynchronous counting semaphore. +//! +//! A `Semaphore` instance holds a set of permits. Permits are used to +//! synchronize access to a shared resource. +//! +//! Before accessing the shared resource, callers acquire a permit from the +//! semaphore. Once the permit is acquired, the caller then enters the critical +//! section. If no permits are available, then acquiring the semaphore returns +//! `Pending`. The task is woken once a permit becomes available. + +use crate::loom::{ + cell::CausalCell, + future::AtomicWaker, + sync::atomic::{AtomicPtr, AtomicUsize}, + thread, +}; + +use std::fmt; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; +use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; +use std::usize; + +/// Futures-aware semaphore. +pub(crate) struct Semaphore { + /// Tracks both the waiter queue tail pointer and the number of remaining + /// permits. + state: AtomicUsize, + + /// waiter queue head pointer. + head: CausalCell>, + + /// Coordinates access to the queue head. + rx_lock: AtomicUsize, + + /// Stub waiter node used as part of the MPSC channel algorithm. + stub: Box, +} + +/// A semaphore permit +/// +/// Tracks the lifecycle of a semaphore permit. +/// +/// An instance of `Permit` is intended to be used with a **single** instance of +/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore +/// instances will result in unexpected behavior. +/// +/// `Permit` does **not** release the permit back to the semaphore on drop. It +/// is the user's responsibility to ensure that `Permit::release` is called +/// before dropping the permit. +#[derive(Debug)] +pub(crate) struct Permit { + waiter: Option>, + state: PermitState, +} + +/// Error returned by `Permit::poll_acquire`. +#[derive(Debug)] +pub(crate) struct AcquireError(()); + +/// Error returned by `Permit::try_acquire`. +#[derive(Debug)] +pub(crate) struct TryAcquireError { + pub(crate) kind: ErrorKind, +} + +#[derive(Debug)] +pub(crate) enum ErrorKind { + Closed, + NoPermits, +} + +/// Node used to notify the semaphore waiter when permit is available. +#[derive(Debug)] +struct WaiterNode { + /// Stores waiter state. + /// + /// See `NodeState` for more details. + state: AtomicUsize, + + /// Task to wake when a permit is made available. + waker: AtomicWaker, + + /// Next pointer in the queue of waiting senders. + next: AtomicPtr, +} + +/// Semaphore state +/// +/// The 2 low bits track the modes. +/// +/// - Closed +/// - Full +/// +/// When not full, the rest of the `usize` tracks the total number of messages +/// in the channel. When full, the rest of the `usize` is a pointer to the tail +/// of the "waiting senders" queue. +#[derive(Copy, Clone)] +struct SemState(usize); + +/// Permit state +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +enum PermitState { + /// The permit has not been requested. + Idle, + + /// Currently waiting for a permit to be made available and assigned to the + /// waiter. + Waiting, + + /// The permit has been acquired. + Acquired, +} + +/// Waiter node state +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[repr(usize)] +enum NodeState { + /// Not waiting for a permit and the node is not in the wait queue. + /// + /// This is the initial state. + Idle = 0, + + /// Not waiting for a permit but the node is in the wait queue. + /// + /// This happens when the waiter has previously requested a permit, but has + /// since canceled the request. The node cannot be removed by the waiter, so + /// this state informs the receiver to skip the node when it pops it from + /// the wait queue. + Queued = 1, + + /// Waiting for a permit and the node is in the wait queue. + QueuedWaiting = 2, + + /// The waiter has been assigned a permit and the node has been removed from + /// the queue. + Assigned = 3, + + /// The semaphore has been closed. No more permits will be issued. + Closed = 4, +} + +// ===== impl Semaphore ===== + +impl Semaphore { + /// Creates a new semaphore with the initial number of permits + /// + /// # Panics + /// + /// Panics if `permits` is zero. + pub(crate) fn new(permits: usize) -> Semaphore { + let stub = Box::new(WaiterNode::new()); + let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap(); + + // Allocations are aligned + debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0); + + let state = SemState::new(permits, &stub); + + Semaphore { + state: AtomicUsize::new(state.to_usize()), + head: CausalCell::new(ptr), + rx_lock: AtomicUsize::new(0), + stub, + } + } + + /// Returns the current number of available permits + pub(crate) fn available_permits(&self) -> usize { + let curr = SemState::load(&self.state, Acquire); + curr.available_permits() + } + + /// Poll for a permit + fn poll_permit( + &self, + mut permit: Option<(&mut Context<'_>, &mut Permit)>, + ) -> Poll> { + // Load the current state + let mut curr = SemState::load(&self.state, Acquire); + + // Tracks a *mut WaiterNode representing an Arc clone. + // + // This avoids having to bump the ref count unless required. + let mut maybe_strong: Option> = None; + + macro_rules! undo_strong { + () => { + if let Some(waiter) = maybe_strong { + // The waiter was cloned, but never got queued. + // Before entering `poll_permit`, the waiter was in the + // `Idle` state. We must transition the node back to the + // idle state. + let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) }; + waiter.revert_to_idle(); + } + }; + } + + loop { + let mut next = curr; + + if curr.is_closed() { + undo_strong!(); + return Ready(Err(AcquireError::closed())); + } + + if !next.acquire_permit(&self.stub) { + debug_assert!(curr.waiter().is_some()); + + if maybe_strong.is_none() { + if let Some((ref mut cx, ref mut permit)) = permit { + // Get the Sender's waiter node, or initialize one + let waiter = permit + .waiter + .get_or_insert_with(|| Arc::new(WaiterNode::new())); + + waiter.register(cx); + + if !waiter.to_queued_waiting() { + // The node is alrady queued, there is no further work + // to do. + return Pending; + } + + maybe_strong = Some(WaiterNode::into_non_null(waiter.clone())); + } else { + // If no `waiter`, then the task is not registered and there + // is no further work to do. + return Pending; + } + } + + next.set_waiter(maybe_strong.unwrap()); + } + + debug_assert_ne!(curr.0, 0); + debug_assert_ne!(next.0, 0); + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => { + match curr.waiter() { + Some(prev_waiter) => { + let waiter = maybe_strong.unwrap(); + + // Finish pushing + unsafe { + prev_waiter.as_ref().next.store(waiter.as_ptr(), Release); + } + + return Pending; + } + None => { + undo_strong!(); + + return Ready(Ok(())); + } + } + } + Err(actual) => { + curr = actual; + } + } + } + } + + /// Close the semaphore. This prevents the semaphore from issuing new + /// permits and notifies all pending waiters. + pub(crate) fn close(&self) { + // Acquire the `rx_lock`, setting the "closed" flag on the lock. + let prev = self.rx_lock.fetch_or(1, AcqRel); + + if prev != 0 { + // Another thread has the lock and will be responsible for notifying + // pending waiters. + return; + } + + self.add_permits_locked(0, true); + } + + /// Add `n` new permits to the semaphore. + pub(crate) fn add_permits(&self, n: usize) { + if n == 0 { + return; + } + + // TODO: Handle overflow. A panic is not sufficient, the process must + // abort. + let prev = self.rx_lock.fetch_add(n << 1, AcqRel); + + if prev != 0 { + // Another thread has the lock and will be responsible for notifying + // pending waiters. + return; + } + + self.add_permits_locked(n, false); + } + + fn add_permits_locked(&self, mut rem: usize, mut closed: bool) { + while rem > 0 || closed { + if closed { + SemState::fetch_set_closed(&self.state, AcqRel); + } + + // Release the permits and notify + self.add_permits_locked2(rem, closed); + + let n = rem << 1; + + let actual = if closed { + let actual = self.rx_lock.fetch_sub(n | 1, AcqRel); + closed = false; + actual + } else { + let actual = self.rx_lock.fetch_sub(n, AcqRel); + closed = actual & 1 == 1; + actual + }; + + rem = (actual >> 1) - rem; + } + } + + /// Release a specific amount of permits to the semaphore + /// + /// This function is called by `add_permits` after the add lock has been + /// acquired. + fn add_permits_locked2(&self, mut n: usize, closed: bool) { + while n > 0 || closed { + let waiter = match self.pop(n, closed) { + Some(waiter) => waiter, + None => { + return; + } + }; + + if waiter.notify(closed) { + n = n.saturating_sub(1); + } + } + } + + /// Pop a waiter + /// + /// `rem` represents the remaining number of times the caller will pop. If + /// there are no more waiters to pop, `rem` is used to set the available + /// permits. + fn pop(&self, rem: usize, closed: bool) -> Option> { + 'outer: loop { + unsafe { + let mut head = self.head.with(|head| *head); + let mut next_ptr = head.as_ref().next.load(Acquire); + + let stub = self.stub(); + + if head == stub { + let next = match NonNull::new(next_ptr) { + Some(next) => next, + None => { + // This loop is not part of the standard intrusive mpsc + // channel algorithm. This is where we atomically pop + // the last task and add `rem` to the remaining capacity. + // + // This modification to the pop algorithm works because, + // at this point, we have not done any work (only done + // reading). We have a *pretty* good idea that there is + // no concurrent pusher. + // + // The capacity is then atomically added by doing an + // AcqRel CAS on `state`. The `state` cell is the + // linchpin of the algorithm. + // + // By successfully CASing `head` w/ AcqRel, we ensure + // that, if any thread was racing and entered a push, we + // see that and abort pop, retrying as it is + // "inconsistent". + let mut curr = SemState::load(&self.state, Acquire); + + loop { + if curr.has_waiter(&self.stub) { + // Inconsistent + thread::yield_now(); + continue 'outer; + } + + // When closing the semaphore, nodes are popped + // with `rem == 0`. In this case, we are not + // adding permits, but notifying waiters of the + // semaphore's closed state. + if rem == 0 { + debug_assert!(curr.is_closed(), "state = {:?}", curr); + return None; + } + + let mut next = curr; + next.release_permits(rem, &self.stub); + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => return None, + Err(actual) => { + curr = actual; + } + } + } + } + }; + + self.head.with_mut(|head| *head = next); + head = next; + next_ptr = next.as_ref().next.load(Acquire); + } + + if let Some(next) = NonNull::new(next_ptr) { + self.head.with_mut(|head| *head = next); + + return Some(Arc::from_raw(head.as_ptr())); + } + + let state = SemState::load(&self.state, Acquire); + + // This must always be a pointer as the wait list is not empty. + let tail = state.waiter().unwrap(); + + if tail != head { + // Inconsistent + thread::yield_now(); + continue 'outer; + } + + self.push_stub(closed); + + next_ptr = head.as_ref().next.load(Acquire); + + if let Some(next) = NonNull::new(next_ptr) { + self.head.with_mut(|head| *head = next); + + return Some(Arc::from_raw(head.as_ptr())); + } + + // Inconsistent state, loop + thread::yield_now(); + } + } + } + + unsafe fn push_stub(&self, closed: bool) { + let stub = self.stub(); + + // Set the next pointer. This does not require an atomic operation as + // this node is not accessible. The write will be flushed with the next + // operation + stub.as_ref().next.store(ptr::null_mut(), Relaxed); + + // Update the tail to point to the new node. We need to see the previous + // node in order to update the next pointer as well as release `task` + // to any other threads calling `push`. + let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel); + + debug_assert_eq!(closed, prev.is_closed()); + + // The stub is only pushed when there are pending tasks. Because of + // this, the state must *always* be in pointer mode. + let prev = prev.waiter().unwrap(); + + // We don't want the *existing* pointer to be a stub. + debug_assert_ne!(prev, stub); + + // Release `task` to the consume end. + prev.as_ref().next.store(stub.as_ptr(), Release); + } + + fn stub(&self) -> NonNull { + unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) } + } +} + +impl fmt::Debug for Semaphore { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Semaphore") + .field("state", &SemState::load(&self.state, Relaxed)) + .field("head", &self.head.with(|ptr| ptr)) + .field("rx_lock", &self.rx_lock.load(Relaxed)) + .field("stub", &self.stub) + .finish() + } +} + +unsafe impl Send for Semaphore {} +unsafe impl Sync for Semaphore {} + +// ===== impl Permit ===== + +impl Permit { + /// Create a new `Permit`. + /// + /// The permit begins in the "unacquired" state. + pub(crate) fn new() -> Permit { + Permit { + waiter: None, + state: PermitState::Idle, + } + } + + /// Returns true if the permit has been acquired + pub(crate) fn is_acquired(&self) -> bool { + self.state == PermitState::Acquired + } + + /// Try to acquire the permit. If no permits are available, the current task + /// is notified once a new permit becomes available. + pub(crate) fn poll_acquire( + &mut self, + cx: &mut Context<'_>, + semaphore: &Semaphore, + ) -> Poll> { + match self.state { + PermitState::Idle => {} + PermitState::Waiting => { + let waiter = self.waiter.as_ref().unwrap(); + + if waiter.acquire(cx)? { + self.state = PermitState::Acquired; + return Ready(Ok(())); + } else { + return Pending; + } + } + PermitState::Acquired => { + return Ready(Ok(())); + } + } + + match semaphore.poll_permit(Some((cx, self)))? { + Ready(()) => { + self.state = PermitState::Acquired; + Ready(Ok(())) + } + Pending => { + self.state = PermitState::Waiting; + Pending + } + } + } + + /// Try to acquire the permit. + pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { + match self.state { + PermitState::Idle => {} + PermitState::Waiting => { + let waiter = self.waiter.as_ref().unwrap(); + + if waiter.acquire2().map_err(to_try_acquire)? { + self.state = PermitState::Acquired; + return Ok(()); + } else { + return Err(TryAcquireError::no_permits()); + } + } + PermitState::Acquired => { + return Ok(()); + } + } + + match semaphore.poll_permit(None).map_err(to_try_acquire)? { + Ready(()) => { + self.state = PermitState::Acquired; + Ok(()) + } + Pending => Err(TryAcquireError::no_permits()), + } + } + + /// Release a permit back to the semaphore + pub(crate) fn release(&mut self, semaphore: &Semaphore) { + if self.forget2() { + semaphore.add_permits(1); + } + } + + /// Forget the permit **without** releasing it back to the semaphore. + /// + /// After calling `forget`, `poll_acquire` is able to acquire new permit + /// from the sempahore. + /// + /// Repeatedly calling `forget` without associated calls to `add_permit` + /// will result in the semaphore losing all permits. + pub(crate) fn forget(&mut self) { + self.forget2(); + } + + /// Returns `true` if the permit was acquired + fn forget2(&mut self) -> bool { + match self.state { + PermitState::Idle => false, + PermitState::Waiting => { + let ret = self.waiter.as_ref().unwrap().cancel_interest(); + self.state = PermitState::Idle; + ret + } + PermitState::Acquired => { + self.state = PermitState::Idle; + true + } + } + } +} + +impl Default for Permit { + fn default() -> Self { + Self::new() + } +} + +// ===== impl AcquireError ==== + +impl AcquireError { + fn closed() -> AcquireError { + AcquireError(()) + } +} + +fn to_try_acquire(_: AcquireError) -> TryAcquireError { + TryAcquireError::closed() +} + +impl fmt::Display for AcquireError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "semaphore closed") + } +} + +impl ::std::error::Error for AcquireError {} + +// ===== impl TryAcquireError ===== + +impl TryAcquireError { + fn closed() -> TryAcquireError { + TryAcquireError { + kind: ErrorKind::Closed, + } + } + + fn no_permits() -> TryAcquireError { + TryAcquireError { + kind: ErrorKind::NoPermits, + } + } + + /// Returns true if the error was caused by a closed semaphore. + pub(crate) fn is_closed(&self) -> bool { + match self.kind { + ErrorKind::Closed => true, + _ => false, + } + } + + /// Returns true if the error was caused by calling `try_acquire` on a + /// semaphore with no available permits. + pub(crate) fn is_no_permits(&self) -> bool { + match self.kind { + ErrorKind::NoPermits => true, + _ => false, + } + } +} + +impl fmt::Display for TryAcquireError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let descr = match self.kind { + ErrorKind::Closed => "semaphore closed", + ErrorKind::NoPermits => "no permits available", + }; + write!(fmt, "{}", descr) + } +} + +impl ::std::error::Error for TryAcquireError {} + +// ===== impl WaiterNode ===== + +impl WaiterNode { + fn new() -> WaiterNode { + WaiterNode { + state: AtomicUsize::new(NodeState::new().to_usize()), + waker: AtomicWaker::new(), + next: AtomicPtr::new(ptr::null_mut()), + } + } + + fn acquire(&self, cx: &mut Context<'_>) -> Result { + if self.acquire2()? { + return Ok(true); + } + + self.waker.register_by_ref(cx.waker()); + + self.acquire2() + } + + fn acquire2(&self) -> Result { + use self::NodeState::*; + + match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) { + Ok(_) => Ok(true), + Err(Closed) => Err(AcquireError::closed()), + Err(_) => Ok(false), + } + } + + fn register(&self, cx: &mut Context<'_>) { + self.waker.register_by_ref(cx.waker()) + } + + /// Returns `true` if the permit has been acquired + fn cancel_interest(&self) -> bool { + use self::NodeState::*; + + match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) { + // Successfully removed interest from the queued node. The permit + // has not been assigned to the node. + Ok(_) => false, + // The semaphore has been closed, there is no further action to + // take. + Err(Closed) => false, + // The permit has been assigned. It must be acquired in order to + // be released back to the semaphore. + Err(Assigned) => { + match self.acquire2() { + Ok(true) => true, + // Not a reachable state + Ok(false) => panic!(), + // The semaphore has been closed, no further action to take. + Err(_) => false, + } + } + Err(state) => panic!("unexpected state = {:?}", state), + } + } + + /// Transition the state to `QueuedWaiting`. + /// + /// This step can only happen from `Queued` or from `Idle`. + /// + /// Returns `true` if transitioning into a queued state. + fn to_queued_waiting(&self) -> bool { + use self::NodeState::*; + + let mut curr = NodeState::load(&self.state, Acquire); + + loop { + debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr); + let next = QueuedWaiting; + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => { + if curr.is_queued() { + return false; + } else { + // Transitioned to queued, reset next pointer + self.next.store(ptr::null_mut(), Relaxed); + return true; + } + } + Err(actual) => { + curr = actual; + } + } + } + } + + /// Notify the waiter + /// + /// Returns `true` if the waiter accepts the notification + fn notify(&self, closed: bool) -> bool { + use self::NodeState::*; + + // Assume QueuedWaiting state + let mut curr = QueuedWaiting; + + loop { + let next = match curr { + Queued => Idle, + QueuedWaiting => { + if closed { + Closed + } else { + Assigned + } + } + actual => panic!("actual = {:?}", actual), + }; + + match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { + Ok(_) => match curr { + QueuedWaiting => { + self.waker.wake(); + return true; + } + _ => return false, + }, + Err(actual) => curr = actual, + } + } + } + + fn revert_to_idle(&self) { + use self::NodeState::Idle; + + // There are no other handles to the node + NodeState::store(&self.state, Idle, Relaxed); + } + + #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293 + fn into_non_null(self: Arc) -> NonNull { + let ptr = Arc::into_raw(self); + unsafe { NonNull::new_unchecked(ptr as *mut _) } + } +} + +// ===== impl State ===== + +/// Flag differentiating between available permits and waiter pointers. +/// +/// If we assume pointers are properly aligned, then the least significant bit +/// will always be zero. So, we use that bit to track if the value represents a +/// number. +const NUM_FLAG: usize = 0b01; + +const CLOSED_FLAG: usize = 0b10; + +const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT; + +/// When representing "numbers", the state has to be shifted this much (to get +/// rid of the flag bit). +const NUM_SHIFT: usize = 2; + +impl SemState { + /// Returns a new default `State` value. + fn new(permits: usize, stub: &WaiterNode) -> SemState { + assert!(permits <= MAX_PERMITS); + + if permits > 0 { + SemState((permits << NUM_SHIFT) | NUM_FLAG) + } else { + SemState(stub as *const _ as usize) + } + } + + /// Returns a `State` tracking `ptr` as the tail of the queue. + fn new_ptr(tail: NonNull, closed: bool) -> SemState { + let mut val = tail.as_ptr() as usize; + + if closed { + val |= CLOSED_FLAG; + } + + SemState(val) + } + + /// Returns the amount of remaining capacity + fn available_permits(self) -> usize { + if !self.has_available_permits() { + return 0; + } + + self.0 >> NUM_SHIFT + } + + /// Returns true if the state has permits that can be claimed by a waiter. + fn has_available_permits(self) -> bool { + self.0 & NUM_FLAG == NUM_FLAG + } + + fn has_waiter(self, stub: &WaiterNode) -> bool { + !self.has_available_permits() && !self.is_stub(stub) + } + + /// Try to acquire a permit + /// + /// # Return + /// + /// Returns `true` if the permit was acquired, `false` otherwise. If `false` + /// is returned, it can be assumed that `State` represents the head pointer + /// in the mpsc channel. + fn acquire_permit(&mut self, stub: &WaiterNode) -> bool { + if !self.has_available_permits() { + return false; + } + + debug_assert!(self.waiter().is_none()); + + self.0 -= 1 << NUM_SHIFT; + + if self.0 == NUM_FLAG { + // Set the state to the stub pointer. + self.0 = stub as *const _ as usize; + } + + true + } + + /// Release permits + /// + /// Returns `true` if the permits were accepted. + fn release_permits(&mut self, permits: usize, stub: &WaiterNode) { + debug_assert!(permits > 0); + + if self.is_stub(stub) { + self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG); + return; + } + + debug_assert!(self.has_available_permits()); + + self.0 += permits << NUM_SHIFT; + } + + fn is_waiter(self) -> bool { + self.0 & NUM_FLAG == 0 + } + + /// Returns the waiter, if one is set. + fn waiter(self) -> Option> { + if self.is_waiter() { + let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored"); + + Some(waiter) + } else { + None + } + } + + /// Assumes `self` represents a pointer + fn as_ptr(self) -> *mut WaiterNode { + (self.0 & !CLOSED_FLAG) as *mut WaiterNode + } + + /// Set to a pointer to a waiter. + /// + /// This can only be done from the full state. + fn set_waiter(&mut self, waiter: NonNull) { + let waiter = waiter.as_ptr() as usize; + debug_assert!(waiter & NUM_FLAG == 0); + debug_assert!(!self.is_closed()); + + self.0 = waiter; + } + + fn is_stub(self, stub: &WaiterNode) -> bool { + self.as_ptr() as usize == stub as *const _ as usize + } + + /// Load the state from an AtomicUsize. + fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState { + let value = cell.load(ordering); + SemState(value) + } + + /// Swap the values + fn swap(self, cell: &AtomicUsize, ordering: Ordering) -> SemState { + let prev = SemState(cell.swap(self.to_usize(), ordering)); + debug_assert_eq!(prev.is_closed(), self.is_closed()); + prev + } + + /// Compare and exchange the current value into the provided cell + fn compare_exchange( + self, + cell: &AtomicUsize, + prev: SemState, + success: Ordering, + failure: Ordering, + ) -> Result { + debug_assert_eq!(prev.is_closed(), self.is_closed()); + + let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure); + + res.map(SemState).map_err(SemState) + } + + fn fetch_set_closed(cell: &AtomicUsize, ordering: Ordering) -> SemState { + let value = cell.fetch_or(CLOSED_FLAG, ordering); + SemState(value) + } + + fn is_closed(self) -> bool { + self.0 & CLOSED_FLAG == CLOSED_FLAG + } + + /// Converts the state into a `usize` representation. + fn to_usize(self) -> usize { + self.0 + } +} + +impl fmt::Debug for SemState { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut fmt = fmt.debug_struct("SemState"); + + if self.is_waiter() { + fmt.field("state", &""); + } else { + fmt.field("permits", &self.available_permits()); + } + + fmt.finish() + } +} + +// ===== impl NodeState ===== + +impl NodeState { + fn new() -> NodeState { + NodeState::Idle + } + + fn from_usize(value: usize) -> NodeState { + use self::NodeState::*; + + match value { + 0 => Idle, + 1 => Queued, + 2 => QueuedWaiting, + 3 => Assigned, + 4 => Closed, + _ => panic!(), + } + } + + fn load(cell: &AtomicUsize, ordering: Ordering) -> NodeState { + NodeState::from_usize(cell.load(ordering)) + } + + /// Store a value + fn store(cell: &AtomicUsize, value: NodeState, ordering: Ordering) { + cell.store(value.to_usize(), ordering); + } + + fn compare_exchange( + self, + cell: &AtomicUsize, + prev: NodeState, + success: Ordering, + failure: Ordering, + ) -> Result { + cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure) + .map(NodeState::from_usize) + .map_err(NodeState::from_usize) + } + + /// Returns `true` if `self` represents a queued state. + fn is_queued(self) -> bool { + use self::NodeState::*; + + match self { + Queued | QueuedWaiting => true, + _ => false, + } + } + + fn to_usize(self) -> usize { + self as usize + } +} diff --git a/tokio/src/sync/tests/loom_semaphore.rs b/tokio/src/sync/tests/loom_semaphore_ll.rs similarity index 98% rename from tokio/src/sync/tests/loom_semaphore.rs rename to tokio/src/sync/tests/loom_semaphore_ll.rs index 7b8de0f05..cd4314ca0 100644 --- a/tokio/src/sync/tests/loom_semaphore.rs +++ b/tokio/src/sync/tests/loom_semaphore_ll.rs @@ -1,4 +1,4 @@ -use crate::sync::semaphore::*; +use crate::sync::semaphore_ll::*; use futures::future::poll_fn; use loom::future::block_on; diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index 06d18e9a9..0b50cc952 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -1,6 +1,6 @@ cfg_not_loom! { mod atomic_waker; - mod semaphore; + mod semaphore_ll; } cfg_loom! { @@ -8,5 +8,5 @@ cfg_loom! { mod loom_list; mod loom_mpsc; mod loom_oneshot; - mod loom_semaphore; + mod loom_semaphore_ll; } diff --git a/tokio/src/sync/tests/semaphore.rs b/tokio/src/sync/tests/semaphore_ll.rs similarity index 98% rename from tokio/src/sync/tests/semaphore.rs rename to tokio/src/sync/tests/semaphore_ll.rs index 86dd7da58..8dd56c858 100644 --- a/tokio/src/sync/tests/semaphore.rs +++ b/tokio/src/sync/tests/semaphore_ll.rs @@ -1,4 +1,4 @@ -use crate::sync::semaphore::{Permit, Semaphore}; +use crate::sync::semaphore_ll::{Permit, Semaphore}; use tokio_test::task; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok}; diff --git a/tokio/tests/sync_semaphore.rs b/tokio/tests/sync_semaphore.rs new file mode 100644 index 000000000..1cb0c749d --- /dev/null +++ b/tokio/tests/sync_semaphore.rs @@ -0,0 +1,81 @@ +#![cfg(feature = "full")] + +use std::sync::Arc; +use tokio::sync::Semaphore; + +#[test] +fn no_permits() { + // this should not panic + Semaphore::new(0); +} + +#[test] +fn try_acquire() { + let sem = Semaphore::new(1); + { + let p1 = sem.try_acquire(); + assert!(p1.is_ok()); + let p2 = sem.try_acquire(); + assert!(p2.is_err()); + } + let p3 = sem.try_acquire(); + assert!(p3.is_ok()); +} + +#[tokio::test] +async fn acquire() { + let sem = Arc::new(Semaphore::new(1)); + let p1 = sem.try_acquire().unwrap(); + let sem_clone = sem.clone(); + let j = tokio::spawn(async move { + let _p2 = sem_clone.acquire().await; + }); + drop(p1); + j.await.unwrap(); +} + +#[tokio::test] +async fn add_permits() { + let sem = Arc::new(Semaphore::new(0)); + let sem_clone = sem.clone(); + let j = tokio::spawn(async move { + let _p2 = sem_clone.acquire().await; + }); + sem.add_permits(1); + j.await.unwrap(); +} + +#[test] +fn forget() { + let sem = Arc::new(Semaphore::new(1)); + { + let p = sem.try_acquire().unwrap(); + assert_eq!(sem.available_permits(), 0); + p.forget(); + assert_eq!(sem.available_permits(), 0); + } + assert_eq!(sem.available_permits(), 0); + assert!(sem.try_acquire().is_err()); +} + +#[tokio::test] +async fn stresstest() { + let sem = Arc::new(Semaphore::new(5)); + let mut join_handles = Vec::new(); + for _ in 0..1000 { + let sem_clone = sem.clone(); + join_handles.push(tokio::spawn(async move { + let _p = sem_clone.acquire().await; + })); + } + for j in join_handles { + j.await.unwrap(); + } + // there should be exactly 5 semaphores available now + let _p1 = sem.try_acquire().unwrap(); + let _p2 = sem.try_acquire().unwrap(); + let _p3 = sem.try_acquire().unwrap(); + let _p4 = sem.try_acquire().unwrap(); + let _p5 = sem.try_acquire().unwrap(); + assert!(sem.try_acquire().is_err()); +}