63: spsc::Queue: add single core variant r=japaric a=japaric



Co-authored-by: Jorge Aparicio <jorge@japaric.io>
This commit is contained in:
bors[bot] 2018-12-15 23:57:40 +00:00
commit 6e521fad24
5 changed files with 206 additions and 57 deletions

View File

@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased] ## [Unreleased]
## [v0.4.1] - 2018-12-16
### Changed
- Add a new type parameter to `spsc::Queue` that indicates whether the queue is
only single-core safe, or multi-core safe. By default the queue is multi-core
safe; this preserves the current semantics. New `unsafe` constructors have
been added to create the single-core variant.
## [v0.4.0] - 2018-10-19 ## [v0.4.0] - 2018-10-19
### Changed ### Changed
@ -168,8 +177,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Initial release - Initial release
[Unreleased]: https://github.com/japaric/heapless/compare/v0.4.0...HEAD [Unreleased]: https://github.com/japaric/heapless/compare/v0.4.1...HEAD
[v0.3.8]: https://github.com/japaric/heapless/compare/v0.3.7...v0.4.0 [v0.4.1]: https://github.com/japaric/heapless/compare/v0.4.0...v0.4.1
[v0.4.0]: https://github.com/japaric/heapless/compare/v0.3.7...v0.4.0
[v0.3.7]: https://github.com/japaric/heapless/compare/v0.3.6...v0.3.7 [v0.3.7]: https://github.com/japaric/heapless/compare/v0.3.6...v0.3.7
[v0.3.6]: https://github.com/japaric/heapless/compare/v0.3.5...v0.3.6 [v0.3.6]: https://github.com/japaric/heapless/compare/v0.3.5...v0.3.6
[v0.3.5]: https://github.com/japaric/heapless/compare/v0.3.4...v0.3.5 [v0.3.5]: https://github.com/japaric/heapless/compare/v0.3.4...v0.3.5

View File

@ -16,7 +16,7 @@ keywords = [
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
name = "heapless" name = "heapless"
repository = "https://github.com/japaric/heapless" repository = "https://github.com/japaric/heapless"
version = "0.4.0" version = "0.4.1"
[features] [features]
const-fn = [] const-fn = []

View File

@ -1,19 +1,41 @@
use core::sync::atomic::{self, AtomicUsize, Ordering};
#[cfg(feature = "smaller-atomics")] #[cfg(feature = "smaller-atomics")]
use core::sync::atomic::{AtomicU16, AtomicU8}; use core::sync::atomic::{AtomicU16, AtomicU8};
use core::sync::atomic::{AtomicUsize, Ordering};
use spsc::{MultiCore, SingleCore};
pub unsafe trait XCore {
fn is_multi_core() -> bool;
}
unsafe impl XCore for SingleCore {
fn is_multi_core() -> bool {
false
}
}
unsafe impl XCore for MultiCore {
fn is_multi_core() -> bool {
true
}
}
pub unsafe trait Uxx: Into<usize> + Send { pub unsafe trait Uxx: Into<usize> + Send {
#[doc(hidden)] #[doc(hidden)]
fn truncate(x: usize) -> Self; fn truncate(x: usize) -> Self;
#[doc(hidden)] #[doc(hidden)]
fn load_acquire(x: *const Self) -> Self; unsafe fn load_acquire<C>(x: *const Self) -> Self
where
C: XCore;
#[doc(hidden)] #[doc(hidden)]
fn load_relaxed(x: *const Self) -> Self; fn load_relaxed(x: *const Self) -> Self;
#[doc(hidden)] #[doc(hidden)]
fn store_release(x: *const Self, val: Self); unsafe fn store_release<C>(x: *const Self, val: Self)
where
C: XCore;
} }
#[cfg(feature = "smaller-atomics")] #[cfg(feature = "smaller-atomics")]
@ -27,16 +49,33 @@ unsafe impl Uxx for u8 {
} }
} }
fn load_acquire(x: *const Self) -> Self { unsafe fn load_acquire<C>(x: *const Self) -> Self
unsafe { (*(x as *const AtomicU8)).load(Ordering::Acquire) } where
C: XCore,
{
if C::is_multi_core() {
(*(x as *const AtomicU8)).load(Ordering::Acquire)
} else {
let y = (*(x as *const AtomicU8)).load(Ordering::Relaxed);
atomic::compiler_fence(Ordering::Acquire);
y
}
} }
fn load_relaxed(x: *const Self) -> Self { fn load_relaxed(x: *const Self) -> Self {
unsafe { (*(x as *const AtomicU8)).load(Ordering::Relaxed) } unsafe { (*(x as *const AtomicU8)).load(Ordering::Relaxed) }
} }
fn store_release(x: *const Self, val: Self) { unsafe fn store_release<C>(x: *const Self, val: Self)
unsafe { (*(x as *const AtomicU8)).store(val, Ordering::Release) } where
C: XCore,
{
if C::is_multi_core() {
(*(x as *const AtomicU8)).store(val, Ordering::Release)
} else {
atomic::compiler_fence(Ordering::Release);
(*(x as *const AtomicU8)).store(val, Ordering::Relaxed)
}
} }
} }
@ -51,16 +90,33 @@ unsafe impl Uxx for u16 {
} }
} }
fn load_acquire(x: *const Self) -> Self { unsafe fn load_acquire<C>(x: *const Self) -> Self
unsafe { (*(x as *const AtomicU16)).load(Ordering::Acquire) } where
C: XCore,
{
if C::is_multi_core() {
(*(x as *const AtomicU16)).load(Ordering::Acquire)
} else {
let y = (*(x as *const AtomicU16)).load(Ordering::Relaxed);
atomic::compiler_fence(Ordering::Acquire);
y
}
} }
fn load_relaxed(x: *const Self) -> Self { fn load_relaxed(x: *const Self) -> Self {
unsafe { (*(x as *const AtomicU16)).load(Ordering::Relaxed) } unsafe { (*(x as *const AtomicU16)).load(Ordering::Relaxed) }
} }
fn store_release(x: *const Self, val: Self) { unsafe fn store_release<C>(x: *const Self, val: Self)
unsafe { (*(x as *const AtomicU16)).store(val, Ordering::Release) } where
C: XCore,
{
if C::is_multi_core() {
(*(x as *const AtomicU16)).store(val, Ordering::Release)
} else {
atomic::compiler_fence(Ordering::Release);
(*(x as *const AtomicU16)).store(val, Ordering::Relaxed)
}
} }
} }
@ -69,15 +125,32 @@ unsafe impl Uxx for usize {
x x
} }
fn load_acquire(x: *const Self) -> Self { unsafe fn load_acquire<C>(x: *const Self) -> Self
unsafe { (*(x as *const AtomicUsize)).load(Ordering::Acquire) } where
C: XCore,
{
if C::is_multi_core() {
(*(x as *const AtomicUsize)).load(Ordering::Acquire)
} else {
let y = (*(x as *const AtomicUsize)).load(Ordering::Relaxed);
atomic::compiler_fence(Ordering::Acquire);
y
}
} }
fn load_relaxed(x: *const Self) -> Self { fn load_relaxed(x: *const Self) -> Self {
unsafe { (*(x as *const AtomicUsize)).load(Ordering::Relaxed) } unsafe { (*(x as *const AtomicUsize)).load(Ordering::Relaxed) }
} }
fn store_release(x: *const Self, val: Self) { unsafe fn store_release<C>(x: *const Self, val: Self)
unsafe { (*(x as *const AtomicUsize)).store(val, Ordering::Release) } where
C: XCore,
{
if C::is_multi_core() {
(*(x as *const AtomicUsize)).store(val, Ordering::Release)
} else {
atomic::compiler_fence(Ordering::Release);
(*(x as *const AtomicUsize)).store(val, Ordering::Relaxed);
}
} }
} }

View File

@ -1,6 +1,7 @@
//! Single producer single consumer queue //! Single producer single consumer queue
use core::cell::UnsafeCell; use core::cell::UnsafeCell;
use core::marker::PhantomData;
use core::ptr; use core::ptr;
use generic_array::{ArrayLength, GenericArray}; use generic_array::{ArrayLength, GenericArray};
@ -11,23 +12,33 @@ use sealed;
mod split; mod split;
/// Multi core synchronization - a memory barrier is used for synchronization
pub struct MultiCore;
/// Single core synchronization - no memory barrier synchronization, just a compiler fence
pub struct SingleCore;
// Atomic{U8,U16, Usize} with no CAS operations that works on targets that have "no atomic support" // Atomic{U8,U16, Usize} with no CAS operations that works on targets that have "no atomic support"
// according to their specification // according to their specification
struct Atomic<U> struct Atomic<U, C>
where where
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
v: UnsafeCell<U>, v: UnsafeCell<U>,
c: PhantomData<C>,
} }
impl<U> Atomic<U> impl<U, C> Atomic<U, C>
where where
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
const_fn! { const_fn! {
const fn new(v: U) -> Atomic<U> { const fn new(v: U) -> Self {
Atomic { Atomic {
v: UnsafeCell::new(v), v: UnsafeCell::new(v),
c: PhantomData,
} }
} }
} }
@ -37,7 +48,7 @@ where
} }
fn load_acquire(&self) -> U { fn load_acquire(&self) -> U {
U::load_acquire(self.v.get()) unsafe { U::load_acquire::<C>(self.v.get()) }
} }
fn load_relaxed(&self) -> U { fn load_relaxed(&self) -> U {
@ -45,7 +56,7 @@ where
} }
fn store_release(&self, val: U) { fn store_release(&self, val: U) {
U::store_release(self.v.get(), val) unsafe { U::store_release::<C>(self.v.get(), val) }
} }
} }
@ -65,6 +76,11 @@ where
/// *IMPORTANT*: `spsc::Queue<_, _, u8>` has a maximum capacity of 255 elements; `spsc::Queue<_, _, /// *IMPORTANT*: `spsc::Queue<_, _, u8>` has a maximum capacity of 255 elements; `spsc::Queue<_, _,
/// u16>` has a maximum capacity of 65535 elements. /// u16>` has a maximum capacity of 65535 elements.
/// ///
/// `spsc::Queue` also comes in a single core variant. This variant can be created using the
/// following constructors: `u8_sc`, `u16_sc`, `usize_sc` and `new_sc`. This variant is `unsafe` to
/// create because the programmer must make sure that the queue's consumer and producer endpoints
/// (if split) are kept on a single core for their entire lifetime.
///
/// # Examples /// # Examples
/// ///
/// ``` /// ```
@ -127,24 +143,26 @@ where
/// // .. /// // ..
/// } /// }
/// ``` /// ```
pub struct Queue<T, N, U = usize> pub struct Queue<T, N, U = usize, C = MultiCore>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
// this is from where we dequeue items // this is from where we dequeue items
head: Atomic<U>, head: Atomic<U, C>,
// this is where we enqueue new items // this is where we enqueue new items
tail: Atomic<U>, tail: Atomic<U, C>,
buffer: MaybeUninit<GenericArray<T, N>>, buffer: MaybeUninit<GenericArray<T, N>>,
} }
impl<T, N, U> Queue<T, N, U> impl<T, N, U, C> Queue<T, N, U, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
/// Returns the maximum number of elements the queue can hold /// Returns the maximum number of elements the queue can hold
pub fn capacity(&self) -> U { pub fn capacity(&self) -> U {
@ -157,7 +175,7 @@ where
} }
/// Iterates from the front of the queue to the back /// Iterates from the front of the queue to the back
pub fn iter(&self) -> Iter<T, N, U> { pub fn iter(&self) -> Iter<T, N, U, C> {
Iter { Iter {
rb: self, rb: self,
index: 0, index: 0,
@ -166,7 +184,7 @@ where
} }
/// Returns an iterator that allows modifying each value. /// Returns an iterator that allows modifying each value.
pub fn iter_mut(&mut self) -> IterMut<T, N, U> { pub fn iter_mut(&mut self) -> IterMut<T, N, U, C> {
let len = self.len_usize(); let len = self.len_usize();
IterMut { IterMut {
rb: self, rb: self,
@ -183,10 +201,11 @@ where
} }
} }
impl<T, N, U> Drop for Queue<T, N, U> impl<T, N, U, C> Drop for Queue<T, N, U, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
fn drop(&mut self) { fn drop(&mut self) {
for item in self { for item in self {
@ -197,26 +216,28 @@ where
} }
} }
impl<'a, T, N, U> IntoIterator for &'a Queue<T, N, U> impl<'a, T, N, U, C> IntoIterator for &'a Queue<T, N, U, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
type Item = &'a T; type Item = &'a T;
type IntoIter = Iter<'a, T, N, U>; type IntoIter = Iter<'a, T, N, U, C>;
fn into_iter(self) -> Self::IntoIter { fn into_iter(self) -> Self::IntoIter {
self.iter() self.iter()
} }
} }
impl<'a, T, N, U> IntoIterator for &'a mut Queue<T, N, U> impl<'a, T, N, U, C> IntoIterator for &'a mut Queue<T, N, U, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
type Item = &'a mut T; type Item = &'a mut T;
type IntoIter = IterMut<'a, T, N, U>; type IntoIter = IterMut<'a, T, N, U, C>;
fn into_iter(self) -> Self::IntoIter { fn into_iter(self) -> Self::IntoIter {
self.iter_mut() self.iter_mut()
@ -224,8 +245,8 @@ where
} }
macro_rules! impl_ { macro_rules! impl_ {
($uxx:ident) => { ($uxx:ident, $uxx_sc:ident) => {
impl<T, N> Queue<T, N, $uxx> impl<T, N> Queue<T, N, $uxx, MultiCore>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
{ {
@ -239,7 +260,29 @@ macro_rules! impl_ {
} }
} }
} }
}
impl<T, N> Queue<T, N, $uxx, SingleCore>
where
N: ArrayLength<T>,
{
const_fn! {
/// Creates an empty queue with a fixed capacity of `N` (single core variant)
pub const unsafe fn $uxx_sc() -> Self {
Queue {
buffer: MaybeUninit::uninitialized(),
head: Atomic::new(0),
tail: Atomic::new(0),
}
}
}
}
impl<T, N, C> Queue<T, N, $uxx, C>
where
N: ArrayLength<T>,
C: sealed::XCore,
{
/// Returns the item in the front of the queue, or `None` if the queue is empty /// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&mut self) -> Option<T> { pub fn dequeue(&mut self) -> Option<T> {
let cap = self.capacity(); let cap = self.capacity();
@ -310,7 +353,7 @@ macro_rules! impl_ {
}; };
} }
impl<T, N> Queue<T, N, usize> impl<T, N> Queue<T, N, usize, MultiCore>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
{ {
@ -322,43 +365,58 @@ where
} }
} }
impl<T, N> Queue<T, N, usize, SingleCore>
where
N: ArrayLength<T>,
{
const_fn! {
/// Alias for [`spsc::Queue::usize_sc`](struct.Queue.html#method.usize_sc)
pub const unsafe fn new_sc() -> Self {
Queue::usize_sc()
}
}
}
#[cfg(feature = "smaller-atomics")] #[cfg(feature = "smaller-atomics")]
impl_!(u8); impl_!(u8, u8_sc);
#[cfg(feature = "smaller-atomics")] #[cfg(feature = "smaller-atomics")]
impl_!(u16); impl_!(u16, u16_sc);
impl_!(usize); impl_!(usize, usize_sc);
/// An iterator over the items of a queue /// An iterator over the items of a queue
pub struct Iter<'a, T, N, U> pub struct Iter<'a, T, N, U, C>
where where
N: ArrayLength<T> + 'a, N: ArrayLength<T> + 'a,
T: 'a, T: 'a,
U: 'a + sealed::Uxx, U: 'a + sealed::Uxx,
C: 'a + sealed::XCore,
{ {
rb: &'a Queue<T, N, U>, rb: &'a Queue<T, N, U, C>,
index: usize, index: usize,
len: usize, len: usize,
} }
/// A mutable iterator over the items of a queue /// A mutable iterator over the items of a queue
pub struct IterMut<'a, T, N, U> pub struct IterMut<'a, T, N, U, C>
where where
N: ArrayLength<T> + 'a, N: ArrayLength<T> + 'a,
T: 'a, T: 'a,
U: 'a + sealed::Uxx, U: 'a + sealed::Uxx,
C: 'a + sealed::XCore,
{ {
rb: &'a mut Queue<T, N, U>, rb: &'a mut Queue<T, N, U, C>,
index: usize, index: usize,
len: usize, len: usize,
} }
macro_rules! iterator { macro_rules! iterator {
(struct $name:ident -> $elem:ty, $ptr:ty, $asref:ident, $asptr:ident, $mkref:ident) => { (struct $name:ident -> $elem:ty, $ptr:ty, $asref:ident, $asptr:ident, $mkref:ident) => {
impl<'a, T, N, U> Iterator for $name<'a, T, N, U> impl<'a, T, N, U, C> Iterator for $name<'a, T, N, U, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
T: 'a, T: 'a,
U: 'a + sealed::Uxx, U: 'a + sealed::Uxx,
C: 'a + sealed::XCore,
{ {
type Item = $elem; type Item = $elem;

View File

@ -4,15 +4,16 @@ use core::ptr::{self, NonNull};
use generic_array::ArrayLength; use generic_array::ArrayLength;
use sealed; use sealed;
use spsc::Queue; use spsc::{MultiCore, Queue};
impl<T, N, U> Queue<T, N, U> impl<T, N, U, C> Queue<T, N, U, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
/// Splits a statically allocated queue into producer and consumer end points /// Splits a statically allocated queue into producer and consumer end points
pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, N, U>, Consumer<'rb, T, N, U>) { pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, N, U, C>, Consumer<'rb, T, N, U, C>) {
( (
Producer { Producer {
rb: unsafe { NonNull::new_unchecked(self) }, rb: unsafe { NonNull::new_unchecked(self) },
@ -28,30 +29,34 @@ where
/// A queue "consumer"; it can dequeue items from the queue /// A queue "consumer"; it can dequeue items from the queue
// NOTE the consumer semantically owns the `head` pointer of the queue // NOTE the consumer semantically owns the `head` pointer of the queue
pub struct Consumer<'a, T, N, U = usize> pub struct Consumer<'a, T, N, U = usize, C = MultiCore>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
rb: NonNull<Queue<T, N, U>>, rb: NonNull<Queue<T, N, U, C>>,
_marker: PhantomData<&'a ()>, _marker: PhantomData<&'a ()>,
} }
unsafe impl<'a, T, N, U> Send for Consumer<'a, T, N, U> unsafe impl<'a, T, N, U, C> Send for Consumer<'a, T, N, U, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
T: Send, T: Send,
U: sealed::Uxx, U: sealed::Uxx,
{} C: sealed::XCore,
{
}
/// A queue "producer"; it can enqueue items into the queue /// A queue "producer"; it can enqueue items into the queue
// NOTE the producer semantically owns the `tail` pointer of the queue // NOTE the producer semantically owns the `tail` pointer of the queue
pub struct Producer<'a, T, N, U = usize> pub struct Producer<'a, T, N, U = usize, C = MultiCore>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
U: sealed::Uxx, U: sealed::Uxx,
C: sealed::XCore,
{ {
rb: NonNull<Queue<T, N, U>>, rb: NonNull<Queue<T, N, U, C>>,
_marker: PhantomData<&'a ()>, _marker: PhantomData<&'a ()>,
} }
@ -60,13 +65,15 @@ where
N: ArrayLength<T>, N: ArrayLength<T>,
T: Send, T: Send,
U: sealed::Uxx, U: sealed::Uxx,
{} {
}
macro_rules! impl_ { macro_rules! impl_ {
($uxx:ident) => { ($uxx:ident) => {
impl<'a, T, N> Consumer<'a, T, N, $uxx> impl<'a, T, N, C> Consumer<'a, T, N, $uxx, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
C: sealed::XCore,
{ {
/// Returns if there are any items to dequeue. When this returns true, at least the /// Returns if there are any items to dequeue. When this returns true, at least the
/// first subsequent dequeue will succeed. /// first subsequent dequeue will succeed.
@ -111,9 +118,10 @@ macro_rules! impl_ {
} }
} }
impl<'a, T, N> Producer<'a, T, N, $uxx> impl<'a, T, N, C> Producer<'a, T, N, $uxx, C>
where where
N: ArrayLength<T>, N: ArrayLength<T>,
C: sealed::XCore,
{ {
/// Returns if there is any space to enqueue a new item. When this returns true, at /// Returns if there is any space to enqueue a new item. When this returns true, at
/// least the first subsequent enqueue will succeed. /// least the first subsequent enqueue will succeed.