Carl Lerche 13083153aa
Introduce tokio-sync crate containing synchronization primitives. (#839)
Introduce a tokio-sync crate containing useful synchronization primitives for programs
written using Tokio.

The initial release contains:

* An mpsc channel
* A oneshot channel
* A semaphore implementation
* An `AtomicTask` primitive.

The `oneshot` and `mpsc` channels are new implementations providing improved
performance characteristics. In some benchmarks, the new mpsc channel shows
up to 7x improvement over the version provided by the `futures` crate. Unfortunately,
the `oneshot` implementation only provides a slight performance improvement as it
is mostly limited by the `futures` 0.1 task system. Once updated to the `std` version
of `Future` (currently nightly only), much greater performance improvements should
be achievable by `oneshot`.

Additionally, he implementations provided here are checked using
[Loom](http://github.com/carllerche/loom/), which provides greater confidence of
correctness.
2019-01-22 11:37:26 -08:00

289 lines
6.3 KiB
Rust

extern crate futures;
extern crate tokio_mock_task;
extern crate tokio_sync;
use tokio_sync::mpsc;
use tokio_mock_task::*;
use futures::prelude::*;
use std::thread;
use std::sync::Arc;
trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}
macro_rules! assert_ready {
($e:expr) => {{
match $e {
Ok(futures::Async::Ready(v)) => v,
Ok(_) => panic!("not ready"),
Err(e) => panic!("error = {:?}", e),
}
}}
}
macro_rules! assert_not_ready {
($e:expr) => {{
match $e {
Ok(futures::Async::NotReady) => {},
Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v),
Err(e) => panic!("error = {:?}", e),
}
}}
}
#[test]
fn send_recv_with_buffer() {
let (mut tx, mut rx) = mpsc::channel::<i32>(16);
// Using poll_ready / try_send
assert_ready!(tx.poll_ready());
tx.try_send(1).unwrap();
// Without poll_ready
tx.try_send(2).unwrap();
// Sink API
assert!(tx.start_send(3).unwrap().is_ready());
assert_ready!(tx.poll_complete());
assert_ready!(tx.close());
drop(tx);
let val = assert_ready!(rx.poll());
assert_eq!(val, Some(1));
let val = assert_ready!(rx.poll());
assert_eq!(val, Some(2));
let val = assert_ready!(rx.poll());
assert_eq!(val, Some(3));
let val = assert_ready!(rx.poll());
assert!(val.is_none());
}
#[test]
fn send_recv_unbounded() {
let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
// Using `try_send`
tx.try_send(1).unwrap();
// Using `Sink` API
assert!(tx.start_send(2).unwrap().is_ready());
assert_ready!(tx.poll_complete());
let val = assert_ready!(rx.poll());
assert_eq!(val, Some(1));
let val = assert_ready!(rx.poll());
assert_eq!(val, Some(2));
assert_ready!(tx.poll_complete());
assert_ready!(tx.close());
drop(tx);
let val = assert_ready!(rx.poll());
assert!(val.is_none());
}
#[test]
fn send_recv_buffer_limited() {
let (mut tx, mut rx) = mpsc::channel::<i32>(1);
let mut task = MockTask::new();
// Run on a task context
task.enter(|| {
assert!(tx.poll_complete().unwrap().is_ready());
assert!(tx.poll_ready().unwrap().is_ready());
// Send first message
let res = tx.start_send(1).unwrap();
assert!(is_ready(&res));
assert!(tx.poll_ready().unwrap().is_not_ready());
// Send second message
let res = tx.start_send(2).unwrap();
assert!(!is_ready(&res));
// Take the value
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1)));
assert!(tx.poll_ready().unwrap().is_ready());
let res = tx.start_send(2).unwrap();
assert!(is_ready(&res));
assert!(tx.poll_ready().unwrap().is_not_ready());
// Take the value
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2)));
assert!(tx.poll_ready().unwrap().is_ready());
});
}
#[test]
fn send_shared_recv() {
let (tx1, rx) = mpsc::channel::<i32>(16);
let tx2 = tx1.clone();
let mut rx = rx.wait();
tx1.send(1).wait().unwrap();
assert_eq!(rx.next().unwrap().unwrap(), 1);
tx2.send(2).wait().unwrap();
assert_eq!(rx.next().unwrap().unwrap(), 2);
}
#[test]
fn send_recv_threads() {
let (tx, rx) = mpsc::channel::<i32>(16);
let mut rx = rx.wait();
thread::spawn(move|| {
tx.send(1).wait().unwrap();
});
assert_eq!(rx.next().unwrap().unwrap(), 1);
}
#[test]
fn recv_close_gets_none_idle() {
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
let mut task = MockTask::new();
rx.close();
task.enter(|| {
let val = assert_ready!(rx.poll());
assert!(val.is_none());
assert!(tx.poll_ready().is_err());
});
}
#[test]
fn recv_close_gets_none_reserved() {
let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
assert_ready!(tx1.poll_ready());
let mut task = MockTask::new();
task.enter(|| {
assert_not_ready!(tx2.poll_ready());
});
rx.close();
assert!(task.is_notified());
task.enter(|| {
assert!(tx2.poll_ready().is_err());
assert_not_ready!(rx.poll());
});
assert!(!task.is_notified());
assert!(tx1.try_send(123).is_ok());
assert!(task.is_notified());
task.enter(|| {
let v = assert_ready!(rx.poll());
assert_eq!(v, Some(123));
let v = assert_ready!(rx.poll());
assert!(v.is_none());
});
}
#[test]
fn tx_close_gets_none() {
let (_, mut rx) = mpsc::channel::<i32>(10);
let mut task = MockTask::new();
// Run on a task context
task.enter(|| {
let v = assert_ready!(rx.poll());
assert!(v.is_none());
});
}
fn is_ready<T>(res: &AsyncSink<T>) -> bool {
match *res {
AsyncSink::Ready => true,
_ => false,
}
}
#[test]
fn try_send_fail() {
let (mut tx, rx) = mpsc::channel(1);
let mut rx = rx.wait();
tx.try_send("hello").unwrap();
// This should fail
assert!(tx.try_send("fail").is_err());
assert_eq!(rx.next().unwrap().unwrap(), "hello");
tx.try_send("goodbye").unwrap();
drop(tx);
assert_eq!(rx.next().unwrap().unwrap(), "goodbye");
assert!(rx.next().is_none());
}
#[test]
fn drop_tx_with_permit_releases_permit() {
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
let (mut tx1, _rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
let mut task = MockTask::new();
assert_ready!(tx1.poll_ready());
task.enter(|| {
assert_not_ready!(tx2.poll_ready());
});
drop(tx1);
assert!(task.is_notified());
assert_ready!(tx2.poll_ready());
}
#[test]
fn dropping_rx_closes_channel() {
let (mut tx, rx) = mpsc::channel(100);
let msg = Arc::new(());
tx.try_send(msg.clone()).unwrap();
drop(rx);
assert!(tx.poll_ready().is_err());
assert_eq!(1, Arc::strong_count(&msg));
}
#[test]
fn unconsumed_messagers_are_dropped() {
let msg = Arc::new(());
let (mut tx, rx) = mpsc::channel(100);
tx.try_send(msg.clone()).unwrap();
assert_eq!(2, Arc::strong_count(&msg));
drop((tx, rx));
assert_eq!(1, Arc::strong_count(&msg));
}