sync: add MappedOwnedMutexGuard (#5474)

This commit is contained in:
Alice Ryhl 2023-02-19 14:12:32 +01:00 committed by GitHub
parent eca24068f7
commit 2e0372be6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 306 additions and 5 deletions

View File

@ -449,7 +449,7 @@ cfg_sync! {
pub mod mpsc;
mod mutex;
pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard};
pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard};
pub(crate) mod notify;
pub use notify::Notify;

View File

@ -9,7 +9,7 @@ use std::error::Error;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, mem};
use std::{fmt, mem, ptr};
/// An asynchronous `Mutex`-like type.
///
@ -169,6 +169,8 @@ pub struct MutexGuard<'a, T: ?Sized> {
/// [`Arc`]: std::sync::Arc
#[clippy::has_significant_drop]
pub struct OwnedMutexGuard<T: ?Sized> {
// When changing the fields in this struct, make sure to update the
// `skip_drop` method.
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: Arc<Mutex<T>>,
@ -184,12 +186,31 @@ pub struct OwnedMutexGuard<T: ?Sized> {
pub struct MappedMutexGuard<'a, T: ?Sized> {
// When changing the fields in this struct, make sure to update the
// `skip_drop` method.
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
s: &'a semaphore::Semaphore,
data: *mut T,
// Needed to tell the borrow checker that we are holding a `&mut T`
marker: PhantomData<&'a mut T>,
}
/// A owned handle to a held `Mutex` that has had a function applied to it via
/// [`OwnedMutexGuard::map`].
///
/// This can be used to hold a subfield of the protected data.
///
/// [`OwnedMutexGuard::map`]: method@OwnedMutexGuard::map
#[clippy::has_significant_drop]
#[must_use = "if unused the Mutex will immediately unlock"]
pub struct OwnedMappedMutexGuard<T: ?Sized, U: ?Sized = T> {
// When changing the fields in this struct, make sure to update the
// `skip_drop` method.
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
data: *mut U,
lock: Arc<Mutex<T>>,
}
/// A helper type used when taking apart a `MutexGuard` without running its
/// Drop implementation.
#[allow(dead_code)] // Unused fields are still used in Drop.
@ -199,14 +220,34 @@ struct MutexGuardInner<'a, T: ?Sized> {
lock: &'a Mutex<T>,
}
/// A helper type used when taking apart a `OwnedMutexGuard` without running
/// its Drop implementation.
struct OwnedMutexGuardInner<T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
lock: Arc<Mutex<T>>,
}
/// A helper type used when taking apart a `MappedMutexGuard` without running
/// its Drop implementation.
#[allow(dead_code)] // Unused fields are still used in Drop.
struct MappedMutexGuardInner<'a, T: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
s: &'a semaphore::Semaphore,
data: *mut T,
}
/// A helper type used when taking apart a `OwnedMappedMutexGuard` without running
/// its Drop implementation.
#[allow(dead_code)] // Unused fields are still used in Drop.
struct OwnedMappedMutexGuardInner<T: ?Sized, U: ?Sized> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
data: *mut U,
lock: Arc<Mutex<T>>,
}
// As long as T: Send, it's fine to send and share Mutex<T> between threads.
// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can
// access T through Mutex<T>.
@ -217,6 +258,19 @@ unsafe impl<T> Sync for OwnedMutexGuard<T> where T: ?Sized + Send + Sync {}
unsafe impl<'a, T> Sync for MappedMutexGuard<'a, T> where T: ?Sized + Sync + 'a {}
unsafe impl<'a, T> Send for MappedMutexGuard<'a, T> where T: ?Sized + Send + 'a {}
unsafe impl<T, U> Sync for OwnedMappedMutexGuard<T, U>
where
T: ?Sized + Send + Sync,
U: ?Sized + Send + Sync,
{
}
unsafe impl<T, U> Send for OwnedMappedMutexGuard<T, U>
where
T: ?Sized + Send,
U: ?Sized + Send,
{
}
/// Error returned from the [`Mutex::try_lock`], [`RwLock::try_read`] and
/// [`RwLock::try_write`] functions.
///
@ -799,6 +853,8 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
s: &inner.lock.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
@ -848,6 +904,8 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
s: &inner.lock.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
@ -880,6 +938,8 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.lock.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
@ -887,7 +947,6 @@ impl<T: ?Sized> Drop for MutexGuard<'_, T> {
locked = false,
);
});
self.lock.s.release(1);
}
}
@ -919,6 +978,116 @@ impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
// === impl OwnedMutexGuard ===
impl<T: ?Sized> OwnedMutexGuard<T> {
fn skip_drop(self) -> OwnedMutexGuardInner<T> {
let me = mem::ManuallyDrop::new(self);
// SAFETY: This duplicates the values in every field of the guard, then
// forgets the originals, so in the end no value is duplicated.
unsafe {
OwnedMutexGuardInner {
lock: ptr::read(&me.lock),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: ptr::read(&me.resource_span),
}
}
}
/// Makes a new [`OwnedMappedMutexGuard`] for a component of the locked data.
///
/// This operation cannot fail as the [`OwnedMutexGuard`] passed in already locked the mutex.
///
/// This is an associated function that needs to be used as `OwnedMutexGuard::map(...)`. A method
/// would interfere with methods of the same name on the contents of the locked data.
///
/// # Examples
///
/// ```
/// use tokio::sync::{Mutex, OwnedMutexGuard};
/// use std::sync::Arc;
///
/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// struct Foo(u32);
///
/// # #[tokio::main]
/// # async fn main() {
/// let foo = Arc::new(Mutex::new(Foo(1)));
///
/// {
/// let mut mapped = OwnedMutexGuard::map(foo.clone().lock_owned().await, |f| &mut f.0);
/// *mapped = 2;
/// }
///
/// assert_eq!(Foo(2), *foo.lock().await);
/// # }
/// ```
///
/// [`OwnedMutexGuard`]: struct@OwnedMutexGuard
/// [`OwnedMappedMutexGuard`]: struct@OwnedMappedMutexGuard
#[inline]
pub fn map<U, F>(mut this: Self, f: F) -> OwnedMappedMutexGuard<T, U>
where
F: FnOnce(&mut T) -> &mut U,
{
let data = f(&mut *this) as *mut U;
let inner = this.skip_drop();
OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
/// Attempts to make a new [`OwnedMappedMutexGuard`] for a component of the locked data. The
/// original guard is returned if the closure returns `None`.
///
/// This operation cannot fail as the [`OwnedMutexGuard`] passed in already locked the mutex.
///
/// This is an associated function that needs to be used as `OwnedMutexGuard::try_map(...)`. A
/// method would interfere with methods of the same name on the contents of the locked data.
///
/// # Examples
///
/// ```
/// use tokio::sync::{Mutex, OwnedMutexGuard};
/// use std::sync::Arc;
///
/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// struct Foo(u32);
///
/// # #[tokio::main]
/// # async fn main() {
/// let foo = Arc::new(Mutex::new(Foo(1)));
///
/// {
/// let mut mapped = OwnedMutexGuard::try_map(foo.clone().lock_owned().await, |f| Some(&mut f.0))
/// .expect("should not fail");
/// *mapped = 2;
/// }
///
/// assert_eq!(Foo(2), *foo.lock().await);
/// # }
/// ```
///
/// [`OwnedMutexGuard`]: struct@OwnedMutexGuard
/// [`OwnedMappedMutexGuard`]: struct@OwnedMappedMutexGuard
#[inline]
pub fn try_map<U, F>(mut this: Self, f: F) -> Result<OwnedMappedMutexGuard<T, U>, Self>
where
F: FnOnce(&mut T) -> Option<&mut U>,
{
let data = match f(&mut *this) {
Some(data) => data as *mut U,
None => return Err(this),
};
let inner = this.skip_drop();
Ok(OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
/// Returns a reference to the original `Arc<Mutex>`.
///
/// ```
@ -949,6 +1118,8 @@ impl<T: ?Sized> OwnedMutexGuard<T> {
impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
fn drop(&mut self) {
self.lock.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
@ -956,7 +1127,6 @@ impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
locked = false,
);
});
self.lock.s.release(1)
}
}
@ -993,6 +1163,8 @@ impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
MappedMutexGuardInner {
s: me.s,
data: me.data,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: unsafe { std::ptr::read(&me.resource_span) },
}
}
@ -1015,6 +1187,8 @@ impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
s: inner.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
@ -1041,13 +1215,23 @@ impl<'a, T: ?Sized> MappedMutexGuard<'a, T> {
s: inner.s,
data,
marker: PhantomData,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
}
impl<'a, T: ?Sized> Drop for MappedMutexGuard<'a, T> {
fn drop(&mut self) {
self.s.release(1)
self.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = false,
);
});
}
}
@ -1075,3 +1259,111 @@ impl<'a, T: ?Sized + fmt::Display> fmt::Display for MappedMutexGuard<'a, T> {
fmt::Display::fmt(&**self, f)
}
}
// === impl OwnedMappedMutexGuard ===
impl<T: ?Sized, U: ?Sized> OwnedMappedMutexGuard<T, U> {
fn skip_drop(self) -> OwnedMappedMutexGuardInner<T, U> {
let me = mem::ManuallyDrop::new(self);
// SAFETY: This duplicates the values in every field of the guard, then
// forgets the originals, so in the end no value is duplicated.
unsafe {
OwnedMappedMutexGuardInner {
data: me.data,
lock: ptr::read(&me.lock),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: ptr::read(&me.resource_span),
}
}
}
/// Makes a new [`OwnedMappedMutexGuard`] for a component of the locked data.
///
/// This operation cannot fail as the [`OwnedMappedMutexGuard`] passed in already locked the mutex.
///
/// This is an associated function that needs to be used as `OwnedMappedMutexGuard::map(...)`. A method
/// would interfere with methods of the same name on the contents of the locked data.
///
/// [`OwnedMappedMutexGuard`]: struct@OwnedMappedMutexGuard
#[inline]
pub fn map<S, F>(mut this: Self, f: F) -> OwnedMappedMutexGuard<T, S>
where
F: FnOnce(&mut U) -> &mut S,
{
let data = f(&mut *this) as *mut S;
let inner = this.skip_drop();
OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
}
}
/// Attempts to make a new [`OwnedMappedMutexGuard`] for a component of the locked data. The
/// original guard is returned if the closure returns `None`.
///
/// This operation cannot fail as the [`OwnedMutexGuard`] passed in already locked the mutex.
///
/// This is an associated function that needs to be used as `OwnedMutexGuard::try_map(...)`. A
/// method would interfere with methods of the same name on the contents of the locked data.
///
/// [`OwnedMutexGuard`]: struct@OwnedMutexGuard
/// [`OwnedMappedMutexGuard`]: struct@OwnedMappedMutexGuard
#[inline]
pub fn try_map<S, F>(mut this: Self, f: F) -> Result<OwnedMappedMutexGuard<T, S>, Self>
where
F: FnOnce(&mut U) -> Option<&mut S>,
{
let data = match f(&mut *this) {
Some(data) => data as *mut S,
None => return Err(this),
};
let inner = this.skip_drop();
Ok(OwnedMappedMutexGuard {
data,
lock: inner.lock,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: inner.resource_span,
})
}
}
impl<T: ?Sized, U: ?Sized> Drop for OwnedMappedMutexGuard<T, U> {
fn drop(&mut self) {
self.lock.s.release(1);
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
locked = false,
);
});
}
}
impl<T: ?Sized, U: ?Sized> Deref for OwnedMappedMutexGuard<T, U> {
type Target = U;
fn deref(&self) -> &Self::Target {
unsafe { &*self.data }
}
}
impl<T: ?Sized, U: ?Sized> DerefMut for OwnedMappedMutexGuard<T, U> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.data }
}
}
impl<T: ?Sized, U: ?Sized + fmt::Debug> fmt::Debug for OwnedMappedMutexGuard<T, U> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: ?Sized, U: ?Sized + fmt::Display> fmt::Display for OwnedMappedMutexGuard<T, U> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}

View File

@ -334,6 +334,15 @@ assert_value!(tokio::sync::OnceCell<YY>: Send & Sync & Unpin);
assert_value!(tokio::sync::OwnedMutexGuard<NN>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMutexGuard<YN>: Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMutexGuard<YY>: Send & Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<NN,NN>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<NN,YN>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<NN,YY>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<YN,NN>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<YN,YN>: Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<YN,YY>: Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<YY,NN>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<YY,YN>: Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedMappedMutexGuard<YY,YY>: Send & Sync & Unpin);
assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<NN>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<YN>: !Send & !Sync & Unpin);
assert_value!(tokio::sync::OwnedRwLockMappedWriteGuard<YY>: Send & Sync & Unpin);