From cf0662199822c848b376db4973c7dbae30303c4e Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Thu, 18 Apr 2019 13:16:26 -0400 Subject: [PATCH] tokio-sync: Add async mutual exclusion primitive (#964) This PR introduces `Lock`: A concurrency primitive built on top of `Semaphore` that provides a `Mutex`-like primitive that interacts nicely with futures. Specifically, `LockGuard` (in contrast to `MutexGuard`) does _not_ borrow the `Lock`, and can thus be passed into a future where it will later be unlocked. This replaces #958, which attempted to introduce a less generic version. The primitive proposed there will instead live in [`async-lease`](https://github.com/jonhoo/async-lease). --- azure-pipelines.yml | 1 + tokio-sync/Cargo.toml | 2 +- tokio-sync/src/lib.rs | 1 + tokio-sync/src/lock.rs | 182 +++++++++++++++++++++++++++++++++++++++ tokio-sync/tests/lock.rs | 68 +++++++++++++++ tokio/Cargo.toml | 2 +- tokio/src/sync.rs | 3 +- 7 files changed, 256 insertions(+), 3 deletions(-) create mode 100644 tokio-sync/src/lock.rs create mode 100644 tokio-sync/tests/lock.rs diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 32b502505..73ef5c602 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -66,6 +66,7 @@ jobs: - timer - udp - uds + - sync tokio-buf: - util diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index 850b7e120..5137f17f4 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -24,6 +24,6 @@ futures = "0.1.19" [dev-dependencies] env_logger = { version = "0.5", default-features = false } -tokio = "0.1.15" +tokio = { version = "0.1.15", path = "../tokio" } tokio-mock-task = "0.1.1" loom = { version = "0.1.1", features = ["futures"] } diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs index 1ba441623..0737cea04 100644 --- a/tokio-sync/src/lib.rs +++ b/tokio-sync/src/lib.rs @@ -24,6 +24,7 @@ macro_rules! if_fuzz { }} } +pub mod lock; mod loom; pub mod mpsc; pub mod oneshot; diff --git a/tokio-sync/src/lock.rs b/tokio-sync/src/lock.rs new file mode 100644 index 000000000..b6153800f --- /dev/null +++ b/tokio-sync/src/lock.rs @@ -0,0 +1,182 @@ +//! An asynchronous `Mutex`-like type. +//! +//! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one +//! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the +//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then +//! release it at some later point in time. +//! +//! This allows you to do something along the lines of: +//! +//! ```rust,no_run +//! # #[macro_use] +//! # extern crate futures; +//! # extern crate tokio; +//! # use futures::{future, Poll, Async, Future, Stream}; +//! use tokio::sync::lock::{Lock, LockGuard}; +//! struct MyType { +//! lock: Lock, +//! } +//! +//! impl Future for MyType +//! where S: Stream + Send + 'static +//! { +//! type Item = (); +//! type Error = (); +//! +//! fn poll(&mut self) -> Poll { +//! match self.lock.poll_lock() { +//! Async::Ready(mut guard) => { +//! tokio::spawn(future::poll_fn(move || { +//! let item = try_ready!(guard.poll().map_err(|_| ())); +//! println!("item = {:?}", item); +//! Ok(().into()) +//! })); +//! Ok(().into()) +//! }, +//! Async::NotReady => Ok(Async::NotReady) +//! } +//! } +//! } +//! # fn main() {} +//! ``` +//! +//! [`Lock`]: struct.Lock.html +//! [`LockGuard`]: struct.LockGuard.html + +use futures::Async; +use semaphore; +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +/// An asynchronous mutual exclusion primitive useful for protecting shared data +/// +/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data +/// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that +/// the data is only ever accessed when the mutex is locked. +#[derive(Debug)] +pub struct Lock { + inner: Arc>, + permit: semaphore::Permit, +} + +/// A handle to a held `Lock`. +/// +/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard +/// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes +/// away, the guard remains valid. +/// +/// The lock is automatically released whenever the guard is dropped, at which point `poll_lock` +/// will succeed yet again. +#[derive(Debug)] +pub struct LockGuard(Lock); + +// As long as T: Send, it's fine to send Lock to other threads. +// If T was not Send, sending a Lock would be bad, since you can access T through Lock. +unsafe impl Send for Lock where T: Send {} +unsafe impl Sync for LockGuard where T: Send + Sync {} + +#[derive(Debug)] +struct State { + c: UnsafeCell, + s: semaphore::Semaphore, +} + +#[test] +fn bounds() { + fn check() {} + check::>(); +} + +impl Lock { + /// Creates a new lock in an unlocked state ready for use. + pub fn new(t: T) -> Self { + Self { + inner: Arc::new(State { + c: UnsafeCell::new(t), + s: semaphore::Semaphore::new(1), + }), + permit: semaphore::Permit::new(), + } + } + + /// Try to acquire the lock. + /// + /// If the lock is already held, the current task is notified when it is released. + pub fn poll_lock(&mut self) -> Async> { + if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }) { + return Async::NotReady; + } + + // We want to move the acquired permit into the guard, + // and leave an unacquired one in self. + let acquired = Self { + inner: self.inner.clone(), + permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()), + }; + Async::Ready(LockGuard(acquired)) + } +} + +impl Drop for LockGuard { + fn drop(&mut self) { + if self.0.permit.is_acquired() { + self.0.permit.release(&self.0.inner.s); + } else if ::std::thread::panicking() { + // A guard _should_ always hold its permit, but if the thread is already panicking, + // we don't want to generate a panic-while-panicing, since that's just unhelpful! + } else { + unreachable!("Permit not held when LockGuard was dropped") + } + } +} + +impl From for Lock { + fn from(s: T) -> Self { + Self::new(s) + } +} + +impl Clone for Lock { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + permit: semaphore::Permit::new(), + } + } +} + +impl Default for Lock +where + T: Default, +{ + fn default() -> Self { + Self::new(T::default()) + } +} + +impl Deref for LockGuard { + type Target = T; + fn deref(&self) -> &Self::Target { + assert!(self.0.permit.is_acquired()); + unsafe { &*self.0.inner.c.get() } + } +} + +impl DerefMut for LockGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + assert!(self.0.permit.is_acquired()); + unsafe { &mut *self.0.inner.c.get() } + } +} + +impl fmt::Display for LockGuard { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} diff --git a/tokio-sync/tests/lock.rs b/tokio-sync/tests/lock.rs new file mode 100644 index 000000000..3366d2be5 --- /dev/null +++ b/tokio-sync/tests/lock.rs @@ -0,0 +1,68 @@ +#![deny(warnings)] + +extern crate futures; +extern crate tokio_mock_task; +extern crate tokio_sync; + +use tokio_mock_task::*; +use tokio_sync::lock::Lock; + +macro_rules! assert_ready { + ($e:expr) => {{ + match $e { + futures::Async::Ready(v) => v, + futures::Async::NotReady => panic!("not ready"), + } + }}; +} + +macro_rules! assert_not_ready { + ($e:expr) => {{ + match $e { + futures::Async::NotReady => {} + futures::Async::Ready(v) => panic!("ready; value = {:?}", v), + } + }}; +} + +#[test] +fn straight_execution() { + let mut l = Lock::new(100); + + // We can immediately acquire the lock and take the value + let mut g = assert_ready!(l.poll_lock()); + assert_eq!(&*g, &100); + *g = 99; + drop(g); + + let mut g = assert_ready!(l.poll_lock()); + assert_eq!(&*g, &99); + *g = 98; + drop(g); + + let mut g = assert_ready!(l.poll_lock()); + assert_eq!(&*g, &98); + + // We can continue to access the guard even if the lock is dropped + drop(l); + *g = 97; + assert_eq!(&*g, &97); +} + +#[test] +fn readiness() { + let mut task = MockTask::new(); + + let mut l = Lock::new(100); + let g = assert_ready!(l.poll_lock()); + + // We can't now acquire the lease since it's already held in g + task.enter(|| { + assert_not_ready!(l.poll_lock()); + }); + + // But once g unlocks, we can acquire it + drop(g); + assert!(task.is_notified()); + assert_ready!(l.poll_lock()); +} diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index c2792971d..e2901c3c4 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -73,7 +73,7 @@ tokio-fs = { version = "0.1.6", optional = true } tokio-io = { version = "0.1.6", optional = true } tokio-executor = { version = "0.1.7", optional = true } tokio-reactor = { version = "0.1.1", optional = true } -tokio-sync = { version = "0.1.3", optional = true } +tokio-sync = { version = "0.1.3", optional = true, path = "../tokio-sync" } tokio-threadpool = { version = "0.1.13", optional = true } tokio-tcp = { version = "0.1.0", optional = true } tokio-udp = { version = "0.1.0", optional = true } diff --git a/tokio/src/sync.rs b/tokio/src/sync.rs index ff7438a25..c8fb75241 100644 --- a/tokio/src/sync.rs +++ b/tokio/src/sync.rs @@ -9,7 +9,8 @@ //! from one task to another. //! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for //! sending values between tasks. +//! - [lock](lock/index.html), an asynchronous `Mutex`-like type. //! - [watch](watch/index.html), a single-producer, multi-consumer channel that //! only stores the **most recently** sent value. -pub use tokio_sync::{mpsc, oneshot, watch}; +pub use tokio_sync::{lock, mpsc, oneshot, watch};