From 55f891e64f9a71a91843d1d3c8c63be95412e92f Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Tue, 31 Oct 2017 21:26:02 +0100 Subject: [PATCH 01/12] use atomics where available cc #5 --- src/lib.rs | 2 + src/ring_buffer/mod.rs | 84 +++++++++++++++++++++++++++++++++-------- src/ring_buffer/spsc.rs | 59 ++++++++++++++++++++++++++++- 3 files changed, 128 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c6228e53..d79b4994 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,7 +144,9 @@ //! is_send::>(); //! ``` +#![cfg_attr(target_has_atomic = "ptr", feature(const_atomic_usize_new))] #![deny(missing_docs)] +#![feature(cfg_target_has_atomic)] #![feature(const_fn)] #![feature(shared)] #![feature(unsize)] diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 2f9cb621..8311a555 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -2,6 +2,8 @@ use core::marker::{PhantomData, Unsize}; use core::ptr; +#[cfg(target_has_atomic = "ptr")] +use core::sync::atomic::{AtomicUsize, Ordering}; use untagged_option::UntaggedOption; @@ -18,11 +20,15 @@ where A: Unsize<[T]>, { _marker: PhantomData<[T]>, - buffer: UntaggedOption, + // this is from where we dequeue items - head: usize, + #[cfg(target_has_atomic = "ptr")] head: AtomicUsize, + #[cfg(not(target_has_atomic = "ptr"))] head: usize, + // this is where we enqueue new items - tail: usize, + #[cfg(target_has_atomic = "ptr")] tail: AtomicUsize, + #[cfg(not(target_has_atomic = "ptr"))] tail: usize, + buffer: UntaggedOption, } impl RingBuffer @@ -35,7 +41,13 @@ where RingBuffer { _marker: PhantomData, buffer: UntaggedOption::none(), + #[cfg(target_has_atomic = "ptr")] + head: AtomicUsize::new(0), + #[cfg(not(target_has_atomic = "ptr"))] head: 0, + #[cfg(target_has_atomic = "ptr")] + tail: AtomicUsize::new(0), + #[cfg(not(target_has_atomic = "ptr"))] tail: 0, } } @@ -49,11 +61,22 @@ where /// Returns the item in the front of the queue, or `None` if the queue is empty pub fn dequeue(&mut self) -> Option { let n = self.capacity() + 1; + + #[cfg(target_has_atomic = "ptr")] + let head = self.head.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let head = &mut self.head; + + #[cfg(target_has_atomic = "ptr")] + let tail = self.tail.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let tail = &mut self.tail; + let buffer: &[T] = unsafe { self.buffer.as_ref() }; - if self.head != self.tail { - let item = unsafe { ptr::read(buffer.get_unchecked(self.head)) }; - self.head = (self.head + 1) % n; + if *head != *tail { + let item = unsafe { ptr::read(buffer.get_unchecked(*head)) }; + *head = (*head + 1) % n; Some(item) } else { None @@ -65,14 +88,25 @@ where /// Returns `BufferFullError` if the queue is full pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let n = self.capacity() + 1; + + #[cfg(target_has_atomic = "ptr")] + let head = self.head.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let head = &mut self.head; + + #[cfg(target_has_atomic = "ptr")] + let tail = self.tail.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let tail = &mut self.tail; + let buffer: &mut [T] = unsafe { self.buffer.as_mut() }; - let next_tail = (self.tail + 1) % n; - if next_tail != self.head { + let next_tail = (*tail + 1) % n; + if next_tail != *head { // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory - unsafe { ptr::write(buffer.get_unchecked_mut(self.tail), item) } - self.tail = next_tail; + unsafe { ptr::write(buffer.get_unchecked_mut(*tail), item) } + *tail = next_tail; Ok(()) } else { Err(BufferFullError) @@ -81,10 +115,20 @@ where /// Returns the number of elements in the queue pub fn len(&self) -> usize { - if self.head > self.tail { - self.head - self.tail + #[cfg(target_has_atomic = "ptr")] + let head = self.head.load(Ordering::Relaxed); + #[cfg(not(target_has_atomic = "ptr"))] + let head = self.head; + + #[cfg(target_has_atomic = "ptr")] + let tail = self.tail.load(Ordering::Relaxed); + #[cfg(not(target_has_atomic = "ptr"))] + let tail = self.tail; + + if head > tail { + head - tail } else { - self.tail - self.head + tail - head } } @@ -176,9 +220,14 @@ where fn next(&mut self) -> Option<&'a T> { if self.index < self.len { + #[cfg(not(target_has_atomic = "ptr"))] + let head = self.rb.head; + #[cfg(target_has_atomic = "ptr")] + let head = self.rb.head.load(Ordering::Relaxed); + let buffer: &[T] = unsafe { self.rb.buffer.as_ref() }; let ptr = buffer.as_ptr(); - let i = (self.rb.head + self.index) % (self.rb.capacity() + 1); + let i = (head + self.index) % (self.rb.capacity() + 1); self.index += 1; Some(unsafe { &*ptr.offset(i as isize) }) } else { @@ -196,10 +245,15 @@ where fn next(&mut self) -> Option<&'a mut T> { if self.index < self.len { + #[cfg(not(target_has_atomic = "ptr"))] + let head = self.rb.head; + #[cfg(target_has_atomic = "ptr")] + let head = self.rb.head.load(Ordering::Relaxed); + let capacity = self.rb.capacity() + 1; let buffer: &mut [T] = unsafe { self.rb.buffer.as_mut() }; let ptr: *mut T = buffer.as_mut_ptr(); - let i = (self.rb.head + self.index) % capacity; + let i = (head + self.index) % capacity; self.index += 1; Some(unsafe { &mut *ptr.offset(i as isize) }) } else { diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 5c6fa0ed..b31ef0fb 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -1,5 +1,7 @@ use core::ptr::{self, Shared}; use core::marker::Unsize; +#[cfg(target_has_atomic = "ptr")] +use core::sync::atomic::Ordering; use BufferFullError; use ring_buffer::RingBuffer; @@ -10,8 +12,11 @@ where { /// Splits a statically allocated ring buffer into producer and consumer end points /// - /// *Warning* the current implementation only supports single core processors. It's also fine to - /// use both end points on the same core of a multi-core processor. + /// **Warning** the current single producer single consumer implementation only supports + /// multi-core systems where `cfg(target_has_atomic = "ptr")` holds for all the cores. For + /// example, a dual core system where one core is Cortex-M0 core and the other is Cortex-M3 core + /// is not supported because Cortex-M0 (`thumbv6m-none-eabi`) doesn't satisfy + /// `cfg(target_has_atomic = "ptr")`. All single core systems are supported. pub fn split(&'static mut self) -> (Producer, Consumer) { ( Producer { @@ -39,8 +44,30 @@ where A: Unsize<[T]>, { /// Returns the item in the front of the queue, or `None` if the queue is empty + #[cfg(target_has_atomic = "ptr")] + pub fn dequeue(&mut self) -> Option { + let rb = unsafe { self.rb.as_ref() }; + + let tail = rb.tail.load(Ordering::Relaxed); + let head = rb.head.load(Ordering::Acquire); + + let n = rb.capacity() + 1; + let buffer: &[T] = unsafe { rb.buffer.as_ref() }; + + if head != tail { + let item = unsafe { ptr::read(buffer.get_unchecked(head)) }; + rb.head.store((head + 1) % n, Ordering::Release); + Some(item) + } else { + None + } + } + + /// Returns the item in the front of the queue, or `None` if the queue is empty + #[cfg(not(target_has_atomic = "ptr"))] pub fn dequeue(&mut self) -> Option { let rb = unsafe { self.rb.as_mut() }; + let n = rb.capacity() + 1; let buffer: &[T] = unsafe { rb.buffer.as_ref() }; @@ -80,8 +107,36 @@ where /// Adds an `item` to the end of the queue /// /// Returns `BufferFullError` if the queue is full + #[cfg(target_has_atomic = "ptr")] pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let rb = unsafe { self.rb.as_mut() }; + + let head = rb.head.load(Ordering::Relaxed); + let tail = rb.tail.load(Ordering::Acquire); + + let n = rb.capacity() + 1; + let next_tail = (tail + 1) % n; + + let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; + + if next_tail != head { + // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We + // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory + unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) } + rb.tail.store(next_tail, Ordering::Release); + Ok(()) + } else { + Err(BufferFullError) + } + } + + /// Adds an `item` to the end of the queue + /// + /// Returns `BufferFullError` if the queue is full + #[cfg(not(target_has_atomic = "ptr"))] + pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { + let rb = unsafe { self.rb.as_mut() }; + let n = rb.capacity() + 1; let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; From 4a6bf95f19448475fc05df2b124113a788616100 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Tue, 31 Oct 2017 21:30:43 +0100 Subject: [PATCH 02/12] add tsan test --- blacklist.txt | 6 ++++++ ci/install.sh | 4 ++++ ci/script.sh | 13 ++++++++++++- tests/tsan.rs | 20 ++++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 blacklist.txt create mode 100644 tests/tsan.rs diff --git a/blacklist.txt b/blacklist.txt new file mode 100644 index 00000000..7a73a277 --- /dev/null +++ b/blacklist.txt @@ -0,0 +1,6 @@ +# false positives from thread::spawn (?) +race:>::drop_slow +race:__GI___call_tls_dtors +race:alloc::heap::{{impl}}::dealloc +race:core::ptr::drop_in_place>>> +race:core::ptr::drop_in_place>> diff --git a/ci/install.sh b/ci/install.sh index 4d5d56a4..8724552d 100644 --- a/ci/install.sh +++ b/ci/install.sh @@ -11,7 +11,11 @@ main() { rustup component list | grep 'rust-src.*installed' || \ rustup component add rust-src ;; + x86_64-unknown-linux-gnu) + ;; *) + # unhandled case + exit 1 ;; esac } diff --git a/ci/script.sh b/ci/script.sh index e9ff7e38..5bf4a8b0 100644 --- a/ci/script.sh +++ b/ci/script.sh @@ -5,8 +5,19 @@ main() { thumb*m-none-eabi) xargo check --target $TARGET ;; - *) + x86_64-unknown-linux-gnu) cargo check --target $TARGET + cargo test --target $TARGET + + export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt" + export RUSTFLAGS="-Z sanitizer=thread" + + cargo test --test tsan --target $TARGET + cargo test --test tsan --target $TARGET --release + ;; + *) + # unhandled case + exit 1 ;; esac } diff --git a/tests/tsan.rs b/tests/tsan.rs new file mode 100644 index 00000000..edee9011 --- /dev/null +++ b/tests/tsan.rs @@ -0,0 +1,20 @@ +extern crate heapless; + +use std::thread; + +use heapless::RingBuffer; + +#[test] +fn tsan() { + static mut RB: RingBuffer = RingBuffer::new(); + + unsafe { RB.split() }.0.enqueue(0).unwrap(); + + thread::spawn(|| { + unsafe { RB.split() }.0.enqueue(1).unwrap(); + }); + + thread::spawn(|| { + unsafe { RB.split() }.1.dequeue().unwrap(); + }); +} From f9a3dfcc891605e8a8dbb5bb2ccbe24bd7b6b775 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Tue, 31 Oct 2017 21:42:53 +0100 Subject: [PATCH 03/12] also test in release --- ci/script.sh | 2 ++ src/ring_buffer/mod.rs | 1 + src/ring_buffer/spsc.rs | 3 +-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ci/script.sh b/ci/script.sh index 5bf4a8b0..c988b650 100644 --- a/ci/script.sh +++ b/ci/script.sh @@ -7,7 +7,9 @@ main() { ;; x86_64-unknown-linux-gnu) cargo check --target $TARGET + cargo test --target $TARGET + cargo test --target $TARGET --release export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt" export RUSTFLAGS="-Z sanitizer=thread" diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 8311a555..321af205 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -28,6 +28,7 @@ where // this is where we enqueue new items #[cfg(target_has_atomic = "ptr")] tail: AtomicUsize, #[cfg(not(target_has_atomic = "ptr"))] tail: usize, + buffer: UntaggedOption, } diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index b31ef0fb..05e44357 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -144,8 +144,7 @@ where // NOTE(volatile) the value of `head` can change at any time in the execution context of the // producer so we inform this to the compiler using a volatile load if next_tail != unsafe { ptr::read_volatile(&rb.head) } { - // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We - // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory + // NOTE(ptr::write) see the other `enqueue` implementation above for details unsafe { ptr::write(buffer.get_unchecked_mut(rb.tail), item) } rb.tail = next_tail; Ok(()) From 158d19b45a12077a180f8507c1407af3076fd2b5 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Tue, 31 Oct 2017 22:01:25 +0100 Subject: [PATCH 04/12] actually execute ci/script.sh --- ci/script.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ci/script.sh b/ci/script.sh index c988b650..9137e63b 100644 --- a/ci/script.sh +++ b/ci/script.sh @@ -23,3 +23,5 @@ main() { ;; esac } + +main From 978f0ee2decad23e1a3923bcfeda7e0e33ac7c14 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Wed, 8 Nov 2017 00:50:26 +0100 Subject: [PATCH 05/12] add a compiler barrier --- src/lib.rs | 1 + src/ring_buffer/spsc.rs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index d79b4994..53e58ecd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,6 +144,7 @@ //! is_send::>(); //! ``` +#![cfg_attr(not(target_has_atomic = "ptr"), feature(asm))] #![cfg_attr(target_has_atomic = "ptr", feature(const_atomic_usize_new))] #![deny(missing_docs)] #![feature(cfg_target_has_atomic)] diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 05e44357..6ea265ff 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -6,6 +6,14 @@ use core::sync::atomic::Ordering; use BufferFullError; use ring_buffer::RingBuffer; +// Compiler barrier +#[cfg(not(target_has_atomic = "ptr"))] +macro_rules! barrier { + () => { + unsafe { asm!("" ::: "memory") } + } +} + impl RingBuffer where A: Unsize<[T]>, @@ -75,6 +83,12 @@ where // consumer so we inform this to the compiler using a volatile load if rb.head != unsafe { ptr::read_volatile(&rb.tail) } { let item = unsafe { ptr::read(buffer.get_unchecked(rb.head)) }; + + // NOTE(barrier!) this ensures that the compiler won't place the instructions to read + // the data *before* the instructions to increment the `head` pointer -- note that this + // won't be enough on architectures that allow out of order execution + barrier!(); + rb.head = (rb.head + 1) % n; Some(item) } else { @@ -146,6 +160,10 @@ where if next_tail != unsafe { ptr::read_volatile(&rb.head) } { // NOTE(ptr::write) see the other `enqueue` implementation above for details unsafe { ptr::write(buffer.get_unchecked_mut(rb.tail), item) } + + // NOTE(barrier!) see the NOTE(barrier!) above + barrier!(); + rb.tail = next_tail; Ok(()) } else { From 37c8b5b63780ed8811173dc1ec8859cd99efa9ad Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Wed, 8 Nov 2017 22:12:24 +0100 Subject: [PATCH 06/12] create our own AtomicUsize which works on thumbv6m-none-eabi and probably other targets with max-atomic-width = 0 --- src/lib.rs | 5 +-- src/ring_buffer/mod.rs | 82 ++++++++++++++++++-------------------- src/ring_buffer/spsc.rs | 87 ++++------------------------------------- 3 files changed, 47 insertions(+), 127 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 53e58ecd..e8f83cbd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,11 +144,10 @@ //! is_send::>(); //! ``` -#![cfg_attr(not(target_has_atomic = "ptr"), feature(asm))] -#![cfg_attr(target_has_atomic = "ptr", feature(const_atomic_usize_new))] #![deny(missing_docs)] -#![feature(cfg_target_has_atomic)] #![feature(const_fn)] +#![feature(const_unsafe_cell_new)] +#![feature(core_intrinsics)] #![feature(shared)] #![feature(unsize)] #![no_std] diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 321af205..37471944 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -1,9 +1,8 @@ //! Ring buffer +use core::cell::UnsafeCell; use core::marker::{PhantomData, Unsize}; -use core::ptr; -#[cfg(target_has_atomic = "ptr")] -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::{intrinsics, ptr}; use untagged_option::UntaggedOption; @@ -13,6 +12,36 @@ pub use self::spsc::{Consumer, Producer}; mod spsc; +// AtomicUsize with no CAS operations that works on targets that have "no atomic support" according +// to their specification +struct AtomicUsize { + v: UnsafeCell, +} + +impl AtomicUsize { + pub const fn new(v: usize) -> AtomicUsize { + AtomicUsize { + v: UnsafeCell::new(v), + } + } + + pub fn get_mut(&mut self) -> &mut usize { + unsafe { &mut *self.v.get() } + } + + pub fn load_acquire(&self) -> usize { + unsafe { intrinsics::atomic_load_acq(self.v.get()) } + } + + pub fn load_relaxed(&self) -> usize { + unsafe { intrinsics::atomic_load_relaxed(self.v.get()) } + } + + pub fn store_release(&self, val: usize) { + unsafe { intrinsics::atomic_store_rel(self.v.get(), val) } + } +} + /// An statically allocated ring buffer backed by an array `A` pub struct RingBuffer where @@ -22,12 +51,10 @@ where _marker: PhantomData<[T]>, // this is from where we dequeue items - #[cfg(target_has_atomic = "ptr")] head: AtomicUsize, - #[cfg(not(target_has_atomic = "ptr"))] head: usize, + head: AtomicUsize, // this is where we enqueue new items - #[cfg(target_has_atomic = "ptr")] tail: AtomicUsize, - #[cfg(not(target_has_atomic = "ptr"))] tail: usize, + tail: AtomicUsize, buffer: UntaggedOption, } @@ -42,14 +69,8 @@ where RingBuffer { _marker: PhantomData, buffer: UntaggedOption::none(), - #[cfg(target_has_atomic = "ptr")] head: AtomicUsize::new(0), - #[cfg(not(target_has_atomic = "ptr"))] - head: 0, - #[cfg(target_has_atomic = "ptr")] tail: AtomicUsize::new(0), - #[cfg(not(target_has_atomic = "ptr"))] - tail: 0, } } @@ -63,15 +84,8 @@ where pub fn dequeue(&mut self) -> Option { let n = self.capacity() + 1; - #[cfg(target_has_atomic = "ptr")] let head = self.head.get_mut(); - #[cfg(not(target_has_atomic = "ptr"))] - let head = &mut self.head; - - #[cfg(target_has_atomic = "ptr")] let tail = self.tail.get_mut(); - #[cfg(not(target_has_atomic = "ptr"))] - let tail = &mut self.tail; let buffer: &[T] = unsafe { self.buffer.as_ref() }; @@ -90,15 +104,8 @@ where pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let n = self.capacity() + 1; - #[cfg(target_has_atomic = "ptr")] let head = self.head.get_mut(); - #[cfg(not(target_has_atomic = "ptr"))] - let head = &mut self.head; - - #[cfg(target_has_atomic = "ptr")] let tail = self.tail.get_mut(); - #[cfg(not(target_has_atomic = "ptr"))] - let tail = &mut self.tail; let buffer: &mut [T] = unsafe { self.buffer.as_mut() }; @@ -116,15 +123,8 @@ where /// Returns the number of elements in the queue pub fn len(&self) -> usize { - #[cfg(target_has_atomic = "ptr")] - let head = self.head.load(Ordering::Relaxed); - #[cfg(not(target_has_atomic = "ptr"))] - let head = self.head; - - #[cfg(target_has_atomic = "ptr")] - let tail = self.tail.load(Ordering::Relaxed); - #[cfg(not(target_has_atomic = "ptr"))] - let tail = self.tail; + let head = self.head.load_relaxed(); + let tail = self.tail.load_relaxed(); if head > tail { head - tail @@ -221,10 +221,7 @@ where fn next(&mut self) -> Option<&'a T> { if self.index < self.len { - #[cfg(not(target_has_atomic = "ptr"))] - let head = self.rb.head; - #[cfg(target_has_atomic = "ptr")] - let head = self.rb.head.load(Ordering::Relaxed); + let head = self.rb.head.load_relaxed(); let buffer: &[T] = unsafe { self.rb.buffer.as_ref() }; let ptr = buffer.as_ptr(); @@ -246,10 +243,7 @@ where fn next(&mut self) -> Option<&'a mut T> { if self.index < self.len { - #[cfg(not(target_has_atomic = "ptr"))] - let head = self.rb.head; - #[cfg(target_has_atomic = "ptr")] - let head = self.rb.head.load(Ordering::Relaxed); + let head = self.rb.head.load_relaxed(); let capacity = self.rb.capacity() + 1; let buffer: &mut [T] = unsafe { self.rb.buffer.as_mut() }; diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 6ea265ff..0c384977 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -1,30 +1,14 @@ use core::ptr::{self, Shared}; use core::marker::Unsize; -#[cfg(target_has_atomic = "ptr")] -use core::sync::atomic::Ordering; use BufferFullError; use ring_buffer::RingBuffer; -// Compiler barrier -#[cfg(not(target_has_atomic = "ptr"))] -macro_rules! barrier { - () => { - unsafe { asm!("" ::: "memory") } - } -} - impl RingBuffer where A: Unsize<[T]>, { /// Splits a statically allocated ring buffer into producer and consumer end points - /// - /// **Warning** the current single producer single consumer implementation only supports - /// multi-core systems where `cfg(target_has_atomic = "ptr")` holds for all the cores. For - /// example, a dual core system where one core is Cortex-M0 core and the other is Cortex-M3 core - /// is not supported because Cortex-M0 (`thumbv6m-none-eabi`) doesn't satisfy - /// `cfg(target_has_atomic = "ptr")`. All single core systems are supported. pub fn split(&'static mut self) -> (Producer, Consumer) { ( Producer { @@ -52,44 +36,17 @@ where A: Unsize<[T]>, { /// Returns the item in the front of the queue, or `None` if the queue is empty - #[cfg(target_has_atomic = "ptr")] pub fn dequeue(&mut self) -> Option { let rb = unsafe { self.rb.as_ref() }; - let tail = rb.tail.load(Ordering::Relaxed); - let head = rb.head.load(Ordering::Acquire); - let n = rb.capacity() + 1; let buffer: &[T] = unsafe { rb.buffer.as_ref() }; + let tail = rb.tail.load_relaxed(); + let head = rb.head.load_acquire(); if head != tail { let item = unsafe { ptr::read(buffer.get_unchecked(head)) }; - rb.head.store((head + 1) % n, Ordering::Release); - Some(item) - } else { - None - } - } - - /// Returns the item in the front of the queue, or `None` if the queue is empty - #[cfg(not(target_has_atomic = "ptr"))] - pub fn dequeue(&mut self) -> Option { - let rb = unsafe { self.rb.as_mut() }; - - let n = rb.capacity() + 1; - let buffer: &[T] = unsafe { rb.buffer.as_ref() }; - - // NOTE(volatile) the value of `tail` can change at any time in the execution context of the - // consumer so we inform this to the compiler using a volatile load - if rb.head != unsafe { ptr::read_volatile(&rb.tail) } { - let item = unsafe { ptr::read(buffer.get_unchecked(rb.head)) }; - - // NOTE(barrier!) this ensures that the compiler won't place the instructions to read - // the data *before* the instructions to increment the `head` pointer -- note that this - // won't be enough on architectures that allow out of order execution - barrier!(); - - rb.head = (rb.head + 1) % n; + rb.head.store_release((head + 1) % n); Some(item) } else { None @@ -121,50 +78,20 @@ where /// Adds an `item` to the end of the queue /// /// Returns `BufferFullError` if the queue is full - #[cfg(target_has_atomic = "ptr")] pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let rb = unsafe { self.rb.as_mut() }; - let head = rb.head.load(Ordering::Relaxed); - let tail = rb.tail.load(Ordering::Acquire); - let n = rb.capacity() + 1; - let next_tail = (tail + 1) % n; - let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; + let head = rb.head.load_relaxed(); + let tail = rb.tail.load_acquire(); + let next_tail = (tail + 1) % n; if next_tail != head { // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) } - rb.tail.store(next_tail, Ordering::Release); - Ok(()) - } else { - Err(BufferFullError) - } - } - - /// Adds an `item` to the end of the queue - /// - /// Returns `BufferFullError` if the queue is full - #[cfg(not(target_has_atomic = "ptr"))] - pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { - let rb = unsafe { self.rb.as_mut() }; - - let n = rb.capacity() + 1; - let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; - - let next_tail = (rb.tail + 1) % n; - // NOTE(volatile) the value of `head` can change at any time in the execution context of the - // producer so we inform this to the compiler using a volatile load - if next_tail != unsafe { ptr::read_volatile(&rb.head) } { - // NOTE(ptr::write) see the other `enqueue` implementation above for details - unsafe { ptr::write(buffer.get_unchecked_mut(rb.tail), item) } - - // NOTE(barrier!) see the NOTE(barrier!) above - barrier!(); - - rb.tail = next_tail; + rb.tail.store_release(next_tail); Ok(()) } else { Err(BufferFullError) From 9533e27f44b21eb331188074771610eb9dae661b Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Wed, 8 Nov 2017 22:51:55 +0100 Subject: [PATCH 07/12] test two consecutive operations --- tests/tsan.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/tsan.rs b/tests/tsan.rs index edee9011..6685bd23 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -5,7 +5,7 @@ use std::thread; use heapless::RingBuffer; #[test] -fn tsan() { +fn once() { static mut RB: RingBuffer = RingBuffer::new(); unsafe { RB.split() }.0.enqueue(0).unwrap(); @@ -18,3 +18,21 @@ fn tsan() { unsafe { RB.split() }.1.dequeue().unwrap(); }); } + +#[test] +fn twice() { + static mut RB: RingBuffer = RingBuffer::new(); + + unsafe { RB.split() }.0.enqueue(0).unwrap(); + unsafe { RB.split() }.0.enqueue(1).unwrap(); + + thread::spawn(|| { + unsafe { RB.split() }.0.enqueue(2).unwrap(); + unsafe { RB.split() }.0.enqueue(3).unwrap(); + }); + + thread::spawn(|| { + unsafe { RB.split() }.1.dequeue().unwrap(); + unsafe { RB.split() }.1.dequeue().unwrap(); + }); +} From 9faea687d12c79f42217efdc7b31b49ff3830021 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Wed, 8 Nov 2017 23:06:25 +0100 Subject: [PATCH 08/12] load_acquire -> load_relaxed --- src/ring_buffer/mod.rs | 4 ---- src/ring_buffer/spsc.rs | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 37471944..3f53646d 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -29,10 +29,6 @@ impl AtomicUsize { unsafe { &mut *self.v.get() } } - pub fn load_acquire(&self) -> usize { - unsafe { intrinsics::atomic_load_acq(self.v.get()) } - } - pub fn load_relaxed(&self) -> usize { unsafe { intrinsics::atomic_load_relaxed(self.v.get()) } } diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 0c384977..4c93f429 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -43,7 +43,7 @@ where let buffer: &[T] = unsafe { rb.buffer.as_ref() }; let tail = rb.tail.load_relaxed(); - let head = rb.head.load_acquire(); + let head = rb.head.load_relaxed(); if head != tail { let item = unsafe { ptr::read(buffer.get_unchecked(head)) }; rb.head.store_release((head + 1) % n); @@ -85,7 +85,7 @@ where let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; let head = rb.head.load_relaxed(); - let tail = rb.tail.load_acquire(); + let tail = rb.tail.load_relaxed(); let next_tail = (tail + 1) % n; if next_tail != head { // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We From 9398aafe61c24723932a3e5b85f3f0d2e8612891 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Thu, 9 Nov 2017 00:25:53 +0100 Subject: [PATCH 09/12] work around rust-lang/rust#45802 --- .travis.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2e740cfa..99758624 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,8 @@ matrix: rust: nightly - env: TARGET=thumbv6m-none-eabi - rust: nightly + # work around rust-lang/rust#45802 + rust: nightly-2017-11-01 addons: apt: sources: From 5ff961c3ad0a41c577bfcdb3f8c904414bcbabb5 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Thu, 9 Nov 2017 01:37:48 +0100 Subject: [PATCH 10/12] tsan: deal with the mangled names --- blacklist.txt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/blacklist.txt b/blacklist.txt index 7a73a277..54defd9d 100644 --- a/blacklist.txt +++ b/blacklist.txt @@ -1,6 +1,4 @@ # false positives from thread::spawn (?) -race:>::drop_slow -race:__GI___call_tls_dtors -race:alloc::heap::{{impl}}::dealloc -race:core::ptr::drop_in_place>>> -race:core::ptr::drop_in_place>> +race:*dealloc +race:*drop_slow* +race:__call_tls_dtors From 731e8ae150cb0de62e536a3fd71e8d53e28018ef Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Thu, 9 Nov 2017 02:09:38 +0100 Subject: [PATCH 11/12] rewrite the test for less unsafety --- tests/tsan.rs | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/tests/tsan.rs b/tests/tsan.rs index 6685bd23..dcef81cf 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -1,3 +1,5 @@ +#![deny(warnings)] + extern crate heapless; use std::thread; @@ -8,14 +10,20 @@ use heapless::RingBuffer; fn once() { static mut RB: RingBuffer = RingBuffer::new(); - unsafe { RB.split() }.0.enqueue(0).unwrap(); + let rb = unsafe { &mut RB }; - thread::spawn(|| { - unsafe { RB.split() }.0.enqueue(1).unwrap(); + rb.enqueue(0).unwrap(); + + let (mut p, mut c) = rb.split(); + + p.enqueue(1).unwrap(); + + thread::spawn(move || { + p.enqueue(1).unwrap(); }); - thread::spawn(|| { - unsafe { RB.split() }.1.dequeue().unwrap(); + thread::spawn(move || { + c.dequeue().unwrap(); }); } @@ -23,16 +31,20 @@ fn once() { fn twice() { static mut RB: RingBuffer = RingBuffer::new(); - unsafe { RB.split() }.0.enqueue(0).unwrap(); - unsafe { RB.split() }.0.enqueue(1).unwrap(); + let rb = unsafe { &mut RB }; - thread::spawn(|| { - unsafe { RB.split() }.0.enqueue(2).unwrap(); - unsafe { RB.split() }.0.enqueue(3).unwrap(); + rb.enqueue(0).unwrap(); + rb.enqueue(1).unwrap(); + + let (mut p, mut c) = rb.split(); + + thread::spawn(move || { + p.enqueue(2).unwrap(); + p.enqueue(3).unwrap(); }); - thread::spawn(|| { - unsafe { RB.split() }.1.dequeue().unwrap(); - unsafe { RB.split() }.1.dequeue().unwrap(); + thread::spawn(move || { + c.dequeue().unwrap(); + c.dequeue().unwrap(); }); } From 30ea33c349f44c1903ae8302341f7aa37614dddb Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Thu, 9 Nov 2017 02:27:01 +0100 Subject: [PATCH 12/12] relax the lifetime constraint of RingBuffer.split also - add a "`split` freezes the ring buffer" compile fail test - hide compile-fail doc tests - add scoped threads tests --- Cargo.toml | 3 ++ blacklist.txt | 5 ++- src/cfail.rs | 80 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 71 ++---------------------------------- src/ring_buffer/spsc.rs | 20 ++++++----- tests/tsan.rs | 25 +++++++++++++ 6 files changed, 126 insertions(+), 78 deletions(-) create mode 100644 src/cfail.rs diff --git a/Cargo.toml b/Cargo.toml index 13b4ab79..983d302e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,6 @@ version = "0.2.0" [dependencies] untagged-option = "0.1.1" + +[dev-dependencies] +scoped_threadpool = "0.1.8" diff --git a/blacklist.txt b/blacklist.txt index 54defd9d..bf051e53 100644 --- a/blacklist.txt +++ b/blacklist.txt @@ -1,4 +1,7 @@ -# false positives from thread::spawn (?) +# false positives in thread::spawn (?) race:*dealloc race:*drop_slow* race:__call_tls_dtors + +# false positives in scoped_threadpool (?) +race:*drop* diff --git a/src/cfail.rs b/src/cfail.rs new file mode 100644 index 00000000..50c3a920 --- /dev/null +++ b/src/cfail.rs @@ -0,0 +1,80 @@ +//! Compile fail tests +//! +//! # `Send`-ness +//! +//! Collections of `Send`-able things are `Send` +//! +//! ``` +//! use heapless::{RingBuffer, Vec}; +//! use heapless::ring_buffer::{Consumer, Producer}; +//! +//! struct IsSend; +//! +//! unsafe impl Send for IsSend {} +//! +//! fn is_send() where T: Send {} +//! +//! is_send::>(); +//! is_send::>(); +//! is_send::>(); +//! is_send::>(); +//! ``` +//! +//! Collections of non-`Send`-able things are *not* `Send` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::ring_buffer::Consumer; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send() where T: Send {} +//! +//! is_send::>(); +//! ``` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::ring_buffer::Producer; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send() where T: Send {} +//! +//! is_send::>(); +//! ``` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::RingBuffer; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send() where T: Send {} +//! +//! is_send::>(); +//! ``` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::Vec; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send() where T: Send {} +//! +//! is_send::>(); +//! ``` +//! +//! # Freeze +//! +//! Splitting a `RingBuffer` should invalidate the original reference. +//! +//! ``` compile_fail +//! use heapless::RingBuffer; +//! +//! let mut rb: RingBuffer = RingBuffer::new(); +//! +//! let (p, c) = rb.split(); +//! rb.enqueue(0).unwrap(); +//! ``` diff --git a/src/lib.rs b/src/lib.rs index e8f83cbd..f36ef00d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,8 +36,6 @@ //! //! ### Single producer single consumer mode //! -//! For use in *single core* systems like microcontrollers -//! //! ``` //! use heapless::RingBuffer; //! @@ -77,72 +75,6 @@ //! // .. //! } //! ``` -//! -//! # `Send`-ness -//! -//! Collections of `Send`-able things are `Send` -//! -//! ``` -//! use heapless::{RingBuffer, Vec}; -//! use heapless::ring_buffer::{Consumer, Producer}; -//! -//! struct IsSend; -//! -//! unsafe impl Send for IsSend {} -//! -//! fn is_send() where T: Send {} -//! -//! is_send::>(); -//! is_send::>(); -//! is_send::>(); -//! is_send::>(); -//! ``` -//! -//! Collections of not `Send`-able things are *not* `Send` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::ring_buffer::Consumer; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send() where T: Send {} -//! -//! is_send::>(); -//! ``` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::ring_buffer::Producer; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send() where T: Send {} -//! -//! is_send::>(); -//! ``` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::RingBuffer; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send() where T: Send {} -//! -//! is_send::>(); -//! ``` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::Vec; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send() where T: Send {} -//! -//! is_send::>(); -//! ``` #![deny(missing_docs)] #![feature(const_fn)] @@ -157,8 +89,9 @@ extern crate untagged_option; pub use vec::Vec; pub use ring_buffer::RingBuffer; -pub mod ring_buffer; +mod cfail; mod vec; +pub mod ring_buffer; /// Error raised when the buffer is full #[derive(Clone, Copy, Debug, Eq, PartialEq)] diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 4c93f429..488c07a8 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -1,5 +1,5 @@ use core::ptr::{self, Shared}; -use core::marker::Unsize; +use core::marker::{PhantomData, Unsize}; use BufferFullError; use ring_buffer::RingBuffer; @@ -9,13 +9,15 @@ where A: Unsize<[T]>, { /// Splits a statically allocated ring buffer into producer and consumer end points - pub fn split(&'static mut self) -> (Producer, Consumer) { + pub fn split(&mut self) -> (Producer, Consumer) { ( Producer { rb: unsafe { Shared::new_unchecked(self) }, + _marker: PhantomData, }, Consumer { rb: unsafe { Shared::new_unchecked(self) }, + _marker: PhantomData, }, ) } @@ -23,15 +25,16 @@ where /// A ring buffer "consumer"; it can dequeue items from the ring buffer // NOTE the consumer semantically owns the `head` pointer of the ring buffer -pub struct Consumer +pub struct Consumer<'a, T, A> where A: Unsize<[T]>, { // XXX do we need to use `Shared` (for soundness) here? rb: Shared>, + _marker: PhantomData<&'a ()>, } -impl Consumer +impl<'a, T, A> Consumer<'a, T, A> where A: Unsize<[T]>, { @@ -54,7 +57,7 @@ where } } -unsafe impl Send for Consumer +unsafe impl<'a, T, A> Send for Consumer<'a, T, A> where A: Unsize<[T]>, T: Send, @@ -63,15 +66,16 @@ where /// A ring buffer "producer"; it can enqueue items into the ring buffer // NOTE the producer semantically owns the `tail` pointer of the ring buffer -pub struct Producer +pub struct Producer<'a, T, A> where A: Unsize<[T]>, { // XXX do we need to use `Shared` (for soundness) here? rb: Shared>, + _marker: PhantomData<&'a ()>, } -impl Producer +impl<'a, T, A> Producer<'a, T, A> where A: Unsize<[T]>, { @@ -99,7 +103,7 @@ where } } -unsafe impl Send for Producer +unsafe impl<'a, T, A> Send for Producer<'a, T, A> where A: Unsize<[T]>, T: Send, diff --git a/tests/tsan.rs b/tests/tsan.rs index dcef81cf..6680cd24 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -1,10 +1,12 @@ #![deny(warnings)] extern crate heapless; +extern crate scoped_threadpool; use std::thread; use heapless::RingBuffer; +use scoped_threadpool::Pool; #[test] fn once() { @@ -48,3 +50,26 @@ fn twice() { c.dequeue().unwrap(); }); } + +#[test] +fn scoped() { + let mut rb: RingBuffer = RingBuffer::new(); + + rb.enqueue(0).unwrap(); + + { + let (mut p, mut c) = rb.split(); + + Pool::new(2).scoped(move |scope| { + scope.execute(move || { + p.enqueue(1).unwrap(); + }); + + scope.execute(move || { + c.dequeue().unwrap(); + }); + }); + } + + rb.dequeue().unwrap(); +}