mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
sync: allow configuring RwLock max reads (#3644)
This commit is contained in:
parent
bf8c77bea1
commit
e89c8981f1
@ -271,6 +271,7 @@ impl Semaphore {
|
||||
Self::MAX_PERMITS
|
||||
);
|
||||
let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
|
||||
let prev = prev >> Self::PERMIT_SHIFT;
|
||||
assert!(
|
||||
prev + permits <= Self::MAX_PERMITS,
|
||||
"number of added permits ({}) would overflow MAX_PERMITS ({})",
|
||||
|
@ -20,10 +20,10 @@ pub(crate) use write_guard::RwLockWriteGuard;
|
||||
pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;
|
||||
|
||||
#[cfg(not(loom))]
|
||||
const MAX_READS: usize = 32;
|
||||
const MAX_READS: u32 = std::u32::MAX >> 3;
|
||||
|
||||
#[cfg(loom)]
|
||||
const MAX_READS: usize = 10;
|
||||
const MAX_READS: u32 = 10;
|
||||
|
||||
/// An asynchronous reader-writer lock.
|
||||
///
|
||||
@ -86,6 +86,9 @@ const MAX_READS: usize = 10;
|
||||
/// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
|
||||
#[derive(Debug)]
|
||||
pub struct RwLock<T: ?Sized> {
|
||||
// maximum number of concurrent readers
|
||||
mr: u32,
|
||||
|
||||
//semaphore to coordinate read and write access to T
|
||||
s: Semaphore,
|
||||
|
||||
@ -199,8 +202,39 @@ impl<T: ?Sized> RwLock<T> {
|
||||
T: Sized,
|
||||
{
|
||||
RwLock {
|
||||
mr: MAX_READS,
|
||||
c: UnsafeCell::new(value),
|
||||
s: Semaphore::new(MAX_READS),
|
||||
s: Semaphore::new(MAX_READS as usize),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new instance of an `RwLock<T>` which is unlocked
|
||||
/// and allows a maximum of `max_reads` concurrent readers.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::sync::RwLock;
|
||||
///
|
||||
/// let lock = RwLock::new_with_max_reads(5, 1024);
|
||||
/// ```
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `max_reads` is more than `u32::MAX >> 3`.
|
||||
pub fn new_with_max_reads(value: T, max_reads: u32) -> RwLock<T>
|
||||
where
|
||||
T: Sized,
|
||||
{
|
||||
assert!(
|
||||
max_reads <= MAX_READS,
|
||||
"a RwLock may not be created with more than {} readers",
|
||||
MAX_READS
|
||||
);
|
||||
RwLock {
|
||||
mr: max_reads,
|
||||
c: UnsafeCell::new(value),
|
||||
s: Semaphore::new(max_reads as usize),
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,8 +254,33 @@ impl<T: ?Sized> RwLock<T> {
|
||||
T: Sized,
|
||||
{
|
||||
RwLock {
|
||||
mr: MAX_READS,
|
||||
c: UnsafeCell::new(value),
|
||||
s: Semaphore::const_new(MAX_READS),
|
||||
s: Semaphore::const_new(MAX_READS as usize),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new instance of an `RwLock<T>` which is unlocked
|
||||
/// and allows a maximum of `max_reads` concurrent readers.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::sync::RwLock;
|
||||
///
|
||||
/// static LOCK: RwLock<i32> = RwLock::const_new_with_max_reads(5, 1024);
|
||||
/// ```
|
||||
#[cfg(all(feature = "parking_lot", not(all(loom, test))))]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
|
||||
pub const fn const_new_with_max_reads(value: T, mut max_reads: u32) -> RwLock<T>
|
||||
where
|
||||
T: Sized,
|
||||
{
|
||||
max_reads &= MAX_READS;
|
||||
RwLock {
|
||||
mr: max_reads,
|
||||
c: UnsafeCell::new(value),
|
||||
s: Semaphore::const_new(max_reads as usize),
|
||||
}
|
||||
}
|
||||
|
||||
@ -456,12 +515,13 @@ impl<T: ?Sized> RwLock<T> {
|
||||
///}
|
||||
/// ```
|
||||
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
|
||||
self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| {
|
||||
self.s.acquire(self.mr).await.unwrap_or_else(|_| {
|
||||
// The semaphore was closed. but, we never explicitly close it, and we have a
|
||||
// handle to it through the Arc, which means that this can never happen.
|
||||
unreachable!()
|
||||
});
|
||||
RwLockWriteGuard {
|
||||
permits_acquired: self.mr,
|
||||
s: &self.s,
|
||||
data: self.c.get(),
|
||||
marker: marker::PhantomData,
|
||||
@ -498,12 +558,13 @@ impl<T: ?Sized> RwLock<T> {
|
||||
///}
|
||||
/// ```
|
||||
pub async fn write_owned(self: Arc<Self>) -> OwnedRwLockWriteGuard<T> {
|
||||
self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| {
|
||||
self.s.acquire(self.mr).await.unwrap_or_else(|_| {
|
||||
// The semaphore was closed. but, we never explicitly close it, and we have a
|
||||
// handle to it through the Arc, which means that this can never happen.
|
||||
unreachable!()
|
||||
});
|
||||
OwnedRwLockWriteGuard {
|
||||
permits_acquired: self.mr,
|
||||
data: self.c.get(),
|
||||
lock: ManuallyDrop::new(self),
|
||||
_p: PhantomData,
|
||||
@ -534,13 +595,14 @@ impl<T: ?Sized> RwLock<T> {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
|
||||
match self.s.try_acquire(MAX_READS as u32) {
|
||||
match self.s.try_acquire(self.mr) {
|
||||
Ok(permit) => permit,
|
||||
Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
|
||||
Err(TryAcquireError::Closed) => unreachable!(),
|
||||
}
|
||||
|
||||
Ok(RwLockWriteGuard {
|
||||
permits_acquired: self.mr,
|
||||
s: &self.s,
|
||||
data: self.c.get(),
|
||||
marker: marker::PhantomData,
|
||||
@ -578,13 +640,14 @@ impl<T: ?Sized> RwLock<T> {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_write_owned(self: Arc<Self>) -> Result<OwnedRwLockWriteGuard<T>, TryLockError> {
|
||||
match self.s.try_acquire(MAX_READS as u32) {
|
||||
match self.s.try_acquire(self.mr) {
|
||||
Ok(permit) => permit,
|
||||
Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
|
||||
Err(TryAcquireError::Closed) => unreachable!(),
|
||||
}
|
||||
|
||||
Ok(OwnedRwLockWriteGuard {
|
||||
permits_acquired: self.mr,
|
||||
data: self.c.get(),
|
||||
lock: ManuallyDrop::new(self),
|
||||
_p: PhantomData,
|
||||
|
@ -16,6 +16,7 @@ use std::sync::Arc;
|
||||
/// [`write_owned`]: method@crate::sync::RwLock::write_owned
|
||||
/// [`RwLock`]: struct@crate::sync::RwLock
|
||||
pub struct OwnedRwLockWriteGuard<T: ?Sized> {
|
||||
pub(super) permits_acquired: u32,
|
||||
// ManuallyDrop allows us to destructure into this field without running the destructor.
|
||||
pub(super) lock: ManuallyDrop<Arc<RwLock<T>>>,
|
||||
pub(super) data: *mut T,
|
||||
@ -62,9 +63,11 @@ impl<T: ?Sized> OwnedRwLockWriteGuard<T> {
|
||||
{
|
||||
let data = f(&mut *this) as *mut U;
|
||||
let lock = unsafe { ManuallyDrop::take(&mut this.lock) };
|
||||
let permits_acquired = this.permits_acquired;
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
OwnedRwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
lock: ManuallyDrop::new(lock),
|
||||
data,
|
||||
_p: PhantomData,
|
||||
@ -118,10 +121,12 @@ impl<T: ?Sized> OwnedRwLockWriteGuard<T> {
|
||||
Some(data) => data as *mut U,
|
||||
None => return Err(this),
|
||||
};
|
||||
let permits_acquired = this.permits_acquired;
|
||||
let lock = unsafe { ManuallyDrop::take(&mut this.lock) };
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
Ok(OwnedRwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
lock: ManuallyDrop::new(lock),
|
||||
data,
|
||||
_p: PhantomData,
|
||||
@ -178,7 +183,7 @@ impl<T: ?Sized> OwnedRwLockWriteGuard<T> {
|
||||
let data = self.data;
|
||||
|
||||
// Release all but one of the permits held by the write guard
|
||||
lock.s.release(super::MAX_READS - 1);
|
||||
lock.s.release((self.permits_acquired - 1) as usize);
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(self);
|
||||
OwnedRwLockReadGuard {
|
||||
@ -223,7 +228,7 @@ where
|
||||
|
||||
impl<T: ?Sized> Drop for OwnedRwLockWriteGuard<T> {
|
||||
fn drop(&mut self) {
|
||||
self.lock.s.release(super::MAX_READS);
|
||||
self.lock.s.release(self.permits_acquired as usize);
|
||||
unsafe { ManuallyDrop::drop(&mut self.lock) };
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use std::sync::Arc;
|
||||
/// [mapping]: method@crate::sync::OwnedRwLockWriteGuard::map
|
||||
/// [`OwnedRwLockWriteGuard`]: struct@crate::sync::OwnedRwLockWriteGuard
|
||||
pub struct OwnedRwLockMappedWriteGuard<T: ?Sized, U: ?Sized = T> {
|
||||
pub(super) permits_acquired: u32,
|
||||
// ManuallyDrop allows us to destructure into this field without running the destructor.
|
||||
pub(super) lock: ManuallyDrop<Arc<RwLock<T>>>,
|
||||
pub(super) data: *mut U,
|
||||
@ -61,9 +62,11 @@ impl<T: ?Sized, U: ?Sized> OwnedRwLockMappedWriteGuard<T, U> {
|
||||
{
|
||||
let data = f(&mut *this) as *mut V;
|
||||
let lock = unsafe { ManuallyDrop::take(&mut this.lock) };
|
||||
let permits_acquired = this.permits_acquired;
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
OwnedRwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
lock: ManuallyDrop::new(lock),
|
||||
data,
|
||||
_p: PhantomData,
|
||||
@ -116,9 +119,11 @@ impl<T: ?Sized, U: ?Sized> OwnedRwLockMappedWriteGuard<T, U> {
|
||||
None => return Err(this),
|
||||
};
|
||||
let lock = unsafe { ManuallyDrop::take(&mut this.lock) };
|
||||
let permits_acquired = this.permits_acquired;
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
Ok(OwnedRwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
lock: ManuallyDrop::new(lock),
|
||||
data,
|
||||
_p: PhantomData,
|
||||
@ -160,7 +165,7 @@ where
|
||||
|
||||
impl<T: ?Sized, U: ?Sized> Drop for OwnedRwLockMappedWriteGuard<T, U> {
|
||||
fn drop(&mut self) {
|
||||
self.lock.s.release(super::MAX_READS);
|
||||
self.lock.s.release(self.permits_acquired as usize);
|
||||
unsafe { ManuallyDrop::drop(&mut self.lock) };
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use std::ops;
|
||||
/// [`write`]: method@crate::sync::RwLock::write
|
||||
/// [`RwLock`]: struct@crate::sync::RwLock
|
||||
pub struct RwLockWriteGuard<'a, T: ?Sized> {
|
||||
pub(super) permits_acquired: u32,
|
||||
pub(super) s: &'a Semaphore,
|
||||
pub(super) data: *mut T,
|
||||
pub(super) marker: marker::PhantomData<&'a mut T>,
|
||||
@ -64,9 +65,11 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
|
||||
{
|
||||
let data = f(&mut *this) as *mut U;
|
||||
let s = this.s;
|
||||
let permits_acquired = this.permits_acquired;
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
RwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
s,
|
||||
data,
|
||||
marker: marker::PhantomData,
|
||||
@ -125,9 +128,11 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
|
||||
None => return Err(this),
|
||||
};
|
||||
let s = this.s;
|
||||
let permits_acquired = this.permits_acquired;
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
Ok(RwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
s,
|
||||
data,
|
||||
marker: marker::PhantomData,
|
||||
@ -185,7 +190,7 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
|
||||
let RwLockWriteGuard { s, data, .. } = self;
|
||||
|
||||
// Release all but one of the permits held by the write guard
|
||||
s.release(super::MAX_READS - 1);
|
||||
s.release((self.permits_acquired - 1) as usize);
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(self);
|
||||
RwLockReadGuard {
|
||||
@ -230,6 +235,6 @@ where
|
||||
|
||||
impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> {
|
||||
fn drop(&mut self) {
|
||||
self.s.release(super::MAX_READS);
|
||||
self.s.release(self.permits_acquired as usize);
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ use std::ops;
|
||||
/// [mapping]: method@crate::sync::RwLockWriteGuard::map
|
||||
/// [`RwLockWriteGuard`]: struct@crate::sync::RwLockWriteGuard
|
||||
pub struct RwLockMappedWriteGuard<'a, T: ?Sized> {
|
||||
pub(super) permits_acquired: u32,
|
||||
pub(super) s: &'a Semaphore,
|
||||
pub(super) data: *mut T,
|
||||
pub(super) marker: marker::PhantomData<&'a mut T>,
|
||||
@ -62,9 +63,11 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> {
|
||||
{
|
||||
let data = f(&mut *this) as *mut U;
|
||||
let s = this.s;
|
||||
let permits_acquired = this.permits_acquired;
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
RwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
s,
|
||||
data,
|
||||
marker: marker::PhantomData,
|
||||
@ -122,9 +125,11 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> {
|
||||
None => return Err(this),
|
||||
};
|
||||
let s = this.s;
|
||||
let permits_acquired = this.permits_acquired;
|
||||
// NB: Forget to avoid drop impl from being called.
|
||||
mem::forget(this);
|
||||
Ok(RwLockMappedWriteGuard {
|
||||
permits_acquired,
|
||||
s,
|
||||
data,
|
||||
marker: marker::PhantomData,
|
||||
@ -166,6 +171,6 @@ where
|
||||
|
||||
impl<'a, T: ?Sized> Drop for RwLockMappedWriteGuard<'a, T> {
|
||||
fn drop(&mut self) {
|
||||
self.s.release(super::MAX_READS);
|
||||
self.s.release(self.permits_acquired as usize);
|
||||
}
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ fn read_exclusive_pending() {
|
||||
// should be made available when one of the shared acesses is dropped
|
||||
#[test]
|
||||
fn exhaust_reading() {
|
||||
let rwlock = RwLock::new(100);
|
||||
let rwlock = RwLock::new_with_max_reads(100, 1024);
|
||||
let mut reads = Vec::new();
|
||||
loop {
|
||||
let mut t = spawn(rwlock.read());
|
||||
|
Loading…
x
Reference in New Issue
Block a user