diff --git a/Cargo.toml b/Cargo.toml index f81e38ad6..e01ab9a3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ members = [ "tokio-timer", "tokio-tcp", "tokio-udp", + "tokio-uds", "futures2", ] diff --git a/README.md b/README.md index f29f79c4e..674eefdef 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,9 @@ The crates included as part of Tokio are: * [`tokio-udp`]: UDP bindings for use with `tokio-io` and `tokio-reactor`. +* [`tokio-uds`]: Unix Domain Socket bindings for use with `tokio-io` and + `tokio-reactor`. + [`tokio-executor`]: tokio-executor [`tokio-fs`]: tokio-fs [`tokio-io`]: tokio-io @@ -137,6 +140,7 @@ The crates included as part of Tokio are: [`tokio-threadpool`]: tokio-threadpool [`tokio-timer`]: tokio-timer [`tokio-udp`]: tokio-udp +[`tokio-udp`]: tokio-uds ## License diff --git a/tokio-uds/Cargo.toml b/tokio-uds/Cargo.toml new file mode 100644 index 000000000..3b7131f00 --- /dev/null +++ b/tokio-uds/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "tokio-uds" +# When releasing to crates.io: +# - Update html_root_url. +# - Update CHANGELOG.md. +# - Create "v0.2.x" git tag. +version = "0.2.0" +authors = ["Carl Lerche "] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +homepage = "https://github.com/tokio-rs/tokio" +documentation = "https://docs.rs/tokio-uds" +description = """ +Unix Domain sockets for Tokio +""" +categories = ["asynchronous"] + +[dependencies] +bytes = "0.4" +futures = "0.1" +iovec = "0.1" +libc = "0.2" +log = "0.4" +mio = "0.6.14" +mio-uds = "0.6.5" +tokio-reactor = { version = "0.1.1", path = "../tokio-reactor" } +tokio-io = { version = "0.1.6", path = "../tokio-io" } + +[dev-dependencies] +tokio = { version = "0.1.6", path = "../" } +tempdir = "0.3.7" diff --git a/tokio-uds/LICENSE b/tokio-uds/LICENSE new file mode 100644 index 000000000..38c1e27b8 --- /dev/null +++ b/tokio-uds/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/tokio-uds/README.md b/tokio-uds/README.md new file mode 100644 index 000000000..0c03efe97 --- /dev/null +++ b/tokio-uds/README.md @@ -0,0 +1,15 @@ +# tokio-uds + +An implementation of Unix Domain Sockets for Tokio + +[Documentation](https://docs.rs/tokio-uds) + +## License + +This project is licensed under the [MIT license](./LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/tokio-uds/src/datagram.rs b/tokio-uds/src/datagram.rs new file mode 100644 index 000000000..5e00a63ed --- /dev/null +++ b/tokio-uds/src/datagram.rs @@ -0,0 +1,208 @@ +use {SendDgram, RecvDgram}; + +use tokio_reactor::{Handle, PollEvented}; + +use futures::{Async, Poll}; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// An I/O object representing a Unix datagram socket. +pub struct UnixDatagram { + io: PollEvented, +} + +impl UnixDatagram { + /// Creates a new `UnixDatagram` bound to the specified path. + pub fn bind

(path: P) -> io::Result + where + P: AsRef, + { + let socket = mio_uds::UnixDatagram::bind(path)?; + Ok(UnixDatagram::new(socket)) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected unix sockets for + /// communicating back and forth between one another. Each socket will be + /// associated with the event loop whose handle is also provided. + pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { + let (a, b) = mio_uds::UnixDatagram::pair()?; + let a = UnixDatagram::new(a); + let b = UnixDatagram::new(b); + + Ok((a, b)) + } + + /// Consumes a `UnixDatagram` in the standard library and returns a + /// nonblocking `UnixDatagram` from this crate. + /// + /// The returned datagram will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(datagram: net::UnixDatagram, handle: &Handle) -> io::Result { + let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; + let io = PollEvented::new_with_handle(socket, handle)?; + Ok(UnixDatagram { io }) + } + + fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { + let io = PollEvented::new(socket); + UnixDatagram { io } + } + + /// Creates a new `UnixDatagram` which is not bound to any address. + pub fn unbound() -> io::Result { + let socket = mio_uds::UnixDatagram::unbound()?; + Ok(UnixDatagram::new(socket)) + } + + /// Connects the socket to the specified address. + /// + /// The `send` method may be used to send data to the specified address. + /// `recv` and `recv_from` will only receive data from that address. + pub fn connect>(&self, path: P) -> io::Result<()> { + self.io.get_ref().connect(path) + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll { + self.io.poll_read_ready(ready) + } + + /// Test whether this socket is ready to be written to or not. + pub fn poll_write_ready(&self) -> Poll { + self.io.poll_write_ready() + } + + /// Returns the local address that this socket is bound to. + pub fn local_addr(&self) -> io::Result { + self.io.get_ref().local_addr() + } + + /// Returns the address of this socket's peer. + /// + /// The `connect` method will connect the socket to a peer. + pub fn peer_addr(&self) -> io::Result { + self.io.get_ref().peer_addr() + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read and the address from + /// whence the data came. + pub fn poll_recv_from(&self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { + if self.io.poll_read_ready(Ready::readable())?.is_not_ready() { + return Ok(Async::NotReady); + } + let r = self.io.get_ref().recv_from(buf); + if is_wouldblock(&r) { + self.io.clear_read_ready(Ready::readable())?; + } + r.map(Async::Ready) + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read. + pub fn poll_recv(&self, buf: &mut [u8]) -> Poll { + if self.io.poll_read_ready(Ready::readable())?.is_not_ready() { + return Ok(Async::NotReady); + } + let r = self.io.get_ref().recv(buf); + if is_wouldblock(&r) { + self.io.clear_read_ready(Ready::readable())?; + } + r.map(Async::Ready) + } + + /// Returns a future for receiving a datagram. See the documentation on RecvDgram for details. + pub fn recv_dgram(self, buf: T) -> RecvDgram + where + T: AsMut<[u8]>, + { + RecvDgram::new(self, buf) + } + + /// Sends data on the socket to the specified address. + /// + /// On success, returns the number of bytes written. + pub fn poll_send_to

(&self, buf: &[u8], path: P) -> Poll + where + P: AsRef, + { + if self.io.poll_write_ready()?.is_not_ready() { + return Ok(Async::NotReady); + } + let r = self.io.get_ref().send_to(buf, path); + if is_wouldblock(&r) { + self.io.clear_write_ready()?; + } + r.map(Async::Ready) + } + + /// Sends data on the socket to the socket's peer. + /// + /// The peer address may be set by the `connect` method, and this method + /// will return an error if the socket has not already been connected. + /// + /// On success, returns the number of bytes written. + pub fn poll_send(&self, buf: &[u8]) -> Poll { + if self.io.poll_write_ready()?.is_not_ready() { + return Ok(Async::NotReady); + } + let r = self.io.get_ref().send(buf); + if is_wouldblock(&r) { + self.io.clear_write_ready()?; + } + r.map(Async::Ready) + } + + /// Returns a future sending the data in buf to the socket at path. + pub fn send_dgram(self, buf: T, path: P) -> SendDgram + where + T: AsRef<[u8]>, + P: AsRef, + { + SendDgram::new(self, buf, path) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.io.get_ref().take_error() + } + + /// Shut down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } +} + +impl fmt::Debug for UnixDatagram { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixDatagram { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} + +fn is_wouldblock(r: &io::Result) -> bool { + match *r { + Ok(_) => false, + Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + } +} diff --git a/tokio-uds/src/incoming.rs b/tokio-uds/src/incoming.rs new file mode 100644 index 000000000..28d4d7683 --- /dev/null +++ b/tokio-uds/src/incoming.rs @@ -0,0 +1,27 @@ +use {UnixListener, UnixStream}; + +use futures::{Stream, Poll}; + +use std::io; + +/// Stream of listeners +#[derive(Debug)] +pub struct Incoming { + inner: UnixListener, +} + +impl Incoming { + pub(crate) fn new(listener: UnixListener) -> Incoming { + Incoming { inner: listener } + } +} + +impl Stream for Incoming { + type Item = UnixStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { + Ok(Some(try_ready!(self.inner.poll_accept()).0).into()) + } +} + diff --git a/tokio-uds/src/lib.rs b/tokio-uds/src/lib.rs new file mode 100644 index 000000000..cfafd58ba --- /dev/null +++ b/tokio-uds/src/lib.rs @@ -0,0 +1,34 @@ +//! Unix Domain Sockets for Tokio. +//! +//! This crate provides APIs for using Unix Domain Sockets with Tokio. + +#![cfg(unix)] +#![doc(html_root_url = "https://docs.rs/tokio-uds/0.2.0")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate iovec; +extern crate libc; +extern crate log; +extern crate mio; +extern crate mio_uds; +extern crate tokio_io; +extern crate tokio_reactor; + +mod datagram; +mod incoming; +mod listener; +mod recv_dgram; +mod send_dgram; +mod stream; +mod ucred; + +pub use datagram::UnixDatagram; +pub use incoming::Incoming; +pub use listener::UnixListener; +pub use recv_dgram::RecvDgram; +pub use send_dgram::SendDgram; +pub use stream::UnixStream; +pub use ucred::UCred; diff --git a/tokio-uds/src/listener.rs b/tokio-uds/src/listener.rs new file mode 100644 index 000000000..ec8731854 --- /dev/null +++ b/tokio-uds/src/listener.rs @@ -0,0 +1,146 @@ +use {Incoming, UnixStream}; + +use tokio_reactor::{Handle, PollEvented}; + +use futures::{Async, Poll}; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// A Unix socket which can accept connections from other unix sockets. +pub struct UnixListener { + io: PollEvented, +} + +impl UnixListener { + /// Creates a new `UnixListener` bound to the specified path. + pub fn bind

(path: P) -> io::Result + where + P: AsRef, + { + let listener = mio_uds::UnixListener::bind(path)?; + let io = PollEvented::new(listener); + Ok(UnixListener { io }) + } + + /// Consumes a `UnixListener` in the standard library and returns a + /// nonblocking `UnixListener` from this crate. + /// + /// The returned listener will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(listener: net::UnixListener, handle: &Handle) -> io::Result { + let listener = mio_uds::UnixListener::from_listener(listener)?; + let io = PollEvented::new_with_handle(listener, handle)?; + Ok(UnixListener { io }) + } + + /// Returns the local socket address of this listener. + pub fn local_addr(&self) -> io::Result { + self.io.get_ref().local_addr() + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll { + self.io.poll_read_ready(ready) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.io.get_ref().take_error() + } + + /// Attempt to accept a connection and create a new connected `UnixStream` + /// if successful. + /// + /// This function will attempt an accept operation, but will not block + /// waiting for it to complete. If the operation would block then a "would + /// block" error is returned. Additionally, if this method would block, it + /// registers the current task to receive a notification when it would + /// otherwise not block. + /// + /// Note that typically for simple usage it's easier to treat incoming + /// connections as a `Stream` of `UnixStream`s with the `incoming` method + /// below. + /// + /// # Panics + /// + /// This function will panic if it is called outside the context of a + /// future's task. It's recommended to only call this from the + /// implementation of a `Future::poll`, if necessary. + pub fn poll_accept(&self) -> Poll<(UnixStream, SocketAddr), io::Error> { + let (io, addr) = try_ready!(self.poll_accept_std()); + + let io = mio_uds::UnixStream::from_stream(io)?; + Ok((UnixStream::new(io), addr).into()) + } + + /// Attempt to accept a connection and create a new connected `UnixStream` + /// if successful. + /// + /// This function is the same as `poll_accept` above except that it returns a + /// `mio_uds::UnixStream` instead of a `tokio_udp::UnixStream`. This in turn + /// can then allow for the stream to be associated with a different reactor + /// than the one this `UnixListener` is associated with. + /// + /// This function will attempt an accept operation, but will not block + /// waiting for it to complete. If the operation would block then a "would + /// block" error is returned. Additionally, if this method would block, it + /// registers the current task to receive a notification when it would + /// otherwise not block. + /// + /// Note that typically for simple usage it's easier to treat incoming + /// connections as a `Stream` of `UnixStream`s with the `incoming` method + /// below. + /// + /// # Panics + /// + /// This function will panic if it is called outside the context of a + /// future's task. It's recommended to only call this from the + /// implementation of a `Future::poll`, if necessary. + pub fn poll_accept_std(&self) -> Poll<(net::UnixStream, SocketAddr), io::Error> { + loop { + try_ready!(self.io.poll_read_ready(Ready::readable())); + + match self.io.get_ref().accept_std() { + Ok(None) => { + self.io.clear_read_ready(Ready::readable())?; + return Ok(Async::NotReady); + } + Ok(Some((sock, addr))) => { + return Ok(Async::Ready((sock, addr))); + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(Ready::readable())?; + return Ok(Async::NotReady); + } + Err(err) => return Err(err), + } + } + } + + /// Consumes this listener, returning a stream of the sockets this listener + /// accepts. + /// + /// This method returns an implementation of the `Stream` trait which + /// resolves to the sockets the are accepted on this listener. + pub fn incoming(self) -> Incoming { + Incoming::new(self) + } +} + +impl fmt::Debug for UnixListener { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixListener { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} diff --git a/tokio-uds/src/recv_dgram.rs b/tokio-uds/src/recv_dgram.rs new file mode 100644 index 000000000..390202f38 --- /dev/null +++ b/tokio-uds/src/recv_dgram.rs @@ -0,0 +1,82 @@ +use UnixDatagram; + +use futures::{Async, Future, Poll}; + +use std::io; +use std::mem; + +/// A future for receiving datagrams from a Unix datagram socket. +/// +/// An example that uses UDP sockets but is still applicable can be found at +/// https://gist.github.com/dermesser/e331094c2ab28fc7f6ba8a16183fe4d5. +#[derive(Debug)] +pub struct RecvDgram { + st: State, +} + +/// A future similar to RecvDgram, but without allocating and returning the peer's address. +/// +/// This can be used if the peer's address is of no interest, so the allocation overhead can be +/// avoided. +#[derive(Debug)] +enum State { + Receiving { + sock: UnixDatagram, + buf: T, + }, + Empty, +} + +impl RecvDgram +where + T: AsMut<[u8]> +{ + pub(crate) fn new(sock: UnixDatagram, buf: T) -> RecvDgram { + RecvDgram { + st: State::Receiving { + sock, + buf, + }, + } + } +} + +impl Future for RecvDgram +where + T: AsMut<[u8]>, +{ + /// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were + /// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the + /// datagram is truncated. + type Item = (UnixDatagram, T, usize, String); + /// This future yields io::Error if an error occurred. + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let received; + let peer; + + if let State::Receiving { + ref mut sock, + ref mut buf, + } = self.st + { + let (n, p) = try_ready!(sock.poll_recv_from(buf.as_mut())); + received = n; + + peer = p.as_pathname().map_or(String::new(), |p| { + p.to_str().map_or(String::new(), |s| s.to_string()) + }); + } else { + panic!() + } + + if let State::Receiving { sock, buf } = + mem::replace(&mut self.st, State::Empty) + { + Ok(Async::Ready((sock, buf, received, peer))) + } else { + panic!() + } + } +} diff --git a/tokio-uds/src/send_dgram.rs b/tokio-uds/src/send_dgram.rs new file mode 100644 index 000000000..59d438b76 --- /dev/null +++ b/tokio-uds/src/send_dgram.rs @@ -0,0 +1,81 @@ +use UnixDatagram; + +use futures::{Async, Future, Poll}; + +use std::io; +use std::mem; +use std::path::Path; + +/// A future for writing a buffer to a Unix datagram socket. +#[derive(Debug)] +pub struct SendDgram { + st: State, +} + +#[derive(Debug)] +enum State { + /// current state is Sending + Sending { + /// the underlying socket + sock: UnixDatagram, + /// the buffer to send + buf: T, + /// the destination + addr: P, + }, + /// neutral state + Empty, +} + +impl SendDgram +where + T: AsRef<[u8]>, + P: AsRef, +{ + pub(crate) fn new(sock: UnixDatagram, buf: T, addr: P) -> SendDgram { + SendDgram { + st: State::Sending { + sock, + buf, + addr, + } + } + } +} + +impl Future for SendDgram +where + T: AsRef<[u8]>, + P: AsRef, +{ + /// Returns the underlying socket and the buffer that was sent. + type Item = (UnixDatagram, T); + /// The error that is returned when sending failed. + type Error = io::Error; + + fn poll(&mut self) -> Poll { + if let State::Sending { + ref mut sock, + ref buf, + ref addr, + } = self.st + { + let n = try_ready!(sock.poll_send_to(buf.as_ref(), addr)); + if n < buf.as_ref().len() { + return Err(io::Error::new( + io::ErrorKind::Other, + "Couldn't send whole buffer".to_string(), + )); + } + } else { + panic!() + } + if let State::Sending { sock, buf, addr: _ } = + mem::replace(&mut self.st, State::Empty) + { + Ok(Async::Ready((sock, buf))) + } else { + panic!() + } + } +} diff --git a/tokio-uds/src/stream.rs b/tokio-uds/src/stream.rs new file mode 100644 index 000000000..2e0a68227 --- /dev/null +++ b/tokio-uds/src/stream.rs @@ -0,0 +1,356 @@ +use ucred::{self, UCred}; + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_reactor::{Handle, PollEvented}; + +use bytes::{Buf, BufMut}; +use futures::{Async, Future, Poll}; +use iovec::{self, IoVec}; +use libc; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io::{self, Read, Write}; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// A structure representing a connected unix socket. +/// +/// This socket can be connected directly with `UnixStream::connect` or accepted +/// from a listener with `UnixListener::incoming`. Additionally, a pair of +/// anonymous Unix sockets can be created with `UnixStream::pair`. +pub struct UnixStream { + io: PollEvented, +} + +/// Future returned by `UnixStream::connect` which will resolve to a +/// `UnixStream` when the stream is connected. +#[derive(Debug)] +pub struct ConnectFuture { + inner: State, +} + +#[derive(Debug)] +enum State { + Waiting(UnixStream), + Error(io::Error), + Empty, +} + +impl UnixStream { + /// Connects to the socket named by `path`. + /// + /// This function will create a new unix socket and connect to the path + /// specified, associating the returned stream with the default event loop's + /// handle. + pub fn connect

(path: P) -> ConnectFuture + where + P: AsRef, + { + let res = mio_uds::UnixStream::connect(path) + .map(UnixStream::new); + + let inner = match res { + Ok(stream) => State::Waiting(stream), + Err(e) => State::Error(e), + }; + + ConnectFuture { inner } + } + + /// Consumes a `UnixStream` in the standard library and returns a + /// nonblocking `UnixStream` from this crate. + /// + /// The returned stream will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result { + let stream = mio_uds::UnixStream::from_stream(stream)?; + let io = PollEvented::new_with_handle(stream, handle)?; + + Ok(UnixStream { io }) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected unix sockets for + /// communicating back and forth between one another. Each socket will be + /// associated with the event loop whose handle is also provided. + pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = try!(mio_uds::UnixStream::pair()); + let a = UnixStream::new(a); + let b = UnixStream::new(b); + + Ok((a, b)) + } + + pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream { + let io = PollEvented::new(stream); + UnixStream { io } + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll { + self.io.poll_read_ready(ready) + } + + /// Test whether this socket is ready to be written to or not. + pub fn poll_write_ready(&self) -> Poll { + self.io.poll_write_ready() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result { + self.io.get_ref().local_addr() + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result { + self.io.get_ref().peer_addr() + } + + /// Returns effective credentials of the process which called `connect` or `socketpair`. + pub fn peer_cred(&self) -> io::Result { + ucred::get_peer_cred(self) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.io.get_ref().take_error() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } +} + +impl Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.io.read(buf) + } +} + +impl Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.io.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.io.flush() + } +} + +impl AsyncRead for UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + <&UnixStream>::read_buf(&mut &*self, buf) + } +} + +impl AsyncWrite for UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + <&UnixStream>::shutdown(&mut &*self) + } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + <&UnixStream>::write_buf(&mut &*self, buf) + } +} + +impl<'a> Read for &'a UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (&self.io).read(buf) + } +} + +impl<'a> Write for &'a UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + (&self.io).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.io).flush() + } +} + +impl<'a> AsyncRead for &'a UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + if let Async::NotReady = ::poll_read_ready(self, Ready::readable())? { + return Ok(Async::NotReady); + } + unsafe { + let r = read_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance_mut(r); + Ok(r.into()) + } + } + } +} + +impl<'a> AsyncWrite for &'a UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + if let Async::NotReady = ::poll_write_ready(self)? { + return Ok(Async::NotReady); + } + unsafe { + let r = write_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance(r); + Ok(r.into()) + } + } + } +} + +impl fmt::Debug for UnixStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} + +impl Future for ConnectFuture { + type Item = UnixStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + use std::mem; + + match self.inner { + State::Waiting(ref mut stream) => { + if let Async::NotReady = stream.io.poll_write_ready()? { + return Ok(Async::NotReady) + } + + if let Some(e) = try!(stream.io.get_ref().take_error()) { + return Err(e) + } + } + State::Error(_) => { + let e = match mem::replace(&mut self.inner, State::Empty) { + State::Error(e) => e, + _ => unreachable!(), + }; + + return Err(e) + }, + State::Empty => panic!("can't poll stream twice"), + } + + match mem::replace(&mut self.inner, State::Empty) { + State::Waiting(stream) => Ok(Async::Ready(stream)), + _ => unreachable!(), + } + } +} + +unsafe fn read_ready(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a 0-length size, so we create a bunch + // of dummy versions on the stack with 1 length which we'll quickly + // overwrite. + let b1: &mut [u8] = &mut [0]; + let b2: &mut [u8] = &mut [0]; + let b3: &mut [u8] = &mut [0]; + let b4: &mut [u8] = &mut [0]; + let b5: &mut [u8] = &mut [0]; + let b6: &mut [u8] = &mut [0]; + let b7: &mut [u8] = &mut [0]; + let b8: &mut [u8] = &mut [0]; + let b9: &mut [u8] = &mut [0]; + let b10: &mut [u8] = &mut [0]; + let b11: &mut [u8] = &mut [0]; + let b12: &mut [u8] = &mut [0]; + let b13: &mut [u8] = &mut [0]; + let b14: &mut [u8] = &mut [0]; + let b15: &mut [u8] = &mut [0]; + let b16: &mut [u8] = &mut [0]; + let mut bufs: [&mut IoVec; 16] = [ + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + b6.into(), + b7.into(), + b8.into(), + b9.into(), + b10.into(), + b11.into(), + b12.into(), + b13.into(), + b14.into(), + b15.into(), + b16.into(), + ]; + + let n = buf.bytes_vec_mut(&mut bufs); + read_ready_vecs(&mut bufs[..n], raw_fd) +} + +unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice_mut(bufs); + + libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} + +unsafe fn write_ready(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a zero-length size, so create a dummy + // version from a 1-length slice which we'll overwrite with the + // `bytes_vec` method. + static DUMMY: &[u8] = &[0]; + let iovec = <&IoVec>::from(DUMMY); + let mut bufs = [ + iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, + iovec, iovec, iovec, + ]; + + let n = buf.bytes_vec(&mut bufs); + write_ready_vecs(&bufs[..n], raw_fd) +} + +unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice(bufs); + + libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} diff --git a/tokio-uds/src/ucred.rs b/tokio-uds/src/ucred.rs new file mode 100644 index 000000000..e74f4b343 --- /dev/null +++ b/tokio-uds/src/ucred.rs @@ -0,0 +1,109 @@ +use libc::{gid_t, uid_t}; + +/// Credentials of a process +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct UCred { + /// UID (user ID) of the process + pub uid: uid_t, + /// GID (group ID) of the process + pub gid: gid_t, +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub use self::impl_linux::get_peer_cred; + +#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "openbsd"))] +pub use self::impl_macos::get_peer_cred; + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod impl_linux { + use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED}; + use std::{io, mem}; + use UnixStream; + use std::os::unix::io::AsRawFd; + + use libc::ucred; + + pub fn get_peer_cred(sock: &UnixStream) -> io::Result { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut ucred = ucred { + pid: 0, + uid: 0, + gid: 0, + }; + + let ucred_size = mem::size_of::(); + + // These paranoid checks should be optimized-out + assert!(mem::size_of::() <= mem::size_of::()); + assert!(ucred_size <= u32::max_value() as usize); + + let mut ucred_size = ucred_size as socklen_t; + + let ret = getsockopt( + raw_fd, + SOL_SOCKET, + SO_PEERCRED, + &mut ucred as *mut ucred as *mut c_void, + &mut ucred_size, + ); + if ret == 0 && ucred_size as usize == mem::size_of::() { + Ok(super::UCred { + uid: ucred.uid, + gid: ucred.gid, + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "openbsd"))] +pub mod impl_macos { + use libc::getpeereid; + use std::{io, mem}; + use UnixStream; + use std::os::unix::io::AsRawFd; + + pub fn get_peer_cred(sock: &UnixStream) -> io::Result { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut cred: super::UCred = mem::uninitialized(); + + let ret = getpeereid(raw_fd, &mut cred.uid, &mut cred.gid); + + if ret == 0 { + Ok(cred) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +// Note that SO_PEERCRED is not supported on DragonFly (yet). So do not run tests. +#[cfg(not(target_os = "dragonfly"))] +#[cfg(test)] +mod test { + use UnixStream; + use libc::geteuid; + use libc::getegid; + + #[test] + fn test_socket_pair() { + let (a, b) = UnixStream::pair().unwrap(); + let cred_a = a.peer_cred().unwrap(); + let cred_b = b.peer_cred().unwrap(); + assert_eq!(cred_a, cred_b); + + let uid = unsafe { geteuid() }; + let gid = unsafe { getegid() }; + + assert_eq!(cred_a.uid, uid); + assert_eq!(cred_a.gid, gid); + } +} diff --git a/tokio-uds/tests/stream.rs b/tokio-uds/tests/stream.rs new file mode 100644 index 000000000..47ee679ef --- /dev/null +++ b/tokio-uds/tests/stream.rs @@ -0,0 +1,55 @@ +#![cfg(unix)] + +extern crate futures; +extern crate tokio; +extern crate tokio_uds; + +extern crate tempdir; + +use tokio_uds::*; + +use tokio::io; +use tokio::runtime::current_thread::Runtime; + +use futures::{Future, Stream}; +use futures::sync::oneshot; +use tempdir::TempDir; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo() { + let dir = TempDir::new("tokio-uds-tests").unwrap(); + let sock_path = dir.path().join("connect.sock"); + + let mut rt = Runtime::new().unwrap(); + + let server = t!(UnixListener::bind(&sock_path)); + let (tx, rx) = oneshot::channel(); + + rt.spawn({ + server.incoming() + .into_future() + .and_then(move |(sock, _)| { + tx.send(sock.unwrap()).unwrap(); + Ok(()) + }) + .map_err(|e| panic!("err={:?}", e)) + }); + + let client = rt.block_on(UnixStream::connect(&sock_path)).unwrap(); + let server = rt.block_on(rx).unwrap(); + + // Write to the client + rt.block_on(io::write_all(client, b"hello")).unwrap(); + + // Read from the server + let (_, buf) = rt.block_on(io::read_to_end(server, vec![])).unwrap(); + + assert_eq!(buf, b"hello"); +}