sync: add loom test for mpsc (#903)

This patch updates tokio_sync::mpsc to support using loom for fuzz
testing. It includes a basic fuzz test.
This commit is contained in:
Carl Lerche 2019-02-20 10:05:56 -08:00 committed by GitHub
parent f513558076
commit 3d787b16c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 54 deletions

View File

@ -25,4 +25,4 @@ futures = "0.1.19"
env_logger = { version = "0.5", default-features = false }
tokio = { path = ".." }
tokio-mock-task = "0.1.1"
loom = { version = "0.1.0", features = ["futures"] }
loom = { version = "0.1.1", features = ["futures"] }

View File

@ -5,6 +5,7 @@ pub(crate) mod futures {
pub(crate) mod sync {
pub(crate) use std::sync::atomic;
pub(crate) use std::sync::Arc;
use std::cell::UnsafeCell;

View File

@ -1,12 +1,14 @@
use loom::futures::AtomicTask;
use super::list;
use futures::Poll;
use std::cell::UnsafeCell;
use ::loom::{
futures::AtomicTask,
sync::{Arc, CausalCell},
sync::atomic::AtomicUsize,
};
use std::process;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
/// Channel sender
@ -48,7 +50,7 @@ pub(crate) enum TrySendError {
NoPermits,
}
pub(crate) trait Semaphore: Sync {
pub(crate) trait Semaphore {
type Permit;
fn new_permit() -> Self::Permit;
@ -90,7 +92,7 @@ struct Chan<T, S> {
tx_count: AtomicUsize,
/// Only accessed by `Rx` handle.
rx_fields: UnsafeCell<RxFields<T>>,
rx_fields: CausalCell<RxFields<T>>,
}
impl<T, S> fmt::Debug for Chan<T, S> where S: fmt::Debug
@ -101,7 +103,7 @@ impl<T, S> fmt::Debug for Chan<T, S> where S: fmt::Debug
.field("semaphore", &self.semaphore)
.field("rx_task", &self.rx_task)
.field("tx_count", &self.tx_count)
.field("rx_fields", &self.rx_fields)
.field("rx_fields", &"...")
.finish()
}
}
@ -139,7 +141,7 @@ where
semaphore,
rx_task: AtomicTask::new(),
tx_count: AtomicUsize::new(1),
rx_fields: UnsafeCell::new(RxFields {
rx_fields: CausalCell::new(RxFields {
list: rx,
rx_closed: false,
}),
@ -231,13 +233,16 @@ where
}
pub(crate) fn close(&mut self) {
let rx_fields = unsafe { &mut *self.inner.rx_fields.get() };
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
if rx_fields.rx_closed {
return;
}
if rx_fields.rx_closed {
return;
}
rx_fields.rx_closed = true;
});
rx_fields.rx_closed = true;
self.inner.semaphore.close();
}
@ -246,47 +251,49 @@ where
use super::block::Read::*;
use futures::Async::*;
let rx_fields = unsafe { &mut *self.inner.rx_fields.get() };
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
macro_rules! try_recv {
() => {
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
return Ok(Ready(Some(value)));
macro_rules! try_recv {
() => {
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
return Ok(Ready(Some(value)));
}
Some(Closed) => {
// TODO: This check may not be required as it most
// likely can only return `true` at this point. A
// channel is closed when all tx handles are
// dropped. Dropping a tx handle releases memory,
// which ensures that if dropping the tx handle is
// visible, then all messages sent are also visible.
assert!(self.inner.semaphore.is_idle());
return Ok(Ready(None));
}
None => {} // fall through
}
Some(Closed) => {
// TODO: This check may not be required as it most
// likely can only return `true` at this point. A
// channel is closed when all tx handles are dropped.
// Dropping a tx handle releases memory, which ensures
// that if dropping the tx handle is visible, then all
// messages sent are also visible.
assert!(self.inner.semaphore.is_idle());
return Ok(Ready(None));
}
None => {} // fall through
}
}
}
try_recv!();
try_recv!();
self.inner.rx_task.register();
self.inner.rx_task.register();
// It is possible that a value was pushed between attempting to read and
// registering the task, so we have to check the channel a second time
// here.
try_recv!();
// It is possible that a value was pushed between attempting to read
// and registering the task, so we have to check the channel a
// second time here.
try_recv!();
debug!("recv; rx_closed = {:?}; is_idle = {:?}",
rx_fields.rx_closed, self.inner.semaphore.is_idle());
debug!("recv; rx_closed = {:?}; is_idle = {:?}",
rx_fields.rx_closed, self.inner.semaphore.is_idle());
if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
Ok(Ready(None))
} else {
Ok(NotReady)
}
if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
Ok(Ready(None))
} else {
Ok(NotReady)
}
})
}
}
@ -299,11 +306,13 @@ where
self.close();
let rx_fields = unsafe { &mut *self.inner.rx_fields.get() };
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
self.inner.semaphore.add_permit();
}
while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
self.inner.semaphore.add_permit();
}
})
}
}
@ -313,10 +322,12 @@ impl<T, S> Drop for Chan<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;
let rx_fields = unsafe { &mut *self.rx_fields.get() };
self.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {
}
while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {
}
});
}
}

View File

@ -1 +0,0 @@

View File

@ -0,0 +1,39 @@
extern crate futures;
#[macro_use]
extern crate loom;
macro_rules! if_fuzz {
($($t:tt)*) => {
$($t)*
}
}
#[path = "../src/mpsc/mod.rs"]
#[allow(warnings)]
mod mpsc;
#[path = "../src/semaphore.rs"]
#[allow(warnings)]
mod semaphore;
use futures::{Stream, future::poll_fn};
use loom::futures::block_on;
use loom::thread;
#[test]
fn closing_tx() {
loom::fuzz(|| {
let (mut tx, mut rx) = mpsc::channel(16);
thread::spawn(move || {
tx.try_send(()).unwrap();
drop(tx);
});
let v = block_on(poll_fn(|| rx.poll())).unwrap();
assert!(v.is_some());
let v = block_on(poll_fn(|| rx.poll())).unwrap();
assert!(v.is_none());
});
}