From c07a7b26d31cce4ef40c51cca4048a7b5230250d Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 5 Jun 2018 15:31:01 -0700 Subject: [PATCH] Cleanup FramedParts in new tokio-codec (#394) --- tokio-codec/tests/framed.rs | 43 +++++------ tokio-io/src/_tokio_codec/framed.rs | 114 ++++++++++++++++------------ tokio-io/src/_tokio_codec/mod.rs | 2 +- tokio-io/src/codec/decoder.rs | 4 +- 4 files changed, 87 insertions(+), 76 deletions(-) diff --git a/tokio-codec/tests/framed.rs b/tokio-codec/tests/framed.rs index fdedd560e..f7dd9cdf7 100644 --- a/tokio-codec/tests/framed.rs +++ b/tokio-codec/tests/framed.rs @@ -7,10 +7,11 @@ use futures::{Stream, Future}; use std::io::{self, Read}; use tokio_codec::{Framed, FramedParts, Decoder, Encoder}; use tokio_io::AsyncRead; -use bytes::{BytesMut, Buf, BufMut, IntoBuf, BigEndian}; +use bytes::{BytesMut, Buf, BufMut, IntoBuf}; const INITIAL_CAPACITY: usize = 8 * 1024; +/// Encode and decode u32 values. struct U32Codec; impl Decoder for U32Codec { @@ -39,6 +40,7 @@ impl Encoder for U32Codec { } } +/// This value should never be used struct DontReadIntoThis; impl Read for DontReadIntoThis { @@ -52,12 +54,10 @@ impl AsyncRead for DontReadIntoThis {} #[test] fn can_read_from_existing_buf() { - let parts = FramedParts { - inner: DontReadIntoThis, - readbuf: vec![0, 0, 0, 42].into(), - writebuf: BytesMut::with_capacity(0), - }; - let framed = Framed::from_parts(parts, U32Codec); + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0, 0, 0, 42].into(); + + let framed = Framed::from_parts(parts); let num = framed .into_future() @@ -67,31 +67,28 @@ fn can_read_from_existing_buf() { .wait() .map_err(|e| e.0) .unwrap(); + assert_eq!(num, 42); } #[test] fn external_buf_grows_to_init() { - let parts = FramedParts { - inner: DontReadIntoThis, - readbuf: vec![0, 0, 0, 42].into(), - writebuf: BytesMut::with_capacity(0), - }; - let framed = Framed::from_parts(parts, U32Codec); - let FramedParts { readbuf, .. } = framed.into_parts(); + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0, 0, 0, 42].into(); - assert_eq!(readbuf.capacity(), INITIAL_CAPACITY); + let framed = Framed::from_parts(parts); + let FramedParts { read_buf, .. } = framed.into_parts(); + + assert_eq!(read_buf.capacity(), INITIAL_CAPACITY); } #[test] fn external_buf_does_not_shrink() { - let parts = FramedParts { - inner: DontReadIntoThis, - readbuf: vec![0; INITIAL_CAPACITY * 2].into(), - writebuf: BytesMut::with_capacity(0), - }; - let framed = Framed::from_parts(parts, U32Codec); - let FramedParts { readbuf, .. } = framed.into_parts(); + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0; INITIAL_CAPACITY * 2].into(); - assert_eq!(readbuf.capacity(), INITIAL_CAPACITY * 2); + let framed = Framed::from_parts(parts); + let FramedParts { read_buf, .. } = framed.into_parts(); + + assert_eq!(read_buf.capacity(), INITIAL_CAPACITY * 2); } diff --git a/tokio-io/src/_tokio_codec/framed.rs b/tokio-io/src/_tokio_codec/framed.rs index de360dba9..666bf9471 100644 --- a/tokio-io/src/_tokio_codec/framed.rs +++ b/tokio-io/src/_tokio_codec/framed.rs @@ -21,29 +21,31 @@ pub struct Framed { pub struct Fuse(pub T, pub U); -/// Provides a `Stream` and `Sink` interface for reading and writing to this -/// `Io` object, using `Decode` and `Encode` to read and write the raw data. -/// -/// Raw I/O objects work with byte sequences, but higher-level code usually -/// wants to batch these into meaningful chunks, called "frames". This -/// method layers framing on top of an I/O object, by using the `Codec` -/// traits to handle encoding and decoding of messages frames. Note that -/// the incoming and outgoing frame types may be distinct. -/// -/// This function returns a *single* object that is both `Stream` and -/// `Sink`; grouping this into a single object is often useful for layering -/// things like gzip or TLS, which require both read and write access to the -/// underlying object. -/// -/// If you want to work more directly with the streams and sink, consider -/// calling `split` on the `Framed` returned by this method, which will -/// break them into separate objects, allowing them to interact more easily. -pub fn framed(inner: T, codec: U) -> Framed - where T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, +impl Framed +where T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, { - Framed { - inner: framed_read2(framed_write2(Fuse(inner, codec))), + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn new(inner: T, codec: U) -> Framed { + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } } } @@ -68,10 +70,10 @@ impl Framed { /// If you want to work more directly with the streams and sink, consider /// calling `split` on the `Framed` returned by this method, which will /// break them into separate objects, allowing them to interact more easily. - pub fn from_parts(parts: FramedParts, codec: U) -> Framed + pub fn from_parts(parts: FramedParts) -> Framed { Framed { - inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), parts.readbuf), + inner: framed_read2_with_buffer(framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), parts.read_buf), } } @@ -104,32 +106,23 @@ impl Framed { self.inner.into_inner().into_inner().0 } - /// Consumes the `Frame`, returning its underlying I/O stream and the buffer - /// with unprocessed data. + /// Consumes the `Frame`, returning its underlying I/O stream, the buffer + /// with unprocessed data, and the codec. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. - pub fn into_parts(self) -> FramedParts { - let (inner, readbuf) = self.inner.into_parts(); - let (inner, writebuf) = inner.into_parts(); - FramedParts { inner: inner.0, readbuf: readbuf, writebuf: writebuf } - } + pub fn into_parts(self) -> FramedParts { + let (inner, read_buf) = self.inner.into_parts(); + let (inner, write_buf) = inner.into_parts(); - /// Consumes the `Frame`, returning its underlying I/O stream and the buffer - /// with unprocessed data, and also the current codec state. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - /// - /// Note that this function will be removed once the codec has been - /// integrated into `FramedParts` in a new version (see - /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)). - pub fn into_parts_and_codec(self) -> (FramedParts, U) { - let (inner, readbuf) = self.inner.into_parts(); - let (inner, writebuf) = inner.into_parts(); - (FramedParts { inner: inner.0, readbuf: readbuf, writebuf: writebuf }, inner.1) + FramedParts { + io: inner.0, + codec: inner.1, + read_buf: read_buf, + write_buf: write_buf, + _priv: (), + } } } @@ -237,12 +230,33 @@ impl Encoder for Fuse { /// It can be used to construct a new `Framed` with a different codec. /// It contains all current buffers and the inner transport. #[derive(Debug)] -pub struct FramedParts -{ +pub struct FramedParts { /// The inner transport used to read bytes to and write bytes to - pub inner: T, + pub io: T, + + /// The codec + pub codec: U, + /// The buffer with read but unprocessed data. - pub readbuf: BytesMut, + pub read_buf: BytesMut, + /// A buffer with unprocessed data which are not written yet. - pub writebuf: BytesMut + pub write_buf: BytesMut, + + /// This private field allows us to add additional fields in the future in a + /// backwards compatible way. + _priv: (), +} + +impl FramedParts { + /// Create a new, default, `FramedParts` + pub fn new(io: T, codec: U) -> FramedParts { + FramedParts { + io, + codec, + read_buf: BytesMut::new(), + write_buf: BytesMut::new(), + _priv: (), + } + } } diff --git a/tokio-io/src/_tokio_codec/mod.rs b/tokio-io/src/_tokio_codec/mod.rs index 7bee1f17c..1c703f3a4 100644 --- a/tokio-io/src/_tokio_codec/mod.rs +++ b/tokio-io/src/_tokio_codec/mod.rs @@ -31,6 +31,6 @@ mod framed_write; pub use self::decoder::Decoder; pub use self::encoder::Encoder; -pub use self::framed::{framed, Framed, FramedParts}; +pub use self::framed::{Framed, FramedParts}; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; diff --git a/tokio-io/src/codec/decoder.rs b/tokio-io/src/codec/decoder.rs index a3c0b8c1c..9797fb0d0 100644 --- a/tokio-io/src/codec/decoder.rs +++ b/tokio-io/src/codec/decoder.rs @@ -4,7 +4,7 @@ use bytes::BytesMut; use {AsyncWrite, AsyncRead}; use super::encoder::Encoder; -use ::_tokio_codec::{framed, Framed}; +use ::_tokio_codec::Framed; /// Decoding of frames via buffers. /// @@ -112,6 +112,6 @@ pub trait Decoder { fn framed(self, io: T) -> Framed where Self: Encoder + Sized, { - framed(io, self) + Framed::new(io, self) } }