rt: track loom changes + tweak queue (#2315)

Loom is having a big refresh to improve performance and tighten up the
concurrency model. This diff tracks those changes.

Included in the changes is the removal of `CausalCell` deferred checks.
This is due to it technically being undefined behavior in the C++11
memory model. To address this, the work-stealing queue is updated to
avoid needing this behavior. This is done by limiting the queue to have
one concurrent stealer.
This commit is contained in:
Carl Lerche 2020-03-26 12:23:12 -07:00 committed by GitHub
parent 186196b911
commit 1cb1e291c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 525 additions and 261 deletions

View File

@ -25,7 +25,7 @@ jobs:
# Run with all crate features
- script: cargo test --all-features
env:
LOOM_MAX_PREEMPTIONS: 2
RUST_BACKTRACE: 1
CI: 'True'
displayName: ${{ crate }} - cargo test --all-features
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
@ -41,7 +41,7 @@ jobs:
# Run with all crate features
- script: cargo test --all-features
env:
LOOM_MAX_PREEMPTIONS: 2
RUST_BACKTRACE: 1
CI: 'True'
displayName: ${{ crate }} - cargo test --all-features
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}

View File

@ -129,7 +129,7 @@ tempfile = "3.1.0"
# loom is currently not compiling on windows.
# See: https://github.com/Xudong-Huang/generator-rs/issues/19
[target.'cfg(not(windows))'.dev-dependencies]
loom = { version = "0.2.13", features = ["futures", "checkpoint"] }
loom = { version = "0.3.0", features = ["futures", "checkpoint"] }
[package.metadata.docs.rs]
all-features = true

View File

@ -0,0 +1,32 @@
use std::fmt;
use std::ops::Deref;
/// `AtomicPtr` providing an additional `load_unsync` function.
pub(crate) struct AtomicPtr<T> {
inner: std::sync::atomic::AtomicPtr<T>,
}
impl<T> AtomicPtr<T> {
pub(crate) fn new(ptr: *mut T) -> AtomicPtr<T> {
let inner = std::sync::atomic::AtomicPtr::new(ptr);
AtomicPtr { inner }
}
pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut *mut T) -> R) -> R {
f(self.inner.get_mut())
}
}
impl<T> Deref for AtomicPtr<T> {
type Target = std::sync::atomic::AtomicPtr<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> fmt::Debug for AtomicPtr<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.deref().fmt(fmt)
}
}

View File

@ -0,0 +1,44 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::Deref;
/// `AtomicU8` providing an additional `load_unsync` function.
pub(crate) struct AtomicU8 {
inner: UnsafeCell<std::sync::atomic::AtomicU8>,
}
unsafe impl Send for AtomicU8 {}
unsafe impl Sync for AtomicU8 {}
impl AtomicU8 {
pub(crate) fn new(val: u8) -> AtomicU8 {
let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val));
AtomicU8 { inner }
}
/// Performs an unsynchronized load.
///
/// # Safety
///
/// All mutations must have happened before the unsynchronized load.
/// Additionally, there must be no concurrent mutations.
pub(crate) unsafe fn unsync_load(&self) -> u8 {
*(*self.inner.get()).get_mut()
}
}
impl Deref for AtomicU8 {
type Target = std::sync::atomic::AtomicU8;
fn deref(&self) -> &Self::Target {
// safety: it is always safe to access `&self` fns on the inner value as
// we never perform unsafe mutations.
unsafe { &*self.inner.get() }
}
}
impl fmt::Debug for AtomicU8 {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.deref().fmt(fmt)
}
}

View File

@ -25,6 +25,11 @@ impl AtomicUsize {
pub(crate) unsafe fn unsync_load(&self) -> usize {
*(*self.inner.get()).get_mut()
}
pub(crate) fn with_mut<R>(&mut self, f: impl FnOnce(&mut usize) -> R) -> R {
// safety: we have mutable access
f(unsafe { (*self.inner.get()).get_mut() })
}
}
impl ops::Deref for AtomicUsize {

View File

@ -1,40 +0,0 @@
use std::cell::UnsafeCell;
#[derive(Debug)]
pub(crate) struct CausalCell<T>(UnsafeCell<T>);
#[derive(Default)]
pub(crate) struct CausalCheck(());
impl<T> CausalCell<T> {
pub(crate) fn new(data: T) -> CausalCell<T> {
CausalCell(UnsafeCell::new(data))
}
pub(crate) fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get())
}
pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck)
where
F: FnOnce(*const T) -> R,
{
(f(self.0.get()), CausalCheck::default())
}
pub(crate) fn with_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut T) -> R,
{
f(self.0.get())
}
}
impl CausalCheck {
pub(crate) fn check(self) {}
pub(crate) fn join(&mut self, _other: CausalCheck) {}
}

View File

@ -1,12 +1,13 @@
#![cfg_attr(any(not(feature = "full"), loom), allow(unused_imports, dead_code))]
mod atomic_u32;
mod atomic_ptr;
mod atomic_u64;
mod atomic_u8;
mod atomic_usize;
mod causal_cell;
mod unsafe_cell;
pub(crate) mod cell {
pub(crate) use super::causal_cell::{CausalCell, CausalCheck};
pub(crate) use super::unsafe_cell::UnsafeCell;
}
#[cfg(any(feature = "sync", feature = "io-driver"))]
@ -58,12 +59,12 @@ pub(crate) mod sync {
pub(crate) use std::sync::{Condvar, Mutex, MutexGuard, WaitTimeoutResult};
pub(crate) mod atomic {
pub(crate) use crate::loom::std::atomic_u32::AtomicU32;
pub(crate) use crate::loom::std::atomic_ptr::AtomicPtr;
pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
pub(crate) use crate::loom::std::atomic_u8::AtomicU8;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
pub(crate) use std::sync::atomic::AtomicU8;
pub(crate) use std::sync::atomic::{fence, AtomicPtr};
pub(crate) use std::sync::atomic::AtomicU16;
pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
}
}

View File

@ -0,0 +1,16 @@
#[derive(Debug)]
pub(crate) struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
impl<T> UnsafeCell<T> {
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(std::cell::UnsafeCell::new(data))
}
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
}

View File

@ -230,6 +230,8 @@ use self::spawner::Spawner;
mod time;
cfg_rt_threaded! {
mod queue;
pub(crate) mod thread_pool;
use self::thread_pool::ThreadPool;
}

View File

@ -1,14 +1,14 @@
//! Run-queue structures to support a work-stealing scheduler
use crate::loom::cell::{CausalCell, CausalCheck};
use crate::loom::sync::atomic::{self, AtomicU32, AtomicUsize};
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU8, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Release};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
@ -36,13 +36,21 @@ pub(super) struct Inject<T: 'static> {
pub(super) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
head: AtomicU32,
///
/// Contains two `u8` values. The LSB byte is the "real" head of the queue.
/// The `u8` in the MSB is set by a stealer in process of stealing values.
/// It represents the first value being stolen in the batch.
///
/// When both `u8` values are the same, there is no active stealer.
///
/// Tracking an in-progress stealer prevents a wrapping scenario.
head: AtomicU16,
/// Only updated by producer thread but read by many threads.
tail: AtomicU32,
tail: AtomicU8,
/// Elements
buffer: Box<[CausalCell<MaybeUninit<task::Notified<T>>>]>,
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
}
struct Pointers {
@ -68,23 +76,21 @@ const LOCAL_QUEUE_CAPACITY: usize = 256;
// logic, but allows loom to test more edge cases in a reasonable a mount of
// time.
#[cfg(loom)]
const LOCAL_QUEUE_CAPACITY: usize = 2;
const LOCAL_QUEUE_CAPACITY: usize = 4;
const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
/// Create a new local run-queue
pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
debug_assert!(LOCAL_QUEUE_CAPACITY >= 2 && LOCAL_QUEUE_CAPACITY.is_power_of_two());
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
for _ in 0..LOCAL_QUEUE_CAPACITY {
buffer.push(CausalCell::new(MaybeUninit::uninit()));
buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let inner = Arc::new(Inner {
head: AtomicU32::new(0),
tail: AtomicU32::new(0),
head: AtomicU16::new(0),
tail: AtomicU8::new(0),
buffer: buffer.into(),
});
@ -126,44 +132,51 @@ impl<T> Local<T> {
/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
loop {
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
if tail.wrapping_sub(head) < LOCAL_QUEUE_CAPACITY as u32 {
// Map the position to a slot index.
let idx = tail as usize & MASK;
self.inner.buffer[idx].with_mut(|ptr| {
// Write the task to the slot
//
// Safety: There is only one producer and the above `if`
// condition ensures we don't touch a cell if there is a
// value, thus no consumer.
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
// Make the task available. Synchronizes with a load in
// `steal_into2`.
self.inner.tail.store(tail.wrapping_add(1), Release);
if steal as usize & MASK != tail.wrapping_add(1) as usize & MASK {
// There is capacity for the task
break tail;
} else if steal != real {
// Concurrently stealing, this will free up capacity, so
// only push the new task onto the inject queue
inject.push(task);
return;
} else {
// Push the current task and half of the queue into the
// inject queue.
match self.push_overflow(task, real, tail, inject) {
Ok(_) => return,
// Lost the race, try again
Err(v) => {
task = v;
}
}
}
};
// The local buffer is full. Push a batch of work to the inject
// queue.
match self.push_overflow(task, head, tail, inject) {
Ok(_) => return,
// Lost the race, try again
Err(v) => task = v,
// Map the position to a slot index.
let idx = tail as usize & MASK;
self.inner.buffer[idx].with_mut(|ptr| {
// Write the task to the slot
//
// Safety: There is only one producer and the above `if`
// condition ensures we don't touch a cell if there is a
// value, thus no consumer.
unsafe {
ptr::write((*ptr).as_mut_ptr(), task);
}
});
atomic::spin_loop_hint();
}
// Make the task available. Synchronizes with a load in
// `steal_into2`.
self.inner.tail.store(tail.wrapping_add(1), Release);
}
/// Moves a batch of tasks into the inject queue.
@ -176,14 +189,22 @@ impl<T> Local<T> {
fn push_overflow(
&mut self,
task: task::Notified<T>,
head: u32,
tail: u32,
head: u8,
tail: u8,
inject: &Inject<T>,
) -> Result<(), task::Notified<T>> {
const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1;
let n = tail.wrapping_sub(head) / 2;
debug_assert_eq!(n as usize, LOCAL_QUEUE_CAPACITY / 2, "queue is not full");
let n = (LOCAL_QUEUE_CAPACITY / 2) as u8;
assert_eq!(
tail.wrapping_sub(head) as usize,
LOCAL_QUEUE_CAPACITY - 1,
"queue is not full; tail = {}; head = {}",
tail,
head
);
let prev = pack(head, head);
// Claim a bunch of tasks
//
@ -195,8 +216,13 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
let actual = self.inner.head.compare_and_swap(head, head + n, Release);
if actual != head {
let actual = self.inner.head.compare_and_swap(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
);
if actual != prev {
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
@ -227,7 +253,7 @@ impl<T> Local<T> {
// tasks and we are the only producer.
self.inner.buffer[i_idx].with_mut(|ptr| unsafe {
let ptr = (*ptr).as_ptr();
*(*ptr).header().queue_next.get() = Some(next);
(*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next));
});
}
@ -249,42 +275,41 @@ impl<T> Local<T> {
return Some(task);
}
loop {
let head = self.inner.head.load(Acquire);
let mut head = self.inner.head.load(Acquire);
let idx = loop {
let (steal, real) = unpack(head);
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
if head == tail {
if real == tail {
// queue is empty
return None;
}
// Map the head position to a slot index.
let idx = head as usize & MASK;
let next_real = real.wrapping_add(1);
let task = self.inner.buffer[idx].with(|ptr| {
// Tentatively read the task at the head position. Note that we
// have not yet claimed the task.
//
// safety: reading this as uninitialized memory.
unsafe { ptr::read(ptr) }
});
// Only update `steal` component if it differs from `real`.
let next = if steal == real {
pack(next_real, next_real)
} else {
pack(steal, next_real)
};
// Attempt to claim the task read above.
let actual = self
// Attempt to claim a task.
let res = self
.inner
.head
.compare_and_swap(head, head.wrapping_add(1), Release);
.compare_exchange(head, next, AcqRel, Acquire);
if actual == head {
// safety: we claimed the task and the data we read is
// initialized memory.
return Some(unsafe { task.assume_init() });
match res {
Ok(_) => break real as usize & MASK,
Err(actual) => head = actual,
}
};
atomic::spin_loop_hint();
}
Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
}
}
@ -324,9 +349,8 @@ impl<T> Steal<T> {
}
// Synchronize with stealers
let dst_head = dst.inner.head.load(Acquire);
assert!(dst_tail.wrapping_sub(dst_head) + n <= LOCAL_QUEUE_CAPACITY as u32);
let (dst_steal, dst_real) = unpack(dst.inner.head.load(Acquire));
assert_eq!(dst_steal, dst_real);
// Make the stolen items available to consumers
dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
@ -334,70 +358,104 @@ impl<T> Steal<T> {
Some(ret)
}
fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u32) -> u32 {
loop {
let src_head = self.0.head.load(Acquire);
// Steal tasks from `self`, placing them into `dst`. Returns the number of
// tasks that were stolen.
fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u8) -> u8 {
let mut prev_packed = self.0.head.load(Acquire);
let mut next_packed;
let n = loop {
let (src_head_steal, src_head_real) = unpack(prev_packed);
let src_tail = self.0.tail.load(Acquire);
// Number of available tasks to steal
let n = src_tail.wrapping_sub(src_head);
let n = n - n / 2;
if n == 0 {
// If these two do not match, another thread is concurrently
// stealing from the queue.
if src_head_steal != src_head_real {
return 0;
}
if n > LOCAL_QUEUE_CAPACITY as u32 / 2 {
atomic::spin_loop_hint();
// inconsistent, try again
continue;
// Number of available tasks to steal
let n = src_tail.wrapping_sub(src_head_real);
let n = n - n / 2;
if n == 0 {
// No tasks available to steal
return 0;
}
// Track CausalCell causality checks. The check is deferred until
// the compare_and_swap claims ownership of the tasks.
let mut check = CausalCheck::default();
// Update the real head index to acquire the tasks.
let steal_to = src_head_real.wrapping_add(n);
next_packed = pack(src_head_steal, steal_to);
for i in 0..n {
// Compute the positions
let src_pos = src_head.wrapping_add(i);
let dst_pos = dst_tail.wrapping_add(i);
// Map to slots
let src_idx = src_pos as usize & MASK;
let dst_idx = dst_pos as usize & MASK;
// Read the task
//
// safety: this is being read as MaybeUninit -- potentially
// uninitialized memory (in the case a producer wraps). We don't
// assume it is initialized, but will just write the
// `MaybeUninit` in our slot below.
let (task, ch) = self.0.buffer[src_idx]
.with_deferred(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
check.join(ch);
// Write the task to the new slot
//
// safety: `dst` queue is empty and we are the only producer to
// this queue.
dst.inner.buffer[dst_idx]
.with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
}
// Claim all of those tasks!
let actual = self
// Claim all those tasks. This is done by incrementing the "real"
// head but not the steal. By doing this, no other thread is able to
// steal from this queue until the current thread completes.
let res = self
.0
.head
.compare_and_swap(src_head, src_head.wrapping_add(n), Release);
.compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
if actual == src_head {
check.check();
return n;
match res {
Ok(_) => break n,
Err(actual) => prev_packed = actual,
}
};
atomic::spin_loop_hint();
let (first, _) = unpack(next_packed);
// Take all the tasks
for i in 0..n {
// Compute the positions
let src_pos = first.wrapping_add(i);
let dst_pos = dst_tail.wrapping_add(i);
// Map to slots
let src_idx = src_pos as usize & MASK;
let dst_idx = dst_pos as usize & MASK;
// Read the task
//
// safety: We acquired the task with the atomic exchange above.
let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
// Write the task to the new slot
//
// safety: `dst` queue is empty and we are the only producer to
// this queue.
dst.inner.buffer[dst_idx]
.with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
}
let mut prev_packed = next_packed;
// Update `src_head_steal` to match `src_head_real` signalling that the
// stealing routine is complete.
loop {
let head = unpack(prev_packed).1;
next_packed = pack(head, head);
let res = self
.0
.head
.compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
match res {
Ok(_) => return n,
Err(actual) => {
let (actual_steal, actual_real) = unpack(actual);
assert_ne!(actual_steal, actual_real);
prev_packed = actual;
}
}
}
}
}
impl<T> Clone for Steal<T> {
fn clone(&self) -> Steal<T> {
Steal(self.0.clone())
}
}
@ -411,7 +469,7 @@ impl<T> Drop for Local<T> {
impl<T> Inner<T> {
fn is_empty(&self) -> bool {
let head = self.head.load(Acquire);
let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);
head == tail
@ -452,7 +510,7 @@ impl<T: 'static> Inject<T> {
self.pointers.lock().unwrap().is_closed
}
fn len(&self) -> usize {
pub(super) fn len(&self) -> usize {
self.len.load(Acquire)
}
@ -558,11 +616,30 @@ impl<T: 'static> Drop for Inject<T> {
}
fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
unsafe { *header.as_ref().queue_next.get() }
unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
}
fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
unsafe {
*header.as_ref().queue_next.get() = val;
header.as_ref().queue_next.with_mut(|ptr| *ptr = val);
}
}
/// Split the head value into the real head and the index a stealer is working
/// on.
fn unpack(n: u16) -> (u8, u8) {
let real = n & u8::max_value() as u16;
let steal = n >> 8;
(steal as u8, real as u8)
}
/// Join the two head values
fn pack(steal: u8, real: u8) -> u16 {
(real as u16) | ((steal as u16) << 8)
}
#[test]
fn test_local_queue_capacity() {
assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::max_value() as usize);
}

View File

@ -1,11 +1,10 @@
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::util::linked_list;
use std::cell::UnsafeCell;
use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
@ -32,10 +31,10 @@ pub(super) struct Cell<T: Future, S> {
/// Holds the future or output, depending on the stage of execution.
pub(super) struct Core<T: Future, S> {
/// Scheduler used to drive this future
pub(super) scheduler: CausalCell<Option<S>>,
pub(super) scheduler: UnsafeCell<Option<S>>,
/// Either the future or the output
pub(super) stage: CausalCell<Stage<T>>,
pub(super) stage: UnsafeCell<Stage<T>>,
}
/// Crate public as this is also needed by the pool.
@ -62,7 +61,7 @@ unsafe impl Sync for Header {}
/// Cold data is stored after the future.
pub(super) struct Trailer {
/// Consumer task waiting on completion of this task.
pub(super) waker: CausalCell<Option<Waker>>,
pub(super) waker: UnsafeCell<Option<Waker>>,
}
/// Either the future or the output.
@ -85,11 +84,11 @@ impl<T: Future, S: Schedule> Cell<T, S> {
vtable: raw::vtable::<T, S>(),
},
core: Core {
scheduler: CausalCell::new(None),
stage: CausalCell::new(Stage::Running(future)),
scheduler: UnsafeCell::new(None),
stage: UnsafeCell::new(Stage::Running(future)),
},
trailer: Trailer {
waker: CausalCell::new(None),
waker: UnsafeCell::new(None),
},
})
}

View File

@ -124,6 +124,9 @@ where
if snapshot.is_notified() {
// Signal yield
self.core().yield_now(Notified(self.to_task()));
// The ref-count was incremented as part of
// `transition_to_idle`.
self.drop_reference();
}
}
Err(_) => self.cancel_task(),

View File

@ -214,6 +214,7 @@ unsafe impl<S> linked_list::Link for Task<S> {
}
unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> {
NonNull::from(&mut *target.as_ref().owned.get())
// Not super great as it avoids some of looms checking...
NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr))
}
}

View File

@ -31,8 +31,10 @@ impl<T: 'static> TransferStack<T> {
loop {
unsafe {
*task.as_ref().stack_next.get() = NonNull::new(curr);
}
task.as_ref()
.stack_next
.with_mut(|ptr| *ptr = NonNull::new(curr))
};
let res = self
.head
@ -57,7 +59,7 @@ impl<T: 'static> TransferStack<T> {
let task = self.0?;
// Move the cursor forward
self.0 = unsafe { *task.as_ref().stack_next.get() };
self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) };
// Return the task
unsafe { Some(Task::from_raw(task)) }

View File

@ -120,6 +120,13 @@ impl State {
let mut next = curr;
next.unset_running();
if next.is_notified() {
// The caller needs to schedule the task. To do this, it needs a
// waker. The waker requires a ref count.
next.ref_inc();
}
Some(next)
})
}
@ -306,16 +313,8 @@ impl State {
/// Returns `true` if the task should be released.
pub(super) fn ref_dec(&self) -> bool {
use crate::loom::sync::atomic;
let prev = Snapshot(self.val.fetch_sub(REF_ONE, Release));
let is_final_ref = prev.ref_count() == 1;
if is_final_ref {
atomic::fence(Acquire);
}
is_final_ref
let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
prev.ref_count() == 1
}
fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>

View File

@ -230,25 +230,24 @@ mod group_c {
#[test]
fn shutdown_with_notification() {
use crate::stream::StreamExt;
use crate::sync::{mpsc, oneshot};
use crate::sync::oneshot;
loom::model(|| {
let rt = mk_pool(2);
let (done_tx, done_rx) = oneshot::channel::<()>();
rt.spawn(track(async move {
let (mut tx, mut rx) = mpsc::channel::<()>(10);
let (tx, rx) = oneshot::channel::<()>();
crate::spawn(async move {
crate::task::spawn_blocking(move || {
let _ = tx.try_send(());
let _ = tx.send(());
});
let _ = done_rx.await;
});
while let Some(_) = rx.next().await {}
let _ = rx.await;
let _ = done_tx.send(());
}));

View File

@ -0,0 +1,75 @@
use crate::runtime::queue;
use crate::runtime::task::{self, Schedule, Task};
use loom::thread;
#[test]
fn multi_stealer() {
const NUM_TASKS: usize = 5;
fn steal_tasks(steal: queue::Steal<Runtime>) -> usize {
let (_, mut local) = queue::local();
if steal.steal_into(&mut local).is_none() {
return 0;
}
let mut n = 1;
while local.pop().is_some() {
n += 1;
}
n
}
loom::model(|| {
let (steal, mut local) = queue::local();
let inject = queue::Inject::new();
// Push work
for _ in 0..NUM_TASKS {
let (task, _) = task::joinable::<_, Runtime>(async {});
local.push_back(task, &inject);
}
let th1 = {
let steal = steal.clone();
thread::spawn(move || steal_tasks(steal))
};
let th2 = thread::spawn(move || steal_tasks(steal));
let mut n = 0;
while local.pop().is_some() {
n += 1;
}
while inject.pop().is_some() {
n += 1;
}
n += th1.join().unwrap();
n += th2.join().unwrap();
assert_eq!(n, NUM_TASKS);
});
}
struct Runtime;
impl Schedule for Runtime {
fn bind(task: Task<Self>) -> Runtime {
std::mem::forget(task);
Runtime
}
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}
fn schedule(&self, _task: task::Notified<Self>) {
unreachable!();
}
}

View File

@ -2,7 +2,12 @@ cfg_loom! {
mod loom_blocking;
mod loom_oneshot;
mod loom_pool;
mod loom_queue;
}
#[cfg(miri)]
mod task;
cfg_not_loom! {
mod queue;
#[cfg(miri)]
mod task;
}

View File

@ -0,0 +1,45 @@
use crate::runtime::queue;
use crate::runtime::task::{self, Schedule, Task};
#[test]
fn steal_batch() {
let (steal1, mut local1) = queue::local();
let (_, mut local2) = queue::local();
let inject = queue::Inject::new();
for _ in 0..4 {
let (task, _) = task::joinable::<_, Runtime>(async {});
local1.push_back(task, &inject);
}
assert!(steal1.steal_into(&mut local2).is_some());
for _ in 0..1 {
assert!(local2.pop().is_some());
}
assert!(local2.pop().is_none());
for _ in 0..2 {
assert!(local1.pop().is_some());
}
assert!(local1.pop().is_none());
}
struct Runtime;
impl Schedule for Runtime {
fn bind(task: Task<Self>) -> Runtime {
std::mem::forget(task);
Runtime
}
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}
fn schedule(&self, _task: task::Notified<Self>) {
unreachable!();
}
}

View File

@ -6,8 +6,6 @@ use atomic_cell::AtomicCell;
mod idle;
use self::idle::Idle;
mod queue;
mod worker;
pub(crate) use worker::Launch;

View File

@ -9,8 +9,8 @@ use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::task;
use crate::runtime::thread_pool::{queue, AtomicCell, Idle};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::{queue, task};
use crate::util::linked_list::LinkedList;
use crate::util::FastRand;

View File

@ -14,8 +14,9 @@
//! tasks are acquiring smaller numbers of permits. This means that in a
//! use-case like tokio's read-write lock, writers will not be starved by
//! readers.
use crate::loom::cell::CausalCell;
use crate::loom::sync::{atomic::AtomicUsize, Mutex, MutexGuard};
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::util::linked_list::{self, LinkedList};
use std::future::Future;
@ -69,7 +70,7 @@ struct Waiter {
/// # Safety
///
/// This may only be accessed while the wait queue is locked.
waker: CausalCell<Option<Waker>>,
waker: UnsafeCell<Option<Waker>>,
/// Intrusive linked-list pointers.
///
@ -79,10 +80,10 @@ struct Waiter {
///
/// TODO: Ideally, we would be able to use loom to enforce that
/// this isn't accessed concurrently. However, it is difficult to
/// use a `CausalCell` here, since the `Link` trait requires _returning_
/// references to `Pointers`, and `CausalCell` requires that checked access
/// use a `UnsafeCell` here, since the `Link` trait requires _returning_
/// references to `Pointers`, and `UnsafeCell` requires that checked access
/// take place inside a closure. We should consider changing `Pointers` to
/// use `CausalCell` internally.
/// use `UnsafeCell` internally.
pointers: linked_list::Pointers<Waiter>,
/// Should not be `Unpin`.
@ -368,7 +369,7 @@ fn notify_all(mut list: LinkedList<Waiter>) {
impl Waiter {
fn new(num_permits: u16) -> Self {
Waiter {
waker: CausalCell::new(None),
waker: UnsafeCell::new(None),
state: AtomicUsize::new(num_permits as usize),
pointers: linked_list::Pointers::new(),
_p: PhantomPinned,

View File

@ -108,7 +108,7 @@
//! assert_eq!(30, rx.recv().await.unwrap());
//! }
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize};
use crate::loom::sync::{Arc, Condvar, Mutex};
@ -292,10 +292,10 @@ struct Slot<T> {
/// A write in the buffer
struct Write<T> {
/// Uniquely identifies this write
pos: CausalCell<u64>,
pos: UnsafeCell<u64>,
/// The written value
val: CausalCell<Option<T>>,
val: UnsafeCell<Option<T>>,
}
/// Tracks a waiting receiver
@ -308,7 +308,7 @@ struct WaitNode {
waker: AtomicWaker,
/// Next pointer in the stack of waiting senders.
next: CausalCell<*const WaitNode>,
next: UnsafeCell<*const WaitNode>,
}
struct RecvGuard<'a, T> {
@ -379,8 +379,8 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
rem: AtomicUsize::new(0),
lock: AtomicUsize::new(0),
write: Write {
pos: CausalCell::new((i as u64).wrapping_sub(capacity as u64)),
val: CausalCell::new(None),
pos: UnsafeCell::new((i as u64).wrapping_sub(capacity as u64)),
val: UnsafeCell::new(None),
},
});
}
@ -400,7 +400,7 @@ pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
wait: Arc::new(WaitNode {
queued: AtomicBool::new(false),
waker: AtomicWaker::new(),
next: CausalCell::new(ptr::null()),
next: UnsafeCell::new(ptr::null()),
}),
};
@ -514,7 +514,7 @@ impl<T> Sender<T> {
wait: Arc::new(WaitNode {
queued: AtomicBool::new(false),
waker: AtomicWaker::new(),
next: CausalCell::new(ptr::null()),
next: UnsafeCell::new(ptr::null()),
}),
}
}
@ -924,7 +924,7 @@ impl<T> Drop for Receiver<T> {
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
// Clear the wait stack
let mut curr = *self.wait_stack.get_mut() as *const WaitNode;
let mut curr = self.wait_stack.with_mut(|ptr| *ptr as *const WaitNode);
while !curr.is_null() {
let waiter = unsafe { Arc::from_raw(curr) };

View File

@ -1,5 +1,5 @@
use crate::loom::{
cell::CausalCell,
cell::UnsafeCell,
sync::atomic::{AtomicPtr, AtomicUsize},
thread,
};
@ -26,7 +26,7 @@ pub(crate) struct Block<T> {
/// The observed `tail_position` value *after* the block has been passed by
/// `block_tail`.
observed_tail_position: CausalCell<usize>,
observed_tail_position: UnsafeCell<usize>,
/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
@ -39,7 +39,7 @@ pub(crate) enum Read<T> {
Closed,
}
struct Values<T>([CausalCell<MaybeUninit<T>>; BLOCK_CAP]);
struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
use super::BLOCK_CAP;
@ -85,7 +85,7 @@ impl<T> Block<T> {
ready_slots: AtomicUsize::new(0),
observed_tail_position: CausalCell::new(0),
observed_tail_position: UnsafeCell::new(0),
// Value storage
values: unsafe { Values::uninitialized() },
@ -365,12 +365,12 @@ impl<T> Values<T> {
unsafe fn uninitialized() -> Values<T> {
let mut vals = MaybeUninit::uninit();
// When fuzzing, `CausalCell` needs to be initialized.
// When fuzzing, `UnsafeCell` needs to be initialized.
if_loom! {
let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>;
let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
.write(CausalCell::new(MaybeUninit::uninit()));
.write(UnsafeCell::new(MaybeUninit::uninit()));
}
}
@ -379,7 +379,7 @@ impl<T> Values<T> {
}
impl<T> ops::Index<usize> for Values<T> {
type Output = CausalCell<MaybeUninit<T>>;
type Output = UnsafeCell<MaybeUninit<T>>;
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)

View File

@ -1,4 +1,4 @@
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
@ -114,7 +114,7 @@ struct Chan<T, S> {
tx_count: AtomicUsize,
/// Only accessed by `Rx` handle.
rx_fields: CausalCell<RxFields<T>>,
rx_fields: UnsafeCell<RxFields<T>>,
}
impl<T, S> fmt::Debug for Chan<T, S>
@ -164,7 +164,7 @@ where
semaphore,
rx_waker: AtomicWaker::new(),
tx_count: AtomicUsize::new(1),
rx_fields: CausalCell::new(RxFields {
rx_fields: UnsafeCell::new(RxFields {
list: rx,
rx_closed: false,
}),

View File

@ -2,7 +2,7 @@
//! A channel for sending a single message between asynchronous tasks.
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
@ -81,13 +81,13 @@ struct Inner<T> {
/// The value. This is set by `Sender` and read by `Receiver`. The state of
/// the cell is tracked by `state`.
value: CausalCell<Option<T>>,
value: UnsafeCell<Option<T>>,
/// The task to notify when the receiver drops without consuming the value.
tx_task: CausalCell<MaybeUninit<Waker>>,
tx_task: UnsafeCell<MaybeUninit<Waker>>,
/// The task to notify when the value is sent.
rx_task: CausalCell<MaybeUninit<Waker>>,
rx_task: UnsafeCell<MaybeUninit<Waker>>,
}
#[derive(Clone, Copy)]
@ -127,9 +127,9 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
#[allow(deprecated)]
let inner = Arc::new(Inner {
state: AtomicUsize::new(State::new().as_usize()),
value: CausalCell::new(None),
tx_task: CausalCell::new(MaybeUninit::uninit()),
rx_task: CausalCell::new(MaybeUninit::uninit()),
value: UnsafeCell::new(None),
tx_task: UnsafeCell::new(MaybeUninit::uninit()),
rx_task: UnsafeCell::new(MaybeUninit::uninit()),
});
let tx = Sender {
@ -675,7 +675,7 @@ unsafe impl<T: Send> Sync for Inner<T> {}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
let state = State(*self.state.get_mut());
let state = State(self.state.with_mut(|v| *v));
if state.is_rx_task_set() {
unsafe {

View File

@ -10,7 +10,7 @@
//! section. If no permits are available, then acquiring the semaphore returns
//! `Pending`. The task is woken once a permit becomes available.
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
use crate::loom::thread;
@ -30,7 +30,7 @@ pub(crate) struct Semaphore {
state: AtomicUsize,
/// waiter queue head pointer.
head: CausalCell<NonNull<Waiter>>,
head: UnsafeCell<NonNull<Waiter>>,
/// Coordinates access to the queue head.
rx_lock: AtomicUsize,
@ -165,7 +165,7 @@ impl Semaphore {
Semaphore {
state: AtomicUsize::new(state.to_usize()),
head: CausalCell::new(ptr),
head: UnsafeCell::new(ptr),
rx_lock: AtomicUsize::new(0),
stub,
}

View File

@ -1,6 +1,6 @@
#![cfg_attr(any(loom, not(feature = "sync")), allow(dead_code, unreachable_pub))]
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{self, AtomicUsize};
use std::fmt;
@ -24,7 +24,7 @@ use std::task::Waker;
/// `wake`.
pub(crate) struct AtomicWaker {
state: AtomicUsize,
waker: CausalCell<Option<Waker>>,
waker: UnsafeCell<Option<Waker>>,
}
// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell
@ -137,7 +137,7 @@ impl AtomicWaker {
pub(crate) fn new() -> AtomicWaker {
AtomicWaker {
state: AtomicUsize::new(WAITING),
waker: CausalCell::new(None),
waker: UnsafeCell::new(None),
}
}

View File

@ -1,11 +1,11 @@
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::util::slab::{Address, Entry, Slot, TransferStack, INITIAL_PAGE_SIZE};
use std::fmt;
/// Data accessed only by the thread that owns the shard.
pub(crate) struct Local {
head: CausalCell<usize>,
head: UnsafeCell<usize>,
}
/// Data accessed by any thread.
@ -13,7 +13,7 @@ pub(crate) struct Shared<T> {
remote: TransferStack,
size: usize,
prev_sz: usize,
slab: CausalCell<Option<Box<[Slot<T>]>>>,
slab: UnsafeCell<Option<Box<[Slot<T>]>>>,
}
/// Returns the size of the page at index `n`
@ -24,7 +24,7 @@ pub(super) fn size(n: usize) -> usize {
impl Local {
pub(crate) fn new() -> Self {
Self {
head: CausalCell::new(0),
head: UnsafeCell::new(0),
}
}
@ -45,7 +45,7 @@ impl<T: Entry> Shared<T> {
prev_sz,
size,
remote: TransferStack::new(),
slab: CausalCell::new(None),
slab: UnsafeCell::new(None),
}
}

View File

@ -1,9 +1,9 @@
use crate::loom::cell::CausalCell;
use crate::loom::cell::UnsafeCell;
use crate::util::slab::{Entry, Generation};
/// Stores an entry in the slab.
pub(super) struct Slot<T> {
next: CausalCell<usize>,
next: UnsafeCell<usize>,
entry: T,
}
@ -13,7 +13,7 @@ impl<T: Entry> Slot<T> {
/// The entry is initialized to a default value.
pub(super) fn new(next: usize) -> Slot<T> {
Slot {
next: CausalCell::new(next),
next: UnsafeCell::new(next),
entry: T::default(),
}
}

View File

@ -1,13 +1,13 @@
use crate::util::slab::TransferStack;
use loom::cell::CausalCell;
use loom::cell::UnsafeCell;
use loom::sync::Arc;
use loom::thread;
#[test]
fn transfer_stack() {
loom::model(|| {
let causalities = [CausalCell::new(None), CausalCell::new(None)];
let causalities = [UnsafeCell::new(None), UnsafeCell::new(None)];
let shared = Arc::new((causalities, TransferStack::new()));
let shared1 = shared.clone();
let shared2 = shared.clone();
@ -45,7 +45,7 @@ fn transfer_stack() {
let val = unsafe { *val };
assert!(
val.is_some(),
"CausalCell write must happen-before index is pushed to the stack!",
"UnsafeCell write must happen-before index is pushed to the stack!",
);
// were there two entries in the stack? if so, check that
// both saw a write.
@ -54,7 +54,7 @@ fn transfer_stack() {
let val = unsafe { *val };
assert!(
val.is_some(),
"CausalCell write must happen-before index is pushed to the stack!",
"UnsafeCell write must happen-before index is pushed to the stack!",
);
});
true
@ -77,7 +77,7 @@ fn transfer_stack() {
let val = unsafe { *val };
assert!(
val.is_some(),
"CausalCell write must happen-before index is pushed to the stack!",
"UnsafeCell write must happen-before index is pushed to the stack!",
);
});
}