RingBuffer: support smaller index types

This commit is contained in:
Jorge Aparicio 2018-04-19 21:51:39 +02:00
parent a8d404fbe8
commit 78f4f10933
2 changed files with 287 additions and 167 deletions

View File

@ -11,38 +11,88 @@ use BufferFullError;
mod spsc;
// AtomicUsize with no CAS operations that works on targets that have "no atomic support" according
// to their specification
struct AtomicUsize {
v: UnsafeCell<usize>,
/// Types that can be used as `RingBuffer` indices: `u8`, `u16` and `usize
///
/// Do not implement this trait yourself
pub unsafe trait Uxx: Into<usize> + Send {
#[doc(hidden)]
fn truncate(x: usize) -> Self;
}
impl AtomicUsize {
pub const fn new(v: usize) -> AtomicUsize {
AtomicUsize {
unsafe impl Uxx for u8 {
fn truncate(x: usize) -> Self {
let max = ::core::u8::MAX;
if x >= usize::from(max) {
max - 1
} else {
x as u8
}
}
}
unsafe impl Uxx for u16 {
fn truncate(x: usize) -> Self {
let max = ::core::u16::MAX;
if x >= usize::from(max) {
max - 1
} else {
x as u16
}
}
}
unsafe impl Uxx for usize {
fn truncate(x: usize) -> Self {
x
}
}
// Atomic{U8,U16, Usize} with no CAS operations that works on targets that have "no atomic support"
// according to their specification
struct Atomic<U>
where
U: Uxx,
{
v: UnsafeCell<U>,
}
impl<U> Atomic<U>
where
U: Uxx,
{
const fn new(v: U) -> Atomic<U> {
Atomic {
v: UnsafeCell::new(v),
}
}
pub fn get_mut(&mut self) -> &mut usize {
fn get_mut(&mut self) -> &mut U {
unsafe { &mut *self.v.get() }
}
pub fn load_acquire(&self) -> usize {
fn load_acquire(&self) -> U {
unsafe { intrinsics::atomic_load_acq(self.v.get()) }
}
pub fn load_relaxed(&self) -> usize {
fn load_relaxed(&self) -> U {
unsafe { intrinsics::atomic_load_relaxed(self.v.get()) }
}
pub fn store_release(&self, val: usize) {
fn store_release(&self, val: U) {
unsafe { intrinsics::atomic_store_rel(self.v.get(), val) }
}
}
/// An statically allocated ring buffer backed by an array `A`
///
/// By default `RingBuffer` will use `usize` integers to hold the indices to its head and tail. For
/// small ring buffers `usize` may be overkill. However, `RingBuffer`'s index type is generic and
/// can be changed to `u8` or `u16` to reduce its footprint. The easiest to construct a `RingBuffer`
/// with a smaller index type is to use the [`u8`] and [`u16`] constructors.
///
/// [`u8`]: struct.RingBuffer.html#method.u8
/// [`u16`]: struct.RingBuffer.html#method.u16
///
/// # Examples
///
/// ```
@ -99,88 +149,62 @@ impl AtomicUsize {
/// // ..
/// }
/// ```
pub struct RingBuffer<T, A>
pub struct RingBuffer<T, A, U = usize>
where
// FIXME(rust-lang/rust#44580) use "const generics" instead of `Unsize`
A: Unsize<[T]>,
U: Uxx,
{
_marker: PhantomData<[T]>,
// this is from where we dequeue items
head: AtomicUsize,
head: Atomic<U>,
// this is where we enqueue new items
tail: AtomicUsize,
tail: Atomic<U>,
buffer: UntaggedOption<A>,
}
impl<T, A> RingBuffer<T, A>
impl<T, A, U> RingBuffer<T, A, U>
where
A: Unsize<[T]>,
U: Uxx,
{
/// Creates an empty ring buffer with capacity equals to the length of the array `A` *minus
/// one*.
pub const fn new() -> Self {
RingBuffer {
_marker: PhantomData,
buffer: UntaggedOption::none(),
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
}
}
/// Returns the maximum number of elements the ring buffer can hold
pub fn capacity(&self) -> usize {
pub fn capacity(&self) -> U {
let buffer: &[T] = unsafe { self.buffer.as_ref() };
buffer.len() - 1
U::truncate(buffer.len() - 1)
}
/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&mut self) -> Option<T> {
let n = self.capacity() + 1;
/// Returns `true` if the ring buffer has a length of 0
pub fn is_empty(&self) -> bool {
self.len_usize() == 0
}
let head = self.head.get_mut();
let tail = self.tail.get_mut();
let buffer: &[T] = unsafe { self.buffer.as_ref() };
if *head != *tail {
let item = unsafe { ptr::read(buffer.get_unchecked(*head)) };
*head = (*head + 1) % n;
Some(item)
} else {
None
/// Iterates from the front of the queue to the back
pub fn iter(&self) -> Iter<T, A, U> {
Iter {
rb: self,
index: 0,
len: self.len_usize(),
}
}
/// Adds an `item` to the end of the queue
///
/// Returns `BufferFullError` if the queue is full
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
let n = self.capacity() + 1;
let head = self.head.get_mut();
let tail = self.tail.get_mut();
let buffer: &mut [T] = unsafe { self.buffer.as_mut() };
let next_tail = (*tail + 1) % n;
if next_tail != *head {
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
unsafe { ptr::write(buffer.get_unchecked_mut(*tail), item) }
*tail = next_tail;
Ok(())
} else {
Err(BufferFullError)
/// Returns an iterator that allows modifying each value.
pub fn iter_mut(&mut self) -> IterMut<T, A, U> {
let len = self.len_usize();
IterMut {
rb: self,
index: 0,
len,
}
}
/// Returns the number of elements in the queue
pub fn len(&self) -> usize {
let head = self.head.load_relaxed();
let tail = self.tail.load_relaxed();
fn len_usize(&self) -> usize {
let head = self.head.load_relaxed().into();
let tail = self.tail.load_relaxed().into();
if head > tail {
head - tail
@ -188,35 +212,12 @@ where
tail - head
}
}
/// Returns `true` if the ring buffer has a length of 0
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Iterates from the front of the queue to the back
pub fn iter(&self) -> Iter<T, A> {
Iter {
rb: self,
index: 0,
len: self.len(),
}
}
/// Returns an iterator that allows modifying each value.
pub fn iter_mut(&mut self) -> IterMut<T, A> {
let len = self.len();
IterMut {
rb: self,
index: 0,
len,
}
}
}
impl<T, A> Drop for RingBuffer<T, A>
impl<T, A, U> Drop for RingBuffer<T, A, U>
where
A: Unsize<[T]>,
U: Uxx,
{
fn drop(&mut self) {
for item in self {
@ -227,66 +228,159 @@ where
}
}
impl<'a, T, A> IntoIterator for &'a RingBuffer<T, A>
impl<'a, T, A, U> IntoIterator for &'a RingBuffer<T, A, U>
where
A: Unsize<[T]>,
U: Uxx,
{
type Item = &'a T;
type IntoIter = Iter<'a, T, A>;
type IntoIter = Iter<'a, T, A, U>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'a, T, A> IntoIterator for &'a mut RingBuffer<T, A>
impl<'a, T, A, U> IntoIterator for &'a mut RingBuffer<T, A, U>
where
A: Unsize<[T]>,
U: Uxx,
{
type Item = &'a mut T;
type IntoIter = IterMut<'a, T, A>;
type IntoIter = IterMut<'a, T, A, U>;
fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}
macro_rules! impl_ {
($uxx:ident) => {
impl<T, A> RingBuffer<T, A, $uxx>
where
A: Unsize<[T]>,
{
/// Creates an empty ring buffer with capacity equals to the length of the array `A`
/// *minus one*.
pub const fn $uxx() -> Self {
RingBuffer {
_marker: PhantomData,
buffer: UntaggedOption::none(),
head: Atomic::new(0),
tail: Atomic::new(0),
}
}
/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&mut self) -> Option<T> {
let n = self.capacity() + 1;
let head = self.head.get_mut();
let tail = self.tail.get_mut();
let buffer: &[T] = unsafe { self.buffer.as_ref() };
if *head != *tail {
let item = unsafe { ptr::read(buffer.get_unchecked(usize::from(*head))) };
*head = (*head + 1) % n;
Some(item)
} else {
None
}
}
/// Adds an `item` to the end of the queue
///
/// Returns `BufferFullError` if the queue is full
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
let n = self.capacity() + 1;
let head = self.head.get_mut();
let tail = self.tail.get_mut();
let buffer: &mut [T] = unsafe { self.buffer.as_mut() };
let next_tail = (*tail + 1) % n;
if next_tail != *head {
// NOTE(ptr::write) the memory slot that we are about to write to is
// uninitialized. We use `ptr::write` to avoid running `T`'s destructor on the
// uninitialized memory
unsafe { ptr::write(buffer.get_unchecked_mut(usize::from(*tail)), item) }
*tail = next_tail;
Ok(())
} else {
Err(BufferFullError)
}
}
/// Returns the number of elements in the queue
pub fn len(&self) -> $uxx {
let head = self.head.load_relaxed();
let tail = self.tail.load_relaxed();
if head > tail {
head - tail
} else {
tail - head
}
}
}
}
}
impl<T, A> RingBuffer<T, A, usize>
where
A: Unsize<[T]>,
{
/// Alias for [`RingBuffer::usize`](struct.RingBuffer.html#method.usize)
pub const fn new() -> Self {
RingBuffer::usize()
}
}
impl_!(u8);
impl_!(u16);
impl_!(usize);
/// An iterator over a ring buffer items
pub struct Iter<'a, T, A>
pub struct Iter<'a, T, A, U>
where
A: Unsize<[T]> + 'a,
T: 'a,
U: 'a + Uxx,
{
rb: &'a RingBuffer<T, A>,
rb: &'a RingBuffer<T, A, U>,
index: usize,
len: usize,
}
/// A mutable iterator over a ring buffer items
pub struct IterMut<'a, T, A>
pub struct IterMut<'a, T, A, U>
where
A: Unsize<[T]> + 'a,
T: 'a,
U: 'a + Uxx,
{
rb: &'a mut RingBuffer<T, A>,
rb: &'a mut RingBuffer<T, A, U>,
index: usize,
len: usize,
}
macro_rules! iterator {
(struct $name:ident -> $elem:ty, $buffer:ty, $ptr:ty, $asref:ident, $asptr:ident, $mkref:ident)=> {
impl<'a, T, A> Iterator for $name<'a, T, A>
where
impl<'a, T, A, U> Iterator for $name<'a, T, A, U>
where
A: Unsize<[T]> + 'a,
T: 'a,
U: 'a + Uxx,
{
type Item = $elem;
fn next(&mut self) -> Option<$elem> {
if self.index < self.len {
let head = self.rb.head.load_relaxed();
let head = self.rb.head.load_relaxed().into();
let capacity = self.rb.capacity() + 1;
let capacity = self.rb.capacity().into() + 1;
let buffer: $buffer = unsafe { self.rb.buffer.$asref() };
let ptr: $ptr = buffer.$asptr();
let i = (head + self.index) % capacity;
@ -410,6 +504,17 @@ mod tests {
assert_eq!(rb.dequeue(), None);
}
#[test]
fn u8() {
let mut rb: RingBuffer<u8, [u8; 256], _> = RingBuffer::u8();
for _ in 0..254 {
rb.enqueue(0).unwrap();
}
assert!(rb.enqueue(0).is_err());
}
#[test]
fn wrap_around() {
let mut rb: RingBuffer<i32, [i32; 4]> = RingBuffer::new();

View File

@ -1,15 +1,16 @@
use core::marker::{PhantomData, Unsize};
use core::ptr::{self, NonNull};
use ring_buffer::RingBuffer;
use ring_buffer::{RingBuffer, Uxx};
use BufferFullError;
impl<T, A> RingBuffer<T, A>
impl<T, A, U> RingBuffer<T, A, U>
where
A: Unsize<[T]>,
U: Uxx,
{
/// Splits a statically allocated ring buffer into producer and consumer end points
pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, A>, Consumer<'rb, T, A>) {
pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, A, U>, Consumer<'rb, T, A, U>) {
(
Producer {
rb: unsafe { NonNull::new_unchecked(self) },
@ -25,95 +26,109 @@ where
/// A ring buffer "consumer"; it can dequeue items from the ring buffer
// NOTE the consumer semantically owns the `head` pointer of the ring buffer
pub struct Consumer<'a, T, A>
pub struct Consumer<'a, T, A, U = usize>
where
A: Unsize<[T]>,
U: Uxx,
{
// XXX do we need to use `NonNull` (for soundness) here?
rb: NonNull<RingBuffer<T, A>>,
rb: NonNull<RingBuffer<T, A, U>>,
_marker: PhantomData<&'a ()>,
}
impl<'a, T, A> Consumer<'a, T, A>
where
A: Unsize<[T]>,
{
/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&mut self) -> Option<T> {
let rb = unsafe { self.rb.as_ref() };
let n = rb.capacity() + 1;
let buffer: &[T] = unsafe { rb.buffer.as_ref() };
let tail = rb.tail.load_acquire();
let head = rb.head.load_relaxed();
if head != tail {
let item = unsafe { ptr::read(buffer.get_unchecked(head)) };
rb.head.store_release((head + 1) % n);
Some(item)
} else {
None
}
}
}
unsafe impl<'a, T, A> Send for Consumer<'a, T, A>
unsafe impl<'a, T, A, U> Send for Consumer<'a, T, A, U>
where
A: Unsize<[T]>,
T: Send,
U: Uxx,
{
}
/// A ring buffer "producer"; it can enqueue items into the ring buffer
// NOTE the producer semantically owns the `tail` pointer of the ring buffer
pub struct Producer<'a, T, A>
pub struct Producer<'a, T, A, U = usize>
where
A: Unsize<[T]>,
U: Uxx,
{
// XXX do we need to use `NonNull` (for soundness) here?
rb: NonNull<RingBuffer<T, A>>,
rb: NonNull<RingBuffer<T, A, U>>,
_marker: PhantomData<&'a ()>,
}
impl<'a, T, A> Producer<'a, T, A>
where
A: Unsize<[T]>,
{
/// Adds an `item` to the end of the queue
///
/// Returns `BufferFullError` if the queue is full
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
let rb = unsafe { self.rb.as_mut() };
let n = rb.capacity() + 1;
let buffer: &mut [T] = unsafe { rb.buffer.as_mut() };
let tail = rb.tail.load_relaxed();
// NOTE we could replace this `load_acquire` with a `load_relaxed` and this method would be
// sound on most architectures but that change would result in UB according to the C++
// memory model, which is what Rust currently uses, so we err on the side of caution and
// stick to `load_acquire`. Check issue google#sanitizers#882 for more details.
let head = rb.head.load_acquire();
let next_tail = (tail + 1) % n;
if next_tail != head {
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) }
rb.tail.store_release(next_tail);
Ok(())
} else {
Err(BufferFullError)
}
}
}
unsafe impl<'a, T, A> Send for Producer<'a, T, A>
unsafe impl<'a, T, A, U> Send for Producer<'a, T, A, U>
where
A: Unsize<[T]>,
T: Send,
U: Uxx,
{
}
macro_rules! impl_ {
($uxx:ident) => {
impl<'a, T, A> Consumer<'a, T, A, $uxx>
where
A: Unsize<[T]>,
{
/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&mut self) -> Option<T> {
let rb = unsafe { self.rb.as_ref() };
let n = rb.capacity() + 1;
let buffer: &[T] = unsafe { rb.buffer.as_ref() };
let tail = rb.tail.load_acquire();
let head = rb.head.load_relaxed();
if head != tail {
let item = unsafe { ptr::read(buffer.get_unchecked(usize::from(head))) };
rb.head.store_release((head + 1) % n);
Some(item)
} else {
None
}
}
}
impl<'a, T, A> Producer<'a, T, A, $uxx>
where
A: Unsize<[T]>,
{
/// Adds an `item` to the end of the queue
///
/// Returns `BufferFullError` if the queue is full
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
let rb = unsafe { self.rb.as_mut() };
let n = rb.capacity() + 1;
let buffer: &mut [T] = unsafe { rb.buffer.as_mut() };
let tail = rb.tail.load_relaxed();
// NOTE we could replace this `load_acquire` with a `load_relaxed` and this method
// would be sound on most architectures but that change would result in UB according
// to the C++ memory model, which is what Rust currently uses, so we err on the side
// of caution and stick to `load_acquire`. Check issue google#sanitizers#882 for
// more details.
let head = rb.head.load_acquire();
let next_tail = (tail + 1) % n;
if next_tail != head {
// NOTE(ptr::write) the memory slot that we are about to write to is
// uninitialized. We use `ptr::write` to avoid running `T`'s destructor on the
// uninitialized memory
unsafe { ptr::write(buffer.get_unchecked_mut(usize::from(tail)), item) }
rb.tail.store_release(next_tail);
Ok(())
} else {
Err(BufferFullError)
}
}
}
};
}
impl_!(u8);
impl_!(u16);
impl_!(usize);
#[cfg(test)]
mod tests {
use RingBuffer;