util: add links to tokio-util + example to BytesCodec (#2207)

This commit is contained in:
Alice Ryhl 2020-02-01 23:04:58 +01:00 committed by GitHub
parent 64e75ad1b0
commit 79e4514283
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 127 additions and 44 deletions

View File

@ -4,7 +4,43 @@ use crate::codec::encoder::Encoder;
use bytes::{BufMut, Bytes, BytesMut};
use std::io;
/// A simple `Codec` implementation that just ships bytes around.
/// A simple [`Decoder`] and [`Encoder`] implementation that just ships bytes around.
///
/// [`Decoder`]: crate::codec::Decoder
/// [`Encoder`]: crate::codec::Encoder
///
/// # Example
///
/// Turn an [`AsyncRead`] into a stream of `Result<`[`BytesMut`]`, `[`Error`]`>`.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`BytesMut`]: bytes::BytesMut
/// [`Error`]: std::io::Error
///
/// ```
/// # mod hidden {
/// # #[allow(unused_imports)]
/// use tokio::fs::File;
/// # }
/// use tokio::io::AsyncRead;
/// use tokio_util::codec::{FramedRead, BytesCodec};
///
/// # enum File {}
/// # impl File {
/// # async fn open(_name: &str) -> Result<impl AsyncRead, std::io::Error> {
/// # use std::io::Cursor;
/// # Ok(Cursor::new(vec![0, 1, 2, 3, 4, 5]))
/// # }
/// # }
/// #
/// # #[tokio::main(core_threads = 1)]
/// # async fn main() -> Result<(), std::io::Error> {
/// let my_async_read = File::open("filename.txt").await?;
/// let my_stream_of_bytes = FramedRead::new(my_async_read, BytesCodec::new());
/// # Ok(())
/// # }
/// ```
///
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
pub struct BytesCodec(());

View File

@ -8,14 +8,17 @@ use std::io;
/// Decoding of frames via buffers.
///
/// This trait is used when constructing an instance of `Framed` or
/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has
/// This trait is used when constructing an instance of [`Framed`] or
/// [`FramedRead`]. An implementation of `Decoder` takes a byte stream that has
/// already been buffered in `src` and decodes the data into a stream of
/// `Self::Item` frames.
///
/// Implementations are able to track state on `self`, which enables
/// implementing stateful streaming parsers. In many cases, though, this type
/// will simply be a unit struct (e.g. `struct HttpDecoder`).
///
/// [`Framed`]: crate::codec::Framed
/// [`FramedRead`]: crate::codec::FramedRead
pub trait Decoder {
/// The type of decoded frames.
type Item;
@ -27,17 +30,19 @@ pub trait Decoder {
/// useful to report the failure as an `Item`.
///
/// `From<io::Error>` is required in the interest of making `Error` suitable
/// for returning directly from a `FramedRead`, and to enable the default
/// for returning directly from a [`FramedRead`], and to enable the default
/// implementation of `decode_eof` to yield an `io::Error` when the decoder
/// fails to consume all available data.
///
/// Note that implementors of this trait can simply indicate `type Error =
/// io::Error` to use I/O errors as this type.
///
/// [`FramedRead`]: crate::codec::FramedRead
type Error: From<io::Error>;
/// Attempts to decode a frame from the provided buffer of bytes.
///
/// This method is called by `FramedRead` whenever bytes are ready to be
/// This method is called by [`FramedRead`] whenever bytes are ready to be
/// parsed. The provided buffer of bytes is what's been read so far, and
/// this instance of `Decode` can determine whether an entire frame is in
/// the buffer and is ready to be returned.
@ -49,7 +54,7 @@ pub trait Decoder {
/// most circumstances.
///
/// If the bytes look valid, but a frame isn't fully available yet, then
/// `Ok(None)` is returned. This indicates to the `Framed` instance that
/// `Ok(None)` is returned. This indicates to the [`Framed`] instance that
/// it needs to read some more bytes before calling this method again.
///
/// Note that the bytes provided may be empty. If a previous call to
@ -58,9 +63,12 @@ pub trait Decoder {
/// be read.
///
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `Framed` that the stream is now
/// returned indicating why. This informs [`Framed`] that the stream is now
/// corrupt and should be terminated.
///
/// [`Framed`]: crate::codec::Framed
/// [`FramedRead`]: crate::codec::FramedRead
///
/// # Buffer management
///
/// Before returning from the function, implementations should ensure that
@ -128,7 +136,7 @@ pub trait Decoder {
}
}
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
@ -143,8 +151,12 @@ pub trait Decoder {
/// underlying object.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// calling `split` on the [`Framed`] returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
///
/// [`Stream`]: tokio::stream::Stream
/// [`Sink`]: futures_sink::Sink
/// [`Framed`]: crate::codec::Framed
fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self>
where
Self: Encoder + Sized,

View File

@ -2,21 +2,27 @@ use bytes::BytesMut;
use std::io;
/// Trait of helper objects to write out messages as bytes, for use with
/// `FramedWrite`.
/// [`FramedWrite`].
///
/// [`FramedWrite`]: crate::codec::FramedWrite
pub trait Encoder {
/// The type of items consumed by the `Encoder`
type Item;
/// The type of encoding errors.
///
/// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>`
/// [`FramedWrite`] requires `Encoder`s errors to implement `From<io::Error>`
/// in the interest letting it return `Error`s directly.
///
/// [`FramedWrite`]: crate::codec::FramedWrite
type Error: From<io::Error>;
/// Encodes a frame into the buffer provided.
///
/// This method will encode `item` into the byte buffer provided by `dst`.
/// The `dst` provided is an internal buffer of the `Framed` instance and
/// The `dst` provided is an internal buffer of the [`FramedWrite`] instance and
/// will be written out when possible.
///
/// [`FramedWrite`]: crate::codec::FramedWrite
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
}

View File

@ -18,10 +18,16 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
/// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
/// by using the `new` function seen below.
///
/// [`Stream`]: tokio::stream::Stream
/// [`Sink`]: futures_sink::Sink
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Decoder::framed`]: crate::codec::Decoder::framed()
pub struct Framed<T, U> {
#[pin]
inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
@ -63,23 +69,29 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
/// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
/// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// method layers framing on top of an I/O object, by using the codec
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// This function returns a *single* object that is both [`Stream`] and
/// [`Sink`]; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// calling [`split`] on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
///
/// [`Stream`]: tokio::stream::Stream
/// [`Sink`]: futures_sink::Sink
/// [`Decode`]: crate::codec::Decoder
/// [`Encoder`]: crate::codec::Encoder
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn new(inner: T, codec: U) -> Framed<T, U> {
Framed {
inner: framed_read2(framed_write2(Fuse { io: inner, codec })),
@ -88,8 +100,8 @@ where
}
impl<T, U> Framed<T, U> {
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
/// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
/// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
@ -97,17 +109,24 @@ impl<T, U> Framed<T, U> {
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// This function returns a *single* object that is both [`Stream`] and
/// [`Sink`]; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// This objects takes a stream and a readbuffer and a writebuffer. These field
/// can be obtained from an existing `Framed` with the `into_parts` method.
/// can be obtained from an existing `Framed` with the [`into_parts`] method.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `Framed` returned by this method, which will
/// calling [`split`] on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
///
/// [`Stream`]: tokio::stream::Stream
/// [`Sink`]: futures_sink::Sink
/// [`Decoder`]: crate::codec::Decoder
/// [`Encoder`]: crate::codec::Encoder
/// [`into_parts`]: crate::codec::Framed::into_parts()
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
inner: framed_read2_with_buffer(
@ -124,7 +143,7 @@ impl<T, U> Framed<T, U> {
}
/// Returns a reference to the underlying I/O stream wrapped by
/// `Frame`.
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
@ -134,7 +153,7 @@ impl<T, U> Framed<T, U> {
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Frame`.
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
@ -144,7 +163,7 @@ impl<T, U> Framed<T, U> {
}
/// Returns a reference to the underlying codec wrapped by
/// `Frame`.
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying codec
/// as it may corrupt the stream of frames otherwise being worked with.
@ -153,7 +172,7 @@ impl<T, U> Framed<T, U> {
}
/// Returns a mutable reference to the underlying codec wrapped by
/// `Frame`.
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying codec
/// as it may corrupt the stream of frames otherwise being worked with.
@ -166,7 +185,7 @@ impl<T, U> Framed<T, U> {
self.inner.buffer()
}
/// Consumes the `Frame`, returning its underlying I/O stream.
/// Consumes the `Framed`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
@ -175,7 +194,7 @@ impl<T, U> Framed<T, U> {
self.inner.into_inner().into_inner().io
}
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// Consumes the `Framed`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
@ -338,8 +357,10 @@ impl<T, U: Encoder> Encoder for Fuse<T, U> {
}
/// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new `Framed` with a different codec.
/// It can be used to construct a new [`Framed`] with a different codec.
/// It contains all current buffers and the inner transport.
///
/// [`Framed`]: crate::codec::Framed
#[derive(Debug)]
pub struct FramedParts<T, U> {
/// The inner transport used to read bytes to and write bytes to
@ -360,7 +381,7 @@ pub struct FramedParts<T, U> {
}
impl<T, U> FramedParts<T, U> {
/// Create a new, default, `FramedParts`
/// Create a new, default, `FramedParts`.
pub fn new(io: T, codec: U) -> FramedParts<T, U> {
FramedParts {
io,

View File

@ -12,7 +12,10 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// A `Stream` of messages decoded from an `AsyncRead`.
/// A [`Stream`] of messages decoded from an [`AsyncRead`].
///
/// [`Stream`]: tokio::stream::Stream
/// [`AsyncRead`]: tokio::io::AsyncRead
pub struct FramedRead<T, D> {
#[pin]
inner: FramedRead2<Fuse<T, D>>,

View File

@ -19,7 +19,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// A `Sink` of frames encoded to an `AsyncWrite`.
/// A [`Sink`] of frames encoded to an `AsyncWrite`.
///
/// [`Sink`]: futures_sink::Sink
pub struct FramedWrite<T, E> {
#[pin]
inner: FramedWrite2<Fuse<T, E>>,

View File

@ -4,7 +4,10 @@ use crate::codec::encoder::Encoder;
use bytes::{Buf, BufMut, BytesMut};
use std::{cmp, fmt, io, str, usize};
/// A simple `Codec` implementation that splits up data into lines.
/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines.
///
/// [`Decoder`]: crate::codec::Decoder
/// [`Encoder`]: crate::codec::Encoder
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct LinesCodec {
// Stored index of the next index to examine for a `\n` character.
@ -33,7 +36,7 @@ impl LinesCodec {
/// of a buffered line. See the documentation for [`new_with_max_length`]
/// for information on why this could be a potential security risk.
///
/// [`new_with_max_length`]: #method.new_with_max_length
/// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length()
pub fn new() -> LinesCodec {
LinesCodec {
next_index: 0,
@ -45,7 +48,7 @@ impl LinesCodec {
/// Returns a `LinesCodec` with a maximum line length limit.
///
/// If this is set, calls to `LinesCodec::decode` will return a
/// [`LengthError`] when a line exceeds the length limit. Subsequent calls
/// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls
/// will discard up to `limit` bytes from that line until a newline
/// character is reached, returning `None` until the line over the limit
/// has been fully discarded. After that point, calls to `decode` will
@ -59,7 +62,7 @@ impl LinesCodec {
/// exploit this unbounded buffer by sending an unbounded amount of input
/// without any `\n` characters, causing unbounded memory consumption.
///
/// [`LengthError`]: ../struct.LengthError
/// [`LinesCodecError`]: crate::codec::LinesCodecError
pub fn new_with_max_length(max_length: usize) -> Self {
LinesCodec {
max_length,

View File

@ -4,10 +4,10 @@
//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
//! Framed streams are also known as transports.
//!
//! [`AsyncRead`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncRead.html
//! [`AsyncWrite`]: https://docs.rs/tokio/*/tokio/io/trait.AsyncWrite.html
//! [`Stream`]: https://docs.rs/tokio/*/tokio/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures-sink/*/futures_sink/trait.Sink.html
//! [`AsyncRead`]: tokio::io::AsyncRead
//! [`AsyncWrite`]: tokio::io::AsyncWrite
//! [`Stream`]: tokio::stream::Stream
//! [`Sink`]: futures_sink::Sink
mod bytes_codec;
pub use self::bytes_codec::BytesCodec;