streams: create StreamMock for testing Streams (#5915)

Introduce a new mock type to tests streams and eventually
sinks. Only includes next() and wait() for now. Fixes #4106
This commit is contained in:
joe thomas 2023-08-12 12:18:23 -07:00 committed by GitHub
parent 8b8005ebdd
commit 197757d440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 219 additions and 0 deletions

View File

@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities
pub mod io;
pub mod stream_mock;
mod macros;
pub mod task;

View File

@ -0,0 +1,168 @@
#![cfg(not(loom))]
//! A mock stream implementing [`Stream`].
//!
//! # Overview
//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
//! intervals between items.
//!
//! # Usage
//! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder
//! allows you to enqueue actions such as returning items or waiting for a certain duration.
//!
//! # Example
//! ```rust
//!
//! use futures_util::StreamExt;
//! use std::time::Duration;
//! use tokio_test::stream_mock::StreamMockBuilder;
//!
//! async fn test_stream_mock_wait() {
//! let mut stream_mock = StreamMockBuilder::new()
//! .next(1)
//! .wait(Duration::from_millis(300))
//! .next(2)
//! .build();
//!
//! assert_eq!(stream_mock.next().await, Some(1));
//! let start = std::time::Instant::now();
//! assert_eq!(stream_mock.next().await, Some(2));
//! let elapsed = start.elapsed();
//! assert!(elapsed >= Duration::from_millis(300));
//! assert_eq!(stream_mock.next().await, None);
//! }
//! ```
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use futures_core::{ready, Stream};
use std::future::Future;
use tokio::time::{sleep_until, Instant, Sleep};
#[derive(Debug, Clone)]
enum Action<T: Unpin> {
Next(T),
Wait(Duration),
}
/// A builder for [`StreamMock`]
#[derive(Debug, Clone)]
pub struct StreamMockBuilder<T: Unpin> {
actions: VecDeque<Action<T>>,
}
impl<T: Unpin> StreamMockBuilder<T> {
/// Create a new empty [`StreamMockBuilder`]
pub fn new() -> Self {
StreamMockBuilder::default()
}
/// Queue an item to be returned by the stream
pub fn next(mut self, value: T) -> Self {
self.actions.push_back(Action::Next(value));
self
}
// Queue an item to be consumed by the sink,
// commented out until Sink is implemented.
//
// pub fn consume(mut self, value: T) -> Self {
// self.actions.push_back(Action::Consume(value));
// self
// }
/// Queue the stream to wait for a duration
pub fn wait(mut self, duration: Duration) -> Self {
self.actions.push_back(Action::Wait(duration));
self
}
/// Build the [`StreamMock`]
pub fn build(self) -> StreamMock<T> {
StreamMock {
actions: self.actions,
sleep: None,
}
}
}
impl<T: Unpin> Default for StreamMockBuilder<T> {
fn default() -> Self {
StreamMockBuilder {
actions: VecDeque::new(),
}
}
}
/// A mock stream implementing [`Stream`]
///
/// See [`StreamMockBuilder`] for more information.
#[derive(Debug)]
pub struct StreamMock<T: Unpin> {
actions: VecDeque<Action<T>>,
sleep: Option<Pin<Box<Sleep>>>,
}
impl<T: Unpin> StreamMock<T> {
fn next_action(&mut self) -> Option<Action<T>> {
self.actions.pop_front()
}
}
impl<T: Unpin> Stream for StreamMock<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// Try polling the sleep future first
if let Some(ref mut sleep) = self.sleep {
ready!(Pin::new(sleep).poll(cx));
// Since we're ready, discard the sleep future
self.sleep.take();
}
match self.next_action() {
Some(action) => match action {
Action::Next(item) => Poll::Ready(Some(item)),
Action::Wait(duration) => {
// Set up a sleep future and schedule this future to be polled again for it.
self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}
impl<T: Unpin> Drop for StreamMock<T> {
fn drop(&mut self) {
// Avoid double panicking to make debugging easier.
if std::thread::panicking() {
return;
}
let undropped_count = self
.actions
.iter()
.filter(|action| match action {
Action::Next(_) => true,
Action::Wait(_) => false,
})
.count();
assert!(
undropped_count == 0,
"StreamMock was dropped before all actions were consumed, {} actions were not consumed",
undropped_count
);
}
}

View File

@ -0,0 +1,50 @@
use futures_util::StreamExt;
use std::time::Duration;
use tokio_test::stream_mock::StreamMockBuilder;
#[tokio::test]
async fn test_stream_mock_empty() {
let mut stream_mock = StreamMockBuilder::<u32>::new().build();
assert_eq!(stream_mock.next().await, None);
assert_eq!(stream_mock.next().await, None);
}
#[tokio::test]
async fn test_stream_mock_items() {
let mut stream_mock = StreamMockBuilder::new().next(1).next(2).build();
assert_eq!(stream_mock.next().await, Some(1));
assert_eq!(stream_mock.next().await, Some(2));
assert_eq!(stream_mock.next().await, None);
}
#[tokio::test]
async fn test_stream_mock_wait() {
let mut stream_mock = StreamMockBuilder::new()
.next(1)
.wait(Duration::from_millis(300))
.next(2)
.build();
assert_eq!(stream_mock.next().await, Some(1));
let start = std::time::Instant::now();
assert_eq!(stream_mock.next().await, Some(2));
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(300));
assert_eq!(stream_mock.next().await, None);
}
#[tokio::test]
#[should_panic(expected = "StreamMock was dropped before all actions were consumed")]
async fn test_stream_mock_drop_without_consuming_all() {
let stream_mock = StreamMockBuilder::new().next(1).next(2).build();
drop(stream_mock);
}
#[tokio::test]
#[should_panic(expected = "test panic was not masked")]
async fn test_stream_mock_drop_during_panic_doesnt_mask_panic() {
let _stream_mock = StreamMockBuilder::new().next(1).next(2).build();
panic!("test panic was not masked");
}