From 40994962e2c9756c8ebc28029ad2b7c3e14f0c0d Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Thu, 9 Nov 2017 15:33:54 +0100 Subject: [PATCH 1/2] add contention test --- ci/script.sh | 3 ++- src/ring_buffer/mod.rs | 5 +++++ tests/tsan.rs | 45 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/ci/script.sh b/ci/script.sh index 9137e63b..07260f43 100644 --- a/ci/script.sh +++ b/ci/script.sh @@ -11,8 +11,9 @@ main() { cargo test --target $TARGET cargo test --target $TARGET --release - export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt" export RUSTFLAGS="-Z sanitizer=thread" + export RUST_TEST_THREADS=1 + export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt" cargo test --test tsan --target $TARGET cargo test --test tsan --target $TARGET --release diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 3f53646d..52bbb04a 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -129,6 +129,11 @@ where } } + /// Returns `true` if the ring buffer has a length of 0 + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Iterates from the front of the queue to the back pub fn iter(&self) -> Iter { Iter { diff --git a/tests/tsan.rs b/tests/tsan.rs index 6680cd24..9fbcb8d5 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -73,3 +73,48 @@ fn scoped() { rb.dequeue().unwrap(); } + +#[test] +fn contention() { + const N: usize = 1024; + + let mut rb: RingBuffer = RingBuffer::new(); + + { + let (mut p, mut c) = rb.split(); + + Pool::new(2).scoped(move |scope| { + scope.execute(move || { + let mut sum: u32 = 0; + + for i in 0..N { + let i = i as u8; + sum = sum.wrapping_add(i as u32); + while let Err(_) = p.enqueue(i) {} + } + + println!("producer: {}", sum); + }); + + scope.execute(move || { + let mut sum: u32 = 0; + + for _ in 0..N { + loop { + match c.dequeue() { + Some(v) => { + sum = sum.wrapping_add(v as u32); + break; + } + _ => {} + } + } + } + + println!("consumer: {}", sum); + }); + }); + } + + assert!(rb.is_empty()); +} From ffcd423e6765eb508d8ae7a305955428ddb7e573 Mon Sep 17 00:00:00 2001 From: Jorge Aparicio Date: Thu, 9 Nov 2017 16:54:30 +0100 Subject: [PATCH 2/2] fix the contention test patch by @pftbest :heart: --- src/ring_buffer/mod.rs | 4 ++++ src/ring_buffer/spsc.rs | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 52bbb04a..0ae68a7c 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -29,6 +29,10 @@ impl AtomicUsize { unsafe { &mut *self.v.get() } } + pub fn load_acquire(&self) -> usize { + unsafe { intrinsics::atomic_load_acq(self.v.get()) } + } + pub fn load_relaxed(&self) -> usize { unsafe { intrinsics::atomic_load_relaxed(self.v.get()) } } diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 488c07a8..082aab71 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -45,7 +45,7 @@ where let n = rb.capacity() + 1; let buffer: &[T] = unsafe { rb.buffer.as_ref() }; - let tail = rb.tail.load_relaxed(); + let tail = rb.tail.load_acquire(); let head = rb.head.load_relaxed(); if head != tail { let item = unsafe { ptr::read(buffer.get_unchecked(head)) };