diff --git a/Cargo.toml b/Cargo.toml index f1a0cb11..73be0c7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,8 @@ version = "0.5.4" default = ["cas"] cas = [] ufmt-impl = ["ufmt-write"] +# read the docs before enabling: makes `Pool` Sync on x86_64 +x86-sync-pool = [] # only for tests __trybuild = [] diff --git a/ci/script.sh b/ci/script.sh index e235fa26..1d1f2666 100644 --- a/ci/script.sh +++ b/ci/script.sh @@ -5,8 +5,8 @@ main() { cargo check --target $TARGET --features 'serde' if [ $TARGET = x86_64-unknown-linux-gnu ]; then - cargo test --target $TARGET --features 'serde' - cargo test --target $TARGET --release --features 'serde' + cargo test --test cpass --target $TARGET --features 'serde' + cargo test --test cpass --target $TARGET --release --features 'serde' if [ $MSRV = 1 ]; then cd cfail @@ -17,11 +17,10 @@ main() { if [ $TRAVIS_RUST_VERSION = nightly ]; then export RUSTFLAGS="-Z sanitizer=thread" - export RUST_TEST_THREADS=1 export TSAN_OPTIONS="suppressions=$(pwd)/suppressions.txt" - cargo test --test tsan --target $TARGET - cargo test --test tsan --target $TARGET --release + cargo test --test tsan --features x86-sync-pool --target $TARGET + cargo test --test tsan --features x86-sync-pool --target $TARGET --release fi fi } diff --git a/src/pool/cas.rs b/src/pool/cas.rs new file mode 100644 index 00000000..46323975 --- /dev/null +++ b/src/pool/cas.rs @@ -0,0 +1,209 @@ +//! Stack based on CAS atomics +//! +//! To reduce the chance of hitting the ABA problem we use a 32-bit offset + a 32-bit version tag +//! instead of a 64-bit pointer. The version tag will be bumped on each successful `pop` operation. + +use core::{ + cell::UnsafeCell, + convert::TryFrom, + marker::PhantomData, + mem, + num::NonZeroUsize, + ptr::NonNull, + sync::atomic::{AtomicUsize, Ordering}, +}; + +/// Unfortunate implementation detail required to use the +/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method +pub struct Node { + next: Atomic>, + pub(crate) data: UnsafeCell, +} + +impl Node { + fn next(&self) -> &Atomic> { + &self.next + } +} + +pub struct Stack { + head: Atomic>, +} + +impl Stack { + pub const fn new() -> Self { + Self { + head: Atomic::null(), + } + } + + pub fn push(&self, new_head: Ptr>) { + let mut head = self.head.load(Ordering::Relaxed); + + loop { + unsafe { + new_head + .as_raw() + .as_ref() + .next() + .store(head, Ordering::Relaxed); + } + + if let Err(p) = self.head.compare_and_exchange_weak( + head, + Some(new_head), + Ordering::Release, + Ordering::Relaxed, + ) { + head = p; + } else { + return; + } + } + } + + pub fn try_pop(&self) -> Option>> { + loop { + if let Some(mut head) = self.head.load(Ordering::Acquire) { + let next = unsafe { head.as_raw().as_ref().next().load(Ordering::Relaxed) }; + + if self + .head + .compare_and_exchange_weak( + Some(head), + next, + Ordering::Release, + Ordering::Relaxed, + ) + .is_ok() + { + head.incr_tag(); + return Some(head); + } + } else { + // stack observed empty + return None; + } + } + } +} + +fn anchor() -> *mut T { + static mut ANCHOR: u8 = 0; + (unsafe { &mut ANCHOR } as *mut u8 as usize & !(mem::align_of::() - 1)) as *mut T +} + +/// Anchored pointer. This is a (signed) 32-bit offset from `anchor` plus a 32-bit tag +pub struct Ptr { + inner: NonZeroUsize, + _marker: PhantomData<*mut T>, +} + +impl Clone for Ptr { + fn clone(&self) -> Self { + *self + } +} + +impl Copy for Ptr {} + +impl Ptr { + pub fn new(p: *mut T) -> Option { + i32::try_from((p as isize).wrapping_sub(anchor::() as isize)) + .ok() + .map(|offset| unsafe { Ptr::from_parts(0, offset) }) + } + + unsafe fn from_parts(tag: u32, offset: i32) -> Self { + Self { + inner: NonZeroUsize::new_unchecked((tag as usize) << 32 | (offset as u32 as usize)), + _marker: PhantomData, + } + } + + fn from_usize(p: usize) -> Option { + NonZeroUsize::new(p).map(|inner| Self { + inner, + _marker: PhantomData, + }) + } + + fn into_usize(&self) -> usize { + self.inner.get() + } + + fn tag(&self) -> u32 { + (self.inner.get() >> 32) as u32 + } + + fn incr_tag(&mut self) { + let tag = self.tag().wrapping_add(1); + let offset = self.offset(); + + *self = unsafe { Ptr::from_parts(tag, offset) }; + } + + fn offset(&self) -> i32 { + self.inner.get() as i32 + } + + fn as_raw(&self) -> NonNull { + unsafe { + NonNull::new_unchecked( + (anchor::() as *mut u8).offset(self.offset() as isize) as *mut T + ) + } + } + + pub fn dangling() -> Self { + unsafe { Self::from_parts(0, 1) } + } + + pub unsafe fn as_ref(&self) -> &T { + &*self.as_raw().as_ptr() + } +} + +struct Atomic { + inner: AtomicUsize, + _marker: PhantomData<*mut T>, +} + +impl Atomic { + const fn null() -> Self { + Self { + inner: AtomicUsize::new(0), + _marker: PhantomData, + } + } + + fn compare_and_exchange_weak( + &self, + current: Option>, + new: Option>, + succ: Ordering, + fail: Ordering, + ) -> Result<(), Option>> { + self.inner + .compare_exchange_weak( + current.map(|p| p.into_usize()).unwrap_or(0), + new.map(|p| p.into_usize()).unwrap_or(0), + succ, + fail, + ) + .map(drop) + .map_err(Ptr::from_usize) + } + + fn load(&self, ord: Ordering) -> Option> { + NonZeroUsize::new(self.inner.load(ord)).map(|inner| Ptr { + inner, + _marker: PhantomData, + }) + } + + fn store(&self, val: Option>, ord: Ordering) { + self.inner + .store(val.map(|p| p.into_usize()).unwrap_or(0), ord) + } +} diff --git a/src/pool/llsc.rs b/src/pool/llsc.rs new file mode 100644 index 00000000..1aec5276 --- /dev/null +++ b/src/pool/llsc.rs @@ -0,0 +1,78 @@ +//! Stack based on LL/SC atomics + +pub use core::ptr::NonNull as Ptr; +use core::{ + cell::UnsafeCell, + ptr, + sync::atomic::{AtomicPtr, Ordering}, +}; + +/// Unfortunate implementation detail required to use the +/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method +pub struct Node { + next: AtomicPtr>, + pub(crate) data: UnsafeCell, +} + +impl Node { + fn next(&self) -> &AtomicPtr> { + &self.next + } +} + +pub struct Stack { + head: AtomicPtr>, +} + +impl Stack { + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + } + } + + pub fn push(&self, new_head: Ptr>) { + // NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack` + + let mut head = self.head.load(Ordering::Relaxed); + loop { + unsafe { new_head.as_ref().next().store(head, Ordering::Relaxed) } + + match self.head.compare_exchange_weak( + head, + new_head.as_ptr(), + Ordering::Release, // success + Ordering::Relaxed, // failure + ) { + Ok(_) => return, + // interrupt occurred or other core made a successful STREX op on the head + Err(p) => head = p, + } + } + } + + pub fn try_pop(&self) -> Option>> { + // NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack` + + loop { + let head = self.head.load(Ordering::Acquire); + if let Some(nn_head) = Ptr::new(head) { + let next = unsafe { nn_head.as_ref().next().load(Ordering::Relaxed) }; + + match self.head.compare_exchange_weak( + head, + next, + Ordering::Release, // success + Ordering::Relaxed, // failure + ) { + Ok(_) => break Some(nn_head), + // interrupt occurred or other core made a successful STREX op on the head + Err(_) => continue, + } + } else { + // stack is observed as empty + break None; + } + } + } +} diff --git a/src/pool/mod.rs b/src/pool/mod.rs index a8dafdfd..739165f0 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -76,10 +76,11 @@ //! let set_order = ..; //! //! // `self.head` has type `AtomicPtr>` +//! // where `struct Node { next: AtomicPtr>, data: UnsafeCell }` //! let mut head = self.head.load(fetch_order); //! loop { //! if let Some(nn_head) = NonNull::new(head) { -//! let next = unsafe { (*head).next }; +//! let next = unsafe { (*head).next.load(Ordering::Relaxed) }; //! //! // <~ preempted //! @@ -150,6 +151,65 @@ //! while the current core is somewhere between LDREX and STREX then the current core will fail its //! STREX operation. //! +//! # x86_64 support / limitations +//! +//! *NOTE* `Pool` is only `Sync` on `x86_64` if the Cargo feature "x86-sync-pool" is enabled +//! +//! x86_64 support is a gamble. Yes, a gamble. Do you feel lucky enough to use `Pool` on x86_64? +//! +//! As it's not possible to implement *ideal* LL/SC semantics (\*) on x86_64 the architecture is +//! susceptible to the ABA problem described above. To *reduce the chances* of ABA occurring in +//! practice we use version tags (keyword: IBM ABA-prevention tags). Again, this approach does +//! *not* fix / prevent / avoid the ABA problem; it only reduces the chance of it occurring in +//! practice but the chances of it occurring are not reduced to zero. +//! +//! How we have implemented version tags: instead of using an `AtomicPtr` to link the stack `Node`s +//! we use an `AtomicUsize` where the 64-bit `usize` is always comprised of a monotonically +//! increasing 32-bit tag (higher bits) and a 32-bit signed address offset. The address of a node is +//! computed by adding the 32-bit offset to an "anchor" address (the address of a static variable +//! that lives somewhere in the `.bss` linker section). The tag is increased every time a node is +//! popped (removed) from the stack. +//! +//! To see how version tags can prevent ABA consider the example from the previous section. Let's +//! start with a stack in this state: `(~A, 0) -> (~B, 1) -> (~C, 2)`, where `~A` represents the +//! address of node A as a 32-bit offset from the "anchor" and the second tuple element (e.g. `0`) +//! indicates the version of the node. For simplicity, assume a single core system: thread T1 is +//! performing `pop` and before `CAS(&self.head, (~A, 0), (~B, 1))` is executed a context switch +//! occurs and the core resumes T2. T2 pops the stack twice and pushes A back into the stack; +//! because the `pop` operation increases the version the stack ends in the following state: `(~A, +//! 1) -> (~C, 2)`. Now if T1 is resumed the CAS operation will fail because `self.head` is `(~A, +//! 1)` and not `(~A, 0)`. +//! +//! When can version tags fail to prevent ABA? Using the previous example: if T2 performs a `push` +//! followed by a `pop` `(1 << 32) - 1` times before doing its original `pop` - `pop` - `push` +//! operation then ABA will occur because the version tag of node `A` will wraparound to its +//! original value of `0` and the CAS operation in T1 will succeed and corrupt the stack. +//! +//! It does seem unlikely that (1) a thread will perform the above operation and (2) that the above +//! operation will complete within one time slice, assuming time sliced threads. If you have thread +//! priorities then the above operation could occur during the lifetime of many high priorities +//! threads if T1 is running at low priority. +//! +//! Other implementations of version tags use more than 32 bits in their tags (e.g. "Scalable +//! Lock-Free Dynamic Memory Allocation" uses 42-bit tags in its super blocks). In theory, one could +//! use double-word CAS on x86_64 to pack a 64-bit tag and a 64-bit pointer in a double-word but +//! this CAS operation is not exposed in the standard library (and I think it's not available on +//! older x86_64 processors?) +//! +//! (\*) Apparently one can emulate proper LL/SC semantics on x86_64 using hazard pointers (?) -- +//! the technique appears to be documented in "ABA Prevention Using Single-Word Instructions", which +//! is not public AFAICT -- but hazard pointers require Thread Local Storage (TLS), which is a +//! non-starter for a `no_std` library like `heapless`. +//! +//! ## x86_64 Limitations +//! +//! Because stack nodes must be located within +- 2 GB of the hidden `ANCHOR` variable, which +//! lives in the `.bss` section, `Pool` may not be able to manage static references created using +//! `Box::leak` -- these heap allocated chunks of memory may live in a very different address space. +//! When the `Pool` is unable to manage a node because of its address it will simply discard it: +//! `Pool::grow*` methods return the number of new memory blocks added to the pool; if these methods +//! return `0` it means the `Pool` is unable to manage the memory given to them. +//! //! # References //! //! 1. [Cortex-M3 Devices Generic User Guide (DUI 0552A)][0], Section 2.2.7 "Synchronization @@ -161,36 +221,49 @@ //! semaphores" //! //! [1]: https://static.docs.arm.com/ddi0403/eb/DDI0403E_B_armv7m_arm.pdf +//! +//! 3. "Scalable Lock-Free Dynamic Memory Allocation" Michael, Maged M. +//! +//! 4. "Hazard pointers: Safe memory reclamation for lock-free objects." Michael, Maged M. -use core::{any::TypeId, mem, sync::atomic::Ordering}; +use core::{any::TypeId, mem}; use core::{ - cell::UnsafeCell, cmp, fmt, hash::{Hash, Hasher}, marker::PhantomData, mem::MaybeUninit, ops::{Deref, DerefMut}, - ptr::{self, NonNull}, - sync::atomic::AtomicPtr, + ptr, }; use as_slice::{AsMutSlice, AsSlice}; +pub use stack::Node; +use stack::{Ptr, Stack}; + pub mod singleton; +#[cfg_attr(target_arch = "x86_64", path = "cas.rs")] +#[cfg_attr(not(target_arch = "x86_64"), path = "llsc.rs")] +mod stack; /// A lock-free memory pool pub struct Pool { - head: AtomicPtr>, + stack: Stack, // Current implementation is unsound on architectures that don't have LL/SC semantics so this // struct is not `Sync` on those platforms _not_send_or_sync: PhantomData<*const ()>, } -// NOTE: Here we lie about `Pool` implementing `Sync` on x86_64. This is not true but it lets us -// test the `pool!` and `singleton::Pool` abstractions. We just have to be careful not to use the -// pool in a multi-threaded context -#[cfg(any(armv7a, armv7r, armv7m, armv8m_main, test))] +// NOTE(any(test)) makes testing easier (no need to enable Cargo features for testing) +#[cfg(any( + armv7a, + armv7r, + armv7m, + armv8m_main, + all(target_arch = "x86_64", feature = "x86-sync-pool"), + test +))] unsafe impl Sync for Pool {} unsafe impl Send for Pool {} @@ -199,7 +272,7 @@ impl Pool { /// Creates a new empty pool pub const fn new() -> Self { Pool { - head: AtomicPtr::new(ptr::null_mut()), + stack: Stack::new(), _not_send_or_sync: PhantomData, } @@ -211,7 +284,14 @@ impl Pool { /// /// *NOTE:* This method does *not* have bounded execution time because it contains a CAS loop pub fn alloc(&self) -> Option> { - if let Some(node) = self.pop() { + if mem::size_of::() == 0 { + return Some(Box { + node: Ptr::dangling(), + _state: PhantomData, + }); + } + + if let Some(node) = self.stack.try_pop() { Some(Box { node, _state: PhantomData, @@ -236,7 +316,12 @@ impl Pool { } } - self.push(value.node) + // no operation + if mem::size_of::() == 0 { + return; + } + + self.stack.push(value.node) } /// Increases the capacity of the pool @@ -245,12 +330,17 @@ impl Pool { /// /// This method returns the number of *new* blocks that can be allocated. pub fn grow(&self, memory: &'static mut [u8]) -> usize { + let sz = mem::size_of::>(); + + if sz == 0 { + // SZT use no memory so a pool of SZT always has maximum capacity + return usize::max_value(); + } + let mut p = memory.as_mut_ptr(); let mut len = memory.len(); let align = mem::align_of::>(); - let sz = mem::size_of::>(); - let rem = (p as usize) % align; if rem != 0 { let offset = align - rem; @@ -266,7 +356,19 @@ impl Pool { let mut n = 0; while len >= sz { - self.push(unsafe { NonNull::new_unchecked(p as *mut _) }); + match () { + #[cfg(target_arch = "x86_64")] + () => { + if let Some(p) = Ptr::new(p as *mut _) { + self.stack.push(p); + } + } + + #[cfg(not(target_arch = "x86_64"))] + () => { + self.stack.push(unsafe { Ptr::new_unchecked(p as *mut _) }); + } + } n += 1; p = unsafe { p.add(sz) }; @@ -284,71 +386,33 @@ impl Pool { where A: AsMutSlice>, { + if mem::size_of::() == 0 { + return usize::max_value(); + } + let nodes = unsafe { (*memory.as_mut_ptr()).as_mut_slice() }; let cap = nodes.len(); for p in nodes { - self.push(NonNull::from(p)) + match () { + #[cfg(target_arch = "x86_64")] + () => { + if let Some(p) = Ptr::new(p) { + self.stack.push(p); + } + } + + #[cfg(not(target_arch = "x86_64"))] + () => self.stack.push(core::ptr::NonNull::from(p)), + } } cap } - - fn pop(&self) -> Option>> { - // NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack` - - loop { - let head = self.head.load(Ordering::Acquire); - if let Some(nn_head) = NonNull::new(head) { - let next = unsafe { (*head).next }; - - match self.head.compare_exchange_weak( - head, - next, - Ordering::Release, // success - Ordering::Relaxed, // failure - ) { - Ok(_) => break Some(nn_head), - // interrupt occurred or other core made a successful STREX op on the head - Err(_) => continue, - } - } else { - // stack is observed as empty - break None; - } - } - } - - fn push(&self, mut new_head: NonNull>) { - // NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack` - - let mut head = self.head.load(Ordering::Relaxed); - loop { - unsafe { new_head.as_mut().next = head } - - match self.head.compare_exchange_weak( - head, - new_head.as_ptr(), - Ordering::Release, // success - Ordering::Relaxed, // failure - ) { - Ok(_) => return, - // interrupt occurred or other core made a successful STREX op on the head - Err(p) => head = p, - } - } - } -} - -/// Unfortunate implementation detail required to use the -/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method -pub struct Node { - data: UnsafeCell, - next: *mut Node, } /// A memory block pub struct Box { _state: PhantomData, - node: NonNull>, + node: Ptr>, } impl Box { @@ -513,7 +577,8 @@ mod tests { #[test] fn sanity() { - static mut MEMORY: [u8; 31] = [0; 31]; + const SZ: usize = 2 * mem::size_of::>() - 1; + static mut MEMORY: [u8; SZ] = [0; SZ]; static POOL: Pool = Pool::new(); diff --git a/src/pool/singleton.rs b/src/pool/singleton.rs index 785c9000..b3e36305 100644 --- a/src/pool/singleton.rs +++ b/src/pool/singleton.rs @@ -15,7 +15,13 @@ use as_slice::{AsMutSlice, AsSlice}; use super::{Init, Node, Uninit}; /// Instantiates a pool as a global singleton -#[cfg(any(armv7a, armv7r, armv7m, armv8m_main, test))] +#[cfg(any( + armv7a, + armv7r, + armv7m, + armv8m_main, + all(target_arch = "x86_64", feature = "x86-sync-pool"), +))] #[macro_export] macro_rules! pool { ($(#[$($attr:tt)*])* $ident:ident: $ty:ty) => { @@ -194,7 +200,9 @@ where } } - P::ptr().push(self.inner.node) + if mem::size_of::() != 0 { + P::ptr().stack.push(self.inner.node) + } } } @@ -291,11 +299,12 @@ mod tests { sync::atomic::{AtomicUsize, Ordering}, }; - use super::Pool; + use super::{super::Node, Pool}; #[test] fn sanity() { - static mut MEMORY: [u8; 31] = [0; 31]; + const SZ: usize = 2 * mem::size_of::>() - 1; + static mut MEMORY: [u8; SZ] = [0; SZ]; pool!(A: u8); @@ -336,9 +345,6 @@ mod tests { } pool!(A: X); - static mut MEMORY: [u8; 23] = [0; 23]; - - A::grow(unsafe { &mut MEMORY }); let x = A::alloc().unwrap().init(X::new()); let y = A::alloc().unwrap().init(X::new()); diff --git a/tests/tsan.rs b/tests/tsan.rs index 890d0008..cdef4df2 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -232,3 +232,34 @@ fn iterator_properly_wraps() { } assert_eq!(expected, actual) } + +#[test] +fn pool() { + use heapless::pool::singleton::Pool as _; + + static mut M: [u8; (N + 1) * 8] = [0; (N + 1) * 8]; + const N: usize = 16 * 1024; + heapless::pool!(A: [u8; 8]); + + A::grow(unsafe { &mut M }); + + Pool::new(2).scoped(move |scope| { + scope.execute(move || { + for _ in 0..N / 4 { + let a = A::alloc().unwrap(); + let b = A::alloc().unwrap(); + drop(a); + let b = b.init([1; 8]); + drop(b); + } + }); + + scope.execute(move || { + for _ in 0..N / 2 { + let a = A::alloc().unwrap(); + let a = a.init([2; 8]); + drop(a); + } + }); + }); +}