codec: move into tokio-util (#1675)

Related to #1318, Tokio APIs that are "less stable" are moved into a new
`tokio-util` crate. This crate will mirror `tokio` and provide
additional APIs that may require a greater rate of breaking changes.

As examples require `tokio-util`, they are moved into a separate
crate (`examples`). This has the added advantage of being able to avoid
example only dependencies in the `tokio` crate.
This commit is contained in:
Carl Lerche 2019-10-22 10:13:49 -07:00 committed by GitHub
parent b8cee1a60a
commit cfc15617a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 324 additions and 485 deletions

View File

@ -2,7 +2,6 @@
members = [
"tokio",
"tokio-codec",
"tokio-executor",
"tokio-io",
"tokio-macros",
@ -10,5 +9,9 @@ members = [
"tokio-sync",
"tokio-test",
"tokio-tls",
"tokio-util",
# Internal
"examples",
"build-tests",
]

View File

@ -94,7 +94,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
```
More examples can be found [here](tokio/examples). Note that the `master` branch
More examples can be found [here](examples). Note that the `master` branch
is currently being updated to use `async` / `await`. The examples are
not fully ported. Examples for stable Tokio can be found
[here](https://github.com/tokio-rs/tokio/tree/v0.1.x/tokio/examples).

View File

@ -26,7 +26,6 @@ jobs:
cross: true
crates:
tokio:
- codec
- fs
- io
- net
@ -61,7 +60,6 @@ jobs:
displayName: Test sub crates -
rust: beta
crates:
tokio-codec: []
tokio-executor:
- current-thread
- thread-pool
@ -71,6 +69,8 @@ jobs:
- async-traits
tokio-macros: []
tokio-test: []
tokio-util: []
examples: []
# Test compilation failure
- template: ci/azure-test-stable.yml

View File

@ -2,10 +2,10 @@
# repository.
[patch.crates-io]
tokio = { path = "tokio" }
tokio-codec = { path = "tokio-codec" }
tokio-executor = { path = "tokio-executor" }
tokio-io = { path = "tokio-io" }
tokio-macros = { path = "tokio-macros" }
tokio-net = { path = "tokio-net" }
tokio-sync = { path = "tokio-sync" }
tokio-tls = { path = "tokio-tls" }
tokio-util = { path = "tokio-util" }

52
examples/Cargo.toml Normal file
View File

@ -0,0 +1,52 @@
[package]
name = "examples"
version = "0.0.0"
publish = false
edition = "2018"
[dev-dependencies]
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" }
bytes = "0.4.12"
futures-preview = "=0.3.0-alpha.19"
[[example]]
name = "chat"
path = "chat.rs"
[[example]]
name = "connect"
path = "connect.rs"
[[example]]
name = "echo-udp"
path = "echo-udp.rs"
[[example]]
name = "echo"
path = "echo.rs"
[[example]]
name = "hello_world"
path = "hello_world.rs"
[[example]]
name = "print_each_packet"
path = "print_each_packet.rs"
[[example]]
name = "proxy"
path = "proxy.rs"
[[example]]
name = "tinydb"
path = "tinydb.rs"
[[example]]
name = "udp-client"
path = "udp-client.rs"
[[example]]
name = "udp-codec"
path = "udp-codec.rs"

View File

@ -26,17 +26,19 @@
#![warn(rust_2018_idioms)]
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
use futures::{Poll, SinkExt, Stream, StreamExt};
use std::{
collections::HashMap, env, error::Error, io, net::SocketAddr, pin::Pin, sync::Arc,
task::Context,
};
use tokio::{
self,
codec::{Framed, LinesCodec, LinesCodecError},
net::{TcpListener, TcpStream},
sync::{mpsc, Mutex},
};
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

View File

@ -16,13 +16,14 @@
#![warn(rust_2018_idioms)]
use tokio::io;
use tokio::sync::{mpsc, oneshot};
use tokio_util::codec::{FramedRead, FramedWrite};
use futures::{SinkExt, Stream};
use std::{env, error::Error, net::SocketAddr};
use tokio::{
codec::{FramedRead, FramedWrite},
io,
sync::{mpsc, oneshot},
};
use std::env;
use std::error::Error;
use std::net::SocketAddr;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
@ -83,10 +84,8 @@ mod tcp {
use super::codec;
use futures::{future, Sink, SinkExt, Stream, StreamExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::{
codec::{FramedRead, FramedWrite},
net::TcpStream,
};
use tokio::net::TcpStream;
use tokio_util::codec::{FramedRead, FramedWrite};
pub async fn connect(
addr: &SocketAddr,
@ -171,7 +170,7 @@ mod udp {
mod codec {
use bytes::{BufMut, BytesMut};
use std::io;
use tokio::codec::{Decoder, Encoder};
use tokio_util::codec::{Decoder, Encoder};
/// A simple `Codec` implementation that just ships bytes around.
///

View File

@ -54,10 +54,9 @@
#![warn(rust_2018_idioms)]
use tokio;
use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_util::codec::{BytesCodec, Decoder};
use std::env;

View File

@ -41,17 +41,15 @@
#![warn(rust_2018_idioms)]
use tokio::net::TcpListener;
use tokio_util::codec::{Framed, LinesCodec};
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpListener;
use futures::{SinkExt, StreamExt};
/// The in-memory database shared amongst all clients.
///
/// This database will be shared via `Arc`, so to mutate the internal map we're

View File

@ -6,26 +6,23 @@
//! new message with a new destination. Overall, we then use this to construct a
//! "ping pong" pair where two sockets are sending messages back and forth.
#![cfg(feature = "rt-full")]
#![warn(rust_2018_idioms)]
use tokio::future::FutureExt as TokioFutureExt;
use tokio::io;
use tokio::net::UdpSocket;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use bytes::Bytes;
use futures::{FutureExt, SinkExt, StreamExt};
use std::env;
use std::error::Error;
use std::net::SocketAddr;
use std::time::Duration;
use bytes::Bytes;
use futures::{FutureExt, SinkExt, StreamExt};
use tokio::codec::BytesCodec;
use tokio::future::FutureExt as TokioFutureExt;
use tokio::io;
use tokio::net::{UdpFramed, UdpSocket};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::init();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string());
// Bind both our sockets and then figure out what ports we got.

View File

@ -1,35 +0,0 @@
# 0.2.0-alpha.6 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.2.0-alpha.5 (September 19, 2019)
- Track tokio release
# 0.2.0-alpha.4 (August 29, 2019)
- Track tokio release.
# 0.2.0-alpha.3 (August 28, 2019)
### Fix
- Infinite loop in `LinesCodec` (#1489).
# 0.2.0-alpha.2 (August 17, 2019)
### Changed
- Update `futures` dependency to 0.3.0-alpha.18.
# 0.2.0-alpha.1 (August 8, 2019)
### Changed
- Switch to `async`, `await`, and `std::future`.
# 0.1.1 (September 26, 2018)
* Allow setting max line length with `LinesCodec` (#632)
# 0.1.0 (June 13, 2018)
* Initial release (#353)

View File

@ -62,7 +62,6 @@ uds = [
log = ["tracing/log"]
[dependencies]
tokio-codec = { version = "=0.2.0-alpha.6", path = "../tokio-codec" }
tokio-executor = { version = "=0.2.0-alpha.6", features = ["blocking"], path = "../tokio-executor" }
tokio-io = { version = "=0.2.0-alpha.6", path = "../tokio-io" }
tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" }

View File

@ -55,10 +55,11 @@
//! We can also read input line by line.
//!
//! ```no_run
//! use tokio::io::{BufReader, AsyncBufReadExt};
//! use tokio::process::Command;
//!
//! use futures_util::stream::StreamExt;
//! use std::process::{Stdio};
//! use tokio::codec::{FramedRead, LinesCodec};
//! use tokio_net::process::Command;
//! use std::process::Stdio;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -77,7 +78,7 @@
//! let stdout = child.stdout().take()
//! .expect("child did not have a handle to stdout");
//!
//! let mut reader = FramedRead::new(stdout, LinesCodec::new());
//! let mut reader = BufReader::new(stdout).lines();
//!
//! // Ensure the child process is spawned in the runtime so it can
//! // make progress on its own while we await for any output.

View File

@ -7,9 +7,7 @@
//!
//! [`UdpSocket`]: struct.UdpSocket
mod frame;
mod socket;
pub mod split;
pub use self::frame::UdpFramed;
pub use self::socket::UdpSocket;

View File

@ -108,7 +108,7 @@ impl UdpSocket {
///
/// [`connect`]: #method.connect
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_send_priv(cx, buf)).await
poll_fn(|cx| self.poll_send(cx, buf)).await
}
// Poll IO functions that takes `&self` are provided for the split API.
@ -121,11 +121,8 @@ impl UdpSocket {
// While violating this requirement is "safe" from a Rust memory model point
// of view, it will result in unexpected behavior in the form of lost
// notifications and tasks hanging.
pub(crate) fn poll_send_priv(
&self,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
#[doc(hidden)]
pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send(buf) {
@ -150,14 +147,11 @@ impl UdpSocket {
///
/// [`connect`]: #method.connect
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
poll_fn(|cx| self.poll_recv(cx, buf)).await
}
pub(crate) fn poll_recv_priv(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
#[doc(hidden)]
pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv(buf) {
@ -178,7 +172,7 @@ impl UdpSocket {
let mut addrs = target.to_socket_addrs().await?;
match addrs.next() {
Some(target) => poll_fn(|cx| self.poll_send_to_priv(cx, buf, &target)).await,
Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await,
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no addresses to send data to",
@ -186,7 +180,9 @@ impl UdpSocket {
}
}
pub(crate) fn poll_send_to_priv(
// TODO: Public or not?
#[doc(hidden)]
pub fn poll_send_to(
&self,
cx: &mut Context<'_>,
buf: &[u8],
@ -210,10 +206,11 @@ impl UdpSocket {
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await
poll_fn(|cx| self.poll_recv_from(cx, buf)).await
}
pub(crate) fn poll_recv_from_priv(
#[doc(hidden)]
pub fn poll_recv_from(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],

View File

@ -86,7 +86,7 @@ impl UdpSocketRecvHalf {
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await
poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await
}
/// Returns a future that receives a single datagram message on the socket from
@ -102,7 +102,7 @@ impl UdpSocketRecvHalf {
///
/// [`connect`]: super::UdpSocket::connect
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await
poll_fn(|cx| self.0.poll_recv(cx, buf)).await
}
}
@ -120,7 +120,7 @@ impl UdpSocketSendHalf {
/// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`.
pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target)).await
poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await
}
/// Returns a future that sends data on the socket to the remote address to which it is connected.
@ -131,7 +131,7 @@ impl UdpSocketSendHalf {
///
/// [`connect`]: super::UdpSocket::connect
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await
poll_fn(|cx| self.0.poll_send(cx, buf)).await
}
}

View File

@ -1,175 +0,0 @@
use super::UnixDatagram;
use bytes::{BufMut, BytesMut};
use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream};
use std::io;
use std::os::unix::net::SocketAddr;
use std::path::Path;
use tokio_codec::{Decoder, Encoder};
/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// Unix datagram sockets work with datagrams, but higher-level code may wants to
/// batch these into meaningful chunks, called "frames". This method layers
/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
/// handle encoding and decoding of messages frames. Note that the incoming and
/// outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and `Sink`;
/// grouping this into a single object is often useful for layering things which
/// require both read and write access to the underlying object.
///
/// If you want to work more directly with the streams and sink, consider
/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break
/// them into separate objects, allowing them to interact more easily.
#[must_use = "sinks do nothing unless polled"]
#[derive(Debug)]
pub struct UnixDatagramFramed<A, C> {
socket: UnixDatagram,
codec: C,
rd: BytesMut,
wr: BytesMut,
out_addr: Option<A>,
flushed: bool,
}
impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> {
type Item = (C::Item, SocketAddr);
type Error = C::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.rd.reserve(INITIAL_RD_CAPACITY);
let (_n, addr) = unsafe {
let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
self.rd.advance_mut(n);
(n, addr)
};
let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n);
let _e = span.enter();
trace!("trying to decode a frame...");
let frame_res = self.codec.decode(&mut self.rd);
self.rd.clear();
let frame = frame_res?;
let result = frame.map(|frame| (frame, addr));
trace!("frame decoded from buffer");
Ok(Async::Ready(result))
}
}
impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> {
type SinkItem = (C::Item, A);
type SinkError = C::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let span = trace_span!("sending", to.addr = %item.0, flushed = self.flushed);
let _e = span.enter();
trace!("sending frame...");
if !self.flushed {
match self.poll_complete()? {
Async::Ready(()) => {}
Async::NotReady => return Ok(AsyncSink::NotReady(item)),
}
}
let (frame, out_addr) = item;
self.codec.encode(frame, &mut self.wr)?;
self.out_addr = Some(out_addr);
self.flushed = false;
trace!(message = "frame encoded", frame.length = pin.wr.len());
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), C::Error> {
if self.flushed {
return Ok(Async::Ready(()));
}
let span = trace_span!("flushing", to.addr = %self.out_addr);
let _e = span.enter();
let n = {
let out_path = match self.out_addr {
Some(ref out_path) => out_path.as_ref(),
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
"internal error: addr not available while data not flushed",
)
.into());
}
};
trace!(message = "flushing frame", frame.length = self.wr.len());
try_ready!(self.socket.poll_send_to(&self.wr, out_path))
};
let wrote_all = n == self.wr.len();
self.wr.clear();
self.flushed = true;
trace!(written.length = n, written.complete = wrote_all);
if wrote_all {
self.out_addr = None;
Ok(Async::Ready(()))
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"failed to write entire datagram to socket",
)
.into())
}
}
fn close(&mut self) -> Poll<(), C::Error> {
self.poll_complete()
}
}
const INITIAL_RD_CAPACITY: usize = 64 * 1024;
const INITIAL_WR_CAPACITY: usize = 8 * 1024;
impl<A, C> UnixDatagramFramed<A, C> {
/// Create a new `UnixDatagramFramed` backed by the given socket and codec.
///
/// See struct level documentation for more details.
pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> {
UnixDatagramFramed {
socket: socket,
codec: codec,
out_addr: None,
rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
flushed: true,
}
}
/// Returns a reference to the underlying I/O stream wrapped by `Framed`.
///
/// # Note
///
/// Care should be taken to not tamper with the underlying stream of data
/// coming in as it may corrupt the stream of frames otherwise being worked
/// with.
pub fn get_ref(&self) -> &UnixDatagram {
&self.socket
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Framed`.
///
/// # Note
///
/// Care should be taken to not tamper with the underlying stream of data
/// coming in as it may corrupt the stream of frames otherwise being worked
/// with.
pub fn get_mut(&mut self) -> &mut UnixDatagram {
&mut self.socket
}
}

View File

@ -3,16 +3,19 @@
//! This crate provides APIs for using Unix Domain Sockets with Tokio.
mod datagram;
// mod frame;
mod incoming;
mod listener;
pub mod split;
mod stream;
mod ucred;
pub use self::datagram::UnixDatagram;
mod incoming;
#[cfg(feature = "async-traits")]
pub use self::incoming::Incoming;
mod listener;
pub use self::listener::UnixListener;
pub mod split;
mod stream;
pub use self::stream::UnixStream;
mod ucred;
pub use self::ucred::UCred;

View File

@ -4,16 +4,15 @@
#[macro_use]
extern crate tracing;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio_net::process::{Child, Command};
use futures_util::future;
use futures_util::future::FutureExt;
use futures_util::stream::StreamExt;
use tokio::codec::{FramedRead, LinesCodec};
use tokio::io::AsyncWriteExt;
use tokio_net::process::{Child, Command};
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
mod support;
use support::*;
@ -51,7 +50,7 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
};
let read = async {
let mut reader = FramedRead::new(stdout, LinesCodec::new());
let mut reader = BufReader::new(stdout).lines();
let mut num_lines = 0;
// Try to read `n + 1` lines, ensuring the last one is empty

View File

@ -1,11 +1,6 @@
#![warn(rust_2018_idioms)]
use tokio_codec::{Decoder, Encoder};
use tokio_net::udp::{UdpFramed, UdpSocket};
use bytes::{BufMut, BytesMut};
use futures_util::{future::FutureExt, sink::SinkExt, stream::StreamExt, try_future::try_join};
use std::io;
use tokio_net::udp::UdpSocket;
#[tokio::test]
async fn send_recv() -> std::io::Result<()> {
@ -75,72 +70,3 @@ async fn reunite_error() -> std::io::Result<()> {
assert!(s.reunite(r1).is_err());
Ok(())
}
pub struct ByteCodec;
impl Decoder for ByteCodec {
type Item = Vec<u8>;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
let len = buf.len();
Ok(Some(buf.split_to(len).to_vec()))
}
}
impl Encoder for ByteCodec {
type Item = Vec<u8>;
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
buf.reserve(data.len());
buf.put(data);
Ok(())
}
}
#[tokio::test]
async fn send_framed() -> std::io::Result<()> {
let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
let a_addr = a_soc.local_addr()?;
let b_addr = b_soc.local_addr()?;
// test sending & receiving bytes
{
let mut a = UdpFramed::new(a_soc, ByteCodec);
let mut b = UdpFramed::new(b_soc, ByteCodec);
let msg = b"4567".to_vec();
let send = a.send((msg.clone(), b_addr));
let recv = b.next().map(|e| e.unwrap());
let (_, received) = try_join(send, recv).await.unwrap();
let (data, addr) = received;
assert_eq!(msg, data);
assert_eq!(a_addr, addr);
a_soc = a.into_inner();
b_soc = b.into_inner();
}
// test sending & receiving an empty message
{
let mut a = UdpFramed::new(a_soc, ByteCodec);
let mut b = UdpFramed::new(b_soc, ByteCodec);
let msg = b"".to_vec();
let send = a.send((msg.clone(), b_addr));
let recv = b.next().map(|e| e.unwrap());
let (_, received) = try_join(send, recv).await.unwrap();
let (data, addr) = received;
assert_eq!(msg, data);
assert_eq!(a_addr, addr);
}
Ok(())
}

0
tokio-util/CHANGELOG.md Normal file
View File

View File

@ -1,5 +1,5 @@
[package]
name = "tokio-codec"
name = "tokio-util"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
@ -13,14 +13,15 @@ authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-codec/0.2.0-alpha.6/tokio_codec"
documentation = "https://docs.rs/tokio-util/0.2.0-alpha.6/tokio_util"
description = """
Utilities for encoding and decoding frames.
Additional utilities for working with Tokio.
"""
categories = ["asynchronous"]
[dependencies]
tokio-io = { version = "=0.2.0-alpha.6", path = "../tokio-io" }
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
bytes = "0.4.7"
futures-core-preview = "=0.3.0-alpha.19"

View File

@ -1,4 +1,4 @@
# tokio-codec
# tokio-util
Utilities for encoding and decoding frames.

View File

@ -1,5 +1,6 @@
use crate::decoder::Decoder;
use crate::encoder::Encoder;
use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
use bytes::{BufMut, Bytes, BytesMut};
use std::io;

View File

@ -1,10 +1,10 @@
use bytes::BytesMut;
use std::io;
use crate::codec::encoder::Encoder;
use crate::codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use super::encoder::Encoder;
use super::Framed;
use bytes::BytesMut;
use std::io;
/// Decoding of frames via buffers.
///
@ -75,7 +75,7 @@ pub trait Decoder {
/// # use std::io;
/// #
/// # use bytes::BytesMut;
/// # use tokio_codec::Decoder;
/// # use tokio_util::codec::Decoder;
/// #
/// # struct MyCodec;
/// #

View File

@ -1,9 +1,7 @@
#![allow(deprecated)]
use crate::decoder::Decoder;
use crate::encoder::Encoder;
use crate::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
use crate::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
use tokio_io::{AsyncBufRead, AsyncRead, AsyncWrite};

View File

@ -1,5 +1,5 @@
use super::framed::Fuse;
use super::Decoder;
use crate::codec::framed::Fuse;
use crate::codec::Decoder;
use tokio_io::AsyncRead;

View File

@ -1,8 +1,6 @@
#![allow(deprecated)]
use super::framed::Fuse;
use crate::decoder::Decoder;
use crate::encoder::Encoder;
use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
use crate::codec::framed::Fuse;
use tokio_io::{AsyncBufRead, AsyncRead, AsyncWrite};

View File

@ -16,7 +16,7 @@
//!
//! ```
//! use tokio_io::{AsyncRead, AsyncWrite};
//! use tokio_codec::{Framed, LengthDelimitedCodec};
//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
//!
//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
//! -> Framed<T, LengthDelimitedCodec>
@ -39,8 +39,8 @@
//! Specifically, given the following:
//!
//! ```
//! use tokio::codec::{Framed, LengthDelimitedCodec};
//! use tokio::prelude::*;
//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
//!
//! use bytes::Bytes;
//!
@ -79,7 +79,7 @@
//!
//! ```
//! # use tokio_io::AsyncRead;
//! # use tokio_codec::LengthDelimitedCodec;
//! # use tokio_util::codec::LengthDelimitedCodec;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! LengthDelimitedCodec::builder()
//! .length_field_offset(0) // default value
@ -113,7 +113,7 @@
//!
//! ```
//! # use tokio_io::AsyncRead;
//! # use tokio_codec::LengthDelimitedCodec;
//! # use tokio_util::codec::LengthDelimitedCodec;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! LengthDelimitedCodec::builder()
//! .length_field_offset(0) // default value
@ -145,7 +145,7 @@
//!
//! ```
//! # use tokio_io::AsyncRead;
//! # use tokio_codec::LengthDelimitedCodec;
//! # use tokio_util::codec::LengthDelimitedCodec;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! LengthDelimitedCodec::builder()
//! .length_field_offset(0) // default value
@ -179,7 +179,7 @@
//!
//! ```
//! # use tokio_io::AsyncRead;
//! # use tokio_codec::LengthDelimitedCodec;
//! # use tokio_util::codec::LengthDelimitedCodec;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! LengthDelimitedCodec::builder()
//! .length_field_offset(0) // default value
@ -223,7 +223,7 @@
//!
//! ```
//! # use tokio_io::AsyncRead;
//! # use tokio_codec::LengthDelimitedCodec;
//! # use tokio_util::codec::LengthDelimitedCodec;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! LengthDelimitedCodec::builder()
//! .length_field_offset(1) // length of hdr1
@ -269,7 +269,7 @@
//!
//! ```
//! # use tokio_io::AsyncRead;
//! # use tokio_codec::LengthDelimitedCodec;
//! # use tokio_util::codec::LengthDelimitedCodec;
//! # fn bind_read<T: AsyncRead>(io: T) {
//! LengthDelimitedCodec::builder()
//! .length_field_offset(1) // length of hdr1
@ -314,7 +314,7 @@
//!
//! ```
//! # use tokio_io::AsyncWrite;
//! # use tokio_codec::LengthDelimitedCodec;
//! # use tokio_util::codec::LengthDelimitedCodec;
//! # fn write_frame<T: AsyncWrite>(io: T) {
//! # let _ =
//! LengthDelimitedCodec::builder()
@ -340,12 +340,14 @@
//! [`Encoder`]: ../trait.Encoder.html
//! [`BytesMut`]: https://docs.rs/bytes/0.4/bytes/struct.BytesMut.html
use crate::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
use std::error::Error as StdError;
use std::io::{self, Cursor};
use std::{cmp, fmt};
use tokio_io::{AsyncRead, AsyncWrite};
/// Configure length delimited `LengthDelimitedCodec`s.
///
@ -604,7 +606,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -648,7 +650,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -672,7 +674,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -696,7 +698,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -730,7 +732,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -754,7 +756,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -777,7 +779,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -798,7 +800,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -822,7 +824,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -841,7 +843,7 @@ impl Builder {
/// # Examples
///
/// ```
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
/// # pub fn main() {
/// LengthDelimitedCodec::builder()
/// .length_field_offset(0)
@ -864,7 +866,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncRead;
/// use tokio_codec::LengthDelimitedCodec;
/// use tokio_util::codec::LengthDelimitedCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// LengthDelimitedCodec::builder()
@ -889,7 +891,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::AsyncWrite;
/// # use tokio_codec::LengthDelimitedCodec;
/// # use tokio_util::codec::LengthDelimitedCodec;
/// # fn write_frame<T: AsyncWrite>(io: T) {
/// LengthDelimitedCodec::builder()
/// .length_field_length(2)
@ -910,7 +912,7 @@ impl Builder {
///
/// ```
/// # use tokio_io::{AsyncRead, AsyncWrite};
/// # use tokio_codec::LengthDelimitedCodec;
/// # use tokio_util::codec::LengthDelimitedCodec;
/// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
/// # let _ =
/// LengthDelimitedCodec::builder()

View File

@ -1,5 +1,6 @@
use crate::decoder::Decoder;
use crate::encoder::Encoder;
use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
use bytes::{BufMut, BytesMut};
use std::{cmp, fmt, io, str, usize};
@ -70,13 +71,13 @@ impl LinesCodec {
///
/// ```
/// use std::usize;
/// use tokio_codec::LinesCodec;
/// use tokio_util::codec::LinesCodec;
///
/// let codec = LinesCodec::new();
/// assert_eq!(codec.max_length(), usize::MAX);
/// ```
/// ```
/// use tokio_codec::LinesCodec;
/// use tokio_util::codec::LinesCodec;
///
/// let codec = LinesCodec::new_with_max_length(256);
/// assert_eq!(codec.max_length(), 256);

View File

@ -1,16 +1,3 @@
#![doc(html_root_url = "https://docs.rs/tokio-codec/0.2.0-alpha.6")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![deny(intra_doc_link_resolution_failure)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Utilities for encoding and decoding frames.
//!
//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
@ -26,19 +13,25 @@
mod macros;
mod bytes_codec;
mod decoder;
mod encoder;
mod framed;
mod framed_read;
mod framed_write;
pub mod length_delimited;
mod lines_codec;
pub use self::bytes_codec::BytesCodec;
pub use crate::bytes_codec::BytesCodec;
pub use crate::decoder::Decoder;
pub use crate::encoder::Encoder;
pub use crate::framed::{Framed, FramedParts};
pub use crate::framed_read::FramedRead;
pub use crate::framed_write::FramedWrite;
pub use crate::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
pub use crate::lines_codec::{LinesCodec, LinesCodecError};
mod decoder;
pub use self::decoder::Decoder;
mod encoder;
pub use self::encoder::Encoder;
mod framed;
pub use self::framed::{Framed, FramedParts};
mod framed_read;
pub use self::framed_read::FramedRead;
mod framed_write;
pub use self::framed_write::FramedWrite;
pub mod length_delimited;
pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
mod lines_codec;
pub use self::lines_codec::{LinesCodec, LinesCodecError};

17
tokio-util/src/lib.rs Normal file
View File

@ -0,0 +1,17 @@
#![doc(html_root_url = "https://docs.rs/tokio-util/0.2.0-alpha.6")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![deny(intra_doc_link_resolution_failure)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Utilities for working with Tokio.
pub mod codec;
pub mod udp;

View File

@ -1,14 +1,14 @@
use super::UdpSocket;
use crate::codec::{Decoder, Encoder};
use tokio_codec::{Decoder, Encoder};
use tokio::net::UdpSocket;
use bytes::{BufMut, BytesMut};
use core::task::{Context, Poll};
use futures_core::{ready, Stream};
use futures_sink::Sink;
use std::io;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::pin::Pin;
use std::task::{Context, Poll};
/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
@ -47,22 +47,17 @@ impl<C: Decoder + Unpin> Stream for UdpFramed<C> {
let (_n, addr) = unsafe {
// Read into the buffer without having to initialize the memory.
let res = ready!(Pin::new(&mut pin.socket).poll_recv_from_priv(cx, pin.rd.bytes_mut()));
let res = ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, pin.rd.bytes_mut()));
let (n, addr) = res?;
pin.rd.advance_mut(n);
(n, addr)
};
let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n);
let _e = span.enter();
trace!("trying to decode a frame...");
let frame_res = pin.codec.decode(&mut pin.rd);
pin.rd.clear();
let frame = frame_res?;
let result = frame.map(|frame| Ok((frame, addr))); // frame -> (frame, addr)
trace!("frame decoded from buffer");
Poll::Ready(result)
}
}
@ -84,16 +79,11 @@ impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> {
fn start_send(self: Pin<&mut Self>, item: (C::Item, SocketAddr)) -> Result<(), Self::Error> {
let (frame, out_addr) = item;
let span = trace_span!("sending", to.addr = %out_addr);
let _e = span.enter();
trace!("encoding frame...");
let pin = self.get_mut();
pin.codec.encode(frame, &mut pin.wr)?;
pin.out_addr = out_addr;
pin.flushed = false;
trace!(message = "frame encoded", frame.length = pin.wr.len());
Ok(())
}
@ -110,18 +100,12 @@ impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> {
..
} = *self;
let span = trace_span!("flushing", to.addr = %out_addr, frame.length = wr.len());
let _e = span.enter();
trace!("flushing frame...");
let n = ready!(socket.poll_send_to_priv(cx, &wr, &out_addr))?;
let n = ready!(socket.poll_send_to(cx, &wr, &out_addr))?;
let wrote_all = n == self.wr.len();
self.wr.clear();
self.flushed = true;
trace!(written.length = n, written.complete = wrote_all);
let res = if wrote_all {
Ok(())
} else {

View File

@ -0,0 +1,4 @@
//! UDP framing
mod frame;
pub use self::frame::UdpFramed;

View File

@ -1,7 +1,8 @@
#![warn(rust_2018_idioms)]
use tokio_util::codec::{BytesCodec, Decoder, Encoder, LinesCodec};
use bytes::{BufMut, Bytes, BytesMut};
use tokio_codec::{BytesCodec, Decoder, Encoder, LinesCodec};
#[test]
fn bytes_decoder() {

View File

@ -1,8 +1,8 @@
#![warn(rust_2018_idioms)]
use tokio::prelude::*;
use tokio_codec::{Decoder, Encoder, Framed, FramedParts};
use tokio_test::assert_ok;
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};
use bytes::{Buf, BufMut, BytesMut, IntoBuf};
use std::io::{self, Read};

View File

@ -1,9 +1,9 @@
#![warn(rust_2018_idioms)]
use tokio::prelude::*;
use tokio_codec::{Decoder, FramedRead};
use tokio_test::assert_ready;
use tokio_test::task::MockTask;
use tokio_util::codec::{Decoder, FramedRead};
use bytes::{Buf, BytesMut, IntoBuf};
use std::collections::VecDeque;

View File

@ -1,9 +1,9 @@
#![warn(rust_2018_idioms)]
use tokio_codec::{Encoder, FramedWrite};
use tokio_io::AsyncWrite;
use tokio_test::assert_ready;
use tokio_test::task::MockTask;
use tokio_util::codec::{Encoder, FramedWrite};
use bytes::{BufMut, BytesMut};
use futures_sink::Sink;

View File

@ -1,12 +1,12 @@
#![warn(rust_2018_idioms)]
use tokio::codec::*;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::*;
use tokio_test::task::MockTask;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
use tokio_util::codec::*;
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::pin_mut;

79
tokio-util/tests/udp.rs Normal file
View File

@ -0,0 +1,79 @@
use tokio::net::UdpSocket;
use tokio_util::codec::{Decoder, Encoder};
use tokio_util::udp::UdpFramed;
use bytes::{BufMut, BytesMut};
use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures_util::try_future::try_join;
use std::io;
#[tokio::test]
async fn send_framed() -> std::io::Result<()> {
let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
let a_addr = a_soc.local_addr()?;
let b_addr = b_soc.local_addr()?;
// test sending & receiving bytes
{
let mut a = UdpFramed::new(a_soc, ByteCodec);
let mut b = UdpFramed::new(b_soc, ByteCodec);
let msg = b"4567".to_vec();
let send = a.send((msg.clone(), b_addr));
let recv = b.next().map(|e| e.unwrap());
let (_, received) = try_join(send, recv).await.unwrap();
let (data, addr) = received;
assert_eq!(msg, data);
assert_eq!(a_addr, addr);
a_soc = a.into_inner();
b_soc = b.into_inner();
}
// test sending & receiving an empty message
{
let mut a = UdpFramed::new(a_soc, ByteCodec);
let mut b = UdpFramed::new(b_soc, ByteCodec);
let msg = b"".to_vec();
let send = a.send((msg.clone(), b_addr));
let recv = b.next().map(|e| e.unwrap());
let (_, received) = try_join(send, recv).await.unwrap();
let (data, addr) = received;
assert_eq!(msg, data);
assert_eq!(a_addr, addr);
}
Ok(())
}
pub struct ByteCodec;
impl Decoder for ByteCodec {
type Item = Vec<u8>;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
let len = buf.len();
Ok(Some(buf.split_to(len).to_vec()))
}
}
impl Encoder for ByteCodec {
type Item = Vec<u8>;
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
buf.reserve(data.len());
buf.put(data);
Ok(())
}
}

View File

@ -25,7 +25,6 @@ keywords = ["io", "async", "non-blocking", "futures"]
[features]
default = [
"codec",
"fs",
"io",
"net",
@ -36,7 +35,6 @@ default = [
"timer",
]
codec = ["io", "tokio-codec", "bytes"]
fs = []
io = ["tokio-io"]
macros = ["tokio-macros"]
@ -71,12 +69,10 @@ futures-sink-preview = "=0.3.0-alpha.19"
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink"] }
# Everything else is optional...
bytes = { version = "0.4", optional = true }
crossbeam-utils = { version = "0.6.0", optional = true }
num_cpus = { version = "1.8.0", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }
tokio-codec = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-codec" }
tokio-io = { version = "=0.2.0-alpha.6", optional = true, features = ["util"], path = "../tokio-io" }
tokio-executor = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-executor" }
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
@ -90,6 +86,7 @@ tokio-executor = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio
[dev-dependencies]
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }
tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" }
futures-preview = "=0.3.0-alpha.19"
futures-util-preview = "=0.3.0-alpha.19"

View File

@ -93,7 +93,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
```
More examples can be found [here](tokio/examples). Note that the `master` branch
More examples can be found [here](../examples). Note that the `master` branch
is currently being updated to use `async` / `await`. The examples are
not fully ported. Examples for stable Tokio can be found
[here](https://github.com/tokio-rs/tokio/tree/v0.1.x/tokio/examples).

View File

@ -52,10 +52,10 @@ pub mod udp {
//! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket.
//!
//! [`UdpSocket`]: struct.UdpSocket.html
pub use tokio_net::udp::{split, UdpFramed, UdpSocket};
pub use tokio_net::udp::{split, UdpSocket};
}
#[cfg(feature = "udp")]
pub use self::udp::{UdpFramed, UdpSocket};
pub use self::udp::UdpSocket;
#[cfg(all(unix, feature = "uds"))]
pub mod unix {