mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
uds: update to std-future (#1227)
This commit is contained in:
parent
88e775dcf0
commit
80915906d8
@ -19,5 +19,5 @@ members = [
|
|||||||
"tokio-tcp",
|
"tokio-tcp",
|
||||||
# "tokio-tls",
|
# "tokio-tls",
|
||||||
"tokio-udp",
|
"tokio-udp",
|
||||||
# "tokio-uds",
|
"tokio-uds",
|
||||||
]
|
]
|
||||||
|
@ -37,7 +37,8 @@ jobs:
|
|||||||
- incoming
|
- incoming
|
||||||
# - tokio-tls
|
# - tokio-tls
|
||||||
tokio-udp: []
|
tokio-udp: []
|
||||||
# - tokio-uds
|
tokio-uds:
|
||||||
|
- async-traits
|
||||||
|
|
||||||
# Test crates that are NOT platform specific
|
# Test crates that are NOT platform specific
|
||||||
- template: ci/azure-test-stable.yml
|
- template: ci/azure-test-stable.yml
|
||||||
|
@ -21,9 +21,12 @@ Unix Domain sockets for Tokio
|
|||||||
categories = ["asynchronous"]
|
categories = ["asynchronous"]
|
||||||
publish = false
|
publish = false
|
||||||
|
|
||||||
|
[features]
|
||||||
|
async-traits = ["futures-core-preview"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "0.4.8"
|
bytes = "0.4.8"
|
||||||
futures = "0.1.21"
|
futures-core-preview = { version = "0.3.0-alpha.17", optional = true }
|
||||||
iovec = "0.1.2"
|
iovec = "0.1.2"
|
||||||
libc = "0.2.42"
|
libc = "0.2.42"
|
||||||
log = "0.4.2"
|
log = "0.4.2"
|
||||||
@ -36,3 +39,4 @@ tokio-io = { version = "0.2.0", path = "../tokio-io" }
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.2.0", path = "../tokio" }
|
tokio = { version = "0.2.0", path = "../tokio" }
|
||||||
tempfile = "3"
|
tempfile = "3"
|
||||||
|
futures-preview = "0.3.0-alpha.17"
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use crate::{RecvDgram, SendDgram};
|
use crate::{Recv, RecvFrom, Send, SendTo};
|
||||||
use futures::{try_ready, Async, Poll};
|
|
||||||
use mio::Ready;
|
use mio::Ready;
|
||||||
use mio_uds;
|
use mio_uds;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
@ -8,6 +7,8 @@ use std::net::Shutdown;
|
|||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
use std::os::unix::net::{self, SocketAddr};
|
use std::os::unix::net::{self, SocketAddr};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
use tokio_reactor::{Handle, PollEvented};
|
use tokio_reactor::{Handle, PollEvented};
|
||||||
|
|
||||||
/// An I/O object representing a Unix datagram socket.
|
/// An I/O object representing a Unix datagram socket.
|
||||||
@ -68,14 +69,219 @@ impl UnixDatagram {
|
|||||||
self.io.get_ref().connect(path)
|
self.io.get_ref().connect(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a future that sends data on the socket to the remote address to which it is connected.
|
||||||
|
/// On success, the future will resolve to the number of bytes written.
|
||||||
|
///
|
||||||
|
/// The [`connect`] method will connect this socket to a remote address. The future
|
||||||
|
/// will resolve to an error if the socket is not connected.
|
||||||
|
///
|
||||||
|
/// [`connect`]: #method.connect
|
||||||
|
pub fn send<'a, 'b>(&'a mut self, buf: &'b [u8]) -> Send<'a, 'b> {
|
||||||
|
Send::new(self, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends data on the socket to the remote address to which it is connected.
|
||||||
|
///
|
||||||
|
/// The [`connect`] method will connect this socket to a remote address. This
|
||||||
|
/// method will fail if the socket is not connected.
|
||||||
|
///
|
||||||
|
/// [`connect`]: #method.connect
|
||||||
|
///
|
||||||
|
/// # Return
|
||||||
|
///
|
||||||
|
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
|
||||||
|
///
|
||||||
|
/// If the socket is not ready for writing, the method returns
|
||||||
|
/// `Poll::Pending` and arranges for the current task to receive a
|
||||||
|
/// notification when the socket becomes writable.
|
||||||
|
pub fn poll_send(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_send_priv(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Poll IO functions that takes `&self` are provided for the split API.
|
||||||
|
//
|
||||||
|
// They are not public because (taken from the doc of `PollEvented`):
|
||||||
|
//
|
||||||
|
// While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
|
||||||
|
// caller must ensure that there are at most two tasks that use a
|
||||||
|
// `PollEvented` instance concurrently. One for reading and one for writing.
|
||||||
|
// While violating this requirement is "safe" from a Rust memory model point
|
||||||
|
// of view, it will result in unexpected behavior in the form of lost
|
||||||
|
// notifications and tasks hanging.
|
||||||
|
pub(crate) fn poll_send_priv(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
ready!(self.io.poll_write_ready(cx))?;
|
||||||
|
|
||||||
|
match self.io.get_ref().send(buf) {
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_write_ready(cx)?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
x => Poll::Ready(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a future that receives a single datagram message on the socket from
|
||||||
|
/// the remote address to which it is connected. On success, the future will resolve
|
||||||
|
/// to the number of bytes read.
|
||||||
|
///
|
||||||
|
/// The function must be called with valid byte array `buf` of sufficient size to
|
||||||
|
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
|
||||||
|
/// excess bytes may be discarded.
|
||||||
|
///
|
||||||
|
/// The [`connect`] method will connect this socket to a remote address. The future
|
||||||
|
/// will fail if the socket is not connected.
|
||||||
|
///
|
||||||
|
/// [`connect`]: #method.connect
|
||||||
|
pub fn recv<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> Recv<'a, 'b> {
|
||||||
|
Recv::new(self, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receives a single datagram message on the socket from the remote address to
|
||||||
|
/// which it is connected. On success, returns the number of bytes read.
|
||||||
|
///
|
||||||
|
/// The function must be called with valid byte array `buf` of sufficient size to
|
||||||
|
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
|
||||||
|
/// excess bytes may be discarded.
|
||||||
|
///
|
||||||
|
/// The [`connect`] method will connect this socket to a remote address. This
|
||||||
|
/// method will fail if the socket is not connected.
|
||||||
|
///
|
||||||
|
/// [`connect`]: #method.connect
|
||||||
|
///
|
||||||
|
/// # Return
|
||||||
|
///
|
||||||
|
/// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
|
||||||
|
///
|
||||||
|
/// If no data is available for reading, the method returns
|
||||||
|
/// `Poll::Pending` and arranges for the current task to receive a
|
||||||
|
/// notification when the socket becomes receivable or is closed.
|
||||||
|
pub fn poll_recv(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_recv_priv(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn poll_recv_priv(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
|
||||||
|
|
||||||
|
match self.io.get_ref().recv(buf) {
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_read_ready(cx, mio::Ready::readable())?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
x => Poll::Ready(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a future that sends data on the socket to the given address.
|
||||||
|
/// On success, the future will resolve to the number of bytes written.
|
||||||
|
///
|
||||||
|
/// The future will resolve to an error if the IP version of the socket does
|
||||||
|
/// not match that of `target`.
|
||||||
|
pub fn send_to<'a, 'b, P>(&'a mut self, buf: &'b [u8], target: P) -> SendTo<'a, 'b, P>
|
||||||
|
where
|
||||||
|
P: AsRef<Path> + Unpin,
|
||||||
|
{
|
||||||
|
SendTo::new(self, buf, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends data on the socket to the given address. On success, returns the
|
||||||
|
/// number of bytes written.
|
||||||
|
///
|
||||||
|
/// This will return an error when the IP version of the local socket
|
||||||
|
/// does not match that of `target`.
|
||||||
|
///
|
||||||
|
/// # Return
|
||||||
|
///
|
||||||
|
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
|
||||||
|
///
|
||||||
|
/// If the socket is not ready for writing, the method returns
|
||||||
|
/// `Poll::Pending` and arranges for the current task to receive a
|
||||||
|
/// notification when the socket becomes writable.
|
||||||
|
pub fn poll_send_to<P: AsRef<Path>>(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
target: P,
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_send_to_priv(cx, buf, target.as_ref())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn poll_send_to_priv(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
target: &Path,
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
ready!(self.io.poll_write_ready(cx))?;
|
||||||
|
|
||||||
|
match self.io.get_ref().send_to(buf, target) {
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_write_ready(cx)?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
x => Poll::Ready(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a future that receives a single datagram on the socket. On success,
|
||||||
|
/// the future resolves to the number of bytes read and the origin.
|
||||||
|
///
|
||||||
|
/// The function must be called with valid byte array `buf` of sufficient size
|
||||||
|
/// to hold the message bytes. If a message is too long to fit in the supplied
|
||||||
|
/// buffer, excess bytes may be discarded.
|
||||||
|
pub fn recv_from<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> RecvFrom<'a, 'b> {
|
||||||
|
RecvFrom::new(self, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receives data from the socket. On success, returns the number of bytes
|
||||||
|
/// read and the address from whence the data came.
|
||||||
|
pub fn poll_recv_from(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<Result<(usize, SocketAddr), io::Error>> {
|
||||||
|
self.poll_recv_from_priv(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn poll_recv_from_priv(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<Result<(usize, SocketAddr), io::Error>> {
|
||||||
|
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
|
||||||
|
|
||||||
|
match self.io.get_ref().recv_from(buf) {
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_read_ready(cx, mio::Ready::readable())?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
x => Poll::Ready(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Test whether this socket is ready to be read or not.
|
/// Test whether this socket is ready to be read or not.
|
||||||
pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
|
pub fn poll_read_ready(&self, cx: &mut Context<'_>, ready: Ready) -> Poll<io::Result<Ready>> {
|
||||||
self.io.poll_read_ready(ready)
|
self.io.poll_read_ready(cx, ready)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test whether this socket is ready to be written to or not.
|
/// Test whether this socket is ready to be written to or not.
|
||||||
pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
|
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||||
self.io.poll_write_ready()
|
self.io.poll_write_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the local address that this socket is bound to.
|
/// Returns the local address that this socket is bound to.
|
||||||
@ -90,94 +296,6 @@ impl UnixDatagram {
|
|||||||
self.io.get_ref().peer_addr()
|
self.io.get_ref().peer_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receives data from the socket.
|
|
||||||
///
|
|
||||||
/// On success, returns the number of bytes read and the address from
|
|
||||||
/// whence the data came.
|
|
||||||
pub fn poll_recv_from(&self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
|
|
||||||
try_ready!(self.io.poll_read_ready(Ready::readable()));
|
|
||||||
|
|
||||||
match self.io.get_ref().recv_from(buf) {
|
|
||||||
Ok(ret) => Ok(ret.into()),
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
|
||||||
self.io.clear_read_ready(Ready::readable())?;
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Receives data from the socket.
|
|
||||||
///
|
|
||||||
/// On success, returns the number of bytes read.
|
|
||||||
pub fn poll_recv(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
|
|
||||||
try_ready!(self.io.poll_read_ready(Ready::readable()));
|
|
||||||
|
|
||||||
match self.io.get_ref().recv(buf) {
|
|
||||||
Ok(ret) => Ok(ret.into()),
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
|
||||||
self.io.clear_read_ready(Ready::readable())?;
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a future for receiving a datagram. See the documentation on RecvDgram for details.
|
|
||||||
pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T>
|
|
||||||
where
|
|
||||||
T: AsMut<[u8]>,
|
|
||||||
{
|
|
||||||
RecvDgram::new(self, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sends data on the socket to the specified address.
|
|
||||||
///
|
|
||||||
/// On success, returns the number of bytes written.
|
|
||||||
pub fn poll_send_to<P>(&self, buf: &[u8], path: P) -> Poll<usize, io::Error>
|
|
||||||
where
|
|
||||||
P: AsRef<Path>,
|
|
||||||
{
|
|
||||||
try_ready!(self.io.poll_write_ready());
|
|
||||||
|
|
||||||
match self.io.get_ref().send_to(buf, path) {
|
|
||||||
Ok(ret) => Ok(ret.into()),
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
|
||||||
self.io.clear_write_ready()?;
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sends data on the socket to the socket's peer.
|
|
||||||
///
|
|
||||||
/// The peer address may be set by the `connect` method, and this method
|
|
||||||
/// will return an error if the socket has not already been connected.
|
|
||||||
///
|
|
||||||
/// On success, returns the number of bytes written.
|
|
||||||
pub fn poll_send(&self, buf: &[u8]) -> Poll<usize, io::Error> {
|
|
||||||
try_ready!(self.io.poll_write_ready());
|
|
||||||
|
|
||||||
match self.io.get_ref().send(buf) {
|
|
||||||
Ok(ret) => Ok(ret.into()),
|
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
|
||||||
self.io.clear_write_ready()?;
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a future sending the data in buf to the socket at path.
|
|
||||||
pub fn send_dgram<T, P>(self, buf: T, path: P) -> SendDgram<T, P>
|
|
||||||
where
|
|
||||||
T: AsRef<[u8]>,
|
|
||||||
P: AsRef<Path>,
|
|
||||||
{
|
|
||||||
SendDgram::new(self, buf, path)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the value of the `SO_ERROR` option.
|
/// Returns the value of the `SO_ERROR` option.
|
||||||
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
|
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
|
||||||
self.io.get_ref().take_error()
|
self.io.get_ref().take_error()
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
|
#![cfg(feature = "async-traits")]
|
||||||
|
|
||||||
use crate::{UnixListener, UnixStream};
|
use crate::{UnixListener, UnixStream};
|
||||||
use futures::{try_ready, Poll, Stream};
|
use futures_core::stream::Stream;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
/// Stream of listeners
|
/// Stream of listeners
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
#[must_use = "streams do nothing unless polled"]
|
||||||
pub struct Incoming {
|
pub struct Incoming {
|
||||||
inner: UnixListener,
|
inner: UnixListener,
|
||||||
}
|
}
|
||||||
@ -15,10 +20,10 @@ impl Incoming {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for Incoming {
|
impl Stream for Incoming {
|
||||||
type Item = UnixStream;
|
type Item = io::Result<UnixStream>;
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
Ok(Some(try_ready!(self.inner.poll_accept()).0).into())
|
let (socket, _) = ready!(Pin::new(&mut self.inner).poll_accept(cx))?;
|
||||||
|
Poll::Ready(Some(Ok(socket)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,20 +8,34 @@
|
|||||||
//!
|
//!
|
||||||
//! This crate provides APIs for using Unix Domain Sockets with Tokio.
|
//! This crate provides APIs for using Unix Domain Sockets with Tokio.
|
||||||
|
|
||||||
|
macro_rules! ready {
|
||||||
|
($e:expr) => {
|
||||||
|
match $e {
|
||||||
|
::std::task::Poll::Ready(t) => t,
|
||||||
|
::std::task::Poll::Pending => return ::std::task::Poll::Pending,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
mod datagram;
|
mod datagram;
|
||||||
mod frame;
|
// mod frame;
|
||||||
mod incoming;
|
mod incoming;
|
||||||
mod listener;
|
mod listener;
|
||||||
mod recv_dgram;
|
mod recv;
|
||||||
mod send_dgram;
|
mod recv_from;
|
||||||
|
mod send;
|
||||||
|
mod send_to;
|
||||||
mod stream;
|
mod stream;
|
||||||
mod ucred;
|
mod ucred;
|
||||||
|
|
||||||
pub use crate::datagram::UnixDatagram;
|
pub use crate::datagram::UnixDatagram;
|
||||||
pub use crate::frame::UnixDatagramFramed;
|
pub use crate::recv::Recv;
|
||||||
|
pub use crate::recv_from::RecvFrom;
|
||||||
|
pub use crate::send::Send;
|
||||||
|
pub use crate::send_to::SendTo;
|
||||||
|
// pub use crate::frame::UnixDatagramFramed;
|
||||||
|
#[cfg(feature = "async-traits")]
|
||||||
pub use crate::incoming::Incoming;
|
pub use crate::incoming::Incoming;
|
||||||
pub use crate::listener::UnixListener;
|
pub use crate::listener::{Accept, UnixListener};
|
||||||
pub use crate::recv_dgram::RecvDgram;
|
|
||||||
pub use crate::send_dgram::SendDgram;
|
|
||||||
pub use crate::stream::{ConnectFuture, UnixStream};
|
pub use crate::stream::{ConnectFuture, UnixStream};
|
||||||
pub use crate::ucred::UCred;
|
pub use crate::ucred::UCred;
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
use crate::{Incoming, UnixStream};
|
use crate::UnixStream;
|
||||||
use futures::{try_ready, Async, Poll};
|
|
||||||
use mio::Ready;
|
use mio::Ready;
|
||||||
use mio_uds;
|
use mio_uds;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
use std::os::unix::net::{self, SocketAddr};
|
use std::os::unix::net::{self, SocketAddr};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
use tokio_reactor::{Handle, PollEvented};
|
use tokio_reactor::{Handle, PollEvented};
|
||||||
|
|
||||||
/// A Unix socket which can accept connections from other Unix sockets.
|
/// A Unix socket which can accept connections from other Unix sockets.
|
||||||
@ -43,8 +45,8 @@ impl UnixListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Test whether this socket is ready to be read or not.
|
/// Test whether this socket is ready to be read or not.
|
||||||
pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
|
pub fn poll_read_ready(&self, cx: &mut Context<'_>, ready: Ready) -> Poll<io::Result<Ready>> {
|
||||||
self.io.poll_read_ready(ready)
|
self.io.poll_read_ready(cx, ready)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the value of the `SO_ERROR` option.
|
/// Returns the value of the `SO_ERROR` option.
|
||||||
@ -52,6 +54,12 @@ impl UnixListener {
|
|||||||
self.io.get_ref().take_error()
|
self.io.get_ref().take_error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a future that attempts to accept a connection and creates a new
|
||||||
|
/// connected `UnixStream` if successful.
|
||||||
|
pub fn accept(&mut self) -> Accept<'_> {
|
||||||
|
Accept { listener: self }
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempt to accept a connection and create a new connected `UnixStream`
|
/// Attempt to accept a connection and create a new connected `UnixStream`
|
||||||
/// if successful.
|
/// if successful.
|
||||||
///
|
///
|
||||||
@ -70,11 +78,14 @@ impl UnixListener {
|
|||||||
/// This function will panic if it is called outside the context of a
|
/// This function will panic if it is called outside the context of a
|
||||||
/// future's task. It's recommended to only call this from the
|
/// future's task. It's recommended to only call this from the
|
||||||
/// implementation of a `Future::poll`, if necessary.
|
/// implementation of a `Future::poll`, if necessary.
|
||||||
pub fn poll_accept(&self) -> Poll<(UnixStream, SocketAddr), io::Error> {
|
pub fn poll_accept(
|
||||||
let (io, addr) = try_ready!(self.poll_accept_std());
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
|
||||||
|
let (io, addr) = ready!(self.poll_accept_std(cx))?;
|
||||||
|
|
||||||
let io = mio_uds::UnixStream::from_stream(io)?;
|
let io = mio_uds::UnixStream::from_stream(io)?;
|
||||||
Ok((UnixStream::new(io), addr).into())
|
Ok((UnixStream::new(io), addr)).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempt to accept a connection and create a new connected `UnixStream`
|
/// Attempt to accept a connection and create a new connected `UnixStream`
|
||||||
@ -100,24 +111,23 @@ impl UnixListener {
|
|||||||
/// This function will panic if it is called outside the context of a
|
/// This function will panic if it is called outside the context of a
|
||||||
/// future's task. It's recommended to only call this from the
|
/// future's task. It's recommended to only call this from the
|
||||||
/// implementation of a `Future::poll`, if necessary.
|
/// implementation of a `Future::poll`, if necessary.
|
||||||
pub fn poll_accept_std(&self) -> Poll<(net::UnixStream, SocketAddr), io::Error> {
|
pub fn poll_accept_std(
|
||||||
loop {
|
self: Pin<&mut Self>,
|
||||||
try_ready!(self.io.poll_read_ready(Ready::readable()));
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> {
|
||||||
|
ready!(self.io.poll_read_ready(cx, Ready::readable()))?;
|
||||||
|
|
||||||
match self.io.get_ref().accept_std() {
|
match self.io.get_ref().accept_std() {
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
self.io.clear_read_ready(Ready::readable())?;
|
self.io.clear_read_ready(cx, Ready::readable())?;
|
||||||
return Ok(Async::NotReady);
|
Poll::Pending
|
||||||
}
|
|
||||||
Ok(Some((sock, addr))) => {
|
|
||||||
return Ok(Async::Ready((sock, addr)));
|
|
||||||
}
|
|
||||||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
|
|
||||||
self.io.clear_read_ready(Ready::readable())?;
|
|
||||||
return Ok(Async::NotReady);
|
|
||||||
}
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
}
|
}
|
||||||
|
Ok(Some((sock, addr))) => Ok((sock, addr)).into(),
|
||||||
|
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_read_ready(cx, Ready::readable())?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
Err(err) => Err(err).into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,8 +136,9 @@ impl UnixListener {
|
|||||||
///
|
///
|
||||||
/// This method returns an implementation of the `Stream` trait which
|
/// This method returns an implementation of the `Stream` trait which
|
||||||
/// resolves to the sockets the are accepted on this listener.
|
/// resolves to the sockets the are accepted on this listener.
|
||||||
pub fn incoming(self) -> Incoming {
|
#[cfg(feature = "async-traits")]
|
||||||
Incoming::new(self)
|
pub fn incoming(self) -> crate::Incoming {
|
||||||
|
crate::Incoming::new(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,3 +165,18 @@ impl AsRawFd for UnixListener {
|
|||||||
self.io.get_ref().as_raw_fd()
|
self.io.get_ref().as_raw_fd()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Future type returned by [`UnixListener::accept`].
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Accept<'a> {
|
||||||
|
listener: &'a mut UnixListener,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Future for Accept<'a> {
|
||||||
|
type Output = io::Result<(UnixStream, SocketAddr)>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
Pin::new(&mut *self.listener).poll_accept(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
30
tokio-uds/src/recv.rs
Normal file
30
tokio-uds/src/recv.rs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
use crate::UnixDatagram;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
/// A future that receives a datagram from the connected address.
|
||||||
|
///
|
||||||
|
/// This `struct` is created by [`recv`](crate::UnixDatagram::recv).
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Recv<'a, 'b> {
|
||||||
|
socket: &'a mut UnixDatagram,
|
||||||
|
buf: &'b mut [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> Recv<'a, 'b> {
|
||||||
|
pub(crate) fn new(socket: &'a mut UnixDatagram, buf: &'b mut [u8]) -> Self {
|
||||||
|
Self { socket, buf }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> Future for Recv<'a, 'b> {
|
||||||
|
type Output = io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let Recv { socket, buf } = self.get_mut();
|
||||||
|
socket.poll_recv_priv(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
@ -1,72 +0,0 @@
|
|||||||
use crate::UnixDatagram;
|
|
||||||
use futures::{try_ready, Async, Future, Poll};
|
|
||||||
use std::io;
|
|
||||||
use std::mem;
|
|
||||||
|
|
||||||
/// A future for receiving datagrams from a Unix datagram socket.
|
|
||||||
///
|
|
||||||
/// An example that uses UDP sockets but is still applicable can be found at
|
|
||||||
/// https://gist.github.com/dermesser/e331094c2ab28fc7f6ba8a16183fe4d5.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct RecvDgram<T> {
|
|
||||||
st: State<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A future similar to RecvDgram, but without allocating and returning the peer's address.
|
|
||||||
///
|
|
||||||
/// This can be used if the peer's address is of no interest, so the allocation overhead can be
|
|
||||||
/// avoided.
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum State<T> {
|
|
||||||
Receiving { sock: UnixDatagram, buf: T },
|
|
||||||
Empty,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> RecvDgram<T>
|
|
||||||
where
|
|
||||||
T: AsMut<[u8]>,
|
|
||||||
{
|
|
||||||
pub(crate) fn new(sock: UnixDatagram, buf: T) -> RecvDgram<T> {
|
|
||||||
RecvDgram {
|
|
||||||
st: State::Receiving { sock, buf },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Future for RecvDgram<T>
|
|
||||||
where
|
|
||||||
T: AsMut<[u8]>,
|
|
||||||
{
|
|
||||||
/// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were
|
|
||||||
/// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the
|
|
||||||
/// datagram is truncated.
|
|
||||||
type Item = (UnixDatagram, T, usize, String);
|
|
||||||
/// This future yields io::Error if an error occurred.
|
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
let received;
|
|
||||||
let peer;
|
|
||||||
|
|
||||||
if let State::Receiving {
|
|
||||||
ref mut sock,
|
|
||||||
ref mut buf,
|
|
||||||
} = self.st
|
|
||||||
{
|
|
||||||
let (n, p) = try_ready!(sock.poll_recv_from(buf.as_mut()));
|
|
||||||
received = n;
|
|
||||||
|
|
||||||
peer = p.as_pathname().map_or(String::new(), |p| {
|
|
||||||
p.to_str().map_or(String::new(), |s| s.to_string())
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
panic!()
|
|
||||||
}
|
|
||||||
|
|
||||||
if let State::Receiving { sock, buf } = mem::replace(&mut self.st, State::Empty) {
|
|
||||||
Ok(Async::Ready((sock, buf, received, peer)))
|
|
||||||
} else {
|
|
||||||
panic!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
31
tokio-uds/src/recv_from.rs
Normal file
31
tokio-uds/src/recv_from.rs
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
use crate::UnixDatagram;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::os::unix::net::SocketAddr;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
/// A future that receives a datagram.
|
||||||
|
///
|
||||||
|
/// This `struct` is created by [`recv_from`](crate::UnixDatagram::recv_from).
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RecvFrom<'a, 'b> {
|
||||||
|
socket: &'a UnixDatagram,
|
||||||
|
buf: &'b mut [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> RecvFrom<'a, 'b> {
|
||||||
|
pub(crate) fn new(socket: &'a UnixDatagram, buf: &'b mut [u8]) -> Self {
|
||||||
|
Self { socket, buf }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> Future for RecvFrom<'a, 'b> {
|
||||||
|
type Output = io::Result<(usize, SocketAddr)>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let RecvFrom { socket, buf } = self.get_mut();
|
||||||
|
socket.poll_recv_from_priv(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
30
tokio-uds/src/send.rs
Normal file
30
tokio-uds/src/send.rs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
use crate::UnixDatagram;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
/// A future that sends a datagram to the connected address.
|
||||||
|
///
|
||||||
|
/// This `struct` is created by [`send`](crate::UnixDatagram::send).
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Send<'a, 'b> {
|
||||||
|
socket: &'a UnixDatagram,
|
||||||
|
buf: &'b [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> Send<'a, 'b> {
|
||||||
|
pub(crate) fn new(socket: &'a UnixDatagram, buf: &'b [u8]) -> Self {
|
||||||
|
Self { socket, buf }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> Future for Send<'a, 'b> {
|
||||||
|
type Output = io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let Send { socket, buf } = self.get_mut();
|
||||||
|
socket.poll_send_priv(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
@ -1,73 +0,0 @@
|
|||||||
use crate::UnixDatagram;
|
|
||||||
use futures::{try_ready, Async, Future, Poll};
|
|
||||||
use std::io;
|
|
||||||
use std::mem;
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
/// A future for writing a buffer to a Unix datagram socket.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct SendDgram<T, P> {
|
|
||||||
st: State<T, P>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum State<T, P> {
|
|
||||||
/// current state is Sending
|
|
||||||
Sending {
|
|
||||||
/// the underlying socket
|
|
||||||
sock: UnixDatagram,
|
|
||||||
/// the buffer to send
|
|
||||||
buf: T,
|
|
||||||
/// the destination
|
|
||||||
addr: P,
|
|
||||||
},
|
|
||||||
/// neutral state
|
|
||||||
Empty,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, P> SendDgram<T, P>
|
|
||||||
where
|
|
||||||
T: AsRef<[u8]>,
|
|
||||||
P: AsRef<Path>,
|
|
||||||
{
|
|
||||||
pub(crate) fn new(sock: UnixDatagram, buf: T, addr: P) -> SendDgram<T, P> {
|
|
||||||
SendDgram {
|
|
||||||
st: State::Sending { sock, buf, addr },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, P> Future for SendDgram<T, P>
|
|
||||||
where
|
|
||||||
T: AsRef<[u8]>,
|
|
||||||
P: AsRef<Path>,
|
|
||||||
{
|
|
||||||
/// Returns the underlying socket and the buffer that was sent.
|
|
||||||
type Item = (UnixDatagram, T);
|
|
||||||
/// The error that is returned when sending failed.
|
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
if let State::Sending {
|
|
||||||
ref mut sock,
|
|
||||||
ref buf,
|
|
||||||
ref addr,
|
|
||||||
} = self.st
|
|
||||||
{
|
|
||||||
let n = try_ready!(sock.poll_send_to(buf.as_ref(), addr));
|
|
||||||
if n < buf.as_ref().len() {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::Other,
|
|
||||||
"Couldn't send whole buffer".to_string(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
panic!()
|
|
||||||
}
|
|
||||||
if let State::Sending { sock, buf, addr: _ } = mem::replace(&mut self.st, State::Empty) {
|
|
||||||
Ok(Async::Ready((sock, buf)))
|
|
||||||
} else {
|
|
||||||
panic!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
43
tokio-uds/src/send_to.rs
Normal file
43
tokio-uds/src/send_to.rs
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
use crate::UnixDatagram;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
/// A future that sends a datagram to a given address.
|
||||||
|
///
|
||||||
|
/// This `struct` is created by [`send_to`](crate::UnixDatagram::send_to).
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SendTo<'a, 'b, P> {
|
||||||
|
socket: &'a UnixDatagram,
|
||||||
|
buf: &'b [u8],
|
||||||
|
target: P,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, P> SendTo<'a, 'b, P> {
|
||||||
|
pub(crate) fn new(socket: &'a UnixDatagram, buf: &'b [u8], target: P) -> Self {
|
||||||
|
Self {
|
||||||
|
socket,
|
||||||
|
buf,
|
||||||
|
target,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, P> Future for SendTo<'a, 'b, P>
|
||||||
|
where
|
||||||
|
P: AsRef<Path> + Unpin,
|
||||||
|
{
|
||||||
|
type Output = io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let SendTo {
|
||||||
|
socket,
|
||||||
|
buf,
|
||||||
|
target,
|
||||||
|
} = self.get_mut();
|
||||||
|
socket.poll_send_to_priv(cx, buf, target.as_ref())
|
||||||
|
}
|
||||||
|
}
|
@ -1,17 +1,18 @@
|
|||||||
use crate::ucred::{self, UCred};
|
use crate::ucred::{self, UCred};
|
||||||
use bytes::{Buf, BufMut};
|
use bytes::{Buf, BufMut};
|
||||||
use futures::{Async, Future, Poll};
|
use iovec::IoVec;
|
||||||
use iovec::{self, IoVec};
|
|
||||||
use libc;
|
|
||||||
use mio::Ready;
|
use mio::Ready;
|
||||||
use mio_uds;
|
use mio_uds;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::future::Future;
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use std::net::Shutdown;
|
use std::net::Shutdown;
|
||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
use std::os::unix::net::{self, SocketAddr};
|
use std::os::unix::net::{self, SocketAddr};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_reactor::{Handle, PollEvented};
|
use tokio_reactor::{Handle, PollEvented};
|
||||||
|
|
||||||
@ -27,15 +28,9 @@ pub struct UnixStream {
|
|||||||
/// Future returned by `UnixStream::connect` which will resolve to a
|
/// Future returned by `UnixStream::connect` which will resolve to a
|
||||||
/// `UnixStream` when the stream is connected.
|
/// `UnixStream` when the stream is connected.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
pub struct ConnectFuture {
|
pub struct ConnectFuture {
|
||||||
inner: State,
|
stream: Option<io::Result<UnixStream>>,
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum State {
|
|
||||||
Waiting(UnixStream),
|
|
||||||
Error(io::Error),
|
|
||||||
Empty,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UnixStream {
|
impl UnixStream {
|
||||||
@ -49,13 +44,7 @@ impl UnixStream {
|
|||||||
P: AsRef<Path>,
|
P: AsRef<Path>,
|
||||||
{
|
{
|
||||||
let res = mio_uds::UnixStream::connect(path).map(UnixStream::new);
|
let res = mio_uds::UnixStream::connect(path).map(UnixStream::new);
|
||||||
|
ConnectFuture { stream: Some(res) }
|
||||||
let inner = match res {
|
|
||||||
Ok(stream) => State::Waiting(stream),
|
|
||||||
Err(e) => State::Error(e),
|
|
||||||
};
|
|
||||||
|
|
||||||
ConnectFuture { inner }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Consumes a `UnixStream` in the standard library and returns a
|
/// Consumes a `UnixStream` in the standard library and returns a
|
||||||
@ -89,13 +78,13 @@ impl UnixStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Test whether this socket is ready to be read or not.
|
/// Test whether this socket is ready to be read or not.
|
||||||
pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
|
pub fn poll_read_ready(&self, cx: &mut Context<'_>, ready: Ready) -> Poll<io::Result<Ready>> {
|
||||||
self.io.poll_read_ready(ready)
|
self.io.poll_read_ready(cx, ready)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test whether this socket is ready to be written to or not.
|
/// Test whether this socket is ready to be written to or not.
|
||||||
pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
|
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||||
self.io.poll_write_ready()
|
self.io.poll_write_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the socket address of the local half of this connection.
|
/// Returns the socket address of the local half of this connection.
|
||||||
@ -140,109 +129,189 @@ impl TryFrom<UnixStream> for mio_uds::UnixStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Read for UnixStream {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
||||||
self.io.read(buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Write for UnixStream {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
||||||
self.io.write(buf)
|
|
||||||
}
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
|
||||||
self.io.flush()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for UnixStream {
|
impl AsyncRead for UnixStream {
|
||||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
fn poll_read(
|
||||||
<&UnixStream>::read_buf(&mut &*self, buf)
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_read_priv(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_read_buf<B: BufMut>(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut B,
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_read_buf_priv(cx, buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncWrite for UnixStream {
|
impl AsyncWrite for UnixStream {
|
||||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
fn poll_write(
|
||||||
<&UnixStream>::shutdown(&mut &*self)
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_write_priv(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
<&UnixStream>::write_buf(&mut &*self, buf)
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_buf<B: Buf>(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut B,
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_write_buf_priv(cx, buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Read for &'a UnixStream {
|
impl UnixStream {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
// == Poll IO functions that takes `&self` ==
|
||||||
(&self.io).read(buf)
|
//
|
||||||
}
|
// They are not public because (taken from the doc of `PollEvented`):
|
||||||
}
|
//
|
||||||
|
// While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
|
||||||
|
// caller must ensure that there are at most two tasks that use a
|
||||||
|
// `PollEvented` instance concurrently. One for reading and one for writing.
|
||||||
|
// While violating this requirement is "safe" from a Rust memory model point
|
||||||
|
// of view, it will result in unexpected behavior in the form of lost
|
||||||
|
// notifications and tasks hanging.
|
||||||
|
|
||||||
impl<'a> Write for &'a UnixStream {
|
pub(crate) fn poll_read_priv(
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
&self,
|
||||||
(&self.io).write(buf)
|
cx: &mut Context<'_>,
|
||||||
}
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
|
||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
match self.io.get_ref().read(buf) {
|
||||||
(&self.io).flush()
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
}
|
self.io.clear_read_ready(cx, mio::Ready::readable())?;
|
||||||
}
|
Poll::Pending
|
||||||
|
|
||||||
impl<'a> AsyncRead for &'a UnixStream {
|
|
||||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
|
||||||
if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
|
|
||||||
return Ok(Async::NotReady);
|
|
||||||
}
|
|
||||||
unsafe {
|
|
||||||
let r = read_ready(buf, self.as_raw_fd());
|
|
||||||
if r == -1 {
|
|
||||||
let e = io::Error::last_os_error();
|
|
||||||
if e.kind() == io::ErrorKind::WouldBlock {
|
|
||||||
self.io.clear_read_ready(Ready::readable())?;
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
} else {
|
|
||||||
Err(e)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let r = r as usize;
|
|
||||||
buf.advance_mut(r);
|
|
||||||
Ok(r.into())
|
|
||||||
}
|
}
|
||||||
|
x => Poll::Ready(x),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> AsyncWrite for &'a UnixStream {
|
pub(crate) fn poll_read_buf_priv<B: BufMut>(
|
||||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
&self,
|
||||||
Ok(().into())
|
cx: &mut Context<'_>,
|
||||||
}
|
buf: &mut B,
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
|
||||||
|
|
||||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
let r = unsafe {
|
||||||
if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
|
// The `IoVec` type can't have a 0-length size, so we create a bunch
|
||||||
return Ok(Async::NotReady);
|
// of dummy versions on the stack with 1 length which we'll quickly
|
||||||
}
|
// overwrite.
|
||||||
unsafe {
|
let b1: &mut [u8] = &mut [0];
|
||||||
let r = write_ready(buf, self.as_raw_fd());
|
let b2: &mut [u8] = &mut [0];
|
||||||
if r == -1 {
|
let b3: &mut [u8] = &mut [0];
|
||||||
let e = io::Error::last_os_error();
|
let b4: &mut [u8] = &mut [0];
|
||||||
if e.kind() == io::ErrorKind::WouldBlock {
|
let b5: &mut [u8] = &mut [0];
|
||||||
self.io.clear_write_ready()?;
|
let b6: &mut [u8] = &mut [0];
|
||||||
Ok(Async::NotReady)
|
let b7: &mut [u8] = &mut [0];
|
||||||
} else {
|
let b8: &mut [u8] = &mut [0];
|
||||||
Err(e)
|
let b9: &mut [u8] = &mut [0];
|
||||||
|
let b10: &mut [u8] = &mut [0];
|
||||||
|
let b11: &mut [u8] = &mut [0];
|
||||||
|
let b12: &mut [u8] = &mut [0];
|
||||||
|
let b13: &mut [u8] = &mut [0];
|
||||||
|
let b14: &mut [u8] = &mut [0];
|
||||||
|
let b15: &mut [u8] = &mut [0];
|
||||||
|
let b16: &mut [u8] = &mut [0];
|
||||||
|
let mut bufs: [&mut IoVec; 16] = [
|
||||||
|
b1.into(),
|
||||||
|
b2.into(),
|
||||||
|
b3.into(),
|
||||||
|
b4.into(),
|
||||||
|
b5.into(),
|
||||||
|
b6.into(),
|
||||||
|
b7.into(),
|
||||||
|
b8.into(),
|
||||||
|
b9.into(),
|
||||||
|
b10.into(),
|
||||||
|
b11.into(),
|
||||||
|
b12.into(),
|
||||||
|
b13.into(),
|
||||||
|
b14.into(),
|
||||||
|
b15.into(),
|
||||||
|
b16.into(),
|
||||||
|
];
|
||||||
|
let n = buf.bytes_vec_mut(&mut bufs);
|
||||||
|
self.io.get_ref().read_bufs(&mut bufs[..n])
|
||||||
|
};
|
||||||
|
|
||||||
|
match r {
|
||||||
|
Ok(n) => {
|
||||||
|
unsafe {
|
||||||
|
buf.advance_mut(n);
|
||||||
}
|
}
|
||||||
} else {
|
Poll::Ready(Ok(n))
|
||||||
let r = r as usize;
|
|
||||||
buf.advance(r);
|
|
||||||
Ok(r.into())
|
|
||||||
}
|
}
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_read_ready(cx, mio::Ready::readable())?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
Err(e) => Poll::Ready(Err(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn poll_write_priv(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
ready!(self.io.poll_write_ready(cx))?;
|
||||||
|
|
||||||
|
match self.io.get_ref().write(buf) {
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_write_ready(cx)?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
x => Poll::Ready(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn poll_write_buf_priv<B: Buf>(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut B,
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
ready!(self.io.poll_write_ready(cx))?;
|
||||||
|
|
||||||
|
let r = {
|
||||||
|
// The `IoVec` type can't have a zero-length size, so create a dummy
|
||||||
|
// version from a 1-length slice which we'll overwrite with the
|
||||||
|
// `bytes_vec` method.
|
||||||
|
static DUMMY: &[u8] = &[0];
|
||||||
|
let iovec = <&IoVec>::from(DUMMY);
|
||||||
|
let mut bufs = [iovec; 64];
|
||||||
|
let n = buf.bytes_vec(&mut bufs);
|
||||||
|
self.io.get_ref().write_bufs(&bufs[..n])
|
||||||
|
};
|
||||||
|
match r {
|
||||||
|
Ok(n) => {
|
||||||
|
buf.advance(n);
|
||||||
|
Poll::Ready(Ok(n))
|
||||||
|
}
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
self.io.clear_write_ready(cx)?;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
Err(e) => Poll::Ready(Err(e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -260,106 +329,27 @@ impl AsRawFd for UnixStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Future for ConnectFuture {
|
impl Future for ConnectFuture {
|
||||||
type Item = UnixStream;
|
type Output = io::Result<UnixStream>;
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<UnixStream, io::Error> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
use std::mem;
|
let stream = self
|
||||||
|
.stream
|
||||||
|
.take()
|
||||||
|
.expect("ConnectFuture polled after completion")?;
|
||||||
|
|
||||||
match self.inner {
|
match stream.io.poll_write_ready(cx) {
|
||||||
State::Waiting(ref mut stream) => {
|
Poll::Pending => {
|
||||||
if let Async::NotReady = stream.io.poll_write_ready()? {
|
self.stream = Some(Ok(stream));
|
||||||
return Ok(Async::NotReady);
|
return Poll::Pending;
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(e) = stream.io.get_ref().take_error()? {
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
State::Error(_) => {
|
Poll::Ready(Err(e)) => return Err(e).into(),
|
||||||
let e = match mem::replace(&mut self.inner, State::Empty) {
|
_ => (),
|
||||||
State::Error(e) => e,
|
|
||||||
_ => unreachable!(),
|
|
||||||
};
|
|
||||||
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
State::Empty => panic!("can't poll stream twice"),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
match mem::replace(&mut self.inner, State::Empty) {
|
if let Some(e) = stream.io.get_ref().take_error()? {
|
||||||
State::Waiting(stream) => Ok(Async::Ready(stream)),
|
return Err(e).into();
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(stream).into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
|
|
||||||
// The `IoVec` type can't have a 0-length size, so we create a bunch
|
|
||||||
// of dummy versions on the stack with 1 length which we'll quickly
|
|
||||||
// overwrite.
|
|
||||||
let b1: &mut [u8] = &mut [0];
|
|
||||||
let b2: &mut [u8] = &mut [0];
|
|
||||||
let b3: &mut [u8] = &mut [0];
|
|
||||||
let b4: &mut [u8] = &mut [0];
|
|
||||||
let b5: &mut [u8] = &mut [0];
|
|
||||||
let b6: &mut [u8] = &mut [0];
|
|
||||||
let b7: &mut [u8] = &mut [0];
|
|
||||||
let b8: &mut [u8] = &mut [0];
|
|
||||||
let b9: &mut [u8] = &mut [0];
|
|
||||||
let b10: &mut [u8] = &mut [0];
|
|
||||||
let b11: &mut [u8] = &mut [0];
|
|
||||||
let b12: &mut [u8] = &mut [0];
|
|
||||||
let b13: &mut [u8] = &mut [0];
|
|
||||||
let b14: &mut [u8] = &mut [0];
|
|
||||||
let b15: &mut [u8] = &mut [0];
|
|
||||||
let b16: &mut [u8] = &mut [0];
|
|
||||||
let mut bufs: [&mut IoVec; 16] = [
|
|
||||||
b1.into(),
|
|
||||||
b2.into(),
|
|
||||||
b3.into(),
|
|
||||||
b4.into(),
|
|
||||||
b5.into(),
|
|
||||||
b6.into(),
|
|
||||||
b7.into(),
|
|
||||||
b8.into(),
|
|
||||||
b9.into(),
|
|
||||||
b10.into(),
|
|
||||||
b11.into(),
|
|
||||||
b12.into(),
|
|
||||||
b13.into(),
|
|
||||||
b14.into(),
|
|
||||||
b15.into(),
|
|
||||||
b16.into(),
|
|
||||||
];
|
|
||||||
|
|
||||||
let n = buf.bytes_vec_mut(&mut bufs);
|
|
||||||
read_ready_vecs(&mut bufs[..n], raw_fd)
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
|
|
||||||
let iovecs = iovec::unix::as_os_slice_mut(bufs);
|
|
||||||
|
|
||||||
libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
|
|
||||||
// The `IoVec` type can't have a zero-length size, so create a dummy
|
|
||||||
// version from a 1-length slice which we'll overwrite with the
|
|
||||||
// `bytes_vec` method.
|
|
||||||
static DUMMY: &[u8] = &[0];
|
|
||||||
let iovec = <&IoVec>::from(DUMMY);
|
|
||||||
let mut bufs = [
|
|
||||||
iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
|
|
||||||
iovec, iovec, iovec,
|
|
||||||
];
|
|
||||||
|
|
||||||
let n = buf.bytes_vec(&mut bufs);
|
|
||||||
write_ready_vecs(&bufs[..n], raw_fd)
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
|
|
||||||
let iovecs = iovec::unix::as_os_slice(bufs);
|
|
||||||
|
|
||||||
libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
|
|
||||||
}
|
|
||||||
|
@ -1,78 +1,70 @@
|
|||||||
#![cfg(unix)]
|
#![cfg(unix)]
|
||||||
|
#![feature(async_await)]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use std::io;
|
||||||
use futures::{Future, Sink, Stream};
|
|
||||||
use std::str;
|
|
||||||
use tempfile;
|
use tempfile;
|
||||||
use tokio::io;
|
|
||||||
use tokio::runtime::current_thread::Runtime;
|
|
||||||
use tokio_codec::{Decoder, Encoder};
|
|
||||||
use tokio_uds::*;
|
use tokio_uds::*;
|
||||||
|
|
||||||
struct StringDatagramCodec;
|
// struct StringDatagramCodec;
|
||||||
|
|
||||||
/// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
|
// /// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
|
||||||
impl Encoder for StringDatagramCodec {
|
// impl Encoder for StringDatagramCodec {
|
||||||
type Item = String;
|
// type Item = String;
|
||||||
type Error = io::Error;
|
// type Error = io::Error;
|
||||||
|
|
||||||
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
// fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
dst.extend_from_slice(&item.into_bytes());
|
// dst.extend_from_slice(&item.into_bytes());
|
||||||
Ok(())
|
// Ok(())
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
|
||||||
|
// impl Decoder for StringDatagramCodec {
|
||||||
|
// type Item = String;
|
||||||
|
// type Error = io::Error;
|
||||||
|
|
||||||
|
// fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
// let decoded = str::from_utf8(buf)
|
||||||
|
// .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
|
||||||
|
// .to_string();
|
||||||
|
|
||||||
|
// Ok(Some(decoded))
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
async fn echo_server(mut socket: UnixDatagram) -> io::Result<()> {
|
||||||
|
let mut recv_buf = vec![0u8; 1024];
|
||||||
|
loop {
|
||||||
|
let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?;
|
||||||
|
if let Some(path) = peer_addr.as_pathname() {
|
||||||
|
socket.send_to(&recv_buf[..len], path).await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
|
#[tokio::test]
|
||||||
impl Decoder for StringDatagramCodec {
|
async fn echo() -> io::Result<()> {
|
||||||
type Item = String;
|
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
||||||
let decoded = str::from_utf8(buf)
|
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
Ok(Some(decoded))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn framed_echo() {
|
|
||||||
let dir = tempfile::tempdir().unwrap();
|
let dir = tempfile::tempdir().unwrap();
|
||||||
let server_path = dir.path().join("server.sock");
|
let server_path = dir.path().join("server.sock");
|
||||||
let client_path = dir.path().join("client.sock");
|
let client_path = dir.path().join("client.sock");
|
||||||
|
|
||||||
let mut rt = Runtime::new().unwrap();
|
let server_socket = UnixDatagram::bind(server_path.clone())?;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = echo_server(server_socket).await {
|
||||||
|
eprintln!("Error in echo server: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
{
|
{
|
||||||
let socket = UnixDatagram::bind(&server_path).unwrap();
|
let mut socket = UnixDatagram::bind(&client_path).unwrap();
|
||||||
let server = UnixDatagramFramed::new(socket, StringDatagramCodec);
|
socket.connect(server_path)?;
|
||||||
|
socket.send(b"ECHO").await?;
|
||||||
let (sink, stream) = server.split();
|
let mut recv_buf = [0u8; 16];
|
||||||
|
let len = socket.recv(&mut recv_buf[..]).await?;
|
||||||
let echo_stream =
|
assert_eq!(&recv_buf[..len], b"ECHO");
|
||||||
stream.map(|(msg, addr)| (msg, addr.as_pathname().unwrap().to_path_buf()));
|
|
||||||
|
|
||||||
// spawn echo server
|
|
||||||
rt.spawn(
|
|
||||||
echo_stream
|
|
||||||
.forward(sink)
|
|
||||||
.map_err(|e| panic!("err={:?}", e))
|
|
||||||
.map(|_| ()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
Ok(())
|
||||||
let socket = UnixDatagram::bind(&client_path).unwrap();
|
|
||||||
let client = UnixDatagramFramed::new(socket, StringDatagramCodec);
|
|
||||||
|
|
||||||
let (sink, stream) = client.split();
|
|
||||||
|
|
||||||
rt.block_on(sink.send(("ECHO".to_string(), server_path)))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let response = rt.block_on(stream.take(1).collect()).unwrap();
|
|
||||||
assert_eq!(response[0].0, "ECHO");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,51 +1,32 @@
|
|||||||
#![cfg(unix)]
|
#![cfg(unix)]
|
||||||
|
#![feature(async_await)]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use futures::sync::oneshot;
|
use futures::future::try_join;
|
||||||
use futures::{Future, Stream};
|
|
||||||
use tempfile::Builder;
|
use tempfile::Builder;
|
||||||
use tokio::io;
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::runtime::current_thread::Runtime;
|
|
||||||
use tokio_uds::*;
|
use tokio_uds::*;
|
||||||
|
|
||||||
macro_rules! t {
|
#[tokio::test]
|
||||||
($e:expr) => {
|
async fn accept_read_write() -> std::io::Result<()> {
|
||||||
match $e {
|
|
||||||
Ok(e) => e,
|
|
||||||
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn echo() {
|
|
||||||
let dir = Builder::new().prefix("tokio-uds-tests").tempdir().unwrap();
|
let dir = Builder::new().prefix("tokio-uds-tests").tempdir().unwrap();
|
||||||
let sock_path = dir.path().join("connect.sock");
|
let sock_path = dir.path().join("connect.sock");
|
||||||
|
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut listener = UnixListener::bind(&sock_path)?;
|
||||||
|
|
||||||
let server = t!(UnixListener::bind(&sock_path));
|
let accept = listener.accept();
|
||||||
let (tx, rx) = oneshot::channel();
|
let connect = UnixStream::connect(&sock_path);
|
||||||
|
let ((mut server, _), mut client) = try_join(accept, connect).await?;
|
||||||
|
|
||||||
rt.spawn({
|
// Write to the client. TODO: Switch to write_all.
|
||||||
server
|
let write_len = client.write(b"hello").await?;
|
||||||
.incoming()
|
assert_eq!(write_len, 5);
|
||||||
.into_future()
|
drop(client);
|
||||||
.and_then(move |(sock, _)| {
|
// Read from the server. TODO: Switch to read_to_end.
|
||||||
tx.send(sock.unwrap()).unwrap();
|
let mut buf = [0u8; 5];
|
||||||
Ok(())
|
server.read_exact(&mut buf).await?;
|
||||||
})
|
assert_eq!(&buf, b"hello");
|
||||||
.map_err(|e| panic!("err={:?}", e))
|
let len = server.read(&mut buf).await?;
|
||||||
});
|
assert_eq!(len, 0);
|
||||||
|
Ok(())
|
||||||
let client = rt.block_on(UnixStream::connect(&sock_path)).unwrap();
|
|
||||||
let server = rt.block_on(rx).unwrap();
|
|
||||||
|
|
||||||
// Write to the client
|
|
||||||
rt.block_on(io::write_all(client, b"hello")).unwrap();
|
|
||||||
|
|
||||||
// Read from the server
|
|
||||||
let (_, buf) = rt.block_on(io::read_to_end(server, vec![])).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(buf, b"hello");
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user