sync: add Semaphore (#1973)

Provide an asynchronous Semaphore implementation. This is useful for
synchronizing concurrent access to a shared resource.
This commit is contained in:
Michael P. Jung 2019-12-18 07:32:12 +01:00 committed by Carl Lerche
parent e5b99b0f7a
commit 9211adbe01
10 changed files with 1225 additions and 1051 deletions

View File

@ -26,7 +26,9 @@ cfg_sync! {
pub mod oneshot; pub mod oneshot;
pub(crate) mod semaphore; pub(crate) mod semaphore_ll;
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit};
mod task; mod task;
pub(crate) use task::AtomicWaker; pub(crate) use task::AtomicWaker;
@ -48,7 +50,7 @@ cfg_not_sync! {
cfg_signal! { cfg_signal! {
pub(crate) mod mpsc; pub(crate) mod mpsc;
pub(crate) mod semaphore; pub(crate) mod semaphore_ll;
} }
} }

View File

@ -1,6 +1,6 @@
use crate::sync::mpsc::chan; use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
use crate::sync::semaphore; use crate::sync::semaphore_ll as semaphore;
use std::fmt; use std::fmt;
use std::task::{Context, Poll}; use std::task::{Context, Poll};

View File

@ -382,7 +382,7 @@ impl<T, S> Drop for Chan<T, S> {
} }
} }
use crate::sync::semaphore::TryAcquireError; use crate::sync::semaphore_ll::TryAcquireError;
impl From<TryAcquireError> for TrySendError { impl From<TryAcquireError> for TrySendError {
fn from(src: TryAcquireError) -> TrySendError { fn from(src: TryAcquireError) -> TrySendError {
@ -398,9 +398,9 @@ impl From<TryAcquireError> for TrySendError {
// ===== impl Semaphore for (::Semaphore, capacity) ===== // ===== impl Semaphore for (::Semaphore, capacity) =====
use crate::sync::semaphore::Permit; use crate::sync::semaphore_ll::Permit;
impl Semaphore for (crate::sync::semaphore::Semaphore, usize) { impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
type Permit = Permit; type Permit = Permit;
fn new_permit() -> Permit { fn new_permit() -> Permit {

View File

@ -35,7 +35,7 @@
//! [`MutexGuard`]: struct.MutexGuard.html //! [`MutexGuard`]: struct.MutexGuard.html
use crate::future::poll_fn; use crate::future::poll_fn;
use crate::sync::semaphore; use crate::sync::semaphore_ll as semaphore;
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::error::Error; use std::error::Error;
@ -74,7 +74,7 @@ unsafe impl<T> Sync for Mutex<T> where T: Send {}
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {} unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
/// An enumeration of possible errors associated with a `TryLockResult` /// An enumeration of possible errors associated with a `TryLockResult`
/// which can occur while trying to aquire a lock from the `try_lock` /// which can occur while trying to acquire a lock from the `try_lock`
/// method on a `Mutex`. /// method on a `Mutex`.
#[derive(Debug)] #[derive(Debug)]
pub enum TryLockError { pub enum TryLockError {
@ -129,7 +129,7 @@ impl<T> Mutex<T> {
guard guard
} }
/// Try to aquire the lock /// Try to acquire the lock
pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> { pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, TryLockError> {
let mut permit = semaphore::Permit::new(); let mut permit = semaphore::Permit::new();
match permit.try_acquire(&self.s) { match permit.try_acquire(&self.s) {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
use crate::sync::semaphore::*; use crate::sync::semaphore_ll::*;
use futures::future::poll_fn; use futures::future::poll_fn;
use loom::future::block_on; use loom::future::block_on;

View File

@ -1,6 +1,6 @@
cfg_not_loom! { cfg_not_loom! {
mod atomic_waker; mod atomic_waker;
mod semaphore; mod semaphore_ll;
} }
cfg_loom! { cfg_loom! {
@ -8,5 +8,5 @@ cfg_loom! {
mod loom_list; mod loom_list;
mod loom_mpsc; mod loom_mpsc;
mod loom_oneshot; mod loom_oneshot;
mod loom_semaphore; mod loom_semaphore_ll;
} }

View File

@ -1,4 +1,4 @@
use crate::sync::semaphore::{Permit, Semaphore}; use crate::sync::semaphore_ll::{Permit, Semaphore};
use tokio_test::task; use tokio_test::task;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok}; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};

View File

@ -0,0 +1,81 @@
#![cfg(feature = "full")]
use std::sync::Arc;
use tokio::sync::Semaphore;
#[test]
fn no_permits() {
// this should not panic
Semaphore::new(0);
}
#[test]
fn try_acquire() {
let sem = Semaphore::new(1);
{
let p1 = sem.try_acquire();
assert!(p1.is_ok());
let p2 = sem.try_acquire();
assert!(p2.is_err());
}
let p3 = sem.try_acquire();
assert!(p3.is_ok());
}
#[tokio::test]
async fn acquire() {
let sem = Arc::new(Semaphore::new(1));
let p1 = sem.try_acquire().unwrap();
let sem_clone = sem.clone();
let j = tokio::spawn(async move {
let _p2 = sem_clone.acquire().await;
});
drop(p1);
j.await.unwrap();
}
#[tokio::test]
async fn add_permits() {
let sem = Arc::new(Semaphore::new(0));
let sem_clone = sem.clone();
let j = tokio::spawn(async move {
let _p2 = sem_clone.acquire().await;
});
sem.add_permits(1);
j.await.unwrap();
}
#[test]
fn forget() {
let sem = Arc::new(Semaphore::new(1));
{
let p = sem.try_acquire().unwrap();
assert_eq!(sem.available_permits(), 0);
p.forget();
assert_eq!(sem.available_permits(), 0);
}
assert_eq!(sem.available_permits(), 0);
assert!(sem.try_acquire().is_err());
}
#[tokio::test]
async fn stresstest() {
let sem = Arc::new(Semaphore::new(5));
let mut join_handles = Vec::new();
for _ in 0..1000 {
let sem_clone = sem.clone();
join_handles.push(tokio::spawn(async move {
let _p = sem_clone.acquire().await;
}));
}
for j in join_handles {
j.await.unwrap();
}
// there should be exactly 5 semaphores available now
let _p1 = sem.try_acquire().unwrap();
let _p2 = sem.try_acquire().unwrap();
let _p3 = sem.try_acquire().unwrap();
let _p4 = sem.try_acquire().unwrap();
let _p5 = sem.try_acquire().unwrap();
assert!(sem.try_acquire().is_err());
}