mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
codec: change Encoder to take &Item (#1746)
Co-authored-by: Markus Westerlind <marwes91@gmail.com>
This commit is contained in:
parent
1eb6131321
commit
9d4d076189
@ -6,7 +6,7 @@ edition = "2018"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "0.2.0", path = "../tokio", features = ["full"] }
|
||||
tokio-util = { version = "0.2.0", path = "../tokio-util", features = ["full"] }
|
||||
tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] }
|
||||
bytes = "0.5"
|
||||
futures = "0.3.0"
|
||||
http = "0.2"
|
||||
|
@ -193,9 +193,7 @@ async fn process(
|
||||
let mut lines = Framed::new(stream, LinesCodec::new());
|
||||
|
||||
// Send a prompt to the client to enter their username.
|
||||
lines
|
||||
.send(String::from("Please enter your username:"))
|
||||
.await?;
|
||||
lines.send("Please enter your username:").await?;
|
||||
|
||||
// Read the first line from the `LineCodec` stream to get the username.
|
||||
let username = match lines.next().await {
|
||||
@ -232,7 +230,7 @@ async fn process(
|
||||
// A message was received from a peer. Send it to the
|
||||
// current user.
|
||||
Ok(Message::Received(msg)) => {
|
||||
peer.lines.send(msg).await?;
|
||||
peer.lines.send(&msg).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
println!(
|
||||
|
@ -130,7 +130,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
let response = response.serialize();
|
||||
|
||||
if let Err(e) = lines.send(response).await {
|
||||
if let Err(e) = lines.send(response.as_str()).await {
|
||||
println!("error on sending response; error = {:?}", e);
|
||||
}
|
||||
}
|
||||
|
@ -96,8 +96,7 @@ struct Http;
|
||||
|
||||
/// Implementation of encoding an HTTP response into a `BytesMut`, basically
|
||||
/// just writing out an HTTP/1.1 response.
|
||||
impl Encoder for Http {
|
||||
type Item = Response<String>;
|
||||
impl Encoder<Response<String>> for Http {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> {
|
||||
|
@ -30,7 +30,7 @@ tokio = { version = "0.2.0", path = "../tokio" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "0.2.0", path = "../tokio", features = ["macros", "stream", "rt-core", "io-util", "net"] }
|
||||
tokio-util = { version = "0.2.0", path = "../tokio-util", features = ["full"] }
|
||||
tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] }
|
||||
|
||||
cfg-if = "0.1"
|
||||
env_logger = { version = "0.6", default-features = false }
|
||||
|
@ -1,3 +1,14 @@
|
||||
# 0.3.0 (February 28, 2020)
|
||||
|
||||
Breaking changes:
|
||||
|
||||
- Change codec::Encoder trait to take a generic Item parameter (#1746), which allows
|
||||
codec writers to pass references into `Framed` and `FramedWrite` types.
|
||||
|
||||
Other additions:
|
||||
|
||||
- Add futures-io/tokio::io compatibility layer (#2117)
|
||||
|
||||
# 0.2.0 (November 26, 2019)
|
||||
|
||||
- Initial release
|
||||
|
@ -7,7 +7,7 @@ name = "tokio-util"
|
||||
# - Cargo.toml
|
||||
# - Update CHANGELOG.md.
|
||||
# - Create "v0.2.x" git tag.
|
||||
version = "0.2.0"
|
||||
version = "0.3.0"
|
||||
edition = "2018"
|
||||
authors = ["Tokio Contributors <team@tokio.rs>"]
|
||||
license = "MIT"
|
||||
@ -38,7 +38,7 @@ futures-core = "0.3.0"
|
||||
futures-sink = "0.3.0"
|
||||
futures-io = { version = "0.3.0", optional = true }
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.1.1"
|
||||
pin-project-lite = "0.1.4"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "0.2.0", path = "../tokio", features = ["full"] }
|
||||
|
@ -65,8 +65,7 @@ impl Decoder for BytesCodec {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for BytesCodec {
|
||||
type Item = Bytes;
|
||||
impl Encoder<Bytes> for BytesCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
|
||||
|
@ -1,4 +1,3 @@
|
||||
use crate::codec::encoder::Encoder;
|
||||
use crate::codec::Framed;
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@ -159,7 +158,7 @@ pub trait Decoder {
|
||||
/// [`Framed`]: crate::codec::Framed
|
||||
fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self>
|
||||
where
|
||||
Self: Encoder + Sized,
|
||||
Self: Sized,
|
||||
{
|
||||
Framed::new(io, self)
|
||||
}
|
||||
|
@ -5,10 +5,7 @@ use std::io;
|
||||
/// [`FramedWrite`].
|
||||
///
|
||||
/// [`FramedWrite`]: crate::codec::FramedWrite
|
||||
pub trait Encoder {
|
||||
/// The type of items consumed by the `Encoder`
|
||||
type Item;
|
||||
|
||||
pub trait Encoder<Item> {
|
||||
/// The type of encoding errors.
|
||||
///
|
||||
/// [`FramedWrite`] requires `Encoder`s errors to implement `From<io::Error>`
|
||||
@ -24,5 +21,5 @@ pub trait Encoder {
|
||||
/// will be written out when possible.
|
||||
///
|
||||
/// [`FramedWrite`]: crate::codec::FramedWrite
|
||||
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
|
||||
fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
@ -67,7 +67,6 @@ impl<T, U> ProjectFuse for Fuse<T, U> {
|
||||
impl<T, U> Framed<T, U>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
U: Decoder + Encoder,
|
||||
{
|
||||
/// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
|
||||
/// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
|
||||
@ -262,7 +261,7 @@ where
|
||||
impl<T, I, U> Sink<I> for Framed<T, U>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
U: Encoder<Item = I>,
|
||||
U: Encoder<I>,
|
||||
U::Error: From<io::Error>,
|
||||
{
|
||||
type Error = U::Error;
|
||||
@ -380,11 +379,10 @@ impl<T, U: Decoder> Decoder for Fuse<T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U: Encoder> Encoder for Fuse<T, U> {
|
||||
type Item = U::Item;
|
||||
impl<T, I, U: Encoder<I>> Encoder<I> for Fuse<T, U> {
|
||||
type Error = U::Error;
|
||||
|
||||
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
self.codec.encode(item, dst)
|
||||
}
|
||||
}
|
||||
@ -414,8 +412,11 @@ pub struct FramedParts<T, U> {
|
||||
}
|
||||
|
||||
impl<T, U> FramedParts<T, U> {
|
||||
/// Create a new, default, `FramedParts`.
|
||||
pub fn new(io: T, codec: U) -> FramedParts<T, U> {
|
||||
/// Create a new, default, `FramedParts`
|
||||
pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
|
||||
where
|
||||
U: Encoder<I>,
|
||||
{
|
||||
FramedParts {
|
||||
io,
|
||||
codec,
|
||||
|
@ -42,7 +42,6 @@ const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
|
||||
impl<T, E> FramedWrite<T, E>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
E: Encoder,
|
||||
{
|
||||
/// Creates a new `FramedWrite` with the given `encoder`.
|
||||
pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
|
||||
@ -100,7 +99,7 @@ impl<T, E> FramedWrite<T, E> {
|
||||
impl<T, I, E> Sink<I> for FramedWrite<T, E>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
E: Encoder<Item = I>,
|
||||
E: Encoder<I>,
|
||||
E::Error: From<io::Error>,
|
||||
{
|
||||
type Error = E::Error;
|
||||
@ -191,9 +190,9 @@ impl<T> FramedWrite2<T> {
|
||||
impl<I, T> Sink<I> for FramedWrite2<T>
|
||||
where
|
||||
T: ProjectFuse + AsyncWrite,
|
||||
T::Codec: Encoder<Item = I>,
|
||||
T::Codec: Encoder<I>,
|
||||
{
|
||||
type Error = <T::Codec as Encoder>::Error;
|
||||
type Error = <T::Codec as Encoder<I>>::Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
// If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
|
||||
|
@ -546,12 +546,11 @@ impl Decoder for LengthDelimitedCodec {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for LengthDelimitedCodec {
|
||||
type Item = Bytes;
|
||||
impl Encoder<Bytes> for LengthDelimitedCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
|
||||
let n = (&data).remaining();
|
||||
let n = data.len();
|
||||
|
||||
if n > self.builder.max_frame_len {
|
||||
return Err(io::Error::new(
|
||||
|
@ -182,11 +182,14 @@ impl Decoder for LinesCodec {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for LinesCodec {
|
||||
type Item = String;
|
||||
impl<T> Encoder<T> for LinesCodec
|
||||
where
|
||||
T: AsRef<str>,
|
||||
{
|
||||
type Error = LinesCodecError;
|
||||
|
||||
fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
|
||||
fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
|
||||
let line = line.as_ref();
|
||||
buf.reserve(line.len() + 1);
|
||||
buf.put(line.as_bytes());
|
||||
buf.put_u8(b'\n');
|
||||
|
@ -70,7 +70,7 @@ impl<C: Decoder + Unpin> Stream for UdpFramed<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> {
|
||||
impl<I, C: Encoder<I> + Unpin> Sink<(I, SocketAddr)> for UdpFramed<C> {
|
||||
type Error = C::Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
@ -84,7 +84,7 @@ impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: (C::Item, SocketAddr)) -> Result<(), Self::Error> {
|
||||
fn start_send(self: Pin<&mut Self>, item: (I, SocketAddr)) -> Result<(), Self::Error> {
|
||||
let (frame, out_addr) = item;
|
||||
|
||||
let pin = self.get_mut();
|
||||
|
@ -209,9 +209,9 @@ fn lines_encoder() {
|
||||
let mut codec = LinesCodec::new();
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
codec.encode(String::from("line 1"), &mut buf).unwrap();
|
||||
codec.encode("line 1", &mut buf).unwrap();
|
||||
assert_eq!("line 1\n", buf);
|
||||
|
||||
codec.encode(String::from("line 2"), &mut buf).unwrap();
|
||||
codec.encode("line 2", &mut buf).unwrap();
|
||||
assert_eq!("line 1\nline 2\n", buf);
|
||||
}
|
||||
|
@ -28,8 +28,7 @@ impl Decoder for U32Codec {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for U32Codec {
|
||||
type Item = u32;
|
||||
impl Encoder<u32> for U32Codec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
|
||||
|
@ -28,8 +28,7 @@ macro_rules! pin {
|
||||
|
||||
struct U32Encoder;
|
||||
|
||||
impl Encoder for U32Encoder {
|
||||
type Item = u32;
|
||||
impl Encoder<u32> for U32Encoder {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
|
||||
|
@ -8,6 +8,7 @@ use futures::future::FutureExt;
|
||||
use futures::sink::SinkExt;
|
||||
use std::io;
|
||||
|
||||
#[cfg_attr(any(target_os = "macos", target_os = "ios"), allow(unused_assignments))]
|
||||
#[tokio::test]
|
||||
async fn send_framed() -> std::io::Result<()> {
|
||||
let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
|
||||
@ -21,33 +22,34 @@ async fn send_framed() -> std::io::Result<()> {
|
||||
let mut a = UdpFramed::new(a_soc, ByteCodec);
|
||||
let mut b = UdpFramed::new(b_soc, ByteCodec);
|
||||
|
||||
let msg = b"4567".to_vec();
|
||||
let msg = b"4567";
|
||||
|
||||
let send = a.send((msg.clone(), b_addr));
|
||||
let send = a.send((msg, b_addr));
|
||||
let recv = b.next().map(|e| e.unwrap());
|
||||
let (_, received) = try_join(send, recv).await.unwrap();
|
||||
|
||||
let (data, addr) = received;
|
||||
assert_eq!(msg, data);
|
||||
assert_eq!(msg, &*data);
|
||||
assert_eq!(a_addr, addr);
|
||||
|
||||
a_soc = a.into_inner();
|
||||
b_soc = b.into_inner();
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
|
||||
// test sending & receiving an empty message
|
||||
{
|
||||
let mut a = UdpFramed::new(a_soc, ByteCodec);
|
||||
let mut b = UdpFramed::new(b_soc, ByteCodec);
|
||||
|
||||
let msg = b"".to_vec();
|
||||
let msg = b"";
|
||||
|
||||
let send = a.send((msg.clone(), b_addr));
|
||||
let send = a.send((msg, b_addr));
|
||||
let recv = b.next().map(|e| e.unwrap());
|
||||
let (_, received) = try_join(send, recv).await.unwrap();
|
||||
|
||||
let (data, addr) = received;
|
||||
assert_eq!(msg, data);
|
||||
assert_eq!(msg, &*data);
|
||||
assert_eq!(a_addr, addr);
|
||||
}
|
||||
|
||||
@ -66,13 +68,12 @@ impl Decoder for ByteCodec {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for ByteCodec {
|
||||
type Item = Vec<u8>;
|
||||
impl Encoder<&[u8]> for ByteCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
|
||||
fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
|
||||
buf.reserve(data.len());
|
||||
buf.put_slice(&data);
|
||||
buf.put_slice(data);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user