tokio/tokio-codec/tests/framed_read.rs
jesskfullwood 6b9e7bdace codec: update to use std-future (#1214)
Strategy was to

- copy the old codec code that was temporarily being stashed in `tokio-io`
- modify all the type signatures to use Pin, as literal a translation as possible
- fix up the tests likewise

This is intended just to get things compiling and passing tests. Beyond that there is surely
lots of refactoring that can be done to make things more idiomatic. The docs are unchanged.

Closes #1189
2019-06-27 10:10:29 -07:00

303 lines
7.8 KiB
Rust

#![deny(warnings, rust_2018_idioms)]
use std::collections::VecDeque;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use bytes::{Buf, BytesMut, IntoBuf};
use futures::Stream;
use tokio_codec::{Decoder, FramedRead};
use tokio_io::AsyncRead;
use tokio_test::assert_ready;
use tokio_test::task::MockTask;
macro_rules! mock {
($($x:expr,)*) => {{
let mut v = VecDeque::new();
v.extend(vec![$($x),*]);
Mock { calls: v }
}};
}
macro_rules! assert_read {
($e:expr, $n:expr) => {{
let val = assert_ready!($e);
assert_eq!(val.unwrap().unwrap(), $n);
}};
}
macro_rules! pin {
($id:ident) => {
Pin::new(&mut $id)
};
}
struct U32Decoder;
impl Decoder for U32Decoder {
type Item = u32;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
if buf.len() < 4 {
return Ok(None);
}
let n = buf.split_to(4).into_buf().get_u32_be();
Ok(Some(n))
}
}
#[test]
fn read_multi_frame_in_packet() {
let mut task = MockTask::new();
let mock = mock! {
Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
}
#[test]
fn read_multi_frame_across_packets() {
let mut task = MockTask::new();
let mock = mock! {
Ok(b"\x00\x00\x00\x00".to_vec()),
Ok(b"\x00\x00\x00\x01".to_vec()),
Ok(b"\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
}
#[test]
fn read_not_ready() {
let mut task = MockTask::new();
let mock = mock! {
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
Ok(b"\x00\x00\x00\x00".to_vec()),
Ok(b"\x00\x00\x00\x01".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
}
#[test]
fn read_partial_then_not_ready() {
let mut task = MockTask::new();
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
}
#[test]
fn read_err() {
let mut task = MockTask::new();
let mock = mock! {
Err(io::Error::new(io::ErrorKind::Other, "")),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
assert_eq!(
io::ErrorKind::Other,
assert_ready!(pin!(framed).poll_next(cx))
.unwrap()
.unwrap_err()
.kind()
)
});
}
#[test]
fn read_partial_then_err() {
let mut task = MockTask::new();
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::Other, "")),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
assert_eq!(
io::ErrorKind::Other,
assert_ready!(pin!(framed).poll_next(cx))
.unwrap()
.unwrap_err()
.kind()
)
});
}
#[test]
fn read_partial_would_block_then_err() {
let mut task = MockTask::new();
let mock = mock! {
Ok(b"\x00\x00".to_vec()),
Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
Err(io::Error::new(io::ErrorKind::Other, "")),
};
let mut framed = FramedRead::new(mock, U32Decoder);
task.enter(|cx| {
assert!(pin!(framed).poll_next(cx).is_pending());
assert_eq!(
io::ErrorKind::Other,
assert_ready!(pin!(framed).poll_next(cx))
.unwrap()
.unwrap_err()
.kind()
)
});
}
#[test]
fn huge_size() {
let mut task = MockTask::new();
let data = [0; 32 * 1024];
let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder);
task.enter(|cx| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
struct BigDecoder;
impl Decoder for BigDecoder {
type Item = u32;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
if buf.len() < 32 * 1024 {
return Ok(None);
}
buf.split_to(32 * 1024);
Ok(Some(0))
}
}
}
#[test]
fn data_remaining_is_error() {
let mut task = MockTask::new();
let slice = Slice(&[0; 5]);
let mut framed = FramedRead::new(slice, U32Decoder);
task.enter(|cx| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
});
}
#[test]
fn multi_frames_on_eof() {
let mut task = MockTask::new();
struct MyDecoder(Vec<u32>);
impl Decoder for MyDecoder {
type Item = u32;
type Error = io::Error;
fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
unreachable!();
}
fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
if self.0.is_empty() {
return Ok(None);
}
Ok(Some(self.0.remove(0)))
}
}
let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
task.enter(|cx| {
assert_read!(pin!(framed).poll_next(cx), 0);
assert_read!(pin!(framed).poll_next(cx), 1);
assert_read!(pin!(framed).poll_next(cx), 2);
assert_read!(pin!(framed).poll_next(cx), 3);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
}
// ===== Mock ======
struct Mock {
calls: VecDeque<io::Result<Vec<u8>>>,
}
impl Read for Mock {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
match self.calls.pop_front() {
Some(Ok(data)) => {
debug_assert!(dst.len() >= data.len());
dst[..data.len()].copy_from_slice(&data[..]);
Ok(data.len())
}
Some(Err(e)) => Err(e),
None => Ok(0),
}
}
}
impl AsyncRead for Mock {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match Pin::get_mut(self).read(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending,
other => Ready(other),
}
}
}
// TODO this newtype is necessary because `&[u8]` does not currently implement `AsyncRead`
struct Slice<'a>(&'a [u8]);
impl<'a> AsyncRead for Slice<'a> {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Ready(Pin::get_mut(self).0.read(buf))
}
}