sync: make const_new methods always available (#5885)

Since MSRV is bumped to 1.63, `Mutex::new` is now usable in const context.

Also use `assert!` in const function to ensure correctness instead of
silently truncating the value and remove cfg `tokio_no_const_mutex_new`.

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2023-07-28 21:46:21 +10:00 committed by GitHub
parent fb08591b43
commit efe3ab679a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 253 additions and 285 deletions

View File

@ -442,7 +442,6 @@ jobs:
# Run a platform without AtomicU64 and no const Mutex::new # Run a platform without AtomicU64 and no const Mutex::new
- target: armv5te-unknown-linux-gnueabi - target: armv5te-unknown-linux-gnueabi
rustflags: --cfg tokio_no_const_mutex_new
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Install Rust stable - name: Install Rust stable
@ -485,7 +484,6 @@ jobs:
# Run a platform without AtomicU64 and no const Mutex::new # Run a platform without AtomicU64 and no const Mutex::new
- target: armv5te-unknown-linux-gnueabi - target: armv5te-unknown-linux-gnueabi
rustflags: --cfg tokio_no_const_mutex_new
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Install Rust stable - name: Install Rust stable
@ -568,10 +566,6 @@ jobs:
# https://github.com/tokio-rs/tokio/pull/5356 # https://github.com/tokio-rs/tokio/pull/5356
# https://github.com/tokio-rs/tokio/issues/5373 # https://github.com/tokio-rs/tokio/issues/5373
- name: Check without const_mutex_new
run: cargo hack check -p tokio --feature-powerset --depth 2 --keep-going
env:
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings --cfg tokio_no_atomic_u64 --cfg tokio_no_const_mutex_new
- name: Check with const_mutex_new - name: Check with const_mutex_new
run: cargo hack check -p tokio --feature-powerset --depth 2 --keep-going run: cargo hack check -p tokio --feature-powerset --depth 2 --keep-going
env: env:

View File

@ -13,7 +13,6 @@ impl<T> Mutex<T> {
} }
#[inline] #[inline]
#[cfg(not(tokio_no_const_mutex_new))]
pub(crate) const fn const_new(t: T) -> Mutex<T> { pub(crate) const fn const_new(t: T) -> Mutex<T> {
Mutex(sync::Mutex::new(t)) Mutex(sync::Mutex::new(t))
} }

View File

@ -541,13 +541,7 @@ macro_rules! cfg_not_has_atomic_u64 {
macro_rules! cfg_has_const_mutex_new { macro_rules! cfg_has_const_mutex_new {
($($item:item)*) => { ($($item:item)*) => {
$( $(
#[cfg(all( #[cfg(not(all(loom, test)))]
not(all(loom, test)),
any(
feature = "parking_lot",
not(tokio_no_const_mutex_new)
)
))]
$item $item
)* )*
} }
@ -556,13 +550,7 @@ macro_rules! cfg_has_const_mutex_new {
macro_rules! cfg_not_has_const_mutex_new { macro_rules! cfg_not_has_const_mutex_new {
($($item:item)*) => { ($($item:item)*) => {
$( $(
#[cfg(not(all( #[cfg(all(loom, test))]
not(all(loom, test)),
any(
feature = "parking_lot",
not(tokio_no_const_mutex_new)
)
)))]
$item $item
)* )*
} }

View File

@ -178,14 +178,9 @@ impl Semaphore {
/// Creates a new semaphore with the initial number of permits. /// Creates a new semaphore with the initial number of permits.
/// ///
/// Maximum number of permits on 32-bit platforms is `1<<29`. /// Maximum number of permits on 32-bit platforms is `1<<29`.
/// #[cfg(not(all(loom, test)))]
/// If the specified number of permits exceeds the maximum permit amount pub(crate) const fn const_new(permits: usize) -> Self {
/// Then the value will get clamped to the maximum number of permits. assert!(permits <= Self::MAX_PERMITS);
#[cfg(all(feature = "parking_lot", not(all(loom, test))))]
pub(crate) const fn const_new(mut permits: usize) -> Self {
// NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925
// currently we just clamp the permit count when it exceeds the max
permits &= Self::MAX_PERMITS;
Self { Self {
permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
@ -198,6 +193,19 @@ impl Semaphore {
} }
} }
/// Creates a new closed semaphore with 0 permits.
pub(crate) fn new_closed() -> Self {
Self {
permits: AtomicUsize::new(Self::CLOSED),
waiters: Mutex::new(Waitlist {
queue: LinkedList::new(),
closed: true,
}),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(),
}
}
/// Returns the current number of available permits. /// Returns the current number of available permits.
pub(crate) fn available_permits(&self) -> usize { pub(crate) fn available_permits(&self) -> usize {
self.permits.load(Acquire) >> Self::PERMIT_SHIFT self.permits.load(Acquire) >> Self::PERMIT_SHIFT

View File

@ -378,8 +378,7 @@ impl<T: ?Sized> Mutex<T> {
/// ///
/// static LOCK: Mutex<i32> = Mutex::const_new(5); /// static LOCK: Mutex<i32> = Mutex::const_new(5);
/// ``` /// ```
#[cfg(all(feature = "parking_lot", not(all(loom, test)),))] #[cfg(not(all(loom, test)))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new(t: T) -> Self pub const fn const_new(t: T) -> Self
where where
T: Sized, T: Sized,

View File

@ -443,8 +443,7 @@ impl Notify {
/// ///
/// static NOTIFY: Notify = Notify::const_new(); /// static NOTIFY: Notify = Notify::const_new();
/// ``` /// ```
#[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg(not(all(loom, test)))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new() -> Notify { pub const fn const_new() -> Notify {
Notify { Notify {
state: AtomicUsize::new(0), state: AtomicUsize::new(0),

View File

@ -114,12 +114,10 @@ impl<T> Drop for OnceCell<T> {
impl<T> From<T> for OnceCell<T> { impl<T> From<T> for OnceCell<T> {
fn from(value: T) -> Self { fn from(value: T) -> Self {
let semaphore = Semaphore::new(0);
semaphore.close();
OnceCell { OnceCell {
value_set: AtomicBool::new(true), value_set: AtomicBool::new(true),
value: UnsafeCell::new(MaybeUninit::new(value)), value: UnsafeCell::new(MaybeUninit::new(value)),
semaphore, semaphore: Semaphore::new_closed(),
} }
} }
} }
@ -139,6 +137,10 @@ impl<T> OnceCell<T> {
/// If the `Option` is `None`, this is equivalent to `OnceCell::new`. /// If the `Option` is `None`, this is equivalent to `OnceCell::new`.
/// ///
/// [`OnceCell::new`]: crate::sync::OnceCell::new /// [`OnceCell::new`]: crate::sync::OnceCell::new
// Once https://github.com/rust-lang/rust/issues/73255 lands
// and tokio MSRV is bumped to the rustc version with it stablised,
// we can made this function available in const context,
// by creating `Semaphore::const_new_closed`.
pub fn new_with(value: Option<T>) -> Self { pub fn new_with(value: Option<T>) -> Self {
if let Some(v) = value { if let Some(v) = value {
OnceCell::from(v) OnceCell::from(v)
@ -171,8 +173,7 @@ impl<T> OnceCell<T> {
/// assert_eq!(*result, 2); /// assert_eq!(*result, 2);
/// } /// }
/// ``` /// ```
#[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg(not(all(loom, test)))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new() -> Self { pub const fn const_new() -> Self {
OnceCell { OnceCell {
value_set: AtomicBool::new(false), value_set: AtomicBool::new(false),

View File

@ -334,8 +334,7 @@ impl<T: ?Sized> RwLock<T> {
/// ///
/// static LOCK: RwLock<i32> = RwLock::const_new(5); /// static LOCK: RwLock<i32> = RwLock::const_new(5);
/// ``` /// ```
#[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg(not(all(loom, test)))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new(value: T) -> RwLock<T> pub const fn const_new(value: T) -> RwLock<T>
where where
T: Sized, T: Sized,
@ -359,13 +358,13 @@ impl<T: ?Sized> RwLock<T> {
/// ///
/// static LOCK: RwLock<i32> = RwLock::const_with_max_readers(5, 1024); /// static LOCK: RwLock<i32> = RwLock::const_with_max_readers(5, 1024);
/// ``` /// ```
#[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg(not(all(loom, test)))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] pub const fn const_with_max_readers(value: T, max_reads: u32) -> RwLock<T>
pub const fn const_with_max_readers(value: T, mut max_reads: u32) -> RwLock<T>
where where
T: Sized, T: Sized,
{ {
max_reads &= MAX_READS; assert!(max_reads <= MAX_READS);
RwLock { RwLock {
mr: max_reads, mr: max_reads,
c: UnsafeCell::new(value), c: UnsafeCell::new(value),

View File

@ -172,20 +172,22 @@ impl Semaphore {
/// ///
/// static SEM: Semaphore = Semaphore::const_new(10); /// static SEM: Semaphore = Semaphore::const_new(10);
/// ``` /// ```
/// #[cfg(not(all(loom, test)))]
#[cfg(all(feature = "parking_lot", not(all(loom, test))))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new(permits: usize) -> Self { pub const fn const_new(permits: usize) -> Self {
#[cfg(all(tokio_unstable, feature = "tracing"))] Self {
return Self {
ll_sem: ll::Semaphore::const_new(permits), ll_sem: ll::Semaphore::const_new(permits),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(), resource_span: tracing::Span::none(),
}; }
}
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))] /// Creates a new closed semaphore with 0 permits.
return Self { pub(crate) fn new_closed() -> Self {
ll_sem: ll::Semaphore::const_new(permits), Self {
}; ll_sem: ll::Semaphore::new_closed(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(),
}
} }
/// Returns the current number of available permits. /// Returns the current number of available permits.

View File

@ -5,12 +5,7 @@ cfg_io_driver! {
#[cfg(feature = "rt")] #[cfg(feature = "rt")]
pub(crate) mod atomic_cell; pub(crate) mod atomic_cell;
#[cfg(any( #[cfg(any(feature = "rt", feature = "signal", feature = "process"))]
feature = "rt",
feature = "signal",
feature = "process",
tokio_no_const_mutex_new,
))]
pub(crate) mod once_cell; pub(crate) mod once_cell;
#[cfg(any( #[cfg(any(

View File

@ -4,7 +4,11 @@
use std::mem; use std::mem;
use std::ops::Drop; use std::ops::Drop;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio::runtime;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use tokio::sync::SetError;
use tokio::time;
#[test] #[test]
fn drop_cell() { fn drop_cell() {
@ -102,184 +106,170 @@ fn from() {
assert_eq!(*cell.get().unwrap(), 2); assert_eq!(*cell.get().unwrap(), 2);
} }
#[cfg(feature = "parking_lot")] async fn func1() -> u32 {
mod parking_lot { 5
use super::*; }
use tokio::runtime; async fn func2() -> u32 {
use tokio::sync::SetError; time::sleep(Duration::from_millis(1)).await;
use tokio::time; 10
}
use std::time::Duration;
async fn func_err() -> Result<u32, ()> {
async fn func1() -> u32 { Err(())
5 }
}
async fn func_ok() -> Result<u32, ()> {
async fn func2() -> u32 { Ok(10)
time::sleep(Duration::from_millis(1)).await; }
10
} async fn func_panic() -> u32 {
time::sleep(Duration::from_millis(1)).await;
async fn func_err() -> Result<u32, ()> { panic!();
Err(()) }
}
async fn sleep_and_set() -> u32 {
async fn func_ok() -> Result<u32, ()> { // Simulate sleep by pausing time and waiting for another thread to
Ok(10) // resume clock when calling `set`, then finding the cell being initialized
} // by this call
time::sleep(Duration::from_millis(2)).await;
async fn func_panic() -> u32 { 5
time::sleep(Duration::from_millis(1)).await; }
panic!();
} async fn advance_time_and_set(cell: &'static OnceCell<u32>, v: u32) -> Result<(), SetError<u32>> {
time::advance(Duration::from_millis(1)).await;
async fn sleep_and_set() -> u32 { cell.set(v)
// Simulate sleep by pausing time and waiting for another thread to }
// resume clock when calling `set`, then finding the cell being initialized
// by this call #[test]
time::sleep(Duration::from_millis(2)).await; fn get_or_init() {
5 let rt = runtime::Builder::new_current_thread()
} .enable_time()
.start_paused(true)
async fn advance_time_and_set( .build()
cell: &'static OnceCell<u32>, .unwrap();
v: u32,
) -> Result<(), SetError<u32>> { static ONCE: OnceCell<u32> = OnceCell::const_new();
time::advance(Duration::from_millis(1)).await;
cell.set(v) rt.block_on(async {
} let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await });
#[test]
fn get_or_init() { time::advance(Duration::from_millis(1)).await;
let rt = runtime::Builder::new_current_thread() time::resume();
.enable_time()
.start_paused(true) let result1 = handle1.await.unwrap();
.build() let result2 = handle2.await.unwrap();
.unwrap();
assert_eq!(*result1, 5);
static ONCE: OnceCell<u32> = OnceCell::const_new(); assert_eq!(*result2, 5);
});
rt.block_on(async { }
let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); #[test]
fn get_or_init_panic() {
time::advance(Duration::from_millis(1)).await; let rt = runtime::Builder::new_current_thread()
time::resume(); .enable_time()
.build()
let result1 = handle1.await.unwrap(); .unwrap();
let result2 = handle2.await.unwrap();
static ONCE: OnceCell<u32> = OnceCell::const_new();
assert_eq!(*result1, 5);
assert_eq!(*result2, 5); rt.block_on(async {
}); time::pause();
}
let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
#[test] let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await });
fn get_or_init_panic() {
let rt = runtime::Builder::new_current_thread() time::advance(Duration::from_millis(1)).await;
.enable_time()
.build() let result1 = handle1.await.unwrap();
.unwrap(); let result2 = handle2.await.unwrap();
static ONCE: OnceCell<u32> = OnceCell::const_new(); assert_eq!(*result1, 5);
assert_eq!(*result2, 5);
rt.block_on(async { });
time::pause(); }
let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); #[test]
let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); fn set_and_get() {
let rt = runtime::Builder::new_current_thread()
time::advance(Duration::from_millis(1)).await; .enable_time()
.build()
let result1 = handle1.await.unwrap(); .unwrap();
let result2 = handle2.await.unwrap();
static ONCE: OnceCell<u32> = OnceCell::const_new();
assert_eq!(*result1, 5);
assert_eq!(*result2, 5); rt.block_on(async {
}); let _ = rt.spawn(async { ONCE.set(5) }).await;
} let value = ONCE.get().unwrap();
assert_eq!(*value, 5);
#[test] });
fn set_and_get() { }
let rt = runtime::Builder::new_current_thread()
.enable_time() #[test]
.build() fn get_uninit() {
.unwrap(); static ONCE: OnceCell<u32> = OnceCell::const_new();
let uninit = ONCE.get();
static ONCE: OnceCell<u32> = OnceCell::const_new(); assert!(uninit.is_none());
}
rt.block_on(async {
let _ = rt.spawn(async { ONCE.set(5) }).await; #[test]
let value = ONCE.get().unwrap(); fn set_twice() {
assert_eq!(*value, 5); static ONCE: OnceCell<u32> = OnceCell::const_new();
});
} let first = ONCE.set(5);
assert_eq!(first, Ok(()));
#[test] let second = ONCE.set(6);
fn get_uninit() { assert!(second.err().unwrap().is_already_init_err());
static ONCE: OnceCell<u32> = OnceCell::const_new(); }
let uninit = ONCE.get();
assert!(uninit.is_none()); #[test]
} fn set_while_initializing() {
let rt = runtime::Builder::new_current_thread()
#[test] .enable_time()
fn set_twice() { .build()
static ONCE: OnceCell<u32> = OnceCell::const_new(); .unwrap();
let first = ONCE.set(5); static ONCE: OnceCell<u32> = OnceCell::const_new();
assert_eq!(first, Ok(()));
let second = ONCE.set(6); rt.block_on(async {
assert!(second.err().unwrap().is_already_init_err()); time::pause();
}
let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await });
#[test] let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await });
fn set_while_initializing() {
let rt = runtime::Builder::new_current_thread() time::advance(Duration::from_millis(2)).await;
.enable_time()
.build() let result1 = handle1.await.unwrap();
.unwrap(); let result2 = handle2.await.unwrap();
static ONCE: OnceCell<u32> = OnceCell::const_new(); assert_eq!(*result1, 5);
assert!(result2.err().unwrap().is_initializing_err());
rt.block_on(async { });
time::pause(); }
let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); #[test]
let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); fn get_or_try_init() {
let rt = runtime::Builder::new_current_thread()
time::advance(Duration::from_millis(2)).await; .enable_time()
.start_paused(true)
let result1 = handle1.await.unwrap(); .build()
let result2 = handle2.await.unwrap(); .unwrap();
assert_eq!(*result1, 5); static ONCE: OnceCell<u32> = OnceCell::const_new();
assert!(result2.err().unwrap().is_initializing_err());
}); rt.block_on(async {
} let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await });
let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await });
#[test]
fn get_or_try_init() { time::advance(Duration::from_millis(1)).await;
let rt = runtime::Builder::new_current_thread() time::resume();
.enable_time()
.start_paused(true) let result1 = handle1.await.unwrap();
.build() assert!(result1.is_err());
.unwrap();
let result2 = handle2.await.unwrap();
static ONCE: OnceCell<u32> = OnceCell::const_new(); assert_eq!(*result2.unwrap(), 10);
});
rt.block_on(async {
let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await });
let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await });
time::advance(Duration::from_millis(1)).await;
time::resume();
let result1 = handle1.await.unwrap();
assert!(result1.is_err());
let result2 = handle2.await.unwrap();
assert_eq!(*result2.unwrap(), 10);
});
}
} }

View File

@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
#![cfg(all(feature = "full"))] #![cfg(feature = "full")]
use futures::future::FutureExt;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::time::Duration; use tokio::time::Duration;
@ -184,52 +185,45 @@ async fn abort_all() {
assert_eq!(set.len(), 0); assert_eq!(set.len(), 0);
} }
#[cfg(feature = "parking_lot")] // This ensures that `join_next` works correctly when the coop budget is
mod parking_lot { // exhausted.
use super::*; #[tokio::test(flavor = "current_thread")]
async fn join_set_coop() {
// Large enough to trigger coop.
const TASK_NUM: u32 = 1000;
use futures::future::FutureExt; static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0);
// This ensures that `join_next` works correctly when the coop budget is let mut set = JoinSet::new();
// exhausted.
#[tokio::test(flavor = "current_thread")]
async fn join_set_coop() {
// Large enough to trigger coop.
const TASK_NUM: u32 = 1000;
static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); for _ in 0..TASK_NUM {
set.spawn(async {
let mut set = JoinSet::new(); SEM.add_permits(1);
});
for _ in 0..TASK_NUM {
set.spawn(async {
SEM.add_permits(1);
});
}
// Wait for all tasks to complete.
//
// Since this is a `current_thread` runtime, there's no race condition
// between the last permit being added and the task completing.
let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
let mut count = 0;
let mut coop_count = 0;
loop {
match set.join_next().now_or_never() {
Some(Some(Ok(()))) => {}
Some(Some(Err(err))) => panic!("failed: {}", err),
None => {
coop_count += 1;
tokio::task::yield_now().await;
continue;
}
Some(None) => break,
}
count += 1;
}
assert!(coop_count >= 1);
assert_eq!(count, TASK_NUM);
} }
// Wait for all tasks to complete.
//
// Since this is a `current_thread` runtime, there's no race condition
// between the last permit being added and the task completing.
let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
let mut count = 0;
let mut coop_count = 0;
loop {
match set.join_next().now_or_never() {
Some(Some(Ok(()))) => {}
Some(Some(Err(err))) => panic!("failed: {}", err),
None => {
coop_count += 1;
tokio::task::yield_now().await;
continue;
}
Some(None) => break,
}
count += 1;
}
assert!(coop_count >= 1);
assert_eq!(count, TASK_NUM);
} }