diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index bc93febd0..ce22c942f 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -25,7 +25,7 @@ jobs: # Run with all crate features - script: cargo test --all-features env: - LOOM_MAX_PREEMPTIONS: 2 + RUST_BACKTRACE: 1 CI: 'True' displayName: ${{ crate }} - cargo test --all-features workingDirectory: $(Build.SourcesDirectory)/${{ crate }} @@ -41,7 +41,7 @@ jobs: # Run with all crate features - script: cargo test --all-features env: - LOOM_MAX_PREEMPTIONS: 2 + RUST_BACKTRACE: 1 CI: 'True' displayName: ${{ crate }} - cargo test --all-features workingDirectory: $(Build.SourcesDirectory)/${{ crate }} diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 322f59081..1b18e6ad8 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -129,7 +129,7 @@ tempfile = "3.1.0" # loom is currently not compiling on windows. # See: https://github.com/Xudong-Huang/generator-rs/issues/19 [target.'cfg(not(windows))'.dev-dependencies] -loom = { version = "0.2.13", features = ["futures", "checkpoint"] } +loom = { version = "0.3.0", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/loom/std/atomic_ptr.rs b/tokio/src/loom/std/atomic_ptr.rs new file mode 100644 index 000000000..eb8e47557 --- /dev/null +++ b/tokio/src/loom/std/atomic_ptr.rs @@ -0,0 +1,32 @@ +use std::fmt; +use std::ops::Deref; + +/// `AtomicPtr` providing an additional `load_unsync` function. +pub(crate) struct AtomicPtr { + inner: std::sync::atomic::AtomicPtr, +} + +impl AtomicPtr { + pub(crate) fn new(ptr: *mut T) -> AtomicPtr { + let inner = std::sync::atomic::AtomicPtr::new(ptr); + AtomicPtr { inner } + } + + pub(crate) fn with_mut(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R { + f(self.inner.get_mut()) + } +} + +impl Deref for AtomicPtr { + type Target = std::sync::atomic::AtomicPtr; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl fmt::Debug for AtomicPtr { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/tokio/src/loom/std/atomic_u8.rs b/tokio/src/loom/std/atomic_u8.rs new file mode 100644 index 000000000..9f394a79b --- /dev/null +++ b/tokio/src/loom/std/atomic_u8.rs @@ -0,0 +1,44 @@ +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::Deref; + +/// `AtomicU8` providing an additional `load_unsync` function. +pub(crate) struct AtomicU8 { + inner: UnsafeCell, +} + +unsafe impl Send for AtomicU8 {} +unsafe impl Sync for AtomicU8 {} + +impl AtomicU8 { + pub(crate) fn new(val: u8) -> AtomicU8 { + let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val)); + AtomicU8 { inner } + } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> u8 { + *(*self.inner.get()).get_mut() + } +} + +impl Deref for AtomicU8 { + type Target = std::sync::atomic::AtomicU8; + + fn deref(&self) -> &Self::Target { + // safety: it is always safe to access `&self` fns on the inner value as + // we never perform unsafe mutations. + unsafe { &*self.inner.get() } + } +} + +impl fmt::Debug for AtomicU8 { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.deref().fmt(fmt) + } +} diff --git a/tokio/src/loom/std/atomic_usize.rs b/tokio/src/loom/std/atomic_usize.rs index 78644b054..0fe998f1f 100644 --- a/tokio/src/loom/std/atomic_usize.rs +++ b/tokio/src/loom/std/atomic_usize.rs @@ -25,6 +25,11 @@ impl AtomicUsize { pub(crate) unsafe fn unsync_load(&self) -> usize { *(*self.inner.get()).get_mut() } + + pub(crate) fn with_mut(&mut self, f: impl FnOnce(&mut usize) -> R) -> R { + // safety: we have mutable access + f(unsafe { (*self.inner.get()).get_mut() }) + } } impl ops::Deref for AtomicUsize { diff --git a/tokio/src/loom/std/causal_cell.rs b/tokio/src/loom/std/causal_cell.rs deleted file mode 100644 index 8300437a1..000000000 --- a/tokio/src/loom/std/causal_cell.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::cell::UnsafeCell; - -#[derive(Debug)] -pub(crate) struct CausalCell(UnsafeCell); - -#[derive(Default)] -pub(crate) struct CausalCheck(()); - -impl CausalCell { - pub(crate) fn new(data: T) -> CausalCell { - CausalCell(UnsafeCell::new(data)) - } - - pub(crate) fn with(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn with_deferred(&self, f: F) -> (R, CausalCheck) - where - F: FnOnce(*const T) -> R, - { - (f(self.0.get()), CausalCheck::default()) - } - - pub(crate) fn with_mut(&self, f: F) -> R - where - F: FnOnce(*mut T) -> R, - { - f(self.0.get()) - } -} - -impl CausalCheck { - pub(crate) fn check(self) {} - - pub(crate) fn join(&mut self, _other: CausalCheck) {} -} diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index a56d778ab..6d7bcee14 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -1,12 +1,13 @@ #![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))] -mod atomic_u32; +mod atomic_ptr; mod atomic_u64; +mod atomic_u8; mod atomic_usize; -mod causal_cell; +mod unsafe_cell; pub(crate) mod cell { - pub(crate) use super::causal_cell::{CausalCell, CausalCheck}; + pub(crate) use super::unsafe_cell::UnsafeCell; } #[cfg(any(feature = "sync", feature = "io-driver"))] @@ -58,12 +59,12 @@ pub(crate) mod sync { pub(crate) use std::sync::{Condvar, Mutex, MutexGuard, WaitTimeoutResult}; pub(crate) mod atomic { - pub(crate) use crate::loom::std::atomic_u32::AtomicU32; + pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr; pub(crate) use crate::loom::std::atomic_u64::AtomicU64; + pub(crate) use crate::loom::std::atomic_u8::AtomicU8; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; - pub(crate) use std::sync::atomic::AtomicU8; - pub(crate) use std::sync::atomic::{fence, AtomicPtr}; + pub(crate) use std::sync::atomic::AtomicU16; pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool}; } } diff --git a/tokio/src/loom/std/unsafe_cell.rs b/tokio/src/loom/std/unsafe_cell.rs new file mode 100644 index 000000000..f2b03d8dc --- /dev/null +++ b/tokio/src/loom/std/unsafe_cell.rs @@ -0,0 +1,16 @@ +#[derive(Debug)] +pub(crate) struct UnsafeCell(std::cell::UnsafeCell); + +impl UnsafeCell { + pub(crate) fn new(data: T) -> UnsafeCell { + UnsafeCell(std::cell::UnsafeCell::new(data)) + } + + pub(crate) fn with(&self, f: impl FnOnce(*const T) -> R) -> R { + f(self.0.get()) + } + + pub(crate) fn with_mut(&self, f: impl FnOnce(*mut T) -> R) -> R { + f(self.0.get()) + } +} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index cd8fbb1c3..aedc32805 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -230,6 +230,8 @@ use self::spawner::Spawner; mod time; cfg_rt_threaded! { + mod queue; + pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; } diff --git a/tokio/src/runtime/thread_pool/queue.rs b/tokio/src/runtime/queue.rs similarity index 62% rename from tokio/src/runtime/thread_pool/queue.rs rename to tokio/src/runtime/queue.rs index 66cec5040..233fe4549 100644 --- a/tokio/src/runtime/thread_pool/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -1,14 +1,14 @@ //! Run-queue structures to support a work-stealing scheduler -use crate::loom::cell::{CausalCell, CausalCheck}; -use crate::loom::sync::atomic::{self, AtomicU32, AtomicUsize}; +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::{AtomicU16, AtomicU8, AtomicUsize}; use crate::loom::sync::{Arc, Mutex}; use crate::runtime::task; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; /// Producer handle. May only be used from a single thread. pub(super) struct Local { @@ -36,13 +36,21 @@ pub(super) struct Inject { pub(super) struct Inner { /// Concurrently updated by many threads. - head: AtomicU32, + /// + /// Contains two `u8` values. The LSB byte is the "real" head of the queue. + /// The `u8` in the MSB is set by a stealer in process of stealing values. + /// It represents the first value being stolen in the batch. + /// + /// When both `u8` values are the same, there is no active stealer. + /// + /// Tracking an in-progress stealer prevents a wrapping scenario. + head: AtomicU16, /// Only updated by producer thread but read by many threads. - tail: AtomicU32, + tail: AtomicU8, /// Elements - buffer: Box<[CausalCell>>]>, + buffer: Box<[UnsafeCell>>]>, } struct Pointers { @@ -68,23 +76,21 @@ const LOCAL_QUEUE_CAPACITY: usize = 256; // logic, but allows loom to test more edge cases in a reasonable a mount of // time. #[cfg(loom)] -const LOCAL_QUEUE_CAPACITY: usize = 2; +const LOCAL_QUEUE_CAPACITY: usize = 4; const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; /// Create a new local run-queue pub(super) fn local() -> (Steal, Local) { - debug_assert!(LOCAL_QUEUE_CAPACITY >= 2 && LOCAL_QUEUE_CAPACITY.is_power_of_two()); - let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); for _ in 0..LOCAL_QUEUE_CAPACITY { - buffer.push(CausalCell::new(MaybeUninit::uninit())); + buffer.push(UnsafeCell::new(MaybeUninit::uninit())); } let inner = Arc::new(Inner { - head: AtomicU32::new(0), - tail: AtomicU32::new(0), + head: AtomicU16::new(0), + tail: AtomicU8::new(0), buffer: buffer.into(), }); @@ -126,44 +132,51 @@ impl Local { /// Pushes a task to the back of the local queue, skipping the LIFO slot. pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) { - loop { + let tail = loop { let head = self.inner.head.load(Acquire); + let (steal, real) = unpack(head); // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if tail.wrapping_sub(head) < LOCAL_QUEUE_CAPACITY as u32 { - // Map the position to a slot index. - let idx = tail as usize & MASK; - - self.inner.buffer[idx].with_mut(|ptr| { - // Write the task to the slot - // - // Safety: There is only one producer and the above `if` - // condition ensures we don't touch a cell if there is a - // value, thus no consumer. - unsafe { - ptr::write((*ptr).as_mut_ptr(), task); - } - }); - - // Make the task available. Synchronizes with a load in - // `steal_into2`. - self.inner.tail.store(tail.wrapping_add(1), Release); - + if steal as usize & MASK != tail.wrapping_add(1) as usize & MASK { + // There is capacity for the task + break tail; + } else if steal != real { + // Concurrently stealing, this will free up capacity, so + // only push the new task onto the inject queue + inject.push(task); return; + } else { + // Push the current task and half of the queue into the + // inject queue. + match self.push_overflow(task, real, tail, inject) { + Ok(_) => return, + // Lost the race, try again + Err(v) => { + task = v; + } + } } + }; - // The local buffer is full. Push a batch of work to the inject - // queue. - match self.push_overflow(task, head, tail, inject) { - Ok(_) => return, - // Lost the race, try again - Err(v) => task = v, + // Map the position to a slot index. + let idx = tail as usize & MASK; + + self.inner.buffer[idx].with_mut(|ptr| { + // Write the task to the slot + // + // Safety: There is only one producer and the above `if` + // condition ensures we don't touch a cell if there is a + // value, thus no consumer. + unsafe { + ptr::write((*ptr).as_mut_ptr(), task); } + }); - atomic::spin_loop_hint(); - } + // Make the task available. Synchronizes with a load in + // `steal_into2`. + self.inner.tail.store(tail.wrapping_add(1), Release); } /// Moves a batch of tasks into the inject queue. @@ -176,14 +189,22 @@ impl Local { fn push_overflow( &mut self, task: task::Notified, - head: u32, - tail: u32, + head: u8, + tail: u8, inject: &Inject, ) -> Result<(), task::Notified> { const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1; - let n = tail.wrapping_sub(head) / 2; - debug_assert_eq!(n as usize, LOCAL_QUEUE_CAPACITY / 2, "queue is not full"); + let n = (LOCAL_QUEUE_CAPACITY / 2) as u8; + assert_eq!( + tail.wrapping_sub(head) as usize, + LOCAL_QUEUE_CAPACITY - 1, + "queue is not full; tail = {}; head = {}", + tail, + head + ); + + let prev = pack(head, head); // Claim a bunch of tasks // @@ -195,8 +216,13 @@ impl Local { // work. This is because all tasks are pushed into the queue from the // current thread (or memory has been acquired if the local queue handle // moved). - let actual = self.inner.head.compare_and_swap(head, head + n, Release); - if actual != head { + let actual = self.inner.head.compare_and_swap( + prev, + pack(head.wrapping_add(n), head.wrapping_add(n)), + Release, + ); + + if actual != prev { // We failed to claim the tasks, losing the race. Return out of // this function and try the full `push` routine again. The queue // may not be full anymore. @@ -227,7 +253,7 @@ impl Local { // tasks and we are the only producer. self.inner.buffer[i_idx].with_mut(|ptr| unsafe { let ptr = (*ptr).as_ptr(); - *(*ptr).header().queue_next.get() = Some(next); + (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next)); }); } @@ -249,42 +275,41 @@ impl Local { return Some(task); } - loop { - let head = self.inner.head.load(Acquire); + let mut head = self.inner.head.load(Acquire); + + let idx = loop { + let (steal, real) = unpack(head); // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if head == tail { + if real == tail { // queue is empty return None; } - // Map the head position to a slot index. - let idx = head as usize & MASK; + let next_real = real.wrapping_add(1); - let task = self.inner.buffer[idx].with(|ptr| { - // Tentatively read the task at the head position. Note that we - // have not yet claimed the task. - // - // safety: reading this as uninitialized memory. - unsafe { ptr::read(ptr) } - }); + // Only update `steal` component if it differs from `real`. + let next = if steal == real { + pack(next_real, next_real) + } else { + pack(steal, next_real) + }; - // Attempt to claim the task read above. - let actual = self + // Attempt to claim a task. + let res = self .inner .head - .compare_and_swap(head, head.wrapping_add(1), Release); + .compare_exchange(head, next, AcqRel, Acquire); - if actual == head { - // safety: we claimed the task and the data we read is - // initialized memory. - return Some(unsafe { task.assume_init() }); + match res { + Ok(_) => break real as usize & MASK, + Err(actual) => head = actual, } + }; - atomic::spin_loop_hint(); - } + Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) } } @@ -324,9 +349,8 @@ impl Steal { } // Synchronize with stealers - let dst_head = dst.inner.head.load(Acquire); - - assert!(dst_tail.wrapping_sub(dst_head) + n <= LOCAL_QUEUE_CAPACITY as u32); + let (dst_steal, dst_real) = unpack(dst.inner.head.load(Acquire)); + assert_eq!(dst_steal, dst_real); // Make the stolen items available to consumers dst.inner.tail.store(dst_tail.wrapping_add(n), Release); @@ -334,70 +358,104 @@ impl Steal { Some(ret) } - fn steal_into2(&self, dst: &mut Local, dst_tail: u32) -> u32 { - loop { - let src_head = self.0.head.load(Acquire); + // Steal tasks from `self`, placing them into `dst`. Returns the number of + // tasks that were stolen. + fn steal_into2(&self, dst: &mut Local, dst_tail: u8) -> u8 { + let mut prev_packed = self.0.head.load(Acquire); + let mut next_packed; + + let n = loop { + let (src_head_steal, src_head_real) = unpack(prev_packed); let src_tail = self.0.tail.load(Acquire); - // Number of available tasks to steal - let n = src_tail.wrapping_sub(src_head); - let n = n - n / 2; - - if n == 0 { + // If these two do not match, another thread is concurrently + // stealing from the queue. + if src_head_steal != src_head_real { return 0; } - if n > LOCAL_QUEUE_CAPACITY as u32 / 2 { - atomic::spin_loop_hint(); - // inconsistent, try again - continue; + // Number of available tasks to steal + let n = src_tail.wrapping_sub(src_head_real); + let n = n - n / 2; + + if n == 0 { + // No tasks available to steal + return 0; } - // Track CausalCell causality checks. The check is deferred until - // the compare_and_swap claims ownership of the tasks. - let mut check = CausalCheck::default(); + // Update the real head index to acquire the tasks. + let steal_to = src_head_real.wrapping_add(n); + next_packed = pack(src_head_steal, steal_to); - for i in 0..n { - // Compute the positions - let src_pos = src_head.wrapping_add(i); - let dst_pos = dst_tail.wrapping_add(i); - - // Map to slots - let src_idx = src_pos as usize & MASK; - let dst_idx = dst_pos as usize & MASK; - - // Read the task - // - // safety: this is being read as MaybeUninit -- potentially - // uninitialized memory (in the case a producer wraps). We don't - // assume it is initialized, but will just write the - // `MaybeUninit` in our slot below. - let (task, ch) = self.0.buffer[src_idx] - .with_deferred(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); - - check.join(ch); - - // Write the task to the new slot - // - // safety: `dst` queue is empty and we are the only producer to - // this queue. - dst.inner.buffer[dst_idx] - .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); - } - - // Claim all of those tasks! - let actual = self + // Claim all those tasks. This is done by incrementing the "real" + // head but not the steal. By doing this, no other thread is able to + // steal from this queue until the current thread completes. + let res = self .0 .head - .compare_and_swap(src_head, src_head.wrapping_add(n), Release); + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); - if actual == src_head { - check.check(); - return n; + match res { + Ok(_) => break n, + Err(actual) => prev_packed = actual, } + }; - atomic::spin_loop_hint(); + let (first, _) = unpack(next_packed); + + // Take all the tasks + for i in 0..n { + // Compute the positions + let src_pos = first.wrapping_add(i); + let dst_pos = dst_tail.wrapping_add(i); + + // Map to slots + let src_idx = src_pos as usize & MASK; + let dst_idx = dst_pos as usize & MASK; + + // Read the task + // + // safety: We acquired the task with the atomic exchange above. + let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + // Write the task to the new slot + // + // safety: `dst` queue is empty and we are the only producer to + // this queue. + dst.inner.buffer[dst_idx] + .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); } + + let mut prev_packed = next_packed; + + // Update `src_head_steal` to match `src_head_real` signalling that the + // stealing routine is complete. + loop { + let head = unpack(prev_packed).1; + next_packed = pack(head, head); + + let res = self + .0 + .head + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + + match res { + Ok(_) => return n, + Err(actual) => { + let (actual_steal, actual_real) = unpack(actual); + + assert_ne!(actual_steal, actual_real); + + prev_packed = actual; + } + } + } + } +} + +impl Clone for Steal { + fn clone(&self) -> Steal { + Steal(self.0.clone()) } } @@ -411,7 +469,7 @@ impl Drop for Local { impl Inner { fn is_empty(&self) -> bool { - let head = self.head.load(Acquire); + let (_, head) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); head == tail @@ -452,7 +510,7 @@ impl Inject { self.pointers.lock().unwrap().is_closed } - fn len(&self) -> usize { + pub(super) fn len(&self) -> usize { self.len.load(Acquire) } @@ -558,11 +616,30 @@ impl Drop for Inject { } fn get_next(header: NonNull) -> Option> { - unsafe { *header.as_ref().queue_next.get() } + unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } } fn set_next(header: NonNull, val: Option>) { unsafe { - *header.as_ref().queue_next.get() = val; + header.as_ref().queue_next.with_mut(|ptr| *ptr = val); } } + +/// Split the head value into the real head and the index a stealer is working +/// on. +fn unpack(n: u16) -> (u8, u8) { + let real = n & u8::max_value() as u16; + let steal = n >> 8; + + (steal as u8, real as u8) +} + +/// Join the two head values +fn pack(steal: u8, real: u8) -> u16 { + (real as u16) | ((steal as u16) << 8) +} + +#[test] +fn test_local_queue_capacity() { + assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::max_value() as usize); +} diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index dee55a54f..2092c0aa3 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -1,11 +1,10 @@ -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{Notified, Schedule, Task}; use crate::util::linked_list; -use std::cell::UnsafeCell; use std::future::Future; use std::pin::Pin; use std::ptr::NonNull; @@ -32,10 +31,10 @@ pub(super) struct Cell { /// Holds the future or output, depending on the stage of execution. pub(super) struct Core { /// Scheduler used to drive this future - pub(super) scheduler: CausalCell>, + pub(super) scheduler: UnsafeCell>, /// Either the future or the output - pub(super) stage: CausalCell>, + pub(super) stage: UnsafeCell>, } /// Crate public as this is also needed by the pool. @@ -62,7 +61,7 @@ unsafe impl Sync for Header {} /// Cold data is stored after the future. pub(super) struct Trailer { /// Consumer task waiting on completion of this task. - pub(super) waker: CausalCell>, + pub(super) waker: UnsafeCell>, } /// Either the future or the output. @@ -85,11 +84,11 @@ impl Cell { vtable: raw::vtable::(), }, core: Core { - scheduler: CausalCell::new(None), - stage: CausalCell::new(Stage::Running(future)), + scheduler: UnsafeCell::new(None), + stage: UnsafeCell::new(Stage::Running(future)), }, trailer: Trailer { - waker: CausalCell::new(None), + waker: UnsafeCell::new(None), }, }) } diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index f9cf5e75c..29b231ea8 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -124,6 +124,9 @@ where if snapshot.is_notified() { // Signal yield self.core().yield_now(Notified(self.to_task())); + // The ref-count was incremented as part of + // `transition_to_idle`. + self.drop_reference(); } } Err(_) => self.cancel_task(), diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 1ea60a9b6..17b5157e8 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -214,6 +214,7 @@ unsafe impl linked_list::Link for Task { } unsafe fn pointers(target: NonNull
) -> NonNull> { - NonNull::from(&mut *target.as_ref().owned.get()) + // Not super great as it avoids some of looms checking... + NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr)) } } diff --git a/tokio/src/runtime/task/stack.rs b/tokio/src/runtime/task/stack.rs index b2d50bafd..9dd8d3f43 100644 --- a/tokio/src/runtime/task/stack.rs +++ b/tokio/src/runtime/task/stack.rs @@ -31,8 +31,10 @@ impl TransferStack { loop { unsafe { - *task.as_ref().stack_next.get() = NonNull::new(curr); - } + task.as_ref() + .stack_next + .with_mut(|ptr| *ptr = NonNull::new(curr)) + }; let res = self .head @@ -57,7 +59,7 @@ impl TransferStack { let task = self.0?; // Move the cursor forward - self.0 = unsafe { *task.as_ref().stack_next.get() }; + self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) }; // Return the task unsafe { Some(Task::from_raw(task)) } diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 653c508da..21e90430d 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -120,6 +120,13 @@ impl State { let mut next = curr; next.unset_running(); + + if next.is_notified() { + // The caller needs to schedule the task. To do this, it needs a + // waker. The waker requires a ref count. + next.ref_inc(); + } + Some(next) }) } @@ -306,16 +313,8 @@ impl State { /// Returns `true` if the task should be released. pub(super) fn ref_dec(&self) -> bool { - use crate::loom::sync::atomic; - - let prev = Snapshot(self.val.fetch_sub(REF_ONE, Release)); - let is_final_ref = prev.ref_count() == 1; - - if is_final_ref { - atomic::fence(Acquire); - } - - is_final_ref + let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); + prev.ref_count() == 1 } fn fetch_update(&self, mut f: F) -> Result diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index 275e0ff54..c08658cde 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -230,25 +230,24 @@ mod group_c { #[test] fn shutdown_with_notification() { - use crate::stream::StreamExt; - use crate::sync::{mpsc, oneshot}; + use crate::sync::oneshot; loom::model(|| { let rt = mk_pool(2); let (done_tx, done_rx) = oneshot::channel::<()>(); rt.spawn(track(async move { - let (mut tx, mut rx) = mpsc::channel::<()>(10); + let (tx, rx) = oneshot::channel::<()>(); crate::spawn(async move { crate::task::spawn_blocking(move || { - let _ = tx.try_send(()); + let _ = tx.send(()); }); let _ = done_rx.await; }); - while let Some(_) = rx.next().await {} + let _ = rx.await; let _ = done_tx.send(()); })); diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs new file mode 100644 index 000000000..4e0b64547 --- /dev/null +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -0,0 +1,75 @@ +use crate::runtime::queue; +use crate::runtime::task::{self, Schedule, Task}; + +use loom::thread; + +#[test] +fn multi_stealer() { + const NUM_TASKS: usize = 5; + + fn steal_tasks(steal: queue::Steal) -> usize { + let (_, mut local) = queue::local(); + + if steal.steal_into(&mut local).is_none() { + return 0; + } + + let mut n = 1; + + while local.pop().is_some() { + n += 1; + } + + n + } + + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = queue::Inject::new(); + + // Push work + for _ in 0..NUM_TASKS { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + } + + let th1 = { + let steal = steal.clone(); + thread::spawn(move || steal_tasks(steal)) + }; + + let th2 = thread::spawn(move || steal_tasks(steal)); + + let mut n = 0; + + while local.pop().is_some() { + n += 1; + } + + while inject.pop().is_some() { + n += 1; + } + + n += th1.join().unwrap(); + n += th2.join().unwrap(); + + assert_eq!(n, NUM_TASKS); + }); +} + +struct Runtime; + +impl Schedule for Runtime { + fn bind(task: Task) -> Runtime { + std::mem::forget(task); + Runtime + } + + fn release(&self, _task: &Task) -> Option> { + None + } + + fn schedule(&self, _task: task::Notified) { + unreachable!(); + } +} diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index b932956ca..123a7e35a 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -2,7 +2,12 @@ cfg_loom! { mod loom_blocking; mod loom_oneshot; mod loom_pool; + mod loom_queue; } -#[cfg(miri)] -mod task; +cfg_not_loom! { + mod queue; + + #[cfg(miri)] + mod task; +} diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs new file mode 100644 index 000000000..912fb56a0 --- /dev/null +++ b/tokio/src/runtime/tests/queue.rs @@ -0,0 +1,45 @@ +use crate::runtime::queue; +use crate::runtime::task::{self, Schedule, Task}; + +#[test] +fn steal_batch() { + let (steal1, mut local1) = queue::local(); + let (_, mut local2) = queue::local(); + let inject = queue::Inject::new(); + + for _ in 0..4 { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local1.push_back(task, &inject); + } + + assert!(steal1.steal_into(&mut local2).is_some()); + + for _ in 0..1 { + assert!(local2.pop().is_some()); + } + + assert!(local2.pop().is_none()); + + for _ in 0..2 { + assert!(local1.pop().is_some()); + } + + assert!(local1.pop().is_none()); +} + +struct Runtime; + +impl Schedule for Runtime { + fn bind(task: Task) -> Runtime { + std::mem::forget(task); + Runtime + } + + fn release(&self, _task: &Task) -> Option> { + None + } + + fn schedule(&self, _task: task::Notified) { + unreachable!(); + } +} diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 8a74fe38b..87a75e3fb 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -6,8 +6,6 @@ use atomic_cell::AtomicCell; mod idle; use self::idle::Idle; -mod queue; - mod worker; pub(crate) use worker::Launch; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 53c7e5b61..c07aa0541 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -9,8 +9,8 @@ use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::park::{Parker, Unparker}; -use crate::runtime::task; -use crate::runtime::thread_pool::{queue, AtomicCell, Idle}; +use crate::runtime::thread_pool::{AtomicCell, Idle}; +use crate::runtime::{queue, task}; use crate::util::linked_list::LinkedList; use crate::util::FastRand; diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index d89ac6ab8..3656c109f 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -14,8 +14,9 @@ //! tasks are acquiring smaller numbers of permits. This means that in a //! use-case like tokio's read-write lock, writers will not be starved by //! readers. -use crate::loom::cell::CausalCell; -use crate::loom::sync::{atomic::AtomicUsize, Mutex, MutexGuard}; +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::{Mutex, MutexGuard}; use crate::util::linked_list::{self, LinkedList}; use std::future::Future; @@ -69,7 +70,7 @@ struct Waiter { /// # Safety /// /// This may only be accessed while the wait queue is locked. - waker: CausalCell>, + waker: UnsafeCell>, /// Intrusive linked-list pointers. /// @@ -79,10 +80,10 @@ struct Waiter { /// /// TODO: Ideally, we would be able to use loom to enforce that /// this isn't accessed concurrently. However, it is difficult to - /// use a `CausalCell` here, since the `Link` trait requires _returning_ - /// references to `Pointers`, and `CausalCell` requires that checked access + /// use a `UnsafeCell` here, since the `Link` trait requires _returning_ + /// references to `Pointers`, and `UnsafeCell` requires that checked access /// take place inside a closure. We should consider changing `Pointers` to - /// use `CausalCell` internally. + /// use `UnsafeCell` internally. pointers: linked_list::Pointers, /// Should not be `Unpin`. @@ -368,7 +369,7 @@ fn notify_all(mut list: LinkedList) { impl Waiter { fn new(num_permits: u16) -> Self { Waiter { - waker: CausalCell::new(None), + waker: UnsafeCell::new(None), state: AtomicUsize::new(num_permits as usize), pointers: linked_list::Pointers::new(), _p: PhantomPinned, diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 3bf7e54ed..5394c496e 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -108,7 +108,7 @@ //! assert_eq!(30, rx.recv().await.unwrap()); //! } -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize}; use crate::loom::sync::{Arc, Condvar, Mutex}; @@ -292,10 +292,10 @@ struct Slot { /// A write in the buffer struct Write { /// Uniquely identifies this write - pos: CausalCell, + pos: UnsafeCell, /// The written value - val: CausalCell>, + val: UnsafeCell>, } /// Tracks a waiting receiver @@ -308,7 +308,7 @@ struct WaitNode { waker: AtomicWaker, /// Next pointer in the stack of waiting senders. - next: CausalCell<*const WaitNode>, + next: UnsafeCell<*const WaitNode>, } struct RecvGuard<'a, T> { @@ -379,8 +379,8 @@ pub fn channel(mut capacity: usize) -> (Sender, Receiver) { rem: AtomicUsize::new(0), lock: AtomicUsize::new(0), write: Write { - pos: CausalCell::new((i as u64).wrapping_sub(capacity as u64)), - val: CausalCell::new(None), + pos: UnsafeCell::new((i as u64).wrapping_sub(capacity as u64)), + val: UnsafeCell::new(None), }, }); } @@ -400,7 +400,7 @@ pub fn channel(mut capacity: usize) -> (Sender, Receiver) { wait: Arc::new(WaitNode { queued: AtomicBool::new(false), waker: AtomicWaker::new(), - next: CausalCell::new(ptr::null()), + next: UnsafeCell::new(ptr::null()), }), }; @@ -514,7 +514,7 @@ impl Sender { wait: Arc::new(WaitNode { queued: AtomicBool::new(false), waker: AtomicWaker::new(), - next: CausalCell::new(ptr::null()), + next: UnsafeCell::new(ptr::null()), }), } } @@ -924,7 +924,7 @@ impl Drop for Receiver { impl Drop for Shared { fn drop(&mut self) { // Clear the wait stack - let mut curr = *self.wait_stack.get_mut() as *const WaitNode; + let mut curr = self.wait_stack.with_mut(|ptr| *ptr as *const WaitNode); while !curr.is_null() { let waiter = unsafe { Arc::from_raw(curr) }; diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index 4af990bf4..7bf161967 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -1,5 +1,5 @@ use crate::loom::{ - cell::CausalCell, + cell::UnsafeCell, sync::atomic::{AtomicPtr, AtomicUsize}, thread, }; @@ -26,7 +26,7 @@ pub(crate) struct Block { /// The observed `tail_position` value *after* the block has been passed by /// `block_tail`. - observed_tail_position: CausalCell, + observed_tail_position: UnsafeCell, /// Array containing values pushed into the block. Values are stored in a /// continuous array in order to improve cache line behavior when reading. @@ -39,7 +39,7 @@ pub(crate) enum Read { Closed, } -struct Values([CausalCell>; BLOCK_CAP]); +struct Values([UnsafeCell>; BLOCK_CAP]); use super::BLOCK_CAP; @@ -85,7 +85,7 @@ impl Block { ready_slots: AtomicUsize::new(0), - observed_tail_position: CausalCell::new(0), + observed_tail_position: UnsafeCell::new(0), // Value storage values: unsafe { Values::uninitialized() }, @@ -365,12 +365,12 @@ impl Values { unsafe fn uninitialized() -> Values { let mut vals = MaybeUninit::uninit(); - // When fuzzing, `CausalCell` needs to be initialized. + // When fuzzing, `UnsafeCell` needs to be initialized. if_loom! { - let p = vals.as_mut_ptr() as *mut CausalCell>; + let p = vals.as_mut_ptr() as *mut UnsafeCell>; for i in 0..BLOCK_CAP { p.add(i) - .write(CausalCell::new(MaybeUninit::uninit())); + .write(UnsafeCell::new(MaybeUninit::uninit())); } } @@ -379,7 +379,7 @@ impl Values { } impl ops::Index for Values { - type Output = CausalCell>; + type Output = UnsafeCell>; fn index(&self, index: usize) -> &Self::Output { self.0.index(index) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index dc02dae2e..326286610 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -1,4 +1,4 @@ -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; @@ -114,7 +114,7 @@ struct Chan { tx_count: AtomicUsize, /// Only accessed by `Rx` handle. - rx_fields: CausalCell>, + rx_fields: UnsafeCell>, } impl fmt::Debug for Chan @@ -164,7 +164,7 @@ where semaphore, rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), - rx_fields: CausalCell::new(RxFields { + rx_fields: UnsafeCell::new(RxFields { list: rx, rx_closed: false, }), diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 163a708d1..644832b96 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -2,7 +2,7 @@ //! A channel for sending a single message between asynchronous tasks. -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; @@ -81,13 +81,13 @@ struct Inner { /// The value. This is set by `Sender` and read by `Receiver`. The state of /// the cell is tracked by `state`. - value: CausalCell>, + value: UnsafeCell>, /// The task to notify when the receiver drops without consuming the value. - tx_task: CausalCell>, + tx_task: UnsafeCell>, /// The task to notify when the value is sent. - rx_task: CausalCell>, + rx_task: UnsafeCell>, } #[derive(Clone, Copy)] @@ -127,9 +127,9 @@ pub fn channel() -> (Sender, Receiver) { #[allow(deprecated)] let inner = Arc::new(Inner { state: AtomicUsize::new(State::new().as_usize()), - value: CausalCell::new(None), - tx_task: CausalCell::new(MaybeUninit::uninit()), - rx_task: CausalCell::new(MaybeUninit::uninit()), + value: UnsafeCell::new(None), + tx_task: UnsafeCell::new(MaybeUninit::uninit()), + rx_task: UnsafeCell::new(MaybeUninit::uninit()), }); let tx = Sender { @@ -675,7 +675,7 @@ unsafe impl Sync for Inner {} impl Drop for Inner { fn drop(&mut self) { - let state = State(*self.state.get_mut()); + let state = State(self.state.with_mut(|v| *v)); if state.is_rx_task_set() { unsafe { diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index b56f21a81..0bdc4e276 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -10,7 +10,7 @@ //! section. If no permits are available, then acquiring the semaphore returns //! `Pending`. The task is woken once a permit becomes available. -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; use crate::loom::thread; @@ -30,7 +30,7 @@ pub(crate) struct Semaphore { state: AtomicUsize, /// waiter queue head pointer. - head: CausalCell>, + head: UnsafeCell>, /// Coordinates access to the queue head. rx_lock: AtomicUsize, @@ -165,7 +165,7 @@ impl Semaphore { Semaphore { state: AtomicUsize::new(state.to_usize()), - head: CausalCell::new(ptr), + head: UnsafeCell::new(ptr), rx_lock: AtomicUsize::new(0), stub, } diff --git a/tokio/src/sync/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs index 127eed357..73b1745f1 100644 --- a/tokio/src/sync/task/atomic_waker.rs +++ b/tokio/src/sync/task/atomic_waker.rs @@ -1,6 +1,6 @@ #![cfg_attr(any(loom, not(feature = "sync")), allow(dead_code, unreachable_pub))] -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{self, AtomicUsize}; use std::fmt; @@ -24,7 +24,7 @@ use std::task::Waker; /// `wake`. pub(crate) struct AtomicWaker { state: AtomicUsize, - waker: CausalCell>, + waker: UnsafeCell>, } // `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell @@ -137,7 +137,7 @@ impl AtomicWaker { pub(crate) fn new() -> AtomicWaker { AtomicWaker { state: AtomicUsize::new(WAITING), - waker: CausalCell::new(None), + waker: UnsafeCell::new(None), } } diff --git a/tokio/src/util/slab/page.rs b/tokio/src/util/slab/page.rs index 6b1c64c05..0000e934d 100644 --- a/tokio/src/util/slab/page.rs +++ b/tokio/src/util/slab/page.rs @@ -1,11 +1,11 @@ -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::util::slab::{Address, Entry, Slot, TransferStack, INITIAL_PAGE_SIZE}; use std::fmt; /// Data accessed only by the thread that owns the shard. pub(crate) struct Local { - head: CausalCell, + head: UnsafeCell, } /// Data accessed by any thread. @@ -13,7 +13,7 @@ pub(crate) struct Shared { remote: TransferStack, size: usize, prev_sz: usize, - slab: CausalCell]>>>, + slab: UnsafeCell]>>>, } /// Returns the size of the page at index `n` @@ -24,7 +24,7 @@ pub(super) fn size(n: usize) -> usize { impl Local { pub(crate) fn new() -> Self { Self { - head: CausalCell::new(0), + head: UnsafeCell::new(0), } } @@ -45,7 +45,7 @@ impl Shared { prev_sz, size, remote: TransferStack::new(), - slab: CausalCell::new(None), + slab: UnsafeCell::new(None), } } diff --git a/tokio/src/util/slab/slot.rs b/tokio/src/util/slab/slot.rs index 522d2f7ec..0608b2618 100644 --- a/tokio/src/util/slab/slot.rs +++ b/tokio/src/util/slab/slot.rs @@ -1,9 +1,9 @@ -use crate::loom::cell::CausalCell; +use crate::loom::cell::UnsafeCell; use crate::util::slab::{Entry, Generation}; /// Stores an entry in the slab. pub(super) struct Slot { - next: CausalCell, + next: UnsafeCell, entry: T, } @@ -13,7 +13,7 @@ impl Slot { /// The entry is initialized to a default value. pub(super) fn new(next: usize) -> Slot { Slot { - next: CausalCell::new(next), + next: UnsafeCell::new(next), entry: T::default(), } } diff --git a/tokio/src/util/slab/tests/loom_stack.rs b/tokio/src/util/slab/tests/loom_stack.rs index d1121bee3..47ad46d3a 100644 --- a/tokio/src/util/slab/tests/loom_stack.rs +++ b/tokio/src/util/slab/tests/loom_stack.rs @@ -1,13 +1,13 @@ use crate::util::slab::TransferStack; -use loom::cell::CausalCell; +use loom::cell::UnsafeCell; use loom::sync::Arc; use loom::thread; #[test] fn transfer_stack() { loom::model(|| { - let causalities = [CausalCell::new(None), CausalCell::new(None)]; + let causalities = [UnsafeCell::new(None), UnsafeCell::new(None)]; let shared = Arc::new((causalities, TransferStack::new())); let shared1 = shared.clone(); let shared2 = shared.clone(); @@ -45,7 +45,7 @@ fn transfer_stack() { let val = unsafe { *val }; assert!( val.is_some(), - "CausalCell write must happen-before index is pushed to the stack!", + "UnsafeCell write must happen-before index is pushed to the stack!", ); // were there two entries in the stack? if so, check that // both saw a write. @@ -54,7 +54,7 @@ fn transfer_stack() { let val = unsafe { *val }; assert!( val.is_some(), - "CausalCell write must happen-before index is pushed to the stack!", + "UnsafeCell write must happen-before index is pushed to the stack!", ); }); true @@ -77,7 +77,7 @@ fn transfer_stack() { let val = unsafe { *val }; assert!( val.is_some(), - "CausalCell write must happen-before index is pushed to the stack!", + "UnsafeCell write must happen-before index is pushed to the stack!", ); }); }