mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00

Following from https://github.com/tokio-rs/tokio/pull/865, this PR removes `#[derive(Debug)]` on `mpsc` sender and receiver types in favor of explicit `impl fmt::Debug` blocks that don't have a `T: fmt::Debug` bound.
317 lines
7.0 KiB
Rust
317 lines
7.0 KiB
Rust
extern crate futures;
|
|
extern crate tokio_mock_task;
|
|
extern crate tokio_sync;
|
|
|
|
use tokio_sync::mpsc;
|
|
use tokio_mock_task::*;
|
|
|
|
use futures::prelude::*;
|
|
|
|
use std::thread;
|
|
use std::sync::Arc;
|
|
|
|
trait AssertSend: Send {}
|
|
impl AssertSend for mpsc::Sender<i32> {}
|
|
impl AssertSend for mpsc::Receiver<i32> {}
|
|
|
|
macro_rules! assert_ready {
|
|
($e:expr) => {{
|
|
match $e {
|
|
Ok(futures::Async::Ready(v)) => v,
|
|
Ok(_) => panic!("not ready"),
|
|
Err(e) => panic!("error = {:?}", e),
|
|
}
|
|
}}
|
|
}
|
|
|
|
macro_rules! assert_not_ready {
|
|
($e:expr) => {{
|
|
match $e {
|
|
Ok(futures::Async::NotReady) => {},
|
|
Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v),
|
|
Err(e) => panic!("error = {:?}", e),
|
|
}
|
|
}}
|
|
}
|
|
|
|
#[test]
|
|
fn send_recv_with_buffer() {
|
|
let (mut tx, mut rx) = mpsc::channel::<i32>(16);
|
|
|
|
// Using poll_ready / try_send
|
|
assert_ready!(tx.poll_ready());
|
|
tx.try_send(1).unwrap();
|
|
|
|
// Without poll_ready
|
|
tx.try_send(2).unwrap();
|
|
|
|
// Sink API
|
|
assert!(tx.start_send(3).unwrap().is_ready());
|
|
assert_ready!(tx.poll_complete());
|
|
assert_ready!(tx.close());
|
|
|
|
drop(tx);
|
|
|
|
let val = assert_ready!(rx.poll());
|
|
assert_eq!(val, Some(1));
|
|
|
|
let val = assert_ready!(rx.poll());
|
|
assert_eq!(val, Some(2));
|
|
|
|
let val = assert_ready!(rx.poll());
|
|
assert_eq!(val, Some(3));
|
|
|
|
let val = assert_ready!(rx.poll());
|
|
assert!(val.is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn send_recv_unbounded() {
|
|
let (mut tx, mut rx) = mpsc::unbounded_channel::<i32>();
|
|
|
|
// Using `try_send`
|
|
tx.try_send(1).unwrap();
|
|
|
|
// Using `Sink` API
|
|
assert!(tx.start_send(2).unwrap().is_ready());
|
|
assert_ready!(tx.poll_complete());
|
|
|
|
let val = assert_ready!(rx.poll());
|
|
assert_eq!(val, Some(1));
|
|
|
|
let val = assert_ready!(rx.poll());
|
|
assert_eq!(val, Some(2));
|
|
|
|
assert_ready!(tx.poll_complete());
|
|
assert_ready!(tx.close());
|
|
|
|
drop(tx);
|
|
|
|
let val = assert_ready!(rx.poll());
|
|
assert!(val.is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn no_t_bounds_buffer() {
|
|
struct NoImpls;
|
|
let (tx, mut rx) = mpsc::channel(100);
|
|
|
|
// sender should be Debug even though T isn't Debug
|
|
println!("{:?}", tx);
|
|
// same with Receiver
|
|
println!("{:?}", rx);
|
|
// and sender should be Clone even though T isn't Clone
|
|
assert!(tx.clone().try_send(NoImpls).is_ok());
|
|
assert!(assert_ready!(rx.poll()).is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn no_t_bounds_unbounded() {
|
|
struct NoImpls;
|
|
let (tx, mut rx) = mpsc::unbounded_channel();
|
|
|
|
// sender should be Debug even though T isn't Debug
|
|
println!("{:?}", tx);
|
|
// same with Receiver
|
|
println!("{:?}", rx);
|
|
// and sender should be Clone even though T isn't Clone
|
|
assert!(tx.clone().try_send(NoImpls).is_ok());
|
|
assert!(assert_ready!(rx.poll()).is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn send_recv_buffer_limited() {
|
|
let (mut tx, mut rx) = mpsc::channel::<i32>(1);
|
|
let mut task = MockTask::new();
|
|
|
|
// Run on a task context
|
|
task.enter(|| {
|
|
assert!(tx.poll_complete().unwrap().is_ready());
|
|
assert!(tx.poll_ready().unwrap().is_ready());
|
|
|
|
// Send first message
|
|
let res = tx.start_send(1).unwrap();
|
|
assert!(is_ready(&res));
|
|
assert!(tx.poll_ready().unwrap().is_not_ready());
|
|
|
|
// Send second message
|
|
let res = tx.start_send(2).unwrap();
|
|
assert!(!is_ready(&res));
|
|
|
|
// Take the value
|
|
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1)));
|
|
assert!(tx.poll_ready().unwrap().is_ready());
|
|
|
|
let res = tx.start_send(2).unwrap();
|
|
assert!(is_ready(&res));
|
|
assert!(tx.poll_ready().unwrap().is_not_ready());
|
|
|
|
// Take the value
|
|
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2)));
|
|
assert!(tx.poll_ready().unwrap().is_ready());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn send_shared_recv() {
|
|
let (tx1, rx) = mpsc::channel::<i32>(16);
|
|
let tx2 = tx1.clone();
|
|
let mut rx = rx.wait();
|
|
|
|
tx1.send(1).wait().unwrap();
|
|
assert_eq!(rx.next().unwrap().unwrap(), 1);
|
|
|
|
tx2.send(2).wait().unwrap();
|
|
assert_eq!(rx.next().unwrap().unwrap(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn send_recv_threads() {
|
|
let (tx, rx) = mpsc::channel::<i32>(16);
|
|
let mut rx = rx.wait();
|
|
|
|
thread::spawn(move|| {
|
|
tx.send(1).wait().unwrap();
|
|
});
|
|
|
|
assert_eq!(rx.next().unwrap().unwrap(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn recv_close_gets_none_idle() {
|
|
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
|
|
let mut task = MockTask::new();
|
|
|
|
rx.close();
|
|
|
|
task.enter(|| {
|
|
let val = assert_ready!(rx.poll());
|
|
assert!(val.is_none());
|
|
assert!(tx.poll_ready().is_err());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn recv_close_gets_none_reserved() {
|
|
let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
|
|
let mut tx2 = tx1.clone();
|
|
|
|
assert_ready!(tx1.poll_ready());
|
|
|
|
let mut task = MockTask::new();
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(tx2.poll_ready());
|
|
});
|
|
|
|
rx.close();
|
|
|
|
assert!(task.is_notified());
|
|
|
|
task.enter(|| {
|
|
assert!(tx2.poll_ready().is_err());
|
|
assert_not_ready!(rx.poll());
|
|
});
|
|
|
|
assert!(!task.is_notified());
|
|
|
|
assert!(tx1.try_send(123).is_ok());
|
|
|
|
assert!(task.is_notified());
|
|
|
|
task.enter(|| {
|
|
let v = assert_ready!(rx.poll());
|
|
assert_eq!(v, Some(123));
|
|
|
|
let v = assert_ready!(rx.poll());
|
|
assert!(v.is_none());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn tx_close_gets_none() {
|
|
let (_, mut rx) = mpsc::channel::<i32>(10);
|
|
let mut task = MockTask::new();
|
|
|
|
// Run on a task context
|
|
task.enter(|| {
|
|
let v = assert_ready!(rx.poll());
|
|
assert!(v.is_none());
|
|
});
|
|
}
|
|
|
|
fn is_ready<T>(res: &AsyncSink<T>) -> bool {
|
|
match *res {
|
|
AsyncSink::Ready => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn try_send_fail() {
|
|
let (mut tx, rx) = mpsc::channel(1);
|
|
let mut rx = rx.wait();
|
|
|
|
tx.try_send("hello").unwrap();
|
|
|
|
// This should fail
|
|
assert!(tx.try_send("fail").is_err());
|
|
|
|
assert_eq!(rx.next().unwrap().unwrap(), "hello");
|
|
|
|
tx.try_send("goodbye").unwrap();
|
|
drop(tx);
|
|
|
|
assert_eq!(rx.next().unwrap().unwrap(), "goodbye");
|
|
assert!(rx.next().is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn drop_tx_with_permit_releases_permit() {
|
|
// poll_ready reserves capacity, ensure that the capacity is released if tx
|
|
// is dropped w/o sending a value.
|
|
let (mut tx1, _rx) = mpsc::channel::<i32>(1);
|
|
let mut tx2 = tx1.clone();
|
|
let mut task = MockTask::new();
|
|
|
|
assert_ready!(tx1.poll_ready());
|
|
|
|
task.enter(|| {
|
|
assert_not_ready!(tx2.poll_ready());
|
|
});
|
|
|
|
drop(tx1);
|
|
|
|
assert!(task.is_notified());
|
|
|
|
assert_ready!(tx2.poll_ready());
|
|
}
|
|
|
|
#[test]
|
|
fn dropping_rx_closes_channel() {
|
|
let (mut tx, rx) = mpsc::channel(100);
|
|
|
|
let msg = Arc::new(());
|
|
tx.try_send(msg.clone()).unwrap();
|
|
|
|
drop(rx);
|
|
assert!(tx.poll_ready().is_err());
|
|
|
|
assert_eq!(1, Arc::strong_count(&msg));
|
|
}
|
|
|
|
#[test]
|
|
fn unconsumed_messagers_are_dropped() {
|
|
let msg = Arc::new(());
|
|
|
|
let (mut tx, rx) = mpsc::channel(100);
|
|
|
|
tx.try_send(msg.clone()).unwrap();
|
|
|
|
assert_eq!(2, Arc::strong_count(&msg));
|
|
|
|
drop((tx, rx));
|
|
|
|
assert_eq!(1, Arc::strong_count(&msg));
|
|
}
|