From 78f4f109333c1f90f3f08a56c8663e5d5e3d877b Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Thu, 19 Apr 2018 21:51:39 +0200 Subject: [PATCH] RingBuffer: support smaller index types --- src/ring_buffer/mod.rs | 311 +++++++++++++++++++++++++++------------- src/ring_buffer/spsc.rs | 143 +++++++++--------- 2 files changed, 287 insertions(+), 167 deletions(-) diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index fb9fd5e1..46016e5e 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -11,38 +11,88 @@ use BufferFullError; mod spsc; -// AtomicUsize with no CAS operations that works on targets that have "no atomic support" according -// to their specification -struct AtomicUsize { - v: UnsafeCell, +/// Types that can be used as `RingBuffer` indices: `u8`, `u16` and `usize +/// +/// Do not implement this trait yourself +pub unsafe trait Uxx: Into + Send { + #[doc(hidden)] + fn truncate(x: usize) -> Self; } -impl AtomicUsize { - pub const fn new(v: usize) -> AtomicUsize { - AtomicUsize { +unsafe impl Uxx for u8 { + fn truncate(x: usize) -> Self { + let max = ::core::u8::MAX; + if x >= usize::from(max) { + max - 1 + } else { + x as u8 + } + } +} + +unsafe impl Uxx for u16 { + fn truncate(x: usize) -> Self { + let max = ::core::u16::MAX; + if x >= usize::from(max) { + max - 1 + } else { + x as u16 + } + } +} + +unsafe impl Uxx for usize { + fn truncate(x: usize) -> Self { + x + } +} + +// Atomic{U8,U16, Usize} with no CAS operations that works on targets that have "no atomic support" +// according to their specification +struct Atomic +where + U: Uxx, +{ + v: UnsafeCell, +} + +impl Atomic +where + U: Uxx, +{ + const fn new(v: U) -> Atomic { + Atomic { v: UnsafeCell::new(v), } } - pub fn get_mut(&mut self) -> &mut usize { + fn get_mut(&mut self) -> &mut U { unsafe { &mut *self.v.get() } } - pub fn load_acquire(&self) -> usize { + fn load_acquire(&self) -> U { unsafe { intrinsics::atomic_load_acq(self.v.get()) } } - pub fn load_relaxed(&self) -> usize { + fn load_relaxed(&self) -> U { unsafe { intrinsics::atomic_load_relaxed(self.v.get()) } } - pub fn store_release(&self, val: usize) { + fn store_release(&self, val: U) { unsafe { intrinsics::atomic_store_rel(self.v.get(), val) } } } /// An statically allocated ring buffer backed by an array `A` /// +/// By default `RingBuffer` will use `usize` integers to hold the indices to its head and tail. For +/// small ring buffers `usize` may be overkill. However, `RingBuffer`'s index type is generic and +/// can be changed to `u8` or `u16` to reduce its footprint. The easiest to construct a `RingBuffer` +/// with a smaller index type is to use the [`u8`] and [`u16`] constructors. +/// +/// [`u8`]: struct.RingBuffer.html#method.u8 +/// [`u16`]: struct.RingBuffer.html#method.u16 +/// /// # Examples /// /// ``` @@ -99,88 +149,62 @@ impl AtomicUsize { /// // .. /// } /// ``` -pub struct RingBuffer +pub struct RingBuffer where // FIXME(rust-lang/rust#44580) use "const generics" instead of `Unsize` A: Unsize<[T]>, + U: Uxx, { _marker: PhantomData<[T]>, // this is from where we dequeue items - head: AtomicUsize, + head: Atomic, // this is where we enqueue new items - tail: AtomicUsize, + tail: Atomic, buffer: UntaggedOption, } -impl RingBuffer +impl RingBuffer where A: Unsize<[T]>, + U: Uxx, { - /// Creates an empty ring buffer with capacity equals to the length of the array `A` *minus - /// one*. - pub const fn new() -> Self { - RingBuffer { - _marker: PhantomData, - buffer: UntaggedOption::none(), - head: AtomicUsize::new(0), - tail: AtomicUsize::new(0), - } - } - /// Returns the maximum number of elements the ring buffer can hold - pub fn capacity(&self) -> usize { + pub fn capacity(&self) -> U { let buffer: &[T] = unsafe { self.buffer.as_ref() }; - buffer.len() - 1 + + U::truncate(buffer.len() - 1) } - /// Returns the item in the front of the queue, or `None` if the queue is empty - pub fn dequeue(&mut self) -> Option { - let n = self.capacity() + 1; + /// Returns `true` if the ring buffer has a length of 0 + pub fn is_empty(&self) -> bool { + self.len_usize() == 0 + } - let head = self.head.get_mut(); - let tail = self.tail.get_mut(); - - let buffer: &[T] = unsafe { self.buffer.as_ref() }; - - if *head != *tail { - let item = unsafe { ptr::read(buffer.get_unchecked(*head)) }; - *head = (*head + 1) % n; - Some(item) - } else { - None + /// Iterates from the front of the queue to the back + pub fn iter(&self) -> Iter { + Iter { + rb: self, + index: 0, + len: self.len_usize(), } } - /// Adds an `item` to the end of the queue - /// - /// Returns `BufferFullError` if the queue is full - pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { - let n = self.capacity() + 1; - - let head = self.head.get_mut(); - let tail = self.tail.get_mut(); - - let buffer: &mut [T] = unsafe { self.buffer.as_mut() }; - - let next_tail = (*tail + 1) % n; - if next_tail != *head { - // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We - // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory - unsafe { ptr::write(buffer.get_unchecked_mut(*tail), item) } - *tail = next_tail; - Ok(()) - } else { - Err(BufferFullError) + /// Returns an iterator that allows modifying each value. + pub fn iter_mut(&mut self) -> IterMut { + let len = self.len_usize(); + IterMut { + rb: self, + index: 0, + len, } } - /// Returns the number of elements in the queue - pub fn len(&self) -> usize { - let head = self.head.load_relaxed(); - let tail = self.tail.load_relaxed(); + fn len_usize(&self) -> usize { + let head = self.head.load_relaxed().into(); + let tail = self.tail.load_relaxed().into(); if head > tail { head - tail @@ -188,35 +212,12 @@ where tail - head } } - - /// Returns `true` if the ring buffer has a length of 0 - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Iterates from the front of the queue to the back - pub fn iter(&self) -> Iter { - Iter { - rb: self, - index: 0, - len: self.len(), - } - } - - /// Returns an iterator that allows modifying each value. - pub fn iter_mut(&mut self) -> IterMut { - let len = self.len(); - IterMut { - rb: self, - index: 0, - len, - } - } } -impl Drop for RingBuffer +impl Drop for RingBuffer where A: Unsize<[T]>, + U: Uxx, { fn drop(&mut self) { for item in self { @@ -227,66 +228,159 @@ where } } -impl<'a, T, A> IntoIterator for &'a RingBuffer +impl<'a, T, A, U> IntoIterator for &'a RingBuffer where A: Unsize<[T]>, + U: Uxx, { type Item = &'a T; - type IntoIter = Iter<'a, T, A>; + type IntoIter = Iter<'a, T, A, U>; fn into_iter(self) -> Self::IntoIter { self.iter() } } -impl<'a, T, A> IntoIterator for &'a mut RingBuffer +impl<'a, T, A, U> IntoIterator for &'a mut RingBuffer where A: Unsize<[T]>, + U: Uxx, { type Item = &'a mut T; - type IntoIter = IterMut<'a, T, A>; + type IntoIter = IterMut<'a, T, A, U>; fn into_iter(self) -> Self::IntoIter { self.iter_mut() } } +macro_rules! impl_ { + ($uxx:ident) => { + impl RingBuffer + where + A: Unsize<[T]>, + { + /// Creates an empty ring buffer with capacity equals to the length of the array `A` + /// *minus one*. + pub const fn $uxx() -> Self { + RingBuffer { + _marker: PhantomData, + buffer: UntaggedOption::none(), + head: Atomic::new(0), + tail: Atomic::new(0), + } + } + + /// Returns the item in the front of the queue, or `None` if the queue is empty + pub fn dequeue(&mut self) -> Option { + let n = self.capacity() + 1; + + let head = self.head.get_mut(); + let tail = self.tail.get_mut(); + + let buffer: &[T] = unsafe { self.buffer.as_ref() }; + + if *head != *tail { + let item = unsafe { ptr::read(buffer.get_unchecked(usize::from(*head))) }; + *head = (*head + 1) % n; + Some(item) + } else { + None + } + } + + /// Adds an `item` to the end of the queue + /// + /// Returns `BufferFullError` if the queue is full + pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { + let n = self.capacity() + 1; + + let head = self.head.get_mut(); + let tail = self.tail.get_mut(); + + let buffer: &mut [T] = unsafe { self.buffer.as_mut() }; + + let next_tail = (*tail + 1) % n; + if next_tail != *head { + // NOTE(ptr::write) the memory slot that we are about to write to is + // uninitialized. We use `ptr::write` to avoid running `T`'s destructor on the + // uninitialized memory + unsafe { ptr::write(buffer.get_unchecked_mut(usize::from(*tail)), item) } + *tail = next_tail; + Ok(()) + } else { + Err(BufferFullError) + } + } + + /// Returns the number of elements in the queue + pub fn len(&self) -> $uxx { + let head = self.head.load_relaxed(); + let tail = self.tail.load_relaxed(); + + if head > tail { + head - tail + } else { + tail - head + } + } + } + } +} + +impl RingBuffer +where + A: Unsize<[T]>, +{ + /// Alias for [`RingBuffer::usize`](struct.RingBuffer.html#method.usize) + pub const fn new() -> Self { + RingBuffer::usize() + } +} + +impl_!(u8); +impl_!(u16); +impl_!(usize); + /// An iterator over a ring buffer items -pub struct Iter<'a, T, A> +pub struct Iter<'a, T, A, U> where A: Unsize<[T]> + 'a, T: 'a, + U: 'a + Uxx, { - rb: &'a RingBuffer, + rb: &'a RingBuffer, index: usize, len: usize, } /// A mutable iterator over a ring buffer items -pub struct IterMut<'a, T, A> +pub struct IterMut<'a, T, A, U> where A: Unsize<[T]> + 'a, T: 'a, + U: 'a + Uxx, { - rb: &'a mut RingBuffer, + rb: &'a mut RingBuffer, index: usize, len: usize, } macro_rules! iterator { (struct $name:ident -> $elem:ty, $buffer:ty, $ptr:ty, $asref:ident, $asptr:ident, $mkref:ident)=> { - impl<'a, T, A> Iterator for $name<'a, T, A> - where + impl<'a, T, A, U> Iterator for $name<'a, T, A, U> + where A: Unsize<[T]> + 'a, T: 'a, + U: 'a + Uxx, { type Item = $elem; fn next(&mut self) -> Option<$elem> { if self.index < self.len { - let head = self.rb.head.load_relaxed(); + let head = self.rb.head.load_relaxed().into(); - let capacity = self.rb.capacity() + 1; + let capacity = self.rb.capacity().into() + 1; let buffer: $buffer = unsafe { self.rb.buffer.$asref() }; let ptr: $ptr = buffer.$asptr(); let i = (head + self.index) % capacity; @@ -410,6 +504,17 @@ mod tests { assert_eq!(rb.dequeue(), None); } + #[test] + fn u8() { + let mut rb: RingBuffer = RingBuffer::u8(); + + for _ in 0..254 { + rb.enqueue(0).unwrap(); + } + + assert!(rb.enqueue(0).is_err()); + } + #[test] fn wrap_around() { let mut rb: RingBuffer = RingBuffer::new(); diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 7f8800d9..98f18b01 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -1,15 +1,16 @@ use core::marker::{PhantomData, Unsize}; use core::ptr::{self, NonNull}; -use ring_buffer::RingBuffer; +use ring_buffer::{RingBuffer, Uxx}; use BufferFullError; -impl RingBuffer +impl RingBuffer where A: Unsize<[T]>, + U: Uxx, { /// Splits a statically allocated ring buffer into producer and consumer end points - pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, A>, Consumer<'rb, T, A>) { + pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, A, U>, Consumer<'rb, T, A, U>) { ( Producer { rb: unsafe { NonNull::new_unchecked(self) }, @@ -25,95 +26,109 @@ where /// A ring buffer "consumer"; it can dequeue items from the ring buffer // NOTE the consumer semantically owns the `head` pointer of the ring buffer -pub struct Consumer<'a, T, A> +pub struct Consumer<'a, T, A, U = usize> where A: Unsize<[T]>, + U: Uxx, { // XXX do we need to use `NonNull` (for soundness) here? - rb: NonNull>, + rb: NonNull>, _marker: PhantomData<&'a ()>, } -impl<'a, T, A> Consumer<'a, T, A> -where - A: Unsize<[T]>, -{ - /// Returns the item in the front of the queue, or `None` if the queue is empty - pub fn dequeue(&mut self) -> Option { - let rb = unsafe { self.rb.as_ref() }; - - let n = rb.capacity() + 1; - let buffer: &[T] = unsafe { rb.buffer.as_ref() }; - - let tail = rb.tail.load_acquire(); - let head = rb.head.load_relaxed(); - if head != tail { - let item = unsafe { ptr::read(buffer.get_unchecked(head)) }; - rb.head.store_release((head + 1) % n); - Some(item) - } else { - None - } - } -} - -unsafe impl<'a, T, A> Send for Consumer<'a, T, A> +unsafe impl<'a, T, A, U> Send for Consumer<'a, T, A, U> where A: Unsize<[T]>, T: Send, + U: Uxx, { } /// A ring buffer "producer"; it can enqueue items into the ring buffer // NOTE the producer semantically owns the `tail` pointer of the ring buffer -pub struct Producer<'a, T, A> +pub struct Producer<'a, T, A, U = usize> where A: Unsize<[T]>, + U: Uxx, { // XXX do we need to use `NonNull` (for soundness) here? - rb: NonNull>, + rb: NonNull>, _marker: PhantomData<&'a ()>, } -impl<'a, T, A> Producer<'a, T, A> -where - A: Unsize<[T]>, -{ - /// Adds an `item` to the end of the queue - /// - /// Returns `BufferFullError` if the queue is full - pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { - let rb = unsafe { self.rb.as_mut() }; - - let n = rb.capacity() + 1; - let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; - - let tail = rb.tail.load_relaxed(); - // NOTE we could replace this `load_acquire` with a `load_relaxed` and this method would be - // sound on most architectures but that change would result in UB according to the C++ - // memory model, which is what Rust currently uses, so we err on the side of caution and - // stick to `load_acquire`. Check issue google#sanitizers#882 for more details. - let head = rb.head.load_acquire(); - let next_tail = (tail + 1) % n; - if next_tail != head { - // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We - // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory - unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) } - rb.tail.store_release(next_tail); - Ok(()) - } else { - Err(BufferFullError) - } - } -} - -unsafe impl<'a, T, A> Send for Producer<'a, T, A> +unsafe impl<'a, T, A, U> Send for Producer<'a, T, A, U> where A: Unsize<[T]>, T: Send, + U: Uxx, { } +macro_rules! impl_ { + ($uxx:ident) => { + impl<'a, T, A> Consumer<'a, T, A, $uxx> + where + A: Unsize<[T]>, + { + /// Returns the item in the front of the queue, or `None` if the queue is empty + pub fn dequeue(&mut self) -> Option { + let rb = unsafe { self.rb.as_ref() }; + + let n = rb.capacity() + 1; + let buffer: &[T] = unsafe { rb.buffer.as_ref() }; + + let tail = rb.tail.load_acquire(); + let head = rb.head.load_relaxed(); + if head != tail { + let item = unsafe { ptr::read(buffer.get_unchecked(usize::from(head))) }; + rb.head.store_release((head + 1) % n); + Some(item) + } else { + None + } + } + } + + impl<'a, T, A> Producer<'a, T, A, $uxx> + where + A: Unsize<[T]>, + { + /// Adds an `item` to the end of the queue + /// + /// Returns `BufferFullError` if the queue is full + pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { + let rb = unsafe { self.rb.as_mut() }; + + let n = rb.capacity() + 1; + let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; + + let tail = rb.tail.load_relaxed(); + // NOTE we could replace this `load_acquire` with a `load_relaxed` and this method + // would be sound on most architectures but that change would result in UB according + // to the C++ memory model, which is what Rust currently uses, so we err on the side + // of caution and stick to `load_acquire`. Check issue google#sanitizers#882 for + // more details. + let head = rb.head.load_acquire(); + let next_tail = (tail + 1) % n; + if next_tail != head { + // NOTE(ptr::write) the memory slot that we are about to write to is + // uninitialized. We use `ptr::write` to avoid running `T`'s destructor on the + // uninitialized memory + unsafe { ptr::write(buffer.get_unchecked_mut(usize::from(tail)), item) } + rb.tail.store_release(next_tail); + Ok(()) + } else { + Err(BufferFullError) + } + } + } + }; +} + +impl_!(u8); +impl_!(u16); +impl_!(usize); + #[cfg(test)] mod tests { use RingBuffer;