mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
tokio-sync: Add async mutual exclusion primitive (#964)
This PR introduces `Lock`: A concurrency primitive built on top of `Semaphore` that provides a `Mutex`-like primitive that interacts nicely with futures. Specifically, `LockGuard` (in contrast to `MutexGuard`) does _not_ borrow the `Lock`, and can thus be passed into a future where it will later be unlocked. This replaces #958, which attempted to introduce a less generic version. The primitive proposed there will instead live in [`async-lease`](https://github.com/jonhoo/async-lease).
This commit is contained in:
parent
7e51ab05e9
commit
cf06621998
@ -66,6 +66,7 @@ jobs:
|
||||
- timer
|
||||
- udp
|
||||
- uds
|
||||
- sync
|
||||
tokio-buf:
|
||||
- util
|
||||
|
||||
|
@ -24,6 +24,6 @@ futures = "0.1.19"
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = { version = "0.5", default-features = false }
|
||||
tokio = "0.1.15"
|
||||
tokio = { version = "0.1.15", path = "../tokio" }
|
||||
tokio-mock-task = "0.1.1"
|
||||
loom = { version = "0.1.1", features = ["futures"] }
|
||||
|
@ -24,6 +24,7 @@ macro_rules! if_fuzz {
|
||||
}}
|
||||
}
|
||||
|
||||
pub mod lock;
|
||||
mod loom;
|
||||
pub mod mpsc;
|
||||
pub mod oneshot;
|
||||
|
182
tokio-sync/src/lock.rs
Normal file
182
tokio-sync/src/lock.rs
Normal file
@ -0,0 +1,182 @@
|
||||
//! An asynchronous `Mutex`-like type.
|
||||
//!
|
||||
//! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one
|
||||
//! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the
|
||||
//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
|
||||
//! release it at some later point in time.
|
||||
//!
|
||||
//! This allows you to do something along the lines of:
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! # #[macro_use]
|
||||
//! # extern crate futures;
|
||||
//! # extern crate tokio;
|
||||
//! # use futures::{future, Poll, Async, Future, Stream};
|
||||
//! use tokio::sync::lock::{Lock, LockGuard};
|
||||
//! struct MyType<S> {
|
||||
//! lock: Lock<S>,
|
||||
//! }
|
||||
//!
|
||||
//! impl<S> Future for MyType<S>
|
||||
//! where S: Stream<Item = u32> + Send + 'static
|
||||
//! {
|
||||
//! type Item = ();
|
||||
//! type Error = ();
|
||||
//!
|
||||
//! fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
//! match self.lock.poll_lock() {
|
||||
//! Async::Ready(mut guard) => {
|
||||
//! tokio::spawn(future::poll_fn(move || {
|
||||
//! let item = try_ready!(guard.poll().map_err(|_| ()));
|
||||
//! println!("item = {:?}", item);
|
||||
//! Ok(().into())
|
||||
//! }));
|
||||
//! Ok(().into())
|
||||
//! },
|
||||
//! Async::NotReady => Ok(Async::NotReady)
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//! # fn main() {}
|
||||
//! ```
|
||||
//!
|
||||
//! [`Lock`]: struct.Lock.html
|
||||
//! [`LockGuard`]: struct.LockGuard.html
|
||||
|
||||
use futures::Async;
|
||||
use semaphore;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::fmt;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// An asynchronous mutual exclusion primitive useful for protecting shared data
|
||||
///
|
||||
/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
|
||||
/// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that
|
||||
/// the data is only ever accessed when the mutex is locked.
|
||||
#[derive(Debug)]
|
||||
pub struct Lock<T> {
|
||||
inner: Arc<State<T>>,
|
||||
permit: semaphore::Permit,
|
||||
}
|
||||
|
||||
/// A handle to a held `Lock`.
|
||||
///
|
||||
/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
|
||||
/// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes
|
||||
/// away, the guard remains valid.
|
||||
///
|
||||
/// The lock is automatically released whenever the guard is dropped, at which point `poll_lock`
|
||||
/// will succeed yet again.
|
||||
#[derive(Debug)]
|
||||
pub struct LockGuard<T>(Lock<T>);
|
||||
|
||||
// As long as T: Send, it's fine to send Lock<T> to other threads.
|
||||
// If T was not Send, sending a Lock<T> would be bad, since you can access T through Lock<T>.
|
||||
unsafe impl<T> Send for Lock<T> where T: Send {}
|
||||
unsafe impl<T> Sync for LockGuard<T> where T: Send + Sync {}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct State<T> {
|
||||
c: UnsafeCell<T>,
|
||||
s: semaphore::Semaphore,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounds() {
|
||||
fn check<T: Send>() {}
|
||||
check::<LockGuard<u32>>();
|
||||
}
|
||||
|
||||
impl<T> Lock<T> {
|
||||
/// Creates a new lock in an unlocked state ready for use.
|
||||
pub fn new(t: T) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(State {
|
||||
c: UnsafeCell::new(t),
|
||||
s: semaphore::Semaphore::new(1),
|
||||
}),
|
||||
permit: semaphore::Permit::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to acquire the lock.
|
||||
///
|
||||
/// If the lock is already held, the current task is notified when it is released.
|
||||
pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
|
||||
if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
|
||||
// The semaphore was closed. but, we never explicitly close it, and we have a
|
||||
// handle to it through the Arc, which means that this can never happen.
|
||||
unreachable!()
|
||||
}) {
|
||||
return Async::NotReady;
|
||||
}
|
||||
|
||||
// We want to move the acquired permit into the guard,
|
||||
// and leave an unacquired one in self.
|
||||
let acquired = Self {
|
||||
inner: self.inner.clone(),
|
||||
permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
|
||||
};
|
||||
Async::Ready(LockGuard(acquired))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for LockGuard<T> {
|
||||
fn drop(&mut self) {
|
||||
if self.0.permit.is_acquired() {
|
||||
self.0.permit.release(&self.0.inner.s);
|
||||
} else if ::std::thread::panicking() {
|
||||
// A guard _should_ always hold its permit, but if the thread is already panicking,
|
||||
// we don't want to generate a panic-while-panicing, since that's just unhelpful!
|
||||
} else {
|
||||
unreachable!("Permit not held when LockGuard was dropped")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for Lock<T> {
|
||||
fn from(s: T) -> Self {
|
||||
Self::new(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Lock<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone(),
|
||||
permit: semaphore::Permit::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for Lock<T>
|
||||
where
|
||||
T: Default,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::new(T::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for LockGuard<T> {
|
||||
type Target = T;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
assert!(self.0.permit.is_acquired());
|
||||
unsafe { &*self.0.inner.c.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for LockGuard<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
assert!(self.0.permit.is_acquired());
|
||||
unsafe { &mut *self.0.inner.c.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: fmt::Display> fmt::Display for LockGuard<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Display::fmt(&**self, f)
|
||||
}
|
||||
}
|
68
tokio-sync/tests/lock.rs
Normal file
68
tokio-sync/tests/lock.rs
Normal file
@ -0,0 +1,68 @@
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate futures;
|
||||
extern crate tokio_mock_task;
|
||||
extern crate tokio_sync;
|
||||
|
||||
use tokio_mock_task::*;
|
||||
use tokio_sync::lock::Lock;
|
||||
|
||||
macro_rules! assert_ready {
|
||||
($e:expr) => {{
|
||||
match $e {
|
||||
futures::Async::Ready(v) => v,
|
||||
futures::Async::NotReady => panic!("not ready"),
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! assert_not_ready {
|
||||
($e:expr) => {{
|
||||
match $e {
|
||||
futures::Async::NotReady => {}
|
||||
futures::Async::Ready(v) => panic!("ready; value = {:?}", v),
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn straight_execution() {
|
||||
let mut l = Lock::new(100);
|
||||
|
||||
// We can immediately acquire the lock and take the value
|
||||
let mut g = assert_ready!(l.poll_lock());
|
||||
assert_eq!(&*g, &100);
|
||||
*g = 99;
|
||||
drop(g);
|
||||
|
||||
let mut g = assert_ready!(l.poll_lock());
|
||||
assert_eq!(&*g, &99);
|
||||
*g = 98;
|
||||
drop(g);
|
||||
|
||||
let mut g = assert_ready!(l.poll_lock());
|
||||
assert_eq!(&*g, &98);
|
||||
|
||||
// We can continue to access the guard even if the lock is dropped
|
||||
drop(l);
|
||||
*g = 97;
|
||||
assert_eq!(&*g, &97);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn readiness() {
|
||||
let mut task = MockTask::new();
|
||||
|
||||
let mut l = Lock::new(100);
|
||||
let g = assert_ready!(l.poll_lock());
|
||||
|
||||
// We can't now acquire the lease since it's already held in g
|
||||
task.enter(|| {
|
||||
assert_not_ready!(l.poll_lock());
|
||||
});
|
||||
|
||||
// But once g unlocks, we can acquire it
|
||||
drop(g);
|
||||
assert!(task.is_notified());
|
||||
assert_ready!(l.poll_lock());
|
||||
}
|
@ -73,7 +73,7 @@ tokio-fs = { version = "0.1.6", optional = true }
|
||||
tokio-io = { version = "0.1.6", optional = true }
|
||||
tokio-executor = { version = "0.1.7", optional = true }
|
||||
tokio-reactor = { version = "0.1.1", optional = true }
|
||||
tokio-sync = { version = "0.1.3", optional = true }
|
||||
tokio-sync = { version = "0.1.3", optional = true, path = "../tokio-sync" }
|
||||
tokio-threadpool = { version = "0.1.13", optional = true }
|
||||
tokio-tcp = { version = "0.1.0", optional = true }
|
||||
tokio-udp = { version = "0.1.0", optional = true }
|
||||
|
@ -9,7 +9,8 @@
|
||||
//! from one task to another.
|
||||
//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for
|
||||
//! sending values between tasks.
|
||||
//! - [lock](lock/index.html), an asynchronous `Mutex`-like type.
|
||||
//! - [watch](watch/index.html), a single-producer, multi-consumer channel that
|
||||
//! only stores the **most recently** sent value.
|
||||
|
||||
pub use tokio_sync::{mpsc, oneshot, watch};
|
||||
pub use tokio_sync::{lock, mpsc, oneshot, watch};
|
||||
|
Loading…
x
Reference in New Issue
Block a user