x86_64: "practically" thread-safe Pool

This commit is contained in:
Jorge Aparicio 2018-08-20 20:19:19 +01:00
parent da154963e2
commit e4c8f1f75b
7 changed files with 453 additions and 77 deletions

View File

@ -17,7 +17,6 @@ main() {
if [ $TRAVIS_RUST_VERSION = nightly ]; then
export RUSTFLAGS="-Z sanitizer=thread"
export RUST_TEST_THREADS=1
export TSAN_OPTIONS="suppressions=$(pwd)/suppressions.txt"
cargo test --test tsan --target $TARGET

View File

@ -73,7 +73,8 @@
#![deny(missing_docs)]
#![deny(rust_2018_compatibility)]
#![deny(rust_2018_idioms)]
#![deny(warnings)]
// #![deny(warnings)]
#![allow(warnings)] // FIXME
pub use binary_heap::BinaryHeap;
pub use generic_array::typenum::consts;

212
src/pool/cas.rs Normal file
View File

@ -0,0 +1,212 @@
//! Stack based on CAS atomics
//!
//! To reduce the chance of hitting the ABA problem we use a 32-bit offset + a 32-bit version tag
//! instead of a 64-bit pointer. The version tag will be bumped on each successful `pop` operation.
use core::{
cell::UnsafeCell,
convert::TryFrom,
marker::PhantomData,
mem::{self, MaybeUninit},
num::NonZeroUsize,
ptr::NonNull,
sync::atomic::{AtomicUsize, Ordering},
};
/// Unfortunate implementation detail required to use the
/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method
pub struct Node<T> {
next: Atomic<Node<T>>,
pub(crate) data: UnsafeCell<T>,
}
impl<T> Node<T> {
fn next(&self) -> &Atomic<Node<T>> {
&self.next
}
}
pub struct Stack<T> {
head: Atomic<Node<T>>,
}
impl<T> Stack<T> {
pub const fn new() -> Self {
Self {
head: Atomic::null(),
}
}
pub fn push(&self, new_head: Ptr<Node<T>>) {
let mut head = self.head.load(Ordering::Relaxed);
loop {
unsafe {
new_head
.as_raw()
.as_ref()
.next()
.store(head, Ordering::Relaxed);
}
if let Err(p) = self.head.compare_and_exchange_weak(
head,
Some(new_head),
Ordering::Release,
Ordering::Relaxed,
) {
head = p;
} else {
return;
}
}
}
pub fn try_pop(&self) -> Option<Ptr<Node<T>>> {
loop {
if let Some(mut head) = self.head.load(Ordering::Acquire) {
let next = unsafe { head.as_raw().as_ref().next().load(Ordering::Relaxed) };
if self
.head
.compare_and_exchange_weak(
Some(head),
next,
Ordering::Release,
Ordering::Relaxed,
)
.is_ok()
{
head.incr_tag();
return Some(head);
}
} else {
// stack observed empty
return None;
}
}
}
}
fn anchor<T>() -> *mut T {
static mut ANCHOR: u8 = 0;
(unsafe { &mut ANCHOR } as *mut u8 as usize & !(mem::align_of::<T>() - 1)) as *mut T
}
/// Anchored pointer. This is a (signed) 32-bit offset from `anchor` plus a 32-bit tag
pub struct Ptr<T> {
inner: NonZeroUsize,
_marker: PhantomData<*mut T>,
}
impl<T> Clone for Ptr<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for Ptr<T> {}
impl<T> Ptr<T> {
pub fn new(p: *mut T) -> Option<Self> {
i32::try_from((p as isize).wrapping_sub(anchor::<T>() as isize))
.ok()
.map(|offset| unsafe { Ptr::from_parts(0, offset) })
}
unsafe fn from_parts(tag: u32, offset: i32) -> Self {
Self {
inner: NonZeroUsize::new_unchecked((tag as usize) << 32 | (offset as u32 as usize)),
_marker: PhantomData,
}
}
fn from_usize(p: usize) -> Option<Self> {
NonZeroUsize::new(p).map(|inner| Self {
inner,
_marker: PhantomData,
})
}
fn into_usize(&self) -> usize {
self.inner.get()
}
fn tag(&self) -> u32 {
(self.inner.get() >> 32) as u32
}
fn incr_tag(&mut self) {
let tag = self.tag().wrapping_add(1);
let offset = self.offset();
*self = unsafe { Ptr::from_parts(tag, offset) };
}
fn offset(&self) -> i32 {
self.inner.get() as i32
}
fn as_raw(&self) -> NonNull<T> {
unsafe {
NonNull::new_unchecked(
anchor::<T>()
.cast::<u8>()
.offset(self.offset() as isize)
.cast(),
)
}
}
pub fn dangling() -> Self {
unsafe { Self::from_parts(0, 1) }
}
pub unsafe fn as_ref(&self) -> &T {
&*self.as_raw().as_ptr()
}
}
struct Atomic<T> {
inner: AtomicUsize,
_marker: PhantomData<*mut T>,
}
impl<T> Atomic<T> {
const fn null() -> Self {
Self {
inner: AtomicUsize::new(0),
_marker: PhantomData,
}
}
fn compare_and_exchange_weak(
&self,
current: Option<Ptr<T>>,
new: Option<Ptr<T>>,
succ: Ordering,
fail: Ordering,
) -> Result<(), Option<Ptr<T>>> {
self.inner
.compare_exchange_weak(
current.map(|p| p.into_usize()).unwrap_or(0),
new.map(|p| p.into_usize()).unwrap_or(0),
succ,
fail,
)
.map(drop)
.map_err(Ptr::from_usize)
}
fn load(&self, ord: Ordering) -> Option<Ptr<T>> {
NonZeroUsize::new(self.inner.load(ord)).map(|inner| Ptr {
inner,
_marker: PhantomData,
})
}
fn store(&self, val: Option<Ptr<T>>, ord: Ordering) {
self.inner
.store(val.map(|p| p.into_usize()).unwrap_or(0), ord)
}
}

76
src/pool/llsc.rs Normal file
View File

@ -0,0 +1,76 @@
//! Stack based on LL/SC atomics
pub use core::ptr::NonNull as Ptr;
use core::{
cell::UnsafeCell,
ptr,
sync::atomic::{self, AtomicPtr, Ordering},
};
pub struct Node<T> {
next: AtomicPtr<Node<T>>,
pub(crate) data: UnsafeCell<T>,
}
impl<T> Node<T> {
fn next(&self) -> &AtomicPtr<Node<T>> {
&self.next
}
}
pub struct Stack<T> {
head: AtomicPtr<Node<T>>,
}
impl<T> Stack<T> {
pub const fn new() -> Self {
Self {
head: AtomicPtr::new(ptr::null_mut()),
}
}
pub fn push(&self, new_head: Ptr<Node<T>>) {
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
let mut head = self.head.load(Ordering::Relaxed);
loop {
unsafe { new_head.as_ref().next().store(head, Ordering::Relaxed) }
match self.head.compare_exchange_weak(
head,
new_head.as_ptr(),
Ordering::Release, // success
Ordering::Relaxed, // failure
) {
Ok(_) => return,
// interrupt occurred or other core made a successful STREX op on the head
Err(p) => head = p,
}
}
}
pub fn try_pop(&self) -> Option<Ptr<Node<T>>> {
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
loop {
let head = self.head.load(Ordering::Acquire);
if let Some(nn_head) = Ptr::new(head) {
let next = unsafe { nn_head.as_ref().next().load(Ordering::Relaxed) };
match self.head.compare_exchange_weak(
head,
next,
Ordering::Release, // success
Ordering::Relaxed, // failure
) {
Ok(_) => break Some(nn_head),
// interrupt occurred or other core made a successful STREX op on the head
Err(_) => continue,
}
} else {
// stack is observed as empty
break None;
}
}
}
}

View File

@ -76,10 +76,11 @@
//! let set_order = ..;
//!
//! // `self.head` has type `AtomicPtr<Node<T>>`
//! // where `struct Node<T> { next: AtomicPtr<Node<T>>, data: UnsafeCell<T> }`
//! let mut head = self.head.load(fetch_order);
//! loop {
//! if let Some(nn_head) = NonNull::new(head) {
//! let next = unsafe { (*head).next };
//! let next = unsafe { (*head).next.load(Ordering::Relaxed) };
//!
//! // <~ preempted
//!
@ -150,6 +151,63 @@
//! while the current core is somewhere between LDREX and STREX then the current core will fail its
//! STREX operation.
//!
//! # x86_64 support / limitations
//!
//! x86_64 support is a gamble. Yes, a gamble. Do you feel lucky enough to use `Pool` on x86_64?
//!
//! As it's not possible to implement *ideal* LL/SC semantics (\*) on x86_64 the architecture is
//! susceptible to the ABA problem described above. To *reduce the chances* of ABA occurring in
//! practice we use version tags (keyword: IBM ABA-prevention tags). Again, this approach does
//! *not* fix / prevent / avoid the ABA problem; it only reduces the chance of it occurring in
//! practice but the chances of it occurring are not reduced to zero.
//!
//! How we have implemented version tags: instead of using an `AtomicPtr` to link the stack `Node`s
//! we use an `AtomicUsize` where the 64-bit `usize` is always comprised of a monotonically
//! increasing 32-bit tag (higher bits) and a 32-bit signed address offset. The address of a node is
//! computed by adding the 32-bit offset to an "anchor" address (the address of a static variable
//! that lives somewhere in the `.bss` linker section). The tag is increased every time a node is
//! popped (removed) from the stack.
//!
//! To see how version tags can prevent ABA consider the example from the previous section. Let's
//! start with a stack in this state: `(~A, 0) -> (~B, 1) -> (~C, 2)`, where `~A` represents the
//! address of node A as a 32-bit offset from the "anchor" and the second tuple element (e.g. `0`)
//! indicates the version of the node. For simplicity, assume a single core system: thread T1 is
//! performing `pop` and before `CAS(&self.head, (~A, 0), (~B, 1))` is executed a context switch
//! occurs and the core resumes T2. T2 pops the stack twice and pushes A back into the stack;
//! because the `pop` operation increases the version the stack ends in the following state: `(~A,
//! 1) -> (~C, 2)`. Now if T1 is resumed the CAS operation will fail because `self.head` is `(~A,
//! 1)` and not `(~A, 0)`.
//!
//! When can version tags fail to prevent ABA? Using the previous example: if T2 performs a `push`
//! followed by a `pop` `(1 << 32) - 1` times before doing its original `pop` - `pop` - `push`
//! operation then ABA will occur because the version tag of node `A` will wraparound to its
//! original value of `0` and the CAS operation in T1 will succeed and corrupt the stack.
//!
//! It does seem unlikely that (1) a thread will perform the above operation and (2) that the above
//! operation will complete within one time slice, assuming time sliced threads. If you have thread
//! priorities then the above operation could occur during the lifetime of many high priorities
//! threads if T1 is running at low priority.
//!
//! Other implementations of version tags use more than 32 bits in their tags (e.g. "Scalable
//! Lock-Free Dynamic Memory Allocation" uses 42-bit tags in its super blocks). In theory, one could
//! use double-word CAS on x86_64 to pack a 64-bit tag and a 64-bit pointer in a double-word but
//! this CAS operation is not exposed in the standard library (and I think it's not available on
//! older x86_64 processors?)
//!
//! (\*) Apparently one can emulate proper LL/SC semantics on x86_64 using hazard pointers (?) --
//! the technique appears to be documented in "ABA Prevention Using Single-Word Instructions", which
//! is not public AFAICT -- but hazard pointers require Thread Local Storage (TLS), which is a
//! non-starter for a `no_std` library like `heapless`.
//!
//! ## x86_64 Limitations
//!
//! Because stack nodes must be located within +- 2 GB of the hidden `ANCHOR` variable, which
//! lives in the `.bss` section, `Pool` may not be able to manage static references created using
//! `Box::leak` -- these heap allocated chunks of memory may live in a very different address space.
//! When the `Pool` is unable to manage a node because of its address it will simply discard it:
//! `Pool::grow*` methods return the number of new memory blocks added to the pool; if these methods
//! return `0` it means the `Pool` is unable to manage the memory given to them.
//!
//! # References
//!
//! 1. [Cortex-M3 Devices Generic User Guide (DUI 0552A)][0], Section 2.2.7 "Synchronization
@ -161,6 +219,10 @@
//! semaphores"
//!
//! [1]: https://static.docs.arm.com/ddi0403/eb/DDI0403E_B_armv7m_arm.pdf
//!
//! 3. "Scalable Lock-Free Dynamic Memory Allocation" Michael, Maged M.
//!
//! 4. "Hazard pointers: Safe memory reclamation for lock-free objects." Michael, Maged M.
use core::{any::TypeId, mem, sync::atomic::Ordering};
use core::{
@ -170,27 +232,30 @@ use core::{
marker::PhantomData,
mem::MaybeUninit,
ops::{Deref, DerefMut},
ptr::{self, NonNull},
ptr,
sync::atomic::AtomicPtr,
};
use as_slice::{AsMutSlice, AsSlice};
pub use stack::Node;
use stack::{Ptr, Stack};
pub mod singleton;
#[cfg_attr(target_arch = "x86_64", path = "cas.rs")]
#[cfg_attr(not(target_arch = "x86_64"), path = "llsc.rs")]
mod stack;
/// A lock-free memory pool
pub struct Pool<T> {
head: AtomicPtr<Node<T>>,
stack: Stack<T>,
// Current implementation is unsound on architectures that don't have LL/SC semantics so this
// struct is not `Sync` on those platforms
_not_send_or_sync: PhantomData<*const ()>,
}
// NOTE: Here we lie about `Pool` implementing `Sync` on x86_64. This is not true but it lets us
// test the `pool!` and `singleton::Pool` abstractions. We just have to be careful not to use the
// pool in a multi-threaded context
#[cfg(any(armv7a, armv7r, armv7m, armv8m_main, test))]
#[cfg(any(armv7a, armv7r, armv7m, armv8m_main, target_arch = "x86_64"))]
unsafe impl<T> Sync for Pool<T> {}
unsafe impl<T> Send for Pool<T> {}
@ -199,7 +264,7 @@ impl<T> Pool<T> {
/// Creates a new empty pool
pub const fn new() -> Self {
Pool {
head: AtomicPtr::new(ptr::null_mut()),
stack: Stack::new(),
_not_send_or_sync: PhantomData,
}
@ -211,7 +276,14 @@ impl<T> Pool<T> {
///
/// *NOTE:* This method does *not* have bounded execution time because it contains a CAS loop
pub fn alloc(&self) -> Option<Box<T, Uninit>> {
if let Some(node) = self.pop() {
if mem::size_of::<T>() == 0 {
return Some(Box {
node: Ptr::dangling(),
_state: PhantomData,
});
}
if let Some(node) = self.stack.try_pop() {
Some(Box {
node,
_state: PhantomData,
@ -236,7 +308,12 @@ impl<T> Pool<T> {
}
}
self.push(value.node)
// no operation
if mem::size_of::<T>() == 0 {
return;
}
self.stack.push(value.node)
}
/// Increases the capacity of the pool
@ -245,12 +322,17 @@ impl<T> Pool<T> {
///
/// This method returns the number of *new* blocks that can be allocated.
pub fn grow(&self, memory: &'static mut [u8]) -> usize {
let sz = mem::size_of::<Node<T>>();
if sz == 0 {
// SZT use no memory so a pool of SZT always has maximum capacity
return usize::max_value();
}
let mut p = memory.as_mut_ptr();
let mut len = memory.len();
let align = mem::align_of::<Node<T>>();
let sz = mem::size_of::<Node<T>>();
let rem = (p as usize) % align;
if rem != 0 {
let offset = align - rem;
@ -266,7 +348,19 @@ impl<T> Pool<T> {
let mut n = 0;
while len >= sz {
self.push(unsafe { NonNull::new_unchecked(p as *mut _) });
match () {
#[cfg(target_arch = "x86_64")]
() => {
if let Some(p) = Ptr::new(p as *mut _) {
self.stack.push(p);
}
}
#[cfg(not(target_arch = "x86_64"))]
() => {
self.stack.push(unsafe { Ptr::new_unchecked(p as *mut _) });
}
}
n += 1;
p = unsafe { p.add(sz) };
@ -284,71 +378,33 @@ impl<T> Pool<T> {
where
A: AsMutSlice<Element = Node<T>>,
{
if mem::size_of::<T>() == 0 {
return usize::max_value();
}
let nodes = unsafe { (*memory.as_mut_ptr()).as_mut_slice() };
let cap = nodes.len();
for p in nodes {
self.push(NonNull::from(p))
match () {
#[cfg(target_arch = "x86_64")]
() => {
if let Some(p) = Ptr::new(p) {
self.stack.push(p);
}
}
#[cfg(not(target_arch = "x86_64"))]
() => self.stack.push(NonNull::from(p)),
}
}
cap
}
fn pop(&self) -> Option<NonNull<Node<T>>> {
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
loop {
let head = self.head.load(Ordering::Acquire);
if let Some(nn_head) = NonNull::new(head) {
let next = unsafe { (*head).next };
match self.head.compare_exchange_weak(
head,
next,
Ordering::Release, // success
Ordering::Relaxed, // failure
) {
Ok(_) => break Some(nn_head),
// interrupt occurred or other core made a successful STREX op on the head
Err(_) => continue,
}
} else {
// stack is observed as empty
break None;
}
}
}
fn push(&self, mut new_head: NonNull<Node<T>>) {
// NOTE `Ordering`s come from crossbeam's (v0.6.0) `TreiberStack`
let mut head = self.head.load(Ordering::Relaxed);
loop {
unsafe { new_head.as_mut().next = head }
match self.head.compare_exchange_weak(
head,
new_head.as_ptr(),
Ordering::Release, // success
Ordering::Relaxed, // failure
) {
Ok(_) => return,
// interrupt occurred or other core made a successful STREX op on the head
Err(p) => head = p,
}
}
}
}
/// Unfortunate implementation detail required to use the
/// [`Pool.grow_exact`](struct.Pool.html#method.grow_exact) method
pub struct Node<T> {
data: UnsafeCell<T>,
next: *mut Node<T>,
}
/// A memory block
pub struct Box<T, STATE = Init> {
_state: PhantomData<STATE>,
node: NonNull<Node<T>>,
node: Ptr<Node<T>>,
}
impl<T> Box<T, Uninit> {
@ -513,7 +569,8 @@ mod tests {
#[test]
fn sanity() {
static mut MEMORY: [u8; 31] = [0; 31];
const SZ: usize = 2 * mem::size_of::<Node<u8>>() - 1;
static mut MEMORY: [u8; SZ] = [0; SZ];
static POOL: Pool<u8> = Pool::new();

View File

@ -15,7 +15,7 @@ use as_slice::{AsMutSlice, AsSlice};
use super::{Init, Node, Uninit};
/// Instantiates a pool as a global singleton
#[cfg(any(armv7a, armv7r, armv7m, armv8m_main, test))]
#[cfg(any(armv7a, armv7r, armv7m, armv8m_main, target_arch = "x86_64"))]
#[macro_export]
macro_rules! pool {
($(#[$($attr:tt)*])* $ident:ident: $ty:ty) => {
@ -194,7 +194,9 @@ where
}
}
P::ptr().push(self.inner.node)
if mem::size_of::<P::Data>() != 0 {
P::ptr().stack.push(self.inner.node)
}
}
}
@ -291,11 +293,12 @@ mod tests {
sync::atomic::{AtomicUsize, Ordering},
};
use super::Pool;
use super::{super::Node, Pool};
#[test]
fn sanity() {
static mut MEMORY: [u8; 31] = [0; 31];
const SZ: usize = 2 * mem::size_of::<Node<u8>>() - 1;
static mut MEMORY: [u8; SZ] = [0; SZ];
pool!(A: u8);
@ -336,9 +339,6 @@ mod tests {
}
pool!(A: X);
static mut MEMORY: [u8; 23] = [0; 23];
A::grow(unsafe { &mut MEMORY });
let x = A::alloc().unwrap().init(X::new());
let y = A::alloc().unwrap().init(X::new());

View File

@ -232,3 +232,34 @@ fn iterator_properly_wraps() {
}
assert_eq!(expected, actual)
}
#[test]
fn pool() {
use heapless::pool::singleton::Pool as _;
static mut M: [u8; (N + 1) * 8] = [0; (N + 1) * 8];
const N: usize = 16 * 1024;
heapless::pool!(A: [u8; 8]);
A::grow(unsafe { &mut M });
Pool::new(2).scoped(move |scope| {
scope.execute(move || {
for _ in 0..N / 4 {
let a = A::alloc().unwrap();
let b = A::alloc().unwrap();
drop(a);
let b = b.init([1; 8]);
drop(b);
}
});
scope.execute(move || {
for _ in 0..N / 2 {
let a = A::alloc().unwrap();
let a = a.init([2; 8]);
drop(a);
}
});
});
}