Polish tokio-core in prep for overall 0.1 release

This commit makes a few tweaks to the new `easy` module:

- Rename `Parse` to `Decode`, and `Serialize` to `Encode`.

- Don't use `Poll` for the `decode` method; we prefer to reserve
  that type for actual aync events, and in particular for a `NotReady`
  result to imply that some task scheduling has taken place. Instead,
  use an internal `Option`.
This commit is contained in:
Aaron Turon 2016-10-25 11:56:07 -07:00
parent 623ce443d8
commit 503f4a0405
2 changed files with 75 additions and 77 deletions

View File

@ -8,11 +8,11 @@
//!
//! Currently this module primarily contains `EasyFramed`, a struct which
//! implements the `FramedIo` trait in `tokio_core::io`. This structure allows
//! simply defining a parser (via the `Parse` trait) and a serializer (via the
//! `Serialize` trait) and transforming a stream of bytes into a stream of
//! frames. Additionally the `Parse` trait passes an `EasyBuf`, another type
//! here, which primarily supports `drain_to`, to extract bytes without copying
//! them.
//! simply defining a decoder (via the `Decode` trait) and a
//! encoder (via the `Encode` trait) and transforming a stream of bytes
//! into a stream of frames. Additionally the `Decode` trait passes an
//! `EasyBuf`, another type here, which primarily supports `drain_to`, to
//! extract bytes without copying them.
//!
//! For more information see the `EasyFramed` and `EasyBuf` types.
@ -202,7 +202,7 @@ impl<'a> Drop for EasyBufMut<'a> {
}
/// An implementation of the `FramedIo` trait building on instances of the
/// `Parse` and `Serialize` traits.
/// `Decode` and `Encode` traits.
///
/// Many I/O streams are simply a framed protocol on both the inbound and
/// outbound halves. In essence the underlying stream of bytes can be converted
@ -210,124 +210,123 @@ impl<'a> Drop for EasyBufMut<'a> {
/// stream deals with reading and writing frames.
///
/// This struct is essentially a convenience implementation of the `FramedIo`
/// which only requires knowledge of how to parse and serialize types. It is
/// constructed with an arbitrary `Io` instance along with how to parse and
/// serialize the frames that this `EasyFramed` will be yielding.
/// which only requires knowledge of how to (de)encode types. It is
/// constructed with an arbitrary `Io` instance along with a encoder and
/// decoder for the frames that this `EasyFramed` will be yielding.
///
/// This implementation of `FramedIo` uses the `EasyBuf` type from the `bytes`
/// crate for the backing storage, which should allow for zero-copy parsing
/// where possible.
pub struct EasyFramed<T, P, S> {
/// crate for the backing storage, which should allow for zero-copy
/// decoding where possible.
pub struct EasyFramed<T, D, S> {
upstream: T,
parse: P,
serialize: S,
decode: D,
encode: S,
eof: bool,
is_readable: bool,
rd: EasyBuf,
wr: Vec<u8>,
}
/// Implementation of parsing a frame from an internal buffer.
/// Decoding of a frame from an internal buffer.
///
/// This trait is used when constructing an instance of `EasyFramed`. It defines how
/// to parse the incoming bytes on a stream to the specified type of frame for
/// to decode the incoming bytes on a stream to the specified type of frame for
/// that framed I/O stream.
///
/// The primary method of this trait, `parse`, attempts to parse a method from a
/// buffer of bytes. It has the option of returning `NotReady`, indicating that
/// more bytes need to be read before parsing can continue as well.
pub trait Parse {
/// The type that this instance of `Parse` will attempt to be parsing.
/// The primary method of this trait, `decode`, attempts to decode a
/// frame from a buffer of bytes. It has the option of returning `NotReady`,
/// indicating that more bytes need to be read before decoding can
/// continue.
pub trait Decode {
/// The type of frame that this decoder produces.
///
/// This is typically a frame being parsed from an input stream, such as an
/// HTTP request, a Redis command, etc.
type Out;
/// Attempts to parse a frame from the provided buffer of bytes.
/// Attempts to decode a frame from the provided buffer of bytes.
///
/// This method is called by `EasyFramed` whenever bytes are ready to be parsed.
/// The provided buffer of bytes is what's been read so far, and this
/// instance of `Parse` can determine whether an entire frame is in the
/// instance of `Decode` can determine whether an entire frame is in the
/// buffer and is ready to be returned.
///
/// If an entire frame is available, then this instance will remove those
/// bytes from the buffer provided and return them as a parsed frame. Note
/// that removing bytes from the provided buffer doesn't always necessarily
/// copy the bytes, so this should be an efficient operation in most
/// circumstances.
/// bytes from the buffer provided and return them as a decoded
/// frame. Note that removing bytes from the provided buffer doesn't always
/// necessarily copy the bytes, so this should be an efficient operation in
/// most circumstances.
///
/// If the bytes look valid, but a frame isn't fully available yet, then
/// `Async::NotReady` is returned. This indicates to the `EasyFramed` instance
/// that it needs to read some more bytes before calling this method again.
/// `Ok(None)` is returned. This indicates to the `EasyFramed` instance that
/// it needs to read some more bytes before calling this method again.
///
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `EasyFramed` that the stream is now
/// corrupt and should be terminated.
fn parse(&mut self, buf: &mut EasyBuf) -> Poll<Self::Out, io::Error>;
fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::Out>, io::Error>;
/// A default method available to be called when there are no more bytes
/// available to be read from the underlying I/O.
///
/// This method defaults to calling `parse` and returns an error if
/// `NotReady` is returned. Typically this doesn't need to be implemented
/// This method defaults to calling `decode` and returns an error if
/// `Ok(None)` is returned. Typically this doesn't need to be implemented
/// unless the framing protocol differs near the end of the stream.
fn done(&mut self, buf: &mut EasyBuf) -> io::Result<Self::Out> {
match try!(self.parse(buf)) {
Async::Ready(frame) => Ok(frame),
Async::NotReady => Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream")),
match try!(self.decode(buf)) {
Some(frame) => Ok(frame),
None => Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream")),
}
}
}
/// A trait for serializing frames into a byte buffer.
/// A trait for encoding frames into a byte buffer.
///
/// This trait is used as a building block of `EasyFramed` to define how frames are
/// serialized into bytes to get passed to the underlying byte stream. Each
/// frame written to `EasyFramed` will be serialized with this trait to an internal
/// encoded into bytes to get passed to the underlying byte stream. Each
/// frame written to `EasyFramed` will be encoded with this trait to an internal
/// buffer. That buffer is then written out when possible to the underlying I/O
/// stream.
pub trait Serialize {
/// The frame that's being serialized to a byte buffer.
pub trait Encode {
/// The frame that's being encoded to a byte buffer.
///
/// This type is the type of frame that's also being written to a `EasyFramed`.
type In;
/// Serializes a frame into the buffer provided.
/// Encodes a frame into the buffer provided.
///
/// This method will serialize `msg` into the byte buffer provided by `buf`.
/// This method will encode `msg` into the byte buffer provided by `buf`.
/// The `buf` provided is an internal buffer of the `EasyFramed` instance and
/// will be written out when possible.
fn serialize(&mut self, msg: Self::In, buf: &mut Vec<u8>);
fn encode(&mut self, msg: Self::In, buf: &mut Vec<u8>);
}
impl<T, P, S> EasyFramed<T, P, S>
impl<T, D, S> EasyFramed<T, D, S>
where T: Io,
P: Parse,
S: Serialize,
D: Decode,
S: Encode,
{
/// Creates a new instance of `EasyFramed` from the given component pieces.
///
/// This method will create a new instance of `EasyFramed` which implements
/// `FramedIo` for reading and writing frames from an underlying I/O stream.
/// The `upstream` argument here is the byte-based I/O stream that it will
/// be operating on. Data will be read from this stream and parsed with
/// `parse` into frames. Frames written to this instance will be serialized
/// by `serialize` and then written to `upstream`.
/// be operating on. Data will be read from this stream and decoded with
/// `decode` into frames. Frames written to this instance will be
/// encoded by `encode` and then written to `upstream`.
///
/// The `rd` and `wr` buffers provided are used for reading and writing
/// bytes and provide a small amount of control over how buffering happens.
pub fn new(upstream: T,
parse: P,
serialize: S) -> EasyFramed<T, P, S> {
decode: D,
encode: S) -> EasyFramed<T, D, S> {
trace!("creating new framed transport");
EasyFramed {
upstream: upstream,
parse: parse,
serialize: serialize,
decode: decode,
encode: encode,
is_readable: false,
eof: false,
rd: EasyBuf::new(),
@ -366,8 +365,8 @@ impl<T, P, S> EasyFramed<T, P, S>
impl<T, P, S> FramedIo for EasyFramed<T, P, S>
where T: Io,
P: Parse,
S: Serialize,
P: Decode,
S: Encode,
{
type In = S::In;
type Out = Option<P::Out>;
@ -383,20 +382,20 @@ impl<T, P, S> FramedIo for EasyFramed<T, P, S>
fn read(&mut self) -> Poll<Self::Out, io::Error> {
loop {
// If the read buffer has any pending data, then it could be
// possible that `parse` will return a new frame. We leave it to
// the parser to optimize detecting that more data is required.
// possible that `decode` will return a new frame. We leave it to
// the decoder to optimize detecting that more data is required.
if self.is_readable {
if self.eof {
if self.rd.len() == 0 {
return Ok(None.into())
} else {
let frame = try!(self.parse.done(&mut self.rd));
let frame = try!(self.decode.done(&mut self.rd));
return Ok(Some(frame).into())
}
}
trace!("attempting to parse a frame");
if let Async::Ready(frame) = try!(self.parse.parse(&mut self.rd)) {
trace!("frame parsed from buffer");
trace!("attempting to decode a frame");
if let Some(frame) = try!(self.decode.decode(&mut self.rd)) {
trace!("frame decoded from buffer");
return Ok(Some(frame).into());
}
self.is_readable = false;
@ -436,8 +435,8 @@ impl<T, P, S> FramedIo for EasyFramed<T, P, S>
"transport not currently writable"));
}
// Serialize the msg
self.serialize.serialize(msg, &mut self.wr);
// Encode the msg
self.encode.encode(msg, &mut self.wr);
// TODO: should provide some backpressure, such as when the buffer is
// too full this returns `NotReady` or something like that.
@ -463,4 +462,3 @@ impl<T, P, S> FramedIo for EasyFramed<T, P, S>
}
}
}

View File

@ -8,20 +8,20 @@ use std::net::Shutdown;
use futures::{Future, Async, Poll};
use futures::stream::Stream;
use tokio_core::io::{FramedIo, write_all, read};
use tokio_core::easy::{Parse, Serialize, EasyFramed, EasyBuf};
use tokio_core::easy::{Encode, Decode, EasyFramed, EasyBuf};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::Core;
pub struct LineParser;
pub struct LineSerialize;
pub struct LineDecoder;
pub struct LineEncoder;
impl Parse for LineParser {
impl Decode for LineDecoder {
type Out = EasyBuf;
fn parse(&mut self, buf: &mut EasyBuf) -> Poll<EasyBuf, io::Error> {
fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<EasyBuf>, io::Error> {
match buf.as_slice().iter().position(|&b| b == b'\n') {
Some(i) => Ok(buf.drain_to(i + 1).into()),
None => Ok(Async::NotReady),
Some(i) => Ok(Some(buf.drain_to(i + 1).into())),
None => Ok(None),
}
}
@ -31,10 +31,10 @@ impl Parse for LineParser {
}
}
impl Serialize for LineSerialize {
impl Encode for LineEncoder {
type In = EasyBuf;
fn serialize(&mut self, msg: EasyBuf, into: &mut Vec<u8>) {
fn encode(&mut self, msg: EasyBuf, into: &mut Vec<u8>) {
into.extend_from_slice(msg.as_slice());
}
}
@ -97,7 +97,7 @@ fn echo() {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
let addr = listener.local_addr().unwrap();
let srv = listener.incoming().for_each(move |(socket, _)| {
let framed = EasyFramed::new(socket, LineParser, LineSerialize);
let framed = EasyFramed::new(socket, LineDecoder, LineEncoder);
handle.spawn(EchoFramed { inner: framed, eof: false });
Ok(())
});