From 3d787b16c7a6372d901f7869b59d94fe0ebaa194 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 20 Feb 2019 10:05:56 -0800 Subject: [PATCH] sync: add loom test for mpsc (#903) This patch updates tokio_sync::mpsc to support using loom for fuzz testing. It includes a basic fuzz test. --- tokio-sync/Cargo.toml | 2 +- tokio-sync/src/loom.rs | 1 + tokio-sync/src/mpsc/chan.rs | 115 +++++++++++++++++++--------------- tokio-sync/src/mpsc/rx.rs | 1 - tokio-sync/src/mpsc/tx.rs | 0 tokio-sync/tests/fuzz_mpsc.rs | 39 ++++++++++++ 6 files changed, 104 insertions(+), 54 deletions(-) delete mode 100644 tokio-sync/src/mpsc/rx.rs delete mode 100644 tokio-sync/src/mpsc/tx.rs create mode 100644 tokio-sync/tests/fuzz_mpsc.rs diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index df21ffaaa..193fe18ef 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -25,4 +25,4 @@ futures = "0.1.19" env_logger = { version = "0.5", default-features = false } tokio = { path = ".." } tokio-mock-task = "0.1.1" -loom = { version = "0.1.0", features = ["futures"] } +loom = { version = "0.1.1", features = ["futures"] } diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs index 7214307a4..5de8c1107 100644 --- a/tokio-sync/src/loom.rs +++ b/tokio-sync/src/loom.rs @@ -5,6 +5,7 @@ pub(crate) mod futures { pub(crate) mod sync { pub(crate) use std::sync::atomic; + pub(crate) use std::sync::Arc; use std::cell::UnsafeCell; diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs index de7ba0f9a..8a8f2bea2 100644 --- a/tokio-sync/src/mpsc/chan.rs +++ b/tokio-sync/src/mpsc/chan.rs @@ -1,12 +1,14 @@ -use loom::futures::AtomicTask; use super::list; use futures::Poll; -use std::cell::UnsafeCell; +use ::loom::{ + futures::AtomicTask, + sync::{Arc, CausalCell}, + sync::atomic::AtomicUsize, +}; + use std::process; use std::fmt; -use std::sync::Arc; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{AcqRel, Relaxed}; /// Channel sender @@ -48,7 +50,7 @@ pub(crate) enum TrySendError { NoPermits, } -pub(crate) trait Semaphore: Sync { +pub(crate) trait Semaphore { type Permit; fn new_permit() -> Self::Permit; @@ -90,7 +92,7 @@ struct Chan { tx_count: AtomicUsize, /// Only accessed by `Rx` handle. - rx_fields: UnsafeCell>, + rx_fields: CausalCell>, } impl fmt::Debug for Chan where S: fmt::Debug @@ -101,7 +103,7 @@ impl fmt::Debug for Chan where S: fmt::Debug .field("semaphore", &self.semaphore) .field("rx_task", &self.rx_task) .field("tx_count", &self.tx_count) - .field("rx_fields", &self.rx_fields) + .field("rx_fields", &"...") .finish() } } @@ -139,7 +141,7 @@ where semaphore, rx_task: AtomicTask::new(), tx_count: AtomicUsize::new(1), - rx_fields: UnsafeCell::new(RxFields { + rx_fields: CausalCell::new(RxFields { list: rx, rx_closed: false, }), @@ -231,13 +233,16 @@ where } pub(crate) fn close(&mut self) { - let rx_fields = unsafe { &mut *self.inner.rx_fields.get() }; + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; - if rx_fields.rx_closed { - return; - } + if rx_fields.rx_closed { + return; + } + + rx_fields.rx_closed = true; + }); - rx_fields.rx_closed = true; self.inner.semaphore.close(); } @@ -246,47 +251,49 @@ where use super::block::Read::*; use futures::Async::*; - let rx_fields = unsafe { &mut *self.inner.rx_fields.get() }; + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; - macro_rules! try_recv { - () => { - match rx_fields.list.pop(&self.inner.tx) { - Some(Value(value)) => { - self.inner.semaphore.add_permit(); - return Ok(Ready(Some(value))); + macro_rules! try_recv { + () => { + match rx_fields.list.pop(&self.inner.tx) { + Some(Value(value)) => { + self.inner.semaphore.add_permit(); + return Ok(Ready(Some(value))); + } + Some(Closed) => { + // TODO: This check may not be required as it most + // likely can only return `true` at this point. A + // channel is closed when all tx handles are + // dropped. Dropping a tx handle releases memory, + // which ensures that if dropping the tx handle is + // visible, then all messages sent are also visible. + assert!(self.inner.semaphore.is_idle()); + return Ok(Ready(None)); + } + None => {} // fall through } - Some(Closed) => { - // TODO: This check may not be required as it most - // likely can only return `true` at this point. A - // channel is closed when all tx handles are dropped. - // Dropping a tx handle releases memory, which ensures - // that if dropping the tx handle is visible, then all - // messages sent are also visible. - assert!(self.inner.semaphore.is_idle()); - return Ok(Ready(None)); - } - None => {} // fall through } } - } - try_recv!(); + try_recv!(); - self.inner.rx_task.register(); + self.inner.rx_task.register(); - // It is possible that a value was pushed between attempting to read and - // registering the task, so we have to check the channel a second time - // here. - try_recv!(); + // It is possible that a value was pushed between attempting to read + // and registering the task, so we have to check the channel a + // second time here. + try_recv!(); - debug!("recv; rx_closed = {:?}; is_idle = {:?}", - rx_fields.rx_closed, self.inner.semaphore.is_idle()); + debug!("recv; rx_closed = {:?}; is_idle = {:?}", + rx_fields.rx_closed, self.inner.semaphore.is_idle()); - if rx_fields.rx_closed && self.inner.semaphore.is_idle() { - Ok(Ready(None)) - } else { - Ok(NotReady) - } + if rx_fields.rx_closed && self.inner.semaphore.is_idle() { + Ok(Ready(None)) + } else { + Ok(NotReady) + } + }) } } @@ -299,11 +306,13 @@ where self.close(); - let rx_fields = unsafe { &mut *self.inner.rx_fields.get() }; + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; - while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { - self.inner.semaphore.add_permit(); - } + while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { + self.inner.semaphore.add_permit(); + } + }) } } @@ -313,10 +322,12 @@ impl Drop for Chan { fn drop(&mut self) { use super::block::Read::Value; - let rx_fields = unsafe { &mut *self.rx_fields.get() }; + self.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; - while let Some(Value(_)) = rx_fields.list.pop(&self.tx) { - } + while let Some(Value(_)) = rx_fields.list.pop(&self.tx) { + } + }); } } diff --git a/tokio-sync/src/mpsc/rx.rs b/tokio-sync/src/mpsc/rx.rs deleted file mode 100644 index 8b1378917..000000000 --- a/tokio-sync/src/mpsc/rx.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/tokio-sync/src/mpsc/tx.rs b/tokio-sync/src/mpsc/tx.rs deleted file mode 100644 index e69de29bb..000000000 diff --git a/tokio-sync/tests/fuzz_mpsc.rs b/tokio-sync/tests/fuzz_mpsc.rs new file mode 100644 index 000000000..a3c7309cf --- /dev/null +++ b/tokio-sync/tests/fuzz_mpsc.rs @@ -0,0 +1,39 @@ +extern crate futures; +#[macro_use] +extern crate loom; + +macro_rules! if_fuzz { + ($($t:tt)*) => { + $($t)* + } +} + +#[path = "../src/mpsc/mod.rs"] +#[allow(warnings)] +mod mpsc; + +#[path = "../src/semaphore.rs"] +#[allow(warnings)] +mod semaphore; + +use futures::{Stream, future::poll_fn}; +use loom::futures::block_on; +use loom::thread; + +#[test] +fn closing_tx() { + loom::fuzz(|| { + let (mut tx, mut rx) = mpsc::channel(16); + + thread::spawn(move || { + tx.try_send(()).unwrap(); + drop(tx); + }); + + let v = block_on(poll_fn(|| rx.poll())).unwrap(); + assert!(v.is_some()); + + let v = block_on(poll_fn(|| rx.poll())).unwrap(); + assert!(v.is_none()); + }); +}