Import tokio-uds (#365)

This imports tokio-uds from the dedicated repo.
This commit is contained in:
Carl Lerche 2018-05-14 14:48:32 -07:00 committed by GitHub
parent e281e4f4cb
commit c8e710d39e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1174 additions and 0 deletions

View File

@ -31,6 +31,7 @@ members = [
"tokio-timer",
"tokio-tcp",
"tokio-udp",
"tokio-uds",
"futures2",
]

View File

@ -129,6 +129,9 @@ The crates included as part of Tokio are:
* [`tokio-udp`]: UDP bindings for use with `tokio-io` and `tokio-reactor`.
* [`tokio-uds`]: Unix Domain Socket bindings for use with `tokio-io` and
`tokio-reactor`.
[`tokio-executor`]: tokio-executor
[`tokio-fs`]: tokio-fs
[`tokio-io`]: tokio-io
@ -137,6 +140,7 @@ The crates included as part of Tokio are:
[`tokio-threadpool`]: tokio-threadpool
[`tokio-timer`]: tokio-timer
[`tokio-udp`]: tokio-udp
[`tokio-udp`]: tokio-uds
## License

31
tokio-uds/Cargo.toml Normal file
View File

@ -0,0 +1,31 @@
[package]
name = "tokio-uds"
# When releasing to crates.io:
# - Update html_root_url.
# - Update CHANGELOG.md.
# - Create "v0.2.x" git tag.
version = "0.2.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://github.com/tokio-rs/tokio"
documentation = "https://docs.rs/tokio-uds"
description = """
Unix Domain sockets for Tokio
"""
categories = ["asynchronous"]
[dependencies]
bytes = "0.4"
futures = "0.1"
iovec = "0.1"
libc = "0.2"
log = "0.4"
mio = "0.6.14"
mio-uds = "0.6.5"
tokio-reactor = { version = "0.1.1", path = "../tokio-reactor" }
tokio-io = { version = "0.1.6", path = "../tokio-io" }
[dev-dependencies]
tokio = { version = "0.1.6", path = "../" }
tempdir = "0.3.7"

25
tokio-uds/LICENSE Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2018 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

15
tokio-uds/README.md Normal file
View File

@ -0,0 +1,15 @@
# tokio-uds
An implementation of Unix Domain Sockets for Tokio
[Documentation](https://docs.rs/tokio-uds)
## License
This project is licensed under the [MIT license](./LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tokio by you, shall be licensed as MIT, without any additional
terms or conditions.

208
tokio-uds/src/datagram.rs Normal file
View File

@ -0,0 +1,208 @@
use {SendDgram, RecvDgram};
use tokio_reactor::{Handle, PollEvented};
use futures::{Async, Poll};
use mio::Ready;
use mio_uds;
use std::fmt;
use std::io;
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
/// An I/O object representing a Unix datagram socket.
pub struct UnixDatagram {
io: PollEvented<mio_uds::UnixDatagram>,
}
impl UnixDatagram {
/// Creates a new `UnixDatagram` bound to the specified path.
pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
where
P: AsRef<Path>,
{
let socket = mio_uds::UnixDatagram::bind(path)?;
Ok(UnixDatagram::new(socket))
}
/// Creates an unnamed pair of connected sockets.
///
/// This function will create a pair of interconnected unix sockets for
/// communicating back and forth between one another. Each socket will be
/// associated with the event loop whose handle is also provided.
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
let (a, b) = mio_uds::UnixDatagram::pair()?;
let a = UnixDatagram::new(a);
let b = UnixDatagram::new(b);
Ok((a, b))
}
/// Consumes a `UnixDatagram` in the standard library and returns a
/// nonblocking `UnixDatagram` from this crate.
///
/// The returned datagram will be associated with the given event loop
/// specified by `handle` and is ready to perform I/O.
pub fn from_std(datagram: net::UnixDatagram, handle: &Handle) -> io::Result<UnixDatagram> {
let socket = mio_uds::UnixDatagram::from_datagram(datagram)?;
let io = PollEvented::new_with_handle(socket, handle)?;
Ok(UnixDatagram { io })
}
fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram {
let io = PollEvented::new(socket);
UnixDatagram { io }
}
/// Creates a new `UnixDatagram` which is not bound to any address.
pub fn unbound() -> io::Result<UnixDatagram> {
let socket = mio_uds::UnixDatagram::unbound()?;
Ok(UnixDatagram::new(socket))
}
/// Connects the socket to the specified address.
///
/// The `send` method may be used to send data to the specified address.
/// `recv` and `recv_from` will only receive data from that address.
pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
self.io.get_ref().connect(path)
}
/// Test whether this socket is ready to be read or not.
pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
self.io.poll_read_ready(ready)
}
/// Test whether this socket is ready to be written to or not.
pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
self.io.poll_write_ready()
}
/// Returns the local address that this socket is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
/// Returns the address of this socket's peer.
///
/// The `connect` method will connect the socket to a peer.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().peer_addr()
}
/// Receives data from the socket.
///
/// On success, returns the number of bytes read and the address from
/// whence the data came.
pub fn poll_recv_from(&self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
if self.io.poll_read_ready(Ready::readable())?.is_not_ready() {
return Ok(Async::NotReady);
}
let r = self.io.get_ref().recv_from(buf);
if is_wouldblock(&r) {
self.io.clear_read_ready(Ready::readable())?;
}
r.map(Async::Ready)
}
/// Receives data from the socket.
///
/// On success, returns the number of bytes read.
pub fn poll_recv(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
if self.io.poll_read_ready(Ready::readable())?.is_not_ready() {
return Ok(Async::NotReady);
}
let r = self.io.get_ref().recv(buf);
if is_wouldblock(&r) {
self.io.clear_read_ready(Ready::readable())?;
}
r.map(Async::Ready)
}
/// Returns a future for receiving a datagram. See the documentation on RecvDgram for details.
pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T>
where
T: AsMut<[u8]>,
{
RecvDgram::new(self, buf)
}
/// Sends data on the socket to the specified address.
///
/// On success, returns the number of bytes written.
pub fn poll_send_to<P>(&self, buf: &[u8], path: P) -> Poll<usize, io::Error>
where
P: AsRef<Path>,
{
if self.io.poll_write_ready()?.is_not_ready() {
return Ok(Async::NotReady);
}
let r = self.io.get_ref().send_to(buf, path);
if is_wouldblock(&r) {
self.io.clear_write_ready()?;
}
r.map(Async::Ready)
}
/// Sends data on the socket to the socket's peer.
///
/// The peer address may be set by the `connect` method, and this method
/// will return an error if the socket has not already been connected.
///
/// On success, returns the number of bytes written.
pub fn poll_send(&self, buf: &[u8]) -> Poll<usize, io::Error> {
if self.io.poll_write_ready()?.is_not_ready() {
return Ok(Async::NotReady);
}
let r = self.io.get_ref().send(buf);
if is_wouldblock(&r) {
self.io.clear_write_ready()?;
}
r.map(Async::Ready)
}
/// Returns a future sending the data in buf to the socket at path.
pub fn send_dgram<T, P>(self, buf: T, path: P) -> SendDgram<T, P>
where
T: AsRef<[u8]>,
P: AsRef<Path>,
{
SendDgram::new(self, buf, path)
}
/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.io.get_ref().take_error()
}
/// Shut down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O calls on the
/// specified portions to immediately return with an appropriate value
/// (see the documentation of `Shutdown`).
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}
}
impl fmt::Debug for UnixDatagram {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl AsRawFd for UnixDatagram {
fn as_raw_fd(&self) -> RawFd {
self.io.get_ref().as_raw_fd()
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}

27
tokio-uds/src/incoming.rs Normal file
View File

@ -0,0 +1,27 @@
use {UnixListener, UnixStream};
use futures::{Stream, Poll};
use std::io;
/// Stream of listeners
#[derive(Debug)]
pub struct Incoming {
inner: UnixListener,
}
impl Incoming {
pub(crate) fn new(listener: UnixListener) -> Incoming {
Incoming { inner: listener }
}
}
impl Stream for Incoming {
type Item = UnixStream;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
Ok(Some(try_ready!(self.inner.poll_accept()).0).into())
}
}

34
tokio-uds/src/lib.rs Normal file
View File

@ -0,0 +1,34 @@
//! Unix Domain Sockets for Tokio.
//!
//! This crate provides APIs for using Unix Domain Sockets with Tokio.
#![cfg(unix)]
#![doc(html_root_url = "https://docs.rs/tokio-uds/0.2.0")]
#![deny(missing_docs, warnings, missing_debug_implementations)]
extern crate bytes;
#[macro_use]
extern crate futures;
extern crate iovec;
extern crate libc;
extern crate log;
extern crate mio;
extern crate mio_uds;
extern crate tokio_io;
extern crate tokio_reactor;
mod datagram;
mod incoming;
mod listener;
mod recv_dgram;
mod send_dgram;
mod stream;
mod ucred;
pub use datagram::UnixDatagram;
pub use incoming::Incoming;
pub use listener::UnixListener;
pub use recv_dgram::RecvDgram;
pub use send_dgram::SendDgram;
pub use stream::UnixStream;
pub use ucred::UCred;

146
tokio-uds/src/listener.rs Normal file
View File

@ -0,0 +1,146 @@
use {Incoming, UnixStream};
use tokio_reactor::{Handle, PollEvented};
use futures::{Async, Poll};
use mio::Ready;
use mio_uds;
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
/// A Unix socket which can accept connections from other unix sockets.
pub struct UnixListener {
io: PollEvented<mio_uds::UnixListener>,
}
impl UnixListener {
/// Creates a new `UnixListener` bound to the specified path.
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
{
let listener = mio_uds::UnixListener::bind(path)?;
let io = PollEvented::new(listener);
Ok(UnixListener { io })
}
/// Consumes a `UnixListener` in the standard library and returns a
/// nonblocking `UnixListener` from this crate.
///
/// The returned listener will be associated with the given event loop
/// specified by `handle` and is ready to perform I/O.
pub fn from_std(listener: net::UnixListener, handle: &Handle) -> io::Result<UnixListener> {
let listener = mio_uds::UnixListener::from_listener(listener)?;
let io = PollEvented::new_with_handle(listener, handle)?;
Ok(UnixListener { io })
}
/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
/// Test whether this socket is ready to be read or not.
pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
self.io.poll_read_ready(ready)
}
/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.io.get_ref().take_error()
}
/// Attempt to accept a connection and create a new connected `UnixStream`
/// if successful.
///
/// This function will attempt an accept operation, but will not block
/// waiting for it to complete. If the operation would block then a "would
/// block" error is returned. Additionally, if this method would block, it
/// registers the current task to receive a notification when it would
/// otherwise not block.
///
/// Note that typically for simple usage it's easier to treat incoming
/// connections as a `Stream` of `UnixStream`s with the `incoming` method
/// below.
///
/// # Panics
///
/// This function will panic if it is called outside the context of a
/// future's task. It's recommended to only call this from the
/// implementation of a `Future::poll`, if necessary.
pub fn poll_accept(&self) -> Poll<(UnixStream, SocketAddr), io::Error> {
let (io, addr) = try_ready!(self.poll_accept_std());
let io = mio_uds::UnixStream::from_stream(io)?;
Ok((UnixStream::new(io), addr).into())
}
/// Attempt to accept a connection and create a new connected `UnixStream`
/// if successful.
///
/// This function is the same as `poll_accept` above except that it returns a
/// `mio_uds::UnixStream` instead of a `tokio_udp::UnixStream`. This in turn
/// can then allow for the stream to be associated with a different reactor
/// than the one this `UnixListener` is associated with.
///
/// This function will attempt an accept operation, but will not block
/// waiting for it to complete. If the operation would block then a "would
/// block" error is returned. Additionally, if this method would block, it
/// registers the current task to receive a notification when it would
/// otherwise not block.
///
/// Note that typically for simple usage it's easier to treat incoming
/// connections as a `Stream` of `UnixStream`s with the `incoming` method
/// below.
///
/// # Panics
///
/// This function will panic if it is called outside the context of a
/// future's task. It's recommended to only call this from the
/// implementation of a `Future::poll`, if necessary.
pub fn poll_accept_std(&self) -> Poll<(net::UnixStream, SocketAddr), io::Error> {
loop {
try_ready!(self.io.poll_read_ready(Ready::readable()));
match self.io.get_ref().accept_std() {
Ok(None) => {
self.io.clear_read_ready(Ready::readable())?;
return Ok(Async::NotReady);
}
Ok(Some((sock, addr))) => {
return Ok(Async::Ready((sock, addr)));
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(Ready::readable())?;
return Ok(Async::NotReady);
}
Err(err) => return Err(err),
}
}
}
/// Consumes this listener, returning a stream of the sockets this listener
/// accepts.
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
pub fn incoming(self) -> Incoming {
Incoming::new(self)
}
}
impl fmt::Debug for UnixListener {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl AsRawFd for UnixListener {
fn as_raw_fd(&self) -> RawFd {
self.io.get_ref().as_raw_fd()
}
}

View File

@ -0,0 +1,82 @@
use UnixDatagram;
use futures::{Async, Future, Poll};
use std::io;
use std::mem;
/// A future for receiving datagrams from a Unix datagram socket.
///
/// An example that uses UDP sockets but is still applicable can be found at
/// https://gist.github.com/dermesser/e331094c2ab28fc7f6ba8a16183fe4d5.
#[derive(Debug)]
pub struct RecvDgram<T> {
st: State<T>,
}
/// A future similar to RecvDgram, but without allocating and returning the peer's address.
///
/// This can be used if the peer's address is of no interest, so the allocation overhead can be
/// avoided.
#[derive(Debug)]
enum State<T> {
Receiving {
sock: UnixDatagram,
buf: T,
},
Empty,
}
impl<T> RecvDgram<T>
where
T: AsMut<[u8]>
{
pub(crate) fn new(sock: UnixDatagram, buf: T) -> RecvDgram<T> {
RecvDgram {
st: State::Receiving {
sock,
buf,
},
}
}
}
impl<T> Future for RecvDgram<T>
where
T: AsMut<[u8]>,
{
/// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were
/// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the
/// datagram is truncated.
type Item = (UnixDatagram, T, usize, String);
/// This future yields io::Error if an error occurred.
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let received;
let peer;
if let State::Receiving {
ref mut sock,
ref mut buf,
} = self.st
{
let (n, p) = try_ready!(sock.poll_recv_from(buf.as_mut()));
received = n;
peer = p.as_pathname().map_or(String::new(), |p| {
p.to_str().map_or(String::new(), |s| s.to_string())
});
} else {
panic!()
}
if let State::Receiving { sock, buf } =
mem::replace(&mut self.st, State::Empty)
{
Ok(Async::Ready((sock, buf, received, peer)))
} else {
panic!()
}
}
}

View File

@ -0,0 +1,81 @@
use UnixDatagram;
use futures::{Async, Future, Poll};
use std::io;
use std::mem;
use std::path::Path;
/// A future for writing a buffer to a Unix datagram socket.
#[derive(Debug)]
pub struct SendDgram<T, P> {
st: State<T, P>,
}
#[derive(Debug)]
enum State<T, P> {
/// current state is Sending
Sending {
/// the underlying socket
sock: UnixDatagram,
/// the buffer to send
buf: T,
/// the destination
addr: P,
},
/// neutral state
Empty,
}
impl<T, P> SendDgram<T, P>
where
T: AsRef<[u8]>,
P: AsRef<Path>,
{
pub(crate) fn new(sock: UnixDatagram, buf: T, addr: P) -> SendDgram<T, P> {
SendDgram {
st: State::Sending {
sock,
buf,
addr,
}
}
}
}
impl<T, P> Future for SendDgram<T, P>
where
T: AsRef<[u8]>,
P: AsRef<Path>,
{
/// Returns the underlying socket and the buffer that was sent.
type Item = (UnixDatagram, T);
/// The error that is returned when sending failed.
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let State::Sending {
ref mut sock,
ref buf,
ref addr,
} = self.st
{
let n = try_ready!(sock.poll_send_to(buf.as_ref(), addr));
if n < buf.as_ref().len() {
return Err(io::Error::new(
io::ErrorKind::Other,
"Couldn't send whole buffer".to_string(),
));
}
} else {
panic!()
}
if let State::Sending { sock, buf, addr: _ } =
mem::replace(&mut self.st, State::Empty)
{
Ok(Async::Ready((sock, buf)))
} else {
panic!()
}
}
}

356
tokio-uds/src/stream.rs Normal file
View File

@ -0,0 +1,356 @@
use ucred::{self, UCred};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::{Handle, PollEvented};
use bytes::{Buf, BufMut};
use futures::{Async, Future, Poll};
use iovec::{self, IoVec};
use libc;
use mio::Ready;
use mio_uds;
use std::fmt;
use std::io::{self, Read, Write};
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
/// A structure representing a connected unix socket.
///
/// This socket can be connected directly with `UnixStream::connect` or accepted
/// from a listener with `UnixListener::incoming`. Additionally, a pair of
/// anonymous Unix sockets can be created with `UnixStream::pair`.
pub struct UnixStream {
io: PollEvented<mio_uds::UnixStream>,
}
/// Future returned by `UnixStream::connect` which will resolve to a
/// `UnixStream` when the stream is connected.
#[derive(Debug)]
pub struct ConnectFuture {
inner: State,
}
#[derive(Debug)]
enum State {
Waiting(UnixStream),
Error(io::Error),
Empty,
}
impl UnixStream {
/// Connects to the socket named by `path`.
///
/// This function will create a new unix socket and connect to the path
/// specified, associating the returned stream with the default event loop's
/// handle.
pub fn connect<P>(path: P) -> ConnectFuture
where
P: AsRef<Path>,
{
let res = mio_uds::UnixStream::connect(path)
.map(UnixStream::new);
let inner = match res {
Ok(stream) => State::Waiting(stream),
Err(e) => State::Error(e),
};
ConnectFuture { inner }
}
/// Consumes a `UnixStream` in the standard library and returns a
/// nonblocking `UnixStream` from this crate.
///
/// The returned stream will be associated with the given event loop
/// specified by `handle` and is ready to perform I/O.
pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
let stream = mio_uds::UnixStream::from_stream(stream)?;
let io = PollEvented::new_with_handle(stream, handle)?;
Ok(UnixStream { io })
}
/// Creates an unnamed pair of connected sockets.
///
/// This function will create a pair of interconnected unix sockets for
/// communicating back and forth between one another. Each socket will be
/// associated with the event loop whose handle is also provided.
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
let (a, b) = try!(mio_uds::UnixStream::pair());
let a = UnixStream::new(a);
let b = UnixStream::new(b);
Ok((a, b))
}
pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
let io = PollEvented::new(stream);
UnixStream { io }
}
/// Test whether this socket is ready to be read or not.
pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
self.io.poll_read_ready(ready)
}
/// Test whether this socket is ready to be written to or not.
pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
self.io.poll_write_ready()
}
/// Returns the socket address of the local half of this connection.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
/// Returns the socket address of the remote half of this connection.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().peer_addr()
}
/// Returns effective credentials of the process which called `connect` or `socketpair`.
pub fn peer_cred(&self) -> io::Result<UCred> {
ucred::get_peer_cred(self)
}
/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.io.get_ref().take_error()
}
/// Shuts down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O calls on the
/// specified portions to immediately return with an appropriate value
/// (see the documentation of `Shutdown`).
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}
}
impl Read for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl Write for UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl AsyncRead for UnixStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&UnixStream>::read_buf(&mut &*self, buf)
}
}
impl AsyncWrite for UnixStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
<&UnixStream>::shutdown(&mut &*self)
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&UnixStream>::write_buf(&mut &*self, buf)
}
}
impl<'a> Read for &'a UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.io).flush()
}
}
impl<'a> AsyncRead for &'a UnixStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
return Ok(Async::NotReady);
}
unsafe {
let r = read_ready(buf, self.as_raw_fd());
if r == -1 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::WouldBlock {
self.io.clear_write_ready()?;
Ok(Async::NotReady)
} else {
Err(e)
}
} else {
let r = r as usize;
buf.advance_mut(r);
Ok(r.into())
}
}
}
}
impl<'a> AsyncWrite for &'a UnixStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
return Ok(Async::NotReady);
}
unsafe {
let r = write_ready(buf, self.as_raw_fd());
if r == -1 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::WouldBlock {
self.io.clear_write_ready()?;
Ok(Async::NotReady)
} else {
Err(e)
}
} else {
let r = r as usize;
buf.advance(r);
Ok(r.into())
}
}
}
}
impl fmt::Debug for UnixStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl AsRawFd for UnixStream {
fn as_raw_fd(&self) -> RawFd {
self.io.get_ref().as_raw_fd()
}
}
impl Future for ConnectFuture {
type Item = UnixStream;
type Error = io::Error;
fn poll(&mut self) -> Poll<UnixStream, io::Error> {
use std::mem;
match self.inner {
State::Waiting(ref mut stream) => {
if let Async::NotReady = stream.io.poll_write_ready()? {
return Ok(Async::NotReady)
}
if let Some(e) = try!(stream.io.get_ref().take_error()) {
return Err(e)
}
}
State::Error(_) => {
let e = match mem::replace(&mut self.inner, State::Empty) {
State::Error(e) => e,
_ => unreachable!(),
};
return Err(e)
},
State::Empty => panic!("can't poll stream twice"),
}
match mem::replace(&mut self.inner, State::Empty) {
State::Waiting(stream) => Ok(Async::Ready(stream)),
_ => unreachable!(),
}
}
}
unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
// The `IoVec` type can't have a 0-length size, so we create a bunch
// of dummy versions on the stack with 1 length which we'll quickly
// overwrite.
let b1: &mut [u8] = &mut [0];
let b2: &mut [u8] = &mut [0];
let b3: &mut [u8] = &mut [0];
let b4: &mut [u8] = &mut [0];
let b5: &mut [u8] = &mut [0];
let b6: &mut [u8] = &mut [0];
let b7: &mut [u8] = &mut [0];
let b8: &mut [u8] = &mut [0];
let b9: &mut [u8] = &mut [0];
let b10: &mut [u8] = &mut [0];
let b11: &mut [u8] = &mut [0];
let b12: &mut [u8] = &mut [0];
let b13: &mut [u8] = &mut [0];
let b14: &mut [u8] = &mut [0];
let b15: &mut [u8] = &mut [0];
let b16: &mut [u8] = &mut [0];
let mut bufs: [&mut IoVec; 16] = [
b1.into(),
b2.into(),
b3.into(),
b4.into(),
b5.into(),
b6.into(),
b7.into(),
b8.into(),
b9.into(),
b10.into(),
b11.into(),
b12.into(),
b13.into(),
b14.into(),
b15.into(),
b16.into(),
];
let n = buf.bytes_vec_mut(&mut bufs);
read_ready_vecs(&mut bufs[..n], raw_fd)
}
unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
let iovecs = iovec::unix::as_os_slice_mut(bufs);
libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
}
unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
// The `IoVec` type can't have a zero-length size, so create a dummy
// version from a 1-length slice which we'll overwrite with the
// `bytes_vec` method.
static DUMMY: &[u8] = &[0];
let iovec = <&IoVec>::from(DUMMY);
let mut bufs = [
iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
iovec, iovec, iovec,
];
let n = buf.bytes_vec(&mut bufs);
write_ready_vecs(&bufs[..n], raw_fd)
}
unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
let iovecs = iovec::unix::as_os_slice(bufs);
libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
}

109
tokio-uds/src/ucred.rs Normal file
View File

@ -0,0 +1,109 @@
use libc::{gid_t, uid_t};
/// Credentials of a process
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct UCred {
/// UID (user ID) of the process
pub uid: uid_t,
/// GID (group ID) of the process
pub gid: gid_t,
}
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use self::impl_linux::get_peer_cred;
#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "openbsd"))]
pub use self::impl_macos::get_peer_cred;
#[cfg(any(target_os = "linux", target_os = "android"))]
pub mod impl_linux {
use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED};
use std::{io, mem};
use UnixStream;
use std::os::unix::io::AsRawFd;
use libc::ucred;
pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
unsafe {
let raw_fd = sock.as_raw_fd();
let mut ucred = ucred {
pid: 0,
uid: 0,
gid: 0,
};
let ucred_size = mem::size_of::<ucred>();
// These paranoid checks should be optimized-out
assert!(mem::size_of::<u32>() <= mem::size_of::<usize>());
assert!(ucred_size <= u32::max_value() as usize);
let mut ucred_size = ucred_size as socklen_t;
let ret = getsockopt(
raw_fd,
SOL_SOCKET,
SO_PEERCRED,
&mut ucred as *mut ucred as *mut c_void,
&mut ucred_size,
);
if ret == 0 && ucred_size as usize == mem::size_of::<ucred>() {
Ok(super::UCred {
uid: ucred.uid,
gid: ucred.gid,
})
} else {
Err(io::Error::last_os_error())
}
}
}
}
#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "openbsd"))]
pub mod impl_macos {
use libc::getpeereid;
use std::{io, mem};
use UnixStream;
use std::os::unix::io::AsRawFd;
pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
unsafe {
let raw_fd = sock.as_raw_fd();
let mut cred: super::UCred = mem::uninitialized();
let ret = getpeereid(raw_fd, &mut cred.uid, &mut cred.gid);
if ret == 0 {
Ok(cred)
} else {
Err(io::Error::last_os_error())
}
}
}
}
// Note that SO_PEERCRED is not supported on DragonFly (yet). So do not run tests.
#[cfg(not(target_os = "dragonfly"))]
#[cfg(test)]
mod test {
use UnixStream;
use libc::geteuid;
use libc::getegid;
#[test]
fn test_socket_pair() {
let (a, b) = UnixStream::pair().unwrap();
let cred_a = a.peer_cred().unwrap();
let cred_b = b.peer_cred().unwrap();
assert_eq!(cred_a, cred_b);
let uid = unsafe { geteuid() };
let gid = unsafe { getegid() };
assert_eq!(cred_a.uid, uid);
assert_eq!(cred_a.gid, gid);
}
}

55
tokio-uds/tests/stream.rs Normal file
View File

@ -0,0 +1,55 @@
#![cfg(unix)]
extern crate futures;
extern crate tokio;
extern crate tokio_uds;
extern crate tempdir;
use tokio_uds::*;
use tokio::io;
use tokio::runtime::current_thread::Runtime;
use futures::{Future, Stream};
use futures::sync::oneshot;
use tempdir::TempDir;
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}
#[test]
fn echo() {
let dir = TempDir::new("tokio-uds-tests").unwrap();
let sock_path = dir.path().join("connect.sock");
let mut rt = Runtime::new().unwrap();
let server = t!(UnixListener::bind(&sock_path));
let (tx, rx) = oneshot::channel();
rt.spawn({
server.incoming()
.into_future()
.and_then(move |(sock, _)| {
tx.send(sock.unwrap()).unwrap();
Ok(())
})
.map_err(|e| panic!("err={:?}", e))
});
let client = rt.block_on(UnixStream::connect(&sock_path)).unwrap();
let server = rt.block_on(rx).unwrap();
// Write to the client
rt.block_on(io::write_all(client, b"hello")).unwrap();
// Read from the server
let (_, buf) = rt.block_on(io::read_to_end(server, vec![])).unwrap();
assert_eq!(buf, b"hello");
}