Cleanup FramedParts in new tokio-codec (#394)

This commit is contained in:
Carl Lerche 2018-06-05 15:31:01 -07:00 committed by GitHub
parent f723d10087
commit c07a7b26d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 76 deletions

View File

@ -7,10 +7,11 @@ use futures::{Stream, Future};
use std::io::{self, Read};
use tokio_codec::{Framed, FramedParts, Decoder, Encoder};
use tokio_io::AsyncRead;
use bytes::{BytesMut, Buf, BufMut, IntoBuf, BigEndian};
use bytes::{BytesMut, Buf, BufMut, IntoBuf};
const INITIAL_CAPACITY: usize = 8 * 1024;
/// Encode and decode u32 values.
struct U32Codec;
impl Decoder for U32Codec {
@ -39,6 +40,7 @@ impl Encoder for U32Codec {
}
}
/// This value should never be used
struct DontReadIntoThis;
impl Read for DontReadIntoThis {
@ -52,12 +54,10 @@ impl AsyncRead for DontReadIntoThis {}
#[test]
fn can_read_from_existing_buf() {
let parts = FramedParts {
inner: DontReadIntoThis,
readbuf: vec![0, 0, 0, 42].into(),
writebuf: BytesMut::with_capacity(0),
};
let framed = Framed::from_parts(parts, U32Codec);
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
parts.read_buf = vec![0, 0, 0, 42].into();
let framed = Framed::from_parts(parts);
let num = framed
.into_future()
@ -67,31 +67,28 @@ fn can_read_from_existing_buf() {
.wait()
.map_err(|e| e.0)
.unwrap();
assert_eq!(num, 42);
}
#[test]
fn external_buf_grows_to_init() {
let parts = FramedParts {
inner: DontReadIntoThis,
readbuf: vec![0, 0, 0, 42].into(),
writebuf: BytesMut::with_capacity(0),
};
let framed = Framed::from_parts(parts, U32Codec);
let FramedParts { readbuf, .. } = framed.into_parts();
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
parts.read_buf = vec![0, 0, 0, 42].into();
assert_eq!(readbuf.capacity(), INITIAL_CAPACITY);
let framed = Framed::from_parts(parts);
let FramedParts { read_buf, .. } = framed.into_parts();
assert_eq!(read_buf.capacity(), INITIAL_CAPACITY);
}
#[test]
fn external_buf_does_not_shrink() {
let parts = FramedParts {
inner: DontReadIntoThis,
readbuf: vec![0; INITIAL_CAPACITY * 2].into(),
writebuf: BytesMut::with_capacity(0),
};
let framed = Framed::from_parts(parts, U32Codec);
let FramedParts { readbuf, .. } = framed.into_parts();
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
parts.read_buf = vec![0; INITIAL_CAPACITY * 2].into();
assert_eq!(readbuf.capacity(), INITIAL_CAPACITY * 2);
let framed = Framed::from_parts(parts);
let FramedParts { read_buf, .. } = framed.into_parts();
assert_eq!(read_buf.capacity(), INITIAL_CAPACITY * 2);
}

View File

@ -21,29 +21,31 @@ pub struct Framed<T, U> {
pub struct Fuse<T, U>(pub T, pub U);
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U>
where T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
impl<T, U> Framed<T, U>
where T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
{
Framed {
inner: framed_read2(framed_write2(Fuse(inner, codec))),
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
pub fn new(inner: T, codec: U) -> Framed<T, U> {
Framed {
inner: framed_read2(framed_write2(Fuse(inner, codec))),
}
}
}
@ -68,10 +70,10 @@ impl<T, U> Framed<T, U> {
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U>
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U>
{
Framed {
inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), parts.readbuf),
inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), parts.read_buf),
}
}
@ -104,32 +106,23 @@ impl<T, U> Framed<T, U> {
self.inner.into_inner().into_inner().0
}
/// Consumes the `Frame`, returning its underlying I/O stream and the buffer
/// with unprocessed data.
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T> {
let (inner, readbuf) = self.inner.into_parts();
let (inner, writebuf) = inner.into_parts();
FramedParts { inner: inner.0, readbuf: readbuf, writebuf: writebuf }
}
pub fn into_parts(self) -> FramedParts<T, U> {
let (inner, read_buf) = self.inner.into_parts();
let (inner, write_buf) = inner.into_parts();
/// Consumes the `Frame`, returning its underlying I/O stream and the buffer
/// with unprocessed data, and also the current codec state.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
///
/// Note that this function will be removed once the codec has been
/// integrated into `FramedParts` in a new version (see
/// [#53](https://github.com/tokio-rs/tokio-io/pull/53)).
pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) {
let (inner, readbuf) = self.inner.into_parts();
let (inner, writebuf) = inner.into_parts();
(FramedParts { inner: inner.0, readbuf: readbuf, writebuf: writebuf }, inner.1)
FramedParts {
io: inner.0,
codec: inner.1,
read_buf: read_buf,
write_buf: write_buf,
_priv: (),
}
}
}
@ -237,12 +230,33 @@ impl<T, U: Encoder> Encoder for Fuse<T, U> {
/// It can be used to construct a new `Framed` with a different codec.
/// It contains all current buffers and the inner transport.
#[derive(Debug)]
pub struct FramedParts<T>
{
pub struct FramedParts<T, U> {
/// The inner transport used to read bytes to and write bytes to
pub inner: T,
pub io: T,
/// The codec
pub codec: U,
/// The buffer with read but unprocessed data.
pub readbuf: BytesMut,
pub read_buf: BytesMut,
/// A buffer with unprocessed data which are not written yet.
pub writebuf: BytesMut
pub write_buf: BytesMut,
/// This private field allows us to add additional fields in the future in a
/// backwards compatible way.
_priv: (),
}
impl<T, U> FramedParts<T, U> {
/// Create a new, default, `FramedParts`
pub fn new(io: T, codec: U) -> FramedParts<T, U> {
FramedParts {
io,
codec,
read_buf: BytesMut::new(),
write_buf: BytesMut::new(),
_priv: (),
}
}
}

View File

@ -31,6 +31,6 @@ mod framed_write;
pub use self::decoder::Decoder;
pub use self::encoder::Encoder;
pub use self::framed::{framed, Framed, FramedParts};
pub use self::framed::{Framed, FramedParts};
pub use self::framed_read::FramedRead;
pub use self::framed_write::FramedWrite;

View File

@ -4,7 +4,7 @@ use bytes::BytesMut;
use {AsyncWrite, AsyncRead};
use super::encoder::Encoder;
use ::_tokio_codec::{framed, Framed};
use ::_tokio_codec::Framed;
/// Decoding of frames via buffers.
///
@ -112,6 +112,6 @@ pub trait Decoder {
fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self>
where Self: Encoder + Sized,
{
framed(io, self)
Framed::new(io, self)
}
}