mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
564 lines
14 KiB
Rust
564 lines
14 KiB
Rust
use futures::stream::iter;
|
|
use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
|
|
use tokio_test::{assert_ok, assert_pending, assert_ready, task};
|
|
|
|
use std::future::{poll_fn, Future};
|
|
use std::pin::{pin, Pin};
|
|
use std::task::Poll;
|
|
|
|
mod support {
|
|
pub(crate) mod mpsc;
|
|
}
|
|
|
|
use support::mpsc;
|
|
|
|
macro_rules! assert_ready_some {
|
|
($($t:tt)*) => {
|
|
match assert_ready!($($t)*) {
|
|
Some(v) => v,
|
|
None => panic!("expected `Some`, got `None`"),
|
|
}
|
|
};
|
|
}
|
|
|
|
macro_rules! assert_ready_none {
|
|
($($t:tt)*) => {
|
|
match assert_ready!($($t)*) {
|
|
None => {}
|
|
Some(v) => panic!("expected `None`, got `Some({:?})`", v),
|
|
}
|
|
};
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn empty() {
|
|
let mut map = StreamMap::<&str, stream::Pending<()>>::new();
|
|
|
|
assert_eq!(map.len(), 0);
|
|
assert!(map.is_empty());
|
|
|
|
assert!(map.next().await.is_none());
|
|
assert!(map.next().await.is_none());
|
|
|
|
assert!(map.remove("foo").is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn single_entry() {
|
|
let mut map = task::spawn(StreamMap::new());
|
|
let (tx, rx) = mpsc::unbounded_channel_stream();
|
|
let rx = Box::pin(rx);
|
|
|
|
assert_ready_none!(map.poll_next());
|
|
|
|
assert!(map.insert("foo", rx).is_none());
|
|
assert!(map.contains_key("foo"));
|
|
assert!(!map.contains_key("bar"));
|
|
|
|
assert_eq!(map.len(), 1);
|
|
assert!(!map.is_empty());
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
assert_ok!(tx.send(1));
|
|
|
|
assert!(map.is_woken());
|
|
let (k, v) = assert_ready_some!(map.poll_next());
|
|
assert_eq!(k, "foo");
|
|
assert_eq!(v, 1);
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
assert_ok!(tx.send(2));
|
|
|
|
assert!(map.is_woken());
|
|
let (k, v) = assert_ready_some!(map.poll_next());
|
|
assert_eq!(k, "foo");
|
|
assert_eq!(v, 2);
|
|
|
|
assert_pending!(map.poll_next());
|
|
drop(tx);
|
|
assert!(map.is_woken());
|
|
assert_ready_none!(map.poll_next());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn multiple_entries() {
|
|
let mut map = task::spawn(StreamMap::new());
|
|
let (tx1, rx1) = mpsc::unbounded_channel_stream();
|
|
let (tx2, rx2) = mpsc::unbounded_channel_stream();
|
|
|
|
let rx1 = Box::pin(rx1);
|
|
let rx2 = Box::pin(rx2);
|
|
|
|
map.insert("foo", rx1);
|
|
map.insert("bar", rx2);
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
assert_ok!(tx1.send(1));
|
|
|
|
assert!(map.is_woken());
|
|
let (k, v) = assert_ready_some!(map.poll_next());
|
|
assert_eq!(k, "foo");
|
|
assert_eq!(v, 1);
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
assert_ok!(tx2.send(2));
|
|
|
|
assert!(map.is_woken());
|
|
let (k, v) = assert_ready_some!(map.poll_next());
|
|
assert_eq!(k, "bar");
|
|
assert_eq!(v, 2);
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
assert_ok!(tx1.send(3));
|
|
assert_ok!(tx2.send(4));
|
|
|
|
assert!(map.is_woken());
|
|
|
|
// Given the randomization, there is no guarantee what order the values will
|
|
// be received in.
|
|
let mut v = (0..2)
|
|
.map(|_| assert_ready_some!(map.poll_next()))
|
|
.collect::<Vec<_>>();
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
v.sort_unstable();
|
|
assert_eq!(v[0].0, "bar");
|
|
assert_eq!(v[0].1, 4);
|
|
assert_eq!(v[1].0, "foo");
|
|
assert_eq!(v[1].1, 3);
|
|
|
|
drop(tx1);
|
|
assert!(map.is_woken());
|
|
assert_pending!(map.poll_next());
|
|
drop(tx2);
|
|
|
|
assert_ready_none!(map.poll_next());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn insert_remove() {
|
|
let mut map = task::spawn(StreamMap::new());
|
|
let (tx, rx) = mpsc::unbounded_channel_stream();
|
|
|
|
let rx = Box::pin(rx);
|
|
|
|
assert_ready_none!(map.poll_next());
|
|
|
|
assert!(map.insert("foo", rx).is_none());
|
|
let rx = map.remove("foo").unwrap();
|
|
|
|
assert_ok!(tx.send(1));
|
|
|
|
assert!(!map.is_woken());
|
|
assert_ready_none!(map.poll_next());
|
|
|
|
assert!(map.insert("bar", rx).is_none());
|
|
|
|
let v = assert_ready_some!(map.poll_next());
|
|
assert_eq!(v.0, "bar");
|
|
assert_eq!(v.1, 1);
|
|
|
|
assert!(map.remove("bar").is_some());
|
|
assert_ready_none!(map.poll_next());
|
|
|
|
assert!(map.is_empty());
|
|
assert_eq!(0, map.len());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn replace() {
|
|
let mut map = task::spawn(StreamMap::new());
|
|
let (tx1, rx1) = mpsc::unbounded_channel_stream();
|
|
let (tx2, rx2) = mpsc::unbounded_channel_stream();
|
|
|
|
let rx1 = Box::pin(rx1);
|
|
let rx2 = Box::pin(rx2);
|
|
|
|
assert!(map.insert("foo", rx1).is_none());
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
let _rx1 = map.insert("foo", rx2).unwrap();
|
|
|
|
assert_pending!(map.poll_next());
|
|
|
|
tx1.send(1).unwrap();
|
|
assert_pending!(map.poll_next());
|
|
|
|
tx2.send(2).unwrap();
|
|
assert!(map.is_woken());
|
|
let v = assert_ready_some!(map.poll_next());
|
|
assert_eq!(v.0, "foo");
|
|
assert_eq!(v.1, 2);
|
|
}
|
|
|
|
#[test]
|
|
fn size_hint_with_upper() {
|
|
let mut map = StreamMap::new();
|
|
|
|
map.insert("a", stream::iter(vec![1]));
|
|
map.insert("b", stream::iter(vec![1, 2]));
|
|
map.insert("c", stream::iter(vec![1, 2, 3]));
|
|
|
|
assert_eq!(3, map.len());
|
|
assert!(!map.is_empty());
|
|
|
|
let size_hint = map.size_hint();
|
|
assert_eq!(size_hint, (6, Some(6)));
|
|
}
|
|
|
|
#[test]
|
|
fn size_hint_without_upper() {
|
|
let mut map = StreamMap::new();
|
|
|
|
map.insert("a", pin_box(stream::iter(vec![1])));
|
|
map.insert("b", pin_box(stream::iter(vec![1, 2])));
|
|
map.insert("c", pin_box(pending()));
|
|
|
|
let size_hint = map.size_hint();
|
|
assert_eq!(size_hint, (3, None));
|
|
}
|
|
|
|
#[test]
|
|
fn new_capacity_zero() {
|
|
let map = StreamMap::<&str, stream::Pending<()>>::new();
|
|
assert_eq!(0, map.capacity());
|
|
|
|
assert!(map.keys().next().is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn with_capacity() {
|
|
let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
|
|
assert!(10 <= map.capacity());
|
|
|
|
assert!(map.keys().next().is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn iter_keys() {
|
|
let mut map = StreamMap::new();
|
|
|
|
map.insert("a", pending::<i32>());
|
|
map.insert("b", pending());
|
|
map.insert("c", pending());
|
|
|
|
let mut keys = map.keys().collect::<Vec<_>>();
|
|
keys.sort_unstable();
|
|
|
|
assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
|
|
}
|
|
|
|
#[test]
|
|
fn iter_values() {
|
|
let mut map = StreamMap::new();
|
|
|
|
map.insert("a", stream::iter(vec![1]));
|
|
map.insert("b", stream::iter(vec![1, 2]));
|
|
map.insert("c", stream::iter(vec![1, 2, 3]));
|
|
|
|
let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
|
|
|
|
size_hints.sort_unstable();
|
|
|
|
assert_eq!(&size_hints[..], &[1, 2, 3]);
|
|
}
|
|
|
|
#[test]
|
|
fn iter_values_mut() {
|
|
let mut map = StreamMap::new();
|
|
|
|
map.insert("a", stream::iter(vec![1]));
|
|
map.insert("b", stream::iter(vec![1, 2]));
|
|
map.insert("c", stream::iter(vec![1, 2, 3]));
|
|
|
|
let mut size_hints = map
|
|
.values_mut()
|
|
.map(|s: &mut _| s.size_hint().0)
|
|
.collect::<Vec<_>>();
|
|
|
|
size_hints.sort_unstable();
|
|
|
|
assert_eq!(&size_hints[..], &[1, 2, 3]);
|
|
}
|
|
|
|
#[test]
|
|
fn clear() {
|
|
let mut map = task::spawn(StreamMap::new());
|
|
|
|
map.insert("a", stream::iter(vec![1]));
|
|
map.insert("b", stream::iter(vec![1, 2]));
|
|
map.insert("c", stream::iter(vec![1, 2, 3]));
|
|
|
|
assert_ready_some!(map.poll_next());
|
|
|
|
map.clear();
|
|
|
|
assert_ready_none!(map.poll_next());
|
|
assert!(map.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn contains_key_borrow() {
|
|
let mut map = StreamMap::new();
|
|
map.insert("foo".to_string(), pending::<()>());
|
|
|
|
assert!(map.contains_key("foo"));
|
|
}
|
|
|
|
#[test]
|
|
fn one_ready_many_none() {
|
|
// Run a few times because of randomness
|
|
for _ in 0..100 {
|
|
let mut map = task::spawn(StreamMap::new());
|
|
|
|
map.insert(0, pin_box(stream::empty()));
|
|
map.insert(1, pin_box(stream::empty()));
|
|
map.insert(2, pin_box(stream::once("hello")));
|
|
map.insert(3, pin_box(stream::pending()));
|
|
|
|
let v = assert_ready_some!(map.poll_next());
|
|
assert_eq!(v, (2, "hello"));
|
|
}
|
|
}
|
|
|
|
fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
|
|
Box::pin(s)
|
|
}
|
|
|
|
type UsizeStream = Pin<Box<dyn Stream<Item = usize> + Send>>;
|
|
|
|
#[tokio::test]
|
|
async fn poll_next_many_zero() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(pending()) as UsizeStream);
|
|
|
|
let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 0)).await;
|
|
|
|
assert_eq!(n, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn poll_next_many_empty() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 1)).await;
|
|
|
|
assert_eq!(n, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn poll_next_many_pending() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(pending()) as UsizeStream);
|
|
|
|
let mut is_pending = false;
|
|
poll_fn(|cx| {
|
|
let poll = stream_map.poll_next_many(cx, &mut vec![], 1);
|
|
|
|
is_pending = poll.is_pending();
|
|
|
|
Poll::Ready(())
|
|
})
|
|
.await;
|
|
|
|
assert!(is_pending);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn poll_next_many_not_enough() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
|
|
stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
|
|
|
|
let mut buffer = vec![];
|
|
let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await;
|
|
|
|
assert_eq!(n, 2);
|
|
assert_eq!(buffer.len(), 2);
|
|
assert!(buffer.contains(&(0, 0)));
|
|
assert!(buffer.contains(&(1, 1)));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn poll_next_many_enough() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
|
|
stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
|
|
|
|
let mut buffer = vec![];
|
|
let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await;
|
|
|
|
assert_eq!(n, 2);
|
|
assert_eq!(buffer.len(), 2);
|
|
assert!(buffer.contains(&(0, 0)));
|
|
assert!(buffer.contains(&(1, 1)));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn poll_next_many_correctly_loops_around() {
|
|
for _ in 0..10 {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
|
|
stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream);
|
|
stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream);
|
|
|
|
let mut buffer = vec![];
|
|
|
|
let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await;
|
|
assert_eq!(n, 3);
|
|
assert_eq!(
|
|
std::mem::take(&mut buffer)
|
|
.into_iter()
|
|
.map(|(_, v)| v)
|
|
.collect::<Vec<_>>(),
|
|
vec![0, 0, 0]
|
|
);
|
|
|
|
let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await;
|
|
assert_eq!(n, 2);
|
|
assert_eq!(
|
|
std::mem::take(&mut buffer)
|
|
.into_iter()
|
|
.map(|(_, v)| v)
|
|
.collect::<Vec<_>>(),
|
|
vec![1, 1]
|
|
);
|
|
|
|
let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 1)).await;
|
|
assert_eq!(n, 1);
|
|
assert_eq!(
|
|
std::mem::take(&mut buffer)
|
|
.into_iter()
|
|
.map(|(_, v)| v)
|
|
.collect::<Vec<_>>(),
|
|
vec![2]
|
|
);
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_many_zero() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(pending()) as UsizeStream);
|
|
|
|
let n = poll_fn(|cx| pin!(stream_map.next_many(&mut vec![], 0)).poll(cx)).await;
|
|
|
|
assert_eq!(n, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_many_empty() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
let n = stream_map.next_many(&mut vec![], 1).await;
|
|
|
|
assert_eq!(n, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_many_pending() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(pending()) as UsizeStream);
|
|
|
|
let mut is_pending = false;
|
|
poll_fn(|cx| {
|
|
let poll = pin!(stream_map.next_many(&mut vec![], 1)).poll(cx);
|
|
|
|
is_pending = poll.is_pending();
|
|
|
|
Poll::Ready(())
|
|
})
|
|
.await;
|
|
|
|
assert!(is_pending);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_many_not_enough() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
|
|
stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
|
|
|
|
let mut buffer = vec![];
|
|
let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await;
|
|
|
|
assert_eq!(n, 2);
|
|
assert_eq!(buffer.len(), 2);
|
|
assert!(buffer.contains(&(0, 0)));
|
|
assert!(buffer.contains(&(1, 1)));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_many_enough() {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
|
|
stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
|
|
|
|
let mut buffer = vec![];
|
|
let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await;
|
|
|
|
assert_eq!(n, 2);
|
|
assert_eq!(buffer.len(), 2);
|
|
assert!(buffer.contains(&(0, 0)));
|
|
assert!(buffer.contains(&(1, 1)));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_many_correctly_loops_around() {
|
|
for _ in 0..10 {
|
|
let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
|
|
|
|
stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
|
|
stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream);
|
|
stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream);
|
|
|
|
let mut buffer = vec![];
|
|
|
|
let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await;
|
|
assert_eq!(n, 3);
|
|
assert_eq!(
|
|
std::mem::take(&mut buffer)
|
|
.into_iter()
|
|
.map(|(_, v)| v)
|
|
.collect::<Vec<_>>(),
|
|
vec![0, 0, 0]
|
|
);
|
|
|
|
let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await;
|
|
assert_eq!(n, 2);
|
|
assert_eq!(
|
|
std::mem::take(&mut buffer)
|
|
.into_iter()
|
|
.map(|(_, v)| v)
|
|
.collect::<Vec<_>>(),
|
|
vec![1, 1]
|
|
);
|
|
|
|
let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 1)).poll(cx)).await;
|
|
assert_eq!(n, 1);
|
|
assert_eq!(
|
|
std::mem::take(&mut buffer)
|
|
.into_iter()
|
|
.map(|(_, v)| v)
|
|
.collect::<Vec<_>>(),
|
|
vec![2]
|
|
);
|
|
}
|
|
}
|