diff --git a/Cargo.toml b/Cargo.toml index 25e84bd1d..40f86dccb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,5 +19,5 @@ members = [ "tokio-tcp", # "tokio-tls", "tokio-udp", - # "tokio-uds", + "tokio-uds", ] diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 35517b287..3387b0bf7 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -37,7 +37,8 @@ jobs: - incoming # - tokio-tls tokio-udp: [] -# - tokio-uds + tokio-uds: + - async-traits # Test crates that are NOT platform specific - template: ci/azure-test-stable.yml diff --git a/tokio-uds/Cargo.toml b/tokio-uds/Cargo.toml index 4241e9546..2d941ec40 100644 --- a/tokio-uds/Cargo.toml +++ b/tokio-uds/Cargo.toml @@ -21,9 +21,12 @@ Unix Domain sockets for Tokio categories = ["asynchronous"] publish = false +[features] +async-traits = ["futures-core-preview"] + [dependencies] bytes = "0.4.8" -futures = "0.1.21" +futures-core-preview = { version = "0.3.0-alpha.17", optional = true } iovec = "0.1.2" libc = "0.2.42" log = "0.4.2" @@ -36,3 +39,4 @@ tokio-io = { version = "0.2.0", path = "../tokio-io" } [dev-dependencies] tokio = { version = "0.2.0", path = "../tokio" } tempfile = "3" +futures-preview = "0.3.0-alpha.17" diff --git a/tokio-uds/src/datagram.rs b/tokio-uds/src/datagram.rs index 040f9c641..87d1d49b9 100644 --- a/tokio-uds/src/datagram.rs +++ b/tokio-uds/src/datagram.rs @@ -1,5 +1,4 @@ -use crate::{RecvDgram, SendDgram}; -use futures::{try_ready, Async, Poll}; +use crate::{Recv, RecvFrom, Send, SendTo}; use mio::Ready; use mio_uds; use std::fmt; @@ -8,6 +7,8 @@ use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio_reactor::{Handle, PollEvented}; /// An I/O object representing a Unix datagram socket. @@ -68,14 +69,219 @@ impl UnixDatagram { self.io.get_ref().connect(path) } + /// Returns a future that sends data on the socket to the remote address to which it is connected. + /// On success, the future will resolve to the number of bytes written. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will resolve to an error if the socket is not connected. + /// + /// [`connect`]: #method.connect + pub fn send<'a, 'b>(&'a mut self, buf: &'b [u8]) -> Send<'a, 'b> { + Send::new(self, buf) + } + + /// Sends data on the socket to the remote address to which it is connected. + /// + /// The [`connect`] method will connect this socket to a remote address. This + /// method will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + /// + /// # Return + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. + /// + /// If the socket is not ready for writing, the method returns + /// `Poll::Pending` and arranges for the current task to receive a + /// notification when the socket becomes writable. + pub fn poll_send( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.poll_send_priv(cx, buf) + } + + // Poll IO functions that takes `&self` are provided for the split API. + // + // They are not public because (taken from the doc of `PollEvented`): + // + // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the + // caller must ensure that there are at most two tasks that use a + // `PollEvented` instance concurrently. One for reading and one for writing. + // While violating this requirement is "safe" from a Rust memory model point + // of view, it will result in unexpected behavior in the form of lost + // notifications and tasks hanging. + pub(crate) fn poll_send_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().send(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Returns a future that receives a single datagram message on the socket from + /// the remote address to which it is connected. On success, the future will resolve + /// to the number of bytes read. + /// + /// The function must be called with valid byte array `buf` of sufficient size to + /// hold the message bytes. If a message is too long to fit in the supplied buffer, + /// excess bytes may be discarded. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + pub fn recv<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> Recv<'a, 'b> { + Recv::new(self, buf) + } + + /// Receives a single datagram message on the socket from the remote address to + /// which it is connected. On success, returns the number of bytes read. + /// + /// The function must be called with valid byte array `buf` of sufficient size to + /// hold the message bytes. If a message is too long to fit in the supplied buffer, + /// excess bytes may be discarded. + /// + /// The [`connect`] method will connect this socket to a remote address. This + /// method will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + /// + /// # Return + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Poll::Pending` and arranges for the current task to receive a + /// notification when the socket becomes receivable or is closed. + pub fn poll_recv( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.poll_recv_priv(cx, buf) + } + + pub(crate) fn poll_recv_priv( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().recv(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Returns a future that sends data on the socket to the given address. + /// On success, the future will resolve to the number of bytes written. + /// + /// The future will resolve to an error if the IP version of the socket does + /// not match that of `target`. + pub fn send_to<'a, 'b, P>(&'a mut self, buf: &'b [u8], target: P) -> SendTo<'a, 'b, P> + where + P: AsRef + Unpin, + { + SendTo::new(self, buf, target) + } + + /// Sends data on the socket to the given address. On success, returns the + /// number of bytes written. + /// + /// This will return an error when the IP version of the local socket + /// does not match that of `target`. + /// + /// # Return + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. + /// + /// If the socket is not ready for writing, the method returns + /// `Poll::Pending` and arranges for the current task to receive a + /// notification when the socket becomes writable. + pub fn poll_send_to>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + target: P, + ) -> Poll> { + self.poll_send_to_priv(cx, buf, target.as_ref()) + } + + pub(crate) fn poll_send_to_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + target: &Path, + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().send_to(buf, target) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Returns a future that receives a single datagram on the socket. On success, + /// the future resolves to the number of bytes read and the origin. + /// + /// The function must be called with valid byte array `buf` of sufficient size + /// to hold the message bytes. If a message is too long to fit in the supplied + /// buffer, excess bytes may be discarded. + pub fn recv_from<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> RecvFrom<'a, 'b> { + RecvFrom::new(self, buf) + } + + /// Receives data from the socket. On success, returns the number of bytes + /// read and the address from whence the data came. + pub fn poll_recv_from( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.poll_recv_from_priv(cx, buf) + } + + pub(crate) fn poll_recv_from_priv( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().recv_from(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + /// Test whether this socket is ready to be read or not. - pub fn poll_read_ready(&self, ready: Ready) -> Poll { - self.io.poll_read_ready(ready) + pub fn poll_read_ready(&self, cx: &mut Context<'_>, ready: Ready) -> Poll> { + self.io.poll_read_ready(cx, ready) } /// Test whether this socket is ready to be written to or not. - pub fn poll_write_ready(&self) -> Poll { - self.io.poll_write_ready() + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.poll_write_ready(cx) } /// Returns the local address that this socket is bound to. @@ -90,94 +296,6 @@ impl UnixDatagram { self.io.get_ref().peer_addr() } - /// Receives data from the socket. - /// - /// On success, returns the number of bytes read and the address from - /// whence the data came. - pub fn poll_recv_from(&self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { - try_ready!(self.io.poll_read_ready(Ready::readable())); - - match self.io.get_ref().recv_from(buf) { - Ok(ret) => Ok(ret.into()), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(Ready::readable())?; - Ok(Async::NotReady) - } - Err(e) => Err(e), - } - } - - /// Receives data from the socket. - /// - /// On success, returns the number of bytes read. - pub fn poll_recv(&self, buf: &mut [u8]) -> Poll { - try_ready!(self.io.poll_read_ready(Ready::readable())); - - match self.io.get_ref().recv(buf) { - Ok(ret) => Ok(ret.into()), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(Ready::readable())?; - Ok(Async::NotReady) - } - Err(e) => Err(e), - } - } - - /// Returns a future for receiving a datagram. See the documentation on RecvDgram for details. - pub fn recv_dgram(self, buf: T) -> RecvDgram - where - T: AsMut<[u8]>, - { - RecvDgram::new(self, buf) - } - - /// Sends data on the socket to the specified address. - /// - /// On success, returns the number of bytes written. - pub fn poll_send_to

(&self, buf: &[u8], path: P) -> Poll - where - P: AsRef, - { - try_ready!(self.io.poll_write_ready()); - - match self.io.get_ref().send_to(buf, path) { - Ok(ret) => Ok(ret.into()), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready()?; - Ok(Async::NotReady) - } - Err(e) => Err(e), - } - } - - /// Sends data on the socket to the socket's peer. - /// - /// The peer address may be set by the `connect` method, and this method - /// will return an error if the socket has not already been connected. - /// - /// On success, returns the number of bytes written. - pub fn poll_send(&self, buf: &[u8]) -> Poll { - try_ready!(self.io.poll_write_ready()); - - match self.io.get_ref().send(buf) { - Ok(ret) => Ok(ret.into()), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready()?; - Ok(Async::NotReady) - } - Err(e) => Err(e), - } - } - - /// Returns a future sending the data in buf to the socket at path. - pub fn send_dgram(self, buf: T, path: P) -> SendDgram - where - T: AsRef<[u8]>, - P: AsRef, - { - SendDgram::new(self, buf, path) - } - /// Returns the value of the `SO_ERROR` option. pub fn take_error(&self) -> io::Result> { self.io.get_ref().take_error() diff --git a/tokio-uds/src/incoming.rs b/tokio-uds/src/incoming.rs index c904a926d..1a16302af 100644 --- a/tokio-uds/src/incoming.rs +++ b/tokio-uds/src/incoming.rs @@ -1,9 +1,14 @@ +#![cfg(feature = "async-traits")] + use crate::{UnixListener, UnixStream}; -use futures::{try_ready, Poll, Stream}; +use futures_core::stream::Stream; use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; /// Stream of listeners #[derive(Debug)] +#[must_use = "streams do nothing unless polled"] pub struct Incoming { inner: UnixListener, } @@ -15,10 +20,10 @@ impl Incoming { } impl Stream for Incoming { - type Item = UnixStream; - type Error = io::Error; + type Item = io::Result; - fn poll(&mut self) -> Poll, io::Error> { - Ok(Some(try_ready!(self.inner.poll_accept()).0).into()) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (socket, _) = ready!(Pin::new(&mut self.inner).poll_accept(cx))?; + Poll::Ready(Some(Ok(socket))) } } diff --git a/tokio-uds/src/lib.rs b/tokio-uds/src/lib.rs index 55a35e690..fef2a5c56 100644 --- a/tokio-uds/src/lib.rs +++ b/tokio-uds/src/lib.rs @@ -8,20 +8,34 @@ //! //! This crate provides APIs for using Unix Domain Sockets with Tokio. +macro_rules! ready { + ($e:expr) => { + match $e { + ::std::task::Poll::Ready(t) => t, + ::std::task::Poll::Pending => return ::std::task::Poll::Pending, + } + }; +} + mod datagram; -mod frame; +// mod frame; mod incoming; mod listener; -mod recv_dgram; -mod send_dgram; +mod recv; +mod recv_from; +mod send; +mod send_to; mod stream; mod ucred; pub use crate::datagram::UnixDatagram; -pub use crate::frame::UnixDatagramFramed; +pub use crate::recv::Recv; +pub use crate::recv_from::RecvFrom; +pub use crate::send::Send; +pub use crate::send_to::SendTo; +// pub use crate::frame::UnixDatagramFramed; +#[cfg(feature = "async-traits")] pub use crate::incoming::Incoming; -pub use crate::listener::UnixListener; -pub use crate::recv_dgram::RecvDgram; -pub use crate::send_dgram::SendDgram; +pub use crate::listener::{Accept, UnixListener}; pub use crate::stream::{ConnectFuture, UnixStream}; pub use crate::ucred::UCred; diff --git a/tokio-uds/src/listener.rs b/tokio-uds/src/listener.rs index cdfeacea0..e875344a1 100644 --- a/tokio-uds/src/listener.rs +++ b/tokio-uds/src/listener.rs @@ -1,13 +1,15 @@ -use crate::{Incoming, UnixStream}; -use futures::{try_ready, Async, Poll}; +use crate::UnixStream; use mio::Ready; use mio_uds; use std::convert::TryFrom; use std::fmt; +use std::future::Future; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio_reactor::{Handle, PollEvented}; /// A Unix socket which can accept connections from other Unix sockets. @@ -43,8 +45,8 @@ impl UnixListener { } /// Test whether this socket is ready to be read or not. - pub fn poll_read_ready(&self, ready: Ready) -> Poll { - self.io.poll_read_ready(ready) + pub fn poll_read_ready(&self, cx: &mut Context<'_>, ready: Ready) -> Poll> { + self.io.poll_read_ready(cx, ready) } /// Returns the value of the `SO_ERROR` option. @@ -52,6 +54,12 @@ impl UnixListener { self.io.get_ref().take_error() } + /// Returns a future that attempts to accept a connection and creates a new + /// connected `UnixStream` if successful. + pub fn accept(&mut self) -> Accept<'_> { + Accept { listener: self } + } + /// Attempt to accept a connection and create a new connected `UnixStream` /// if successful. /// @@ -70,11 +78,14 @@ impl UnixListener { /// This function will panic if it is called outside the context of a /// future's task. It's recommended to only call this from the /// implementation of a `Future::poll`, if necessary. - pub fn poll_accept(&self) -> Poll<(UnixStream, SocketAddr), io::Error> { - let (io, addr) = try_ready!(self.poll_accept_std()); + pub fn poll_accept( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let (io, addr) = ready!(self.poll_accept_std(cx))?; let io = mio_uds::UnixStream::from_stream(io)?; - Ok((UnixStream::new(io), addr).into()) + Ok((UnixStream::new(io), addr)).into() } /// Attempt to accept a connection and create a new connected `UnixStream` @@ -100,24 +111,23 @@ impl UnixListener { /// This function will panic if it is called outside the context of a /// future's task. It's recommended to only call this from the /// implementation of a `Future::poll`, if necessary. - pub fn poll_accept_std(&self) -> Poll<(net::UnixStream, SocketAddr), io::Error> { - loop { - try_ready!(self.io.poll_read_ready(Ready::readable())); + pub fn poll_accept_std( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, Ready::readable()))?; - match self.io.get_ref().accept_std() { - Ok(None) => { - self.io.clear_read_ready(Ready::readable())?; - return Ok(Async::NotReady); - } - Ok(Some((sock, addr))) => { - return Ok(Async::Ready((sock, addr))); - } - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(Ready::readable())?; - return Ok(Async::NotReady); - } - Err(err) => return Err(err), + match self.io.get_ref().accept_std() { + Ok(None) => { + self.io.clear_read_ready(cx, Ready::readable())?; + Poll::Pending } + Ok(Some((sock, addr))) => Ok((sock, addr)).into(), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, Ready::readable())?; + Poll::Pending + } + Err(err) => Err(err).into(), } } @@ -126,8 +136,9 @@ impl UnixListener { /// /// This method returns an implementation of the `Stream` trait which /// resolves to the sockets the are accepted on this listener. - pub fn incoming(self) -> Incoming { - Incoming::new(self) + #[cfg(feature = "async-traits")] + pub fn incoming(self) -> crate::Incoming { + crate::Incoming::new(self) } } @@ -154,3 +165,18 @@ impl AsRawFd for UnixListener { self.io.get_ref().as_raw_fd() } } + +/// Future type returned by [`UnixListener::accept`]. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Accept<'a> { + listener: &'a mut UnixListener, +} + +impl<'a> Future for Accept<'a> { + type Output = io::Result<(UnixStream, SocketAddr)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.listener).poll_accept(cx) + } +} diff --git a/tokio-uds/src/recv.rs b/tokio-uds/src/recv.rs new file mode 100644 index 000000000..41e7b1381 --- /dev/null +++ b/tokio-uds/src/recv.rs @@ -0,0 +1,30 @@ +use crate::UnixDatagram; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that receives a datagram from the connected address. +/// +/// This `struct` is created by [`recv`](crate::UnixDatagram::recv). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Recv<'a, 'b> { + socket: &'a mut UnixDatagram, + buf: &'b mut [u8], +} + +impl<'a, 'b> Recv<'a, 'b> { + pub(crate) fn new(socket: &'a mut UnixDatagram, buf: &'b mut [u8]) -> Self { + Self { socket, buf } + } +} + +impl<'a, 'b> Future for Recv<'a, 'b> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Recv { socket, buf } = self.get_mut(); + socket.poll_recv_priv(cx, buf) + } +} diff --git a/tokio-uds/src/recv_dgram.rs b/tokio-uds/src/recv_dgram.rs deleted file mode 100644 index 0e03f26ab..000000000 --- a/tokio-uds/src/recv_dgram.rs +++ /dev/null @@ -1,72 +0,0 @@ -use crate::UnixDatagram; -use futures::{try_ready, Async, Future, Poll}; -use std::io; -use std::mem; - -/// A future for receiving datagrams from a Unix datagram socket. -/// -/// An example that uses UDP sockets but is still applicable can be found at -/// https://gist.github.com/dermesser/e331094c2ab28fc7f6ba8a16183fe4d5. -#[derive(Debug)] -pub struct RecvDgram { - st: State, -} - -/// A future similar to RecvDgram, but without allocating and returning the peer's address. -/// -/// This can be used if the peer's address is of no interest, so the allocation overhead can be -/// avoided. -#[derive(Debug)] -enum State { - Receiving { sock: UnixDatagram, buf: T }, - Empty, -} - -impl RecvDgram -where - T: AsMut<[u8]>, -{ - pub(crate) fn new(sock: UnixDatagram, buf: T) -> RecvDgram { - RecvDgram { - st: State::Receiving { sock, buf }, - } - } -} - -impl Future for RecvDgram -where - T: AsMut<[u8]>, -{ - /// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were - /// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the - /// datagram is truncated. - type Item = (UnixDatagram, T, usize, String); - /// This future yields io::Error if an error occurred. - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let received; - let peer; - - if let State::Receiving { - ref mut sock, - ref mut buf, - } = self.st - { - let (n, p) = try_ready!(sock.poll_recv_from(buf.as_mut())); - received = n; - - peer = p.as_pathname().map_or(String::new(), |p| { - p.to_str().map_or(String::new(), |s| s.to_string()) - }); - } else { - panic!() - } - - if let State::Receiving { sock, buf } = mem::replace(&mut self.st, State::Empty) { - Ok(Async::Ready((sock, buf, received, peer))) - } else { - panic!() - } - } -} diff --git a/tokio-uds/src/recv_from.rs b/tokio-uds/src/recv_from.rs new file mode 100644 index 000000000..873ae5963 --- /dev/null +++ b/tokio-uds/src/recv_from.rs @@ -0,0 +1,31 @@ +use crate::UnixDatagram; +use std::future::Future; +use std::io; +use std::os::unix::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that receives a datagram. +/// +/// This `struct` is created by [`recv_from`](crate::UnixDatagram::recv_from). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct RecvFrom<'a, 'b> { + socket: &'a UnixDatagram, + buf: &'b mut [u8], +} + +impl<'a, 'b> RecvFrom<'a, 'b> { + pub(crate) fn new(socket: &'a UnixDatagram, buf: &'b mut [u8]) -> Self { + Self { socket, buf } + } +} + +impl<'a, 'b> Future for RecvFrom<'a, 'b> { + type Output = io::Result<(usize, SocketAddr)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let RecvFrom { socket, buf } = self.get_mut(); + socket.poll_recv_from_priv(cx, buf) + } +} diff --git a/tokio-uds/src/send.rs b/tokio-uds/src/send.rs new file mode 100644 index 000000000..e03f785af --- /dev/null +++ b/tokio-uds/src/send.rs @@ -0,0 +1,30 @@ +use crate::UnixDatagram; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that sends a datagram to the connected address. +/// +/// This `struct` is created by [`send`](crate::UnixDatagram::send). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Send<'a, 'b> { + socket: &'a UnixDatagram, + buf: &'b [u8], +} + +impl<'a, 'b> Send<'a, 'b> { + pub(crate) fn new(socket: &'a UnixDatagram, buf: &'b [u8]) -> Self { + Self { socket, buf } + } +} + +impl<'a, 'b> Future for Send<'a, 'b> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Send { socket, buf } = self.get_mut(); + socket.poll_send_priv(cx, buf) + } +} diff --git a/tokio-uds/src/send_dgram.rs b/tokio-uds/src/send_dgram.rs deleted file mode 100644 index 69004cf5c..000000000 --- a/tokio-uds/src/send_dgram.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::UnixDatagram; -use futures::{try_ready, Async, Future, Poll}; -use std::io; -use std::mem; -use std::path::Path; - -/// A future for writing a buffer to a Unix datagram socket. -#[derive(Debug)] -pub struct SendDgram { - st: State, -} - -#[derive(Debug)] -enum State { - /// current state is Sending - Sending { - /// the underlying socket - sock: UnixDatagram, - /// the buffer to send - buf: T, - /// the destination - addr: P, - }, - /// neutral state - Empty, -} - -impl SendDgram -where - T: AsRef<[u8]>, - P: AsRef, -{ - pub(crate) fn new(sock: UnixDatagram, buf: T, addr: P) -> SendDgram { - SendDgram { - st: State::Sending { sock, buf, addr }, - } - } -} - -impl Future for SendDgram -where - T: AsRef<[u8]>, - P: AsRef, -{ - /// Returns the underlying socket and the buffer that was sent. - type Item = (UnixDatagram, T); - /// The error that is returned when sending failed. - type Error = io::Error; - - fn poll(&mut self) -> Poll { - if let State::Sending { - ref mut sock, - ref buf, - ref addr, - } = self.st - { - let n = try_ready!(sock.poll_send_to(buf.as_ref(), addr)); - if n < buf.as_ref().len() { - return Err(io::Error::new( - io::ErrorKind::Other, - "Couldn't send whole buffer".to_string(), - )); - } - } else { - panic!() - } - if let State::Sending { sock, buf, addr: _ } = mem::replace(&mut self.st, State::Empty) { - Ok(Async::Ready((sock, buf))) - } else { - panic!() - } - } -} diff --git a/tokio-uds/src/send_to.rs b/tokio-uds/src/send_to.rs new file mode 100644 index 000000000..014b74ff6 --- /dev/null +++ b/tokio-uds/src/send_to.rs @@ -0,0 +1,43 @@ +use crate::UnixDatagram; +use std::future::Future; +use std::io; +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A future that sends a datagram to a given address. +/// +/// This `struct` is created by [`send_to`](crate::UnixDatagram::send_to). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct SendTo<'a, 'b, P> { + socket: &'a UnixDatagram, + buf: &'b [u8], + target: P, +} + +impl<'a, 'b, P> SendTo<'a, 'b, P> { + pub(crate) fn new(socket: &'a UnixDatagram, buf: &'b [u8], target: P) -> Self { + Self { + socket, + buf, + target, + } + } +} + +impl<'a, 'b, P> Future for SendTo<'a, 'b, P> +where + P: AsRef + Unpin, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let SendTo { + socket, + buf, + target, + } = self.get_mut(); + socket.poll_send_to_priv(cx, buf, target.as_ref()) + } +} diff --git a/tokio-uds/src/stream.rs b/tokio-uds/src/stream.rs index 3521e1a66..dfaa4fd98 100644 --- a/tokio-uds/src/stream.rs +++ b/tokio-uds/src/stream.rs @@ -1,17 +1,18 @@ use crate::ucred::{self, UCred}; use bytes::{Buf, BufMut}; -use futures::{Async, Future, Poll}; -use iovec::{self, IoVec}; -use libc; +use iovec::IoVec; use mio::Ready; use mio_uds; use std::convert::TryFrom; use std::fmt; +use std::future::Future; use std::io::{self, Read, Write}; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_reactor::{Handle, PollEvented}; @@ -27,15 +28,9 @@ pub struct UnixStream { /// Future returned by `UnixStream::connect` which will resolve to a /// `UnixStream` when the stream is connected. #[derive(Debug)] +#[must_use = "futures do nothing unless polled"] pub struct ConnectFuture { - inner: State, -} - -#[derive(Debug)] -enum State { - Waiting(UnixStream), - Error(io::Error), - Empty, + stream: Option>, } impl UnixStream { @@ -49,13 +44,7 @@ impl UnixStream { P: AsRef, { let res = mio_uds::UnixStream::connect(path).map(UnixStream::new); - - let inner = match res { - Ok(stream) => State::Waiting(stream), - Err(e) => State::Error(e), - }; - - ConnectFuture { inner } + ConnectFuture { stream: Some(res) } } /// Consumes a `UnixStream` in the standard library and returns a @@ -89,13 +78,13 @@ impl UnixStream { } /// Test whether this socket is ready to be read or not. - pub fn poll_read_ready(&self, ready: Ready) -> Poll { - self.io.poll_read_ready(ready) + pub fn poll_read_ready(&self, cx: &mut Context<'_>, ready: Ready) -> Poll> { + self.io.poll_read_ready(cx, ready) } /// Test whether this socket is ready to be written to or not. - pub fn poll_write_ready(&self) -> Poll { - self.io.poll_write_ready() + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.poll_write_ready(cx) } /// Returns the socket address of the local half of this connection. @@ -140,109 +129,189 @@ impl TryFrom for mio_uds::UnixStream { } } -impl Read for UnixStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.io.read(buf) - } -} - -impl Write for UnixStream { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.io.write(buf) - } - fn flush(&mut self) -> io::Result<()> { - self.io.flush() - } -} - impl AsyncRead for UnixStream { unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { false } - fn read_buf(&mut self, buf: &mut B) -> Poll { - <&UnixStream>::read_buf(&mut &*self, buf) + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.poll_read_priv(cx, buf) + } + + fn poll_read_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll> { + self.poll_read_buf_priv(cx, buf) } } impl AsyncWrite for UnixStream { - fn shutdown(&mut self) -> Poll<(), io::Error> { - <&UnixStream>::shutdown(&mut &*self) + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.poll_write_priv(cx, buf) } - fn write_buf(&mut self, buf: &mut B) -> Poll { - <&UnixStream>::write_buf(&mut &*self, buf) + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_write_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll> { + self.poll_write_buf_priv(cx, buf) } } -impl<'a> Read for &'a UnixStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - (&self.io).read(buf) - } -} +impl UnixStream { + // == Poll IO functions that takes `&self` == + // + // They are not public because (taken from the doc of `PollEvented`): + // + // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the + // caller must ensure that there are at most two tasks that use a + // `PollEvented` instance concurrently. One for reading and one for writing. + // While violating this requirement is "safe" from a Rust memory model point + // of view, it will result in unexpected behavior in the form of lost + // notifications and tasks hanging. -impl<'a> Write for &'a UnixStream { - fn write(&mut self, buf: &[u8]) -> io::Result { - (&self.io).write(buf) - } + pub(crate) fn poll_read_priv( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - fn flush(&mut self) -> io::Result<()> { - (&self.io).flush() - } -} - -impl<'a> AsyncRead for &'a UnixStream { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { - false - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = ::poll_read_ready(self, Ready::readable())? { - return Ok(Async::NotReady); - } - unsafe { - let r = read_ready(buf, self.as_raw_fd()); - if r == -1 { - let e = io::Error::last_os_error(); - if e.kind() == io::ErrorKind::WouldBlock { - self.io.clear_read_ready(Ready::readable())?; - Ok(Async::NotReady) - } else { - Err(e) - } - } else { - let r = r as usize; - buf.advance_mut(r); - Ok(r.into()) + match self.io.get_ref().read(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending } + x => Poll::Ready(x), } } -} -impl<'a> AsyncWrite for &'a UnixStream { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(().into()) - } + pub(crate) fn poll_read_buf_priv( + &self, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - fn write_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = ::poll_write_ready(self)? { - return Ok(Async::NotReady); - } - unsafe { - let r = write_ready(buf, self.as_raw_fd()); - if r == -1 { - let e = io::Error::last_os_error(); - if e.kind() == io::ErrorKind::WouldBlock { - self.io.clear_write_ready()?; - Ok(Async::NotReady) - } else { - Err(e) + let r = unsafe { + // The `IoVec` type can't have a 0-length size, so we create a bunch + // of dummy versions on the stack with 1 length which we'll quickly + // overwrite. + let b1: &mut [u8] = &mut [0]; + let b2: &mut [u8] = &mut [0]; + let b3: &mut [u8] = &mut [0]; + let b4: &mut [u8] = &mut [0]; + let b5: &mut [u8] = &mut [0]; + let b6: &mut [u8] = &mut [0]; + let b7: &mut [u8] = &mut [0]; + let b8: &mut [u8] = &mut [0]; + let b9: &mut [u8] = &mut [0]; + let b10: &mut [u8] = &mut [0]; + let b11: &mut [u8] = &mut [0]; + let b12: &mut [u8] = &mut [0]; + let b13: &mut [u8] = &mut [0]; + let b14: &mut [u8] = &mut [0]; + let b15: &mut [u8] = &mut [0]; + let b16: &mut [u8] = &mut [0]; + let mut bufs: [&mut IoVec; 16] = [ + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + b6.into(), + b7.into(), + b8.into(), + b9.into(), + b10.into(), + b11.into(), + b12.into(), + b13.into(), + b14.into(), + b15.into(), + b16.into(), + ]; + let n = buf.bytes_vec_mut(&mut bufs); + self.io.get_ref().read_bufs(&mut bufs[..n]) + }; + + match r { + Ok(n) => { + unsafe { + buf.advance_mut(n); } - } else { - let r = r as usize; - buf.advance(r); - Ok(r.into()) + Poll::Ready(Ok(n)) } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), + } + } + + pub(crate) fn poll_write_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + pub(crate) fn poll_write_buf_priv( + &self, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; + + let r = { + // The `IoVec` type can't have a zero-length size, so create a dummy + // version from a 1-length slice which we'll overwrite with the + // `bytes_vec` method. + static DUMMY: &[u8] = &[0]; + let iovec = <&IoVec>::from(DUMMY); + let mut bufs = [iovec; 64]; + let n = buf.bytes_vec(&mut bufs); + self.io.get_ref().write_bufs(&bufs[..n]) + }; + match r { + Ok(n) => { + buf.advance(n); + Poll::Ready(Ok(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), } } } @@ -260,106 +329,27 @@ impl AsRawFd for UnixStream { } impl Future for ConnectFuture { - type Item = UnixStream; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { - use std::mem; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let stream = self + .stream + .take() + .expect("ConnectFuture polled after completion")?; - match self.inner { - State::Waiting(ref mut stream) => { - if let Async::NotReady = stream.io.poll_write_ready()? { - return Ok(Async::NotReady); - } - - if let Some(e) = stream.io.get_ref().take_error()? { - return Err(e); - } + match stream.io.poll_write_ready(cx) { + Poll::Pending => { + self.stream = Some(Ok(stream)); + return Poll::Pending; } - State::Error(_) => { - let e = match mem::replace(&mut self.inner, State::Empty) { - State::Error(e) => e, - _ => unreachable!(), - }; - - return Err(e); - } - State::Empty => panic!("can't poll stream twice"), + Poll::Ready(Err(e)) => return Err(e).into(), + _ => (), } - match mem::replace(&mut self.inner, State::Empty) { - State::Waiting(stream) => Ok(Async::Ready(stream)), - _ => unreachable!(), + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e).into(); } + + Ok(stream).into() } } - -unsafe fn read_ready(buf: &mut B, raw_fd: RawFd) -> isize { - // The `IoVec` type can't have a 0-length size, so we create a bunch - // of dummy versions on the stack with 1 length which we'll quickly - // overwrite. - let b1: &mut [u8] = &mut [0]; - let b2: &mut [u8] = &mut [0]; - let b3: &mut [u8] = &mut [0]; - let b4: &mut [u8] = &mut [0]; - let b5: &mut [u8] = &mut [0]; - let b6: &mut [u8] = &mut [0]; - let b7: &mut [u8] = &mut [0]; - let b8: &mut [u8] = &mut [0]; - let b9: &mut [u8] = &mut [0]; - let b10: &mut [u8] = &mut [0]; - let b11: &mut [u8] = &mut [0]; - let b12: &mut [u8] = &mut [0]; - let b13: &mut [u8] = &mut [0]; - let b14: &mut [u8] = &mut [0]; - let b15: &mut [u8] = &mut [0]; - let b16: &mut [u8] = &mut [0]; - let mut bufs: [&mut IoVec; 16] = [ - b1.into(), - b2.into(), - b3.into(), - b4.into(), - b5.into(), - b6.into(), - b7.into(), - b8.into(), - b9.into(), - b10.into(), - b11.into(), - b12.into(), - b13.into(), - b14.into(), - b15.into(), - b16.into(), - ]; - - let n = buf.bytes_vec_mut(&mut bufs); - read_ready_vecs(&mut bufs[..n], raw_fd) -} - -unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize { - let iovecs = iovec::unix::as_os_slice_mut(bufs); - - libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) -} - -unsafe fn write_ready(buf: &mut B, raw_fd: RawFd) -> isize { - // The `IoVec` type can't have a zero-length size, so create a dummy - // version from a 1-length slice which we'll overwrite with the - // `bytes_vec` method. - static DUMMY: &[u8] = &[0]; - let iovec = <&IoVec>::from(DUMMY); - let mut bufs = [ - iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, - iovec, iovec, iovec, - ]; - - let n = buf.bytes_vec(&mut bufs); - write_ready_vecs(&bufs[..n], raw_fd) -} - -unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize { - let iovecs = iovec::unix::as_os_slice(bufs); - - libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) -} diff --git a/tokio-uds/tests/datagram.rs b/tokio-uds/tests/datagram.rs index 9ae465107..f5d8c5bd2 100644 --- a/tokio-uds/tests/datagram.rs +++ b/tokio-uds/tests/datagram.rs @@ -1,78 +1,70 @@ #![cfg(unix)] +#![feature(async_await)] #![deny(warnings, rust_2018_idioms)] -use bytes::BytesMut; -use futures::{Future, Sink, Stream}; -use std::str; +use std::io; use tempfile; -use tokio::io; -use tokio::runtime::current_thread::Runtime; -use tokio_codec::{Decoder, Encoder}; use tokio_uds::*; -struct StringDatagramCodec; +// struct StringDatagramCodec; -/// A codec to decode datagrams from a unix domain socket as utf-8 text messages. -impl Encoder for StringDatagramCodec { - type Item = String; - type Error = io::Error; +// /// A codec to decode datagrams from a unix domain socket as utf-8 text messages. +// impl Encoder for StringDatagramCodec { +// type Item = String; +// type Error = io::Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.extend_from_slice(&item.into_bytes()); - Ok(()) +// fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { +// dst.extend_from_slice(&item.into_bytes()); +// Ok(()) +// } +// } + +// /// A codec to decode datagrams from a unix domain socket as utf-8 text messages. +// impl Decoder for StringDatagramCodec { +// type Item = String; +// type Error = io::Error; + +// fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { +// let decoded = str::from_utf8(buf) +// .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? +// .to_string(); + +// Ok(Some(decoded)) +// } +// } + +async fn echo_server(mut socket: UnixDatagram) -> io::Result<()> { + let mut recv_buf = vec![0u8; 1024]; + loop { + let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?; + if let Some(path) = peer_addr.as_pathname() { + socket.send_to(&recv_buf[..len], path).await?; + } } } -/// A codec to decode datagrams from a unix domain socket as utf-8 text messages. -impl Decoder for StringDatagramCodec { - type Item = String; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - let decoded = str::from_utf8(buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? - .to_string(); - - Ok(Some(decoded)) - } -} - -#[test] -fn framed_echo() { +#[tokio::test] +async fn echo() -> io::Result<()> { let dir = tempfile::tempdir().unwrap(); let server_path = dir.path().join("server.sock"); let client_path = dir.path().join("client.sock"); - let mut rt = Runtime::new().unwrap(); + let server_socket = UnixDatagram::bind(server_path.clone())?; + + tokio::spawn(async move { + if let Err(e) = echo_server(server_socket).await { + eprintln!("Error in echo server: {}", e); + } + }); { - let socket = UnixDatagram::bind(&server_path).unwrap(); - let server = UnixDatagramFramed::new(socket, StringDatagramCodec); - - let (sink, stream) = server.split(); - - let echo_stream = - stream.map(|(msg, addr)| (msg, addr.as_pathname().unwrap().to_path_buf())); - - // spawn echo server - rt.spawn( - echo_stream - .forward(sink) - .map_err(|e| panic!("err={:?}", e)) - .map(|_| ()), - ); + let mut socket = UnixDatagram::bind(&client_path).unwrap(); + socket.connect(server_path)?; + socket.send(b"ECHO").await?; + let mut recv_buf = [0u8; 16]; + let len = socket.recv(&mut recv_buf[..]).await?; + assert_eq!(&recv_buf[..len], b"ECHO"); } - { - let socket = UnixDatagram::bind(&client_path).unwrap(); - let client = UnixDatagramFramed::new(socket, StringDatagramCodec); - - let (sink, stream) = client.split(); - - rt.block_on(sink.send(("ECHO".to_string(), server_path))) - .unwrap(); - - let response = rt.block_on(stream.take(1).collect()).unwrap(); - assert_eq!(response[0].0, "ECHO"); - } + Ok(()) } diff --git a/tokio-uds/tests/stream.rs b/tokio-uds/tests/stream.rs index 216ae80a2..a05a7e960 100644 --- a/tokio-uds/tests/stream.rs +++ b/tokio-uds/tests/stream.rs @@ -1,51 +1,32 @@ #![cfg(unix)] +#![feature(async_await)] #![deny(warnings, rust_2018_idioms)] -use futures::sync::oneshot; -use futures::{Future, Stream}; +use futures::future::try_join; use tempfile::Builder; -use tokio::io; -use tokio::runtime::current_thread::Runtime; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_uds::*; -macro_rules! t { - ($e:expr) => { - match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - } - }; -} - -#[test] -fn echo() { +#[tokio::test] +async fn accept_read_write() -> std::io::Result<()> { let dir = Builder::new().prefix("tokio-uds-tests").tempdir().unwrap(); let sock_path = dir.path().join("connect.sock"); - let mut rt = Runtime::new().unwrap(); + let mut listener = UnixListener::bind(&sock_path)?; - let server = t!(UnixListener::bind(&sock_path)); - let (tx, rx) = oneshot::channel(); + let accept = listener.accept(); + let connect = UnixStream::connect(&sock_path); + let ((mut server, _), mut client) = try_join(accept, connect).await?; - rt.spawn({ - server - .incoming() - .into_future() - .and_then(move |(sock, _)| { - tx.send(sock.unwrap()).unwrap(); - Ok(()) - }) - .map_err(|e| panic!("err={:?}", e)) - }); - - let client = rt.block_on(UnixStream::connect(&sock_path)).unwrap(); - let server = rt.block_on(rx).unwrap(); - - // Write to the client - rt.block_on(io::write_all(client, b"hello")).unwrap(); - - // Read from the server - let (_, buf) = rt.block_on(io::read_to_end(server, vec![])).unwrap(); - - assert_eq!(buf, b"hello"); + // Write to the client. TODO: Switch to write_all. + let write_len = client.write(b"hello").await?; + assert_eq!(write_len, 5); + drop(client); + // Read from the server. TODO: Switch to read_to_end. + let mut buf = [0u8; 5]; + server.read_exact(&mut buf).await?; + assert_eq!(&buf, b"hello"); + let len = server.read(&mut buf).await?; + assert_eq!(len, 0); + Ok(()) }