diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 423ffe021..a25f526f1 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dev-dependencies] tokio = { version = "0.2.0", path = "../tokio", features = ["full"] } -tokio-util = { version = "0.2.0", path = "../tokio-util", features = ["full"] } +tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] } bytes = "0.5" futures = "0.3.0" http = "0.2" diff --git a/examples/chat.rs b/examples/chat.rs index fac43cbf0..b3fb727a2 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -193,9 +193,7 @@ async fn process( let mut lines = Framed::new(stream, LinesCodec::new()); // Send a prompt to the client to enter their username. - lines - .send(String::from("Please enter your username:")) - .await?; + lines.send("Please enter your username:").await?; // Read the first line from the `LineCodec` stream to get the username. let username = match lines.next().await { @@ -232,7 +230,7 @@ async fn process( // A message was received from a peer. Send it to the // current user. Ok(Message::Received(msg)) => { - peer.lines.send(msg).await?; + peer.lines.send(&msg).await?; } Err(e) => { println!( diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 7c71dedf6..c1af2541f 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -130,7 +130,7 @@ async fn main() -> Result<(), Box> { let response = response.serialize(); - if let Err(e) = lines.send(response).await { + if let Err(e) = lines.send(response.as_str()).await { println!("error on sending response; error = {:?}", e); } } diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 9ac2806e9..732da0d64 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -96,8 +96,7 @@ struct Http; /// Implementation of encoding an HTTP response into a `BytesMut`, basically /// just writing out an HTTP/1.1 response. -impl Encoder for Http { - type Item = Response; +impl Encoder> for Http { type Error = io::Error; fn encode(&mut self, item: Response, dst: &mut BytesMut) -> io::Result<()> { diff --git a/tokio-tls/Cargo.toml b/tokio-tls/Cargo.toml index d5a5c6def..a98779269 100644 --- a/tokio-tls/Cargo.toml +++ b/tokio-tls/Cargo.toml @@ -30,7 +30,7 @@ tokio = { version = "0.2.0", path = "../tokio" } [dev-dependencies] tokio = { version = "0.2.0", path = "../tokio", features = ["macros", "stream", "rt-core", "io-util", "net"] } -tokio-util = { version = "0.2.0", path = "../tokio-util", features = ["full"] } +tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] } cfg-if = "0.1" env_logger = { version = "0.6", default-features = false } diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 48022e343..aaabd106d 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,14 @@ +# 0.3.0 (February 28, 2020) + +Breaking changes: + +- Change codec::Encoder trait to take a generic Item parameter (#1746), which allows + codec writers to pass references into `Framed` and `FramedWrite` types. + +Other additions: + +- Add futures-io/tokio::io compatibility layer (#2117) + # 0.2.0 (November 26, 2019) - Initial release diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index e029de2f6..9a537ba15 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -7,7 +7,7 @@ name = "tokio-util" # - Cargo.toml # - Update CHANGELOG.md. # - Create "v0.2.x" git tag. -version = "0.2.0" +version = "0.3.0" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" @@ -38,7 +38,7 @@ futures-core = "0.3.0" futures-sink = "0.3.0" futures-io = { version = "0.3.0", optional = true } log = "0.4" -pin-project-lite = "0.1.1" +pin-project-lite = "0.1.4" [dev-dependencies] tokio = { version = "0.2.0", path = "../tokio", features = ["full"] } diff --git a/tokio-util/src/codec/bytes_codec.rs b/tokio-util/src/codec/bytes_codec.rs index 46b31aa32..a5e73749e 100644 --- a/tokio-util/src/codec/bytes_codec.rs +++ b/tokio-util/src/codec/bytes_codec.rs @@ -65,8 +65,7 @@ impl Decoder for BytesCodec { } } -impl Encoder for BytesCodec { - type Item = Bytes; +impl Encoder for BytesCodec { type Error = io::Error; fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { diff --git a/tokio-util/src/codec/decoder.rs b/tokio-util/src/codec/decoder.rs index 5f53ebc31..84d27fbf1 100644 --- a/tokio-util/src/codec/decoder.rs +++ b/tokio-util/src/codec/decoder.rs @@ -1,4 +1,3 @@ -use crate::codec::encoder::Encoder; use crate::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; @@ -159,7 +158,7 @@ pub trait Decoder { /// [`Framed`]: crate::codec::Framed fn framed(self, io: T) -> Framed where - Self: Encoder + Sized, + Self: Sized, { Framed::new(io, self) } diff --git a/tokio-util/src/codec/encoder.rs b/tokio-util/src/codec/encoder.rs index e33e66849..770a10fa9 100644 --- a/tokio-util/src/codec/encoder.rs +++ b/tokio-util/src/codec/encoder.rs @@ -5,10 +5,7 @@ use std::io; /// [`FramedWrite`]. /// /// [`FramedWrite`]: crate::codec::FramedWrite -pub trait Encoder { - /// The type of items consumed by the `Encoder` - type Item; - +pub trait Encoder { /// The type of encoding errors. /// /// [`FramedWrite`] requires `Encoder`s errors to implement `From` @@ -24,5 +21,5 @@ pub trait Encoder { /// will be written out when possible. /// /// [`FramedWrite`]: crate::codec::FramedWrite - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>; + fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Self::Error>; } diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index e42ed28ca..d2e7659ed 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -67,7 +67,6 @@ impl ProjectFuse for Fuse { impl Framed where T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, { /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. @@ -262,7 +261,7 @@ where impl Sink for Framed where T: AsyncWrite, - U: Encoder, + U: Encoder, U::Error: From, { type Error = U::Error; @@ -380,11 +379,10 @@ impl Decoder for Fuse { } } -impl Encoder for Fuse { - type Item = U::Item; +impl> Encoder for Fuse { type Error = U::Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> { self.codec.encode(item, dst) } } @@ -414,8 +412,11 @@ pub struct FramedParts { } impl FramedParts { - /// Create a new, default, `FramedParts`. - pub fn new(io: T, codec: U) -> FramedParts { + /// Create a new, default, `FramedParts` + pub fn new(io: T, codec: U) -> FramedParts + where + U: Encoder, + { FramedParts { io, codec, diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 80aa21c60..c0049b2d0 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -42,7 +42,6 @@ const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; impl FramedWrite where T: AsyncWrite, - E: Encoder, { /// Creates a new `FramedWrite` with the given `encoder`. pub fn new(inner: T, encoder: E) -> FramedWrite { @@ -100,7 +99,7 @@ impl FramedWrite { impl Sink for FramedWrite where T: AsyncWrite, - E: Encoder, + E: Encoder, E::Error: From, { type Error = E::Error; @@ -191,9 +190,9 @@ impl FramedWrite2 { impl Sink for FramedWrite2 where T: ProjectFuse + AsyncWrite, - T::Codec: Encoder, + T::Codec: Encoder, { - type Error = ::Error; + type Error = >::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's diff --git a/tokio-util/src/codec/length_delimited.rs b/tokio-util/src/codec/length_delimited.rs index 04061c0ac..5e98d4a70 100644 --- a/tokio-util/src/codec/length_delimited.rs +++ b/tokio-util/src/codec/length_delimited.rs @@ -546,12 +546,11 @@ impl Decoder for LengthDelimitedCodec { } } -impl Encoder for LengthDelimitedCodec { - type Item = Bytes; +impl Encoder for LengthDelimitedCodec { type Error = io::Error; fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { - let n = (&data).remaining(); + let n = data.len(); if n > self.builder.max_frame_len { return Err(io::Error::new( diff --git a/tokio-util/src/codec/lines_codec.rs b/tokio-util/src/codec/lines_codec.rs index e048b287c..e1816d8c4 100644 --- a/tokio-util/src/codec/lines_codec.rs +++ b/tokio-util/src/codec/lines_codec.rs @@ -182,11 +182,14 @@ impl Decoder for LinesCodec { } } -impl Encoder for LinesCodec { - type Item = String; +impl Encoder for LinesCodec +where + T: AsRef, +{ type Error = LinesCodecError; - fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), LinesCodecError> { + fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> { + let line = line.as_ref(); buf.reserve(line.len() + 1); buf.put(line.as_bytes()); buf.put_u8(b'\n'); diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs index c16e12f88..5b098bd49 100644 --- a/tokio-util/src/udp/frame.rs +++ b/tokio-util/src/udp/frame.rs @@ -70,7 +70,7 @@ impl Stream for UdpFramed { } } -impl Sink<(C::Item, SocketAddr)> for UdpFramed { +impl + Unpin> Sink<(I, SocketAddr)> for UdpFramed { type Error = C::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -84,7 +84,7 @@ impl Sink<(C::Item, SocketAddr)> for UdpFramed { Poll::Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: (C::Item, SocketAddr)) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: (I, SocketAddr)) -> Result<(), Self::Error> { let (frame, out_addr) = item; let pin = self.get_mut(); diff --git a/tokio-util/tests/codecs.rs b/tokio-util/tests/codecs.rs index d12128665..a22effbeb 100644 --- a/tokio-util/tests/codecs.rs +++ b/tokio-util/tests/codecs.rs @@ -209,9 +209,9 @@ fn lines_encoder() { let mut codec = LinesCodec::new(); let mut buf = BytesMut::new(); - codec.encode(String::from("line 1"), &mut buf).unwrap(); + codec.encode("line 1", &mut buf).unwrap(); assert_eq!("line 1\n", buf); - codec.encode(String::from("line 2"), &mut buf).unwrap(); + codec.encode("line 2", &mut buf).unwrap(); assert_eq!("line 1\nline 2\n", buf); } diff --git a/tokio-util/tests/framed.rs b/tokio-util/tests/framed.rs index cb82f8df8..d7ee3ef51 100644 --- a/tokio-util/tests/framed.rs +++ b/tokio-util/tests/framed.rs @@ -28,8 +28,7 @@ impl Decoder for U32Codec { } } -impl Encoder for U32Codec { - type Item = u32; +impl Encoder for U32Codec { type Error = io::Error; fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs index b2970c39d..9ac6c1d11 100644 --- a/tokio-util/tests/framed_write.rs +++ b/tokio-util/tests/framed_write.rs @@ -28,8 +28,7 @@ macro_rules! pin { struct U32Encoder; -impl Encoder for U32Encoder { - type Item = u32; +impl Encoder for U32Encoder { type Error = io::Error; fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index 51c65c630..0ba057428 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -8,6 +8,7 @@ use futures::future::FutureExt; use futures::sink::SinkExt; use std::io; +#[cfg_attr(any(target_os = "macos", target_os = "ios"), allow(unused_assignments))] #[tokio::test] async fn send_framed() -> std::io::Result<()> { let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?; @@ -21,33 +22,34 @@ async fn send_framed() -> std::io::Result<()> { let mut a = UdpFramed::new(a_soc, ByteCodec); let mut b = UdpFramed::new(b_soc, ByteCodec); - let msg = b"4567".to_vec(); + let msg = b"4567"; - let send = a.send((msg.clone(), b_addr)); + let send = a.send((msg, b_addr)); let recv = b.next().map(|e| e.unwrap()); let (_, received) = try_join(send, recv).await.unwrap(); let (data, addr) = received; - assert_eq!(msg, data); + assert_eq!(msg, &*data); assert_eq!(a_addr, addr); a_soc = a.into_inner(); b_soc = b.into_inner(); } + #[cfg(not(any(target_os = "macos", target_os = "ios")))] // test sending & receiving an empty message { let mut a = UdpFramed::new(a_soc, ByteCodec); let mut b = UdpFramed::new(b_soc, ByteCodec); - let msg = b"".to_vec(); + let msg = b""; - let send = a.send((msg.clone(), b_addr)); + let send = a.send((msg, b_addr)); let recv = b.next().map(|e| e.unwrap()); let (_, received) = try_join(send, recv).await.unwrap(); let (data, addr) = received; - assert_eq!(msg, data); + assert_eq!(msg, &*data); assert_eq!(a_addr, addr); } @@ -66,13 +68,12 @@ impl Decoder for ByteCodec { } } -impl Encoder for ByteCodec { - type Item = Vec; +impl Encoder<&[u8]> for ByteCodec { type Error = io::Error; - fn encode(&mut self, data: Vec, buf: &mut BytesMut) -> Result<(), io::Error> { + fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { buf.reserve(data.len()); - buf.put_slice(&data); + buf.put_slice(data); Ok(()) } }