Update tokio-udp to use std-future (#1199)

This commit is contained in:
Yin Guanhao 2019-06-27 02:41:36 +08:00 committed by Lucio Franco
parent 0784dc2767
commit 6316aa1d0b
16 changed files with 347 additions and 605 deletions

View File

@ -20,6 +20,6 @@ members = [
# "tokio-tls",
# "tokio-trace",
# "tokio-trace/tokio-trace-core",
# "tokio-udp",
"tokio-udp",
# "tokio-uds",
]

View File

@ -33,7 +33,7 @@ jobs:
# - tokio-signal
# - tokio-tcp
# - tokio-tls
# - tokio-udp
- tokio-udp
# - tokio-uds
# Test crates that are NOT platform specific

View File

@ -79,7 +79,7 @@ pub fn test(_attr: TokenStream, item: TokenStream) -> TokenStream {
#(#attrs)*
fn #name() #ret {
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
rt.block_on_async(async { #body })
rt.block_on(async { #body })
}
};

View File

@ -22,13 +22,13 @@ categories = ["asynchronous"]
publish = false
[dependencies]
tokio-codec = { version = "0.2.0", path = "../tokio-codec" }
tokio-io = { version = "0.2.0", path = "../tokio-io" }
# tokio-codec = { version = "0.2.0", path = "../tokio-codec" }
# tokio-io = { version = "0.2.0", path = "../tokio-io" }
tokio-reactor = { version = "0.2.0", path = "../tokio-reactor" }
bytes = "0.4"
# bytes = "0.4"
mio = "0.6.14"
log = "0.4"
futures = "0.1.19"
[dev-dependencies]
env_logger = { version = "0.5", default-features = false }
tokio = { version = "0.2.0", path = "../tokio", default-features = false, features = ["rt-full"] }

View File

@ -10,23 +10,28 @@
//!
//! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket.
//! Reading and writing to it can be done using futures, which return the
//! [`RecvDgram`] and [`SendDgram`] structs respectively.
//!
//! For convenience it's also possible to convert raw datagrams into higher-level
//! frames.
//!
//! [`UdpSocket`]: struct.UdpSocket.html
//! [`RecvDgram`]: struct.RecvDgram.html
//! [`SendDgram`]: struct.SendDgram.html
//! [`UdpFramed`]: struct.UdpFramed.html
//! [`framed`]: struct.UdpSocket.html#method.framed
//! [`Recv`], [`Send`], [`RecvFrom`] and [`SendTo`] structs respectively.
mod frame;
mod recv_dgram;
mod send_dgram;
macro_rules! ready {
($e:expr) => {
match $e {
::std::task::Poll::Ready(t) => t,
::std::task::Poll::Pending => return ::std::task::Poll::Pending,
}
};
}
// mod frame;
mod recv;
mod recv_from;
mod send;
mod send_to;
mod socket;
pub use self::frame::UdpFramed;
pub use self::recv_dgram::RecvDgram;
pub use self::send_dgram::SendDgram;
// pub use self::frame::UdpFramed;
pub use self::recv::Recv;
pub use self::recv_from::RecvFrom;
pub use self::send::Send;
pub use self::send_to::SendTo;
pub use self::socket::UdpSocket;

30
tokio-udp/src/recv.rs Normal file
View File

@ -0,0 +1,30 @@
use super::UdpSocket;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A future that receives a datagram from the connected address.
///
/// This `struct` is created by [`recv`](super::UdpSocket::recv).
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Recv<'a, 'b> {
socket: &'a mut UdpSocket,
buf: &'b mut [u8],
}
impl<'a, 'b> Recv<'a, 'b> {
pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b mut [u8]) -> Self {
Self { socket, buf }
}
}
impl<'a, 'b> Future for Recv<'a, 'b> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Recv { socket, buf } = self.get_mut();
Pin::new(&mut **socket).poll_recv(cx, buf)
}
}

View File

@ -1,103 +0,0 @@
use super::socket::UdpSocket;
use futures::{try_ready, Async, Future, Poll};
use std::io;
use std::net::SocketAddr;
/// A future used to receive a datagram from a UDP socket.
///
/// This is created by the `UdpSocket::recv_dgram` method.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct RecvDgram<T> {
/// None means future was completed
state: Option<RecvDgramInner<T>>,
}
/// A struct is used to represent the full info of RecvDgram.
#[derive(Debug)]
struct RecvDgramInner<T> {
/// Rx socket
socket: UdpSocket,
/// The received data will be put in the buffer
buffer: T,
}
/// Components of a `RecvDgram` future, returned from `into_parts`.
#[derive(Debug)]
pub struct Parts<T> {
/// The socket
pub socket: UdpSocket,
/// The buffer
pub buffer: T,
_priv: (),
}
impl<T> RecvDgram<T> {
/// Create a new future to receive UDP Datagram
pub(crate) fn new(socket: UdpSocket, buffer: T) -> RecvDgram<T> {
let inner = RecvDgramInner {
socket: socket,
buffer: buffer,
};
RecvDgram { state: Some(inner) }
}
/// Consume the `RecvDgram`, returning the socket and buffer.
///
/// # Panics
///
/// If called after the future has completed.
///
/// # Examples
///
/// ```
/// use tokio_udp::UdpSocket;
///
/// let socket = UdpSocket::bind(&([127, 0, 0, 1], 0).into()).unwrap();
/// let mut buffer = vec![0; 4096];
///
/// let future = socket.recv_dgram(buffer);
///
/// // ... polling `future` ... giving up (e.g. after timeout)
///
/// let parts = future.into_parts();
///
/// let socket = parts.socket; // extract the socket
/// let buffer = parts.buffer; // extract the buffer
/// ```
pub fn into_parts(mut self) -> Parts<T> {
let state = self
.state
.take()
.expect("into_parts called after completion");
Parts {
socket: state.socket,
buffer: state.buffer,
_priv: (),
}
}
}
impl<T> Future for RecvDgram<T>
where
T: AsMut<[u8]>,
{
type Item = (UdpSocket, T, usize, SocketAddr);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
let (n, addr) = {
let ref mut inner = self
.state
.as_mut()
.expect("RecvDgram polled after completion");
try_ready!(inner.socket.poll_recv_from(inner.buffer.as_mut()))
};
let inner = self.state.take().unwrap();
Ok(Async::Ready((inner.socket, inner.buffer, n, addr)))
}
}

View File

@ -0,0 +1,31 @@
use super::UdpSocket;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A future that receives a datagram.
///
/// This `struct` is created by [`recv_from`](super::UdpSocket::recv_from).
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct RecvFrom<'a, 'b> {
socket: &'a mut UdpSocket,
buf: &'b mut [u8],
}
impl<'a, 'b> RecvFrom<'a, 'b> {
pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b mut [u8]) -> Self {
Self { socket, buf }
}
}
impl<'a, 'b> Future for RecvFrom<'a, 'b> {
type Output = io::Result<(usize, SocketAddr)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let RecvFrom { socket, buf } = self.get_mut();
Pin::new(&mut **socket).poll_recv_from(cx, buf)
}
}

30
tokio-udp/src/send.rs Normal file
View File

@ -0,0 +1,30 @@
use super::UdpSocket;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A future that sends a datagram to the connected address.
///
/// This `struct` is created by [`send`](super::UdpSocket::send).
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Send<'a, 'b> {
socket: &'a mut UdpSocket,
buf: &'b [u8],
}
impl<'a, 'b> Send<'a, 'b> {
pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b [u8]) -> Self {
Self { socket, buf }
}
}
impl<'a, 'b> Future for Send<'a, 'b> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Send { socket, buf } = self.get_mut();
Pin::new(&mut **socket).poll_send(cx, buf)
}
}

View File

@ -1,70 +0,0 @@
use super::socket::UdpSocket;
use futures::{try_ready, Async, Future, Poll};
use std::io;
use std::net::SocketAddr;
/// A future used to write the entire contents of some data to a UDP socket.
///
/// This is created by the `UdpSocket::send_dgram` method.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct SendDgram<T> {
/// None means future was completed
state: Option<SendDgramInner<T>>,
}
/// A struct is used to represent the full info of SendDgram.
#[derive(Debug)]
struct SendDgramInner<T> {
/// Tx socket
socket: UdpSocket,
/// The whole buffer will be sent
buffer: T,
/// Destination addr
addr: SocketAddr,
}
impl<T> SendDgram<T> {
/// Create a new future to send UDP Datagram
pub(crate) fn new(socket: UdpSocket, buffer: T, addr: SocketAddr) -> SendDgram<T> {
let inner = SendDgramInner {
socket: socket,
buffer: buffer,
addr: addr,
};
SendDgram { state: Some(inner) }
}
}
fn incomplete_write(reason: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, reason)
}
impl<T> Future for SendDgram<T>
where
T: AsRef<[u8]>,
{
type Item = (UdpSocket, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> {
{
let ref mut inner = self
.state
.as_mut()
.expect("SendDgram polled after completion");
let n = try_ready!(inner
.socket
.poll_send_to(inner.buffer.as_ref(), &inner.addr));
if n != inner.buffer.as_ref().len() {
return Err(incomplete_write(
"failed to send entire message \
in datagram",
));
}
}
let inner = self.state.take().unwrap();
Ok(Async::Ready((inner.socket, inner.buffer)))
}
}

40
tokio-udp/src/send_to.rs Normal file
View File

@ -0,0 +1,40 @@
use super::UdpSocket;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A future that sends a datagram to a given address.
///
/// This `struct` is created by [`send_to`](super::UdpSocket::send_to).
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct SendTo<'a, 'b> {
socket: &'a mut UdpSocket,
buf: &'b [u8],
target: &'b SocketAddr,
}
impl<'a, 'b> SendTo<'a, 'b> {
pub(super) fn new(socket: &'a mut UdpSocket, buf: &'b [u8], target: &'b SocketAddr) -> Self {
Self {
socket,
buf,
target,
}
}
}
impl<'a, 'b> Future for SendTo<'a, 'b> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let SendTo {
socket,
buf,
target,
} = self.get_mut();
Pin::new(&mut **socket).poll_send_to(cx, buf, target)
}
}

View File

@ -1,10 +1,11 @@
use super::{RecvDgram, SendDgram};
use futures::{try_ready, Async, Poll};
use super::{Recv, RecvFrom, Send, SendTo};
use mio;
use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_reactor::{Handle, PollEvented};
/// An I/O object representing a UDP socket.
@ -53,13 +54,15 @@ impl UdpSocket {
self.io.get_ref().connect(*addr)
}
#[deprecated(since = "0.1.2", note = "use poll_send instead")]
#[doc(hidden)]
pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.poll_send(buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
/// Returns a future that sends data on the socket to the remote address to which it is connected.
/// On success, the future will resolve to the number of bytes written.
///
/// The [`connect`] method will connect this socket to a remote address. The future
/// will resolve to an error if the socket is not connected.
///
/// [`connect`]: #method.connect
pub fn send<'a, 'b>(&'a mut self, buf: &'b [u8]) -> Send<'a, 'b> {
Send::new(self, buf)
}
/// Sends data on the socket to the remote address to which it is connected.
@ -71,35 +74,41 @@ impl UdpSocket {
///
/// # Return
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
///
/// If the socket is not ready for writing, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// `Poll::Pending` and arranges for the current task to receive a
/// notification when the socket becomes writable.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_send(&mut self, buf: &[u8]) -> Poll<usize, io::Error> {
try_ready!(self.io.poll_write_ready());
pub fn poll_send(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send(buf) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready()?;
Ok(Async::NotReady)
self.io.clear_write_ready(cx)?;
Poll::Pending
}
Err(e) => Err(e),
x => Poll::Ready(x),
}
}
#[deprecated(since = "0.1.2", note = "use poll_recv instead")]
#[doc(hidden)]
pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.poll_recv(buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
/// Returns a future that receives a single datagram message on the socket from
/// the remote address to which it is connected. On success, the future will resolve
/// to the number of bytes read.
///
/// The function must be called with valid byte array `buf` of sufficient size to
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
/// excess bytes may be discarded.
///
/// The [`connect`] method will connect this socket to a remote address. The future
/// will fail if the socket is not connected.
///
/// [`connect`]: #method.connect
pub fn recv<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> Recv<'a, 'b> {
Recv::new(self, buf)
}
/// Receives a single datagram message on the socket from the remote address to
@ -116,35 +125,34 @@ impl UdpSocket {
///
/// # Return
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
/// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
///
/// If no data is available for reading, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// `Poll::Pending` and arranges for the current task to receive a
/// notification when the socket becomes receivable or is closed.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
pub fn poll_recv(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv(buf) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady)
self.io.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
Err(e) => Err(e),
x => Poll::Ready(x),
}
}
#[deprecated(since = "0.1.2", note = "use poll_send_to instead")]
#[doc(hidden)]
pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
match self.poll_send_to(buf, target)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
/// Returns a future that sends data on the socket to the given address.
/// On success, the future will resolve to the number of bytes written.
///
/// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`.
pub fn send_to<'a, 'b>(&'a mut self, buf: &'b [u8], target: &'b SocketAddr) -> SendTo<'a, 'b> {
SendTo::new(self, buf, target)
}
/// Sends data on the socket to the given address. On success, returns the
@ -155,133 +163,90 @@ impl UdpSocket {
///
/// # Return
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
///
/// If the socket is not ready for writing, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// `Poll::Pending` and arranges for the current task to receive a
/// notification when the socket becomes writable.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll<usize, io::Error> {
try_ready!(self.io.poll_write_ready());
pub fn poll_send_to(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
target: &SocketAddr,
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send_to(buf, target) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready()?;
Ok(Async::NotReady)
self.io.clear_write_ready(cx)?;
Poll::Pending
}
Err(e) => Err(e),
x => Poll::Ready(x),
}
}
/// Creates a future that will write the entire contents of the buffer
/// `buf` provided as a datagram to this socket.
/// Returns a future that receives a single datagram on the socket. On success,
/// the future resolves to the number of bytes read and the origin.
///
/// The returned future will return after data has been written to the
/// outbound socket. The future will resolve to the stream as well as the
/// buffer (for reuse if needed).
///
/// Any error which happens during writing will cause both the stream and
/// the buffer to get destroyed. Note that failure to write the entire
/// buffer is considered an error for the purposes of sending a datagram.
///
/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which
/// should be broadly applicable to accepting data which can be converted
/// to a slice.
pub fn send_dgram<T>(self, buf: T, addr: &SocketAddr) -> SendDgram<T>
where
T: AsRef<[u8]>,
{
SendDgram::new(self, buf, *addr)
}
#[deprecated(since = "0.1.2", note = "use poll_recv_from instead")]
#[doc(hidden)]
pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
match self.poll_recv_from(buf)? {
Async::Ready(ret) => Ok(ret),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
/// The function must be called with valid byte array `buf` of sufficient size
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
pub fn recv_from<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> RecvFrom<'a, 'b> {
RecvFrom::new(self, buf)
}
/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
pub fn poll_recv_from(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<(usize, SocketAddr), io::Error>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv_from(buf) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady)
self.io.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
Err(e) => Err(e),
x => Poll::Ready(x),
}
}
/// Creates a future that receive a datagram to be written to the buffer
/// provided.
///
/// The returned future will return after a datagram has been received on
/// this socket. The future will resolve to the socket, the buffer, the
/// amount of data read, and the address the data was received from.
///
/// An error during reading will cause the socket and buffer to get
/// destroyed.
///
/// The `buf` parameter here only requires the `AsMut<[u8]>` trait, which
/// should be broadly applicable to accepting data which can be converted
/// to a slice.
pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T>
where
T: AsMut<[u8]>,
{
RecvDgram::new(self, buf)
}
/// Check the UDP socket's read readiness state.
///
/// The mask argument allows specifying what readiness to notify on. This
/// can be any value, including platform specific readiness, **except**
/// `writable`.
///
/// If the socket is not ready for receiving then `Async::NotReady` is
/// If the socket is not ready for receiving then `Poll::Pending` is
/// returned and the current task is notified once a new event is received.
///
/// The socket will remain in a read-ready state until calls to `poll_recv`
/// return `NotReady`.
/// return `Poll::Pending`.
///
/// # Panics
///
/// This function panics if:
///
/// * `ready` includes writable.
/// * called from outside of a task context.
pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
self.io.poll_read_ready(mask)
pub fn poll_read_ready(
&self,
cx: &mut Context<'_>,
mask: mio::Ready,
) -> Poll<Result<mio::Ready, io::Error>> {
self.io.poll_read_ready(cx, mask)
}
/// Check the UDP socket's write readiness state.
///
/// If the socket is not ready for sending then `Async::NotReady` is
/// If the socket is not ready for sending then `Poll::Pending` is
/// returned and the current task is notified once a new event is received.
///
/// The I/O resource will remain in a write-ready state until calls to
/// `poll_send` return `NotReady`.
///
/// # Panics
///
/// This function panics if called from outside of a task context.
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.io.poll_write_ready()
/// `poll_send` return `Poll::Pending`.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Result<mio::Ready, io::Error>> {
self.io.poll_write_ready(cx)
}
/// Gets the value of the `SO_BROADCAST` option for this socket.

View File

@ -1,288 +1,105 @@
#![feature(async_await)]
#![deny(warnings, rust_2018_idioms)]
use bytes::{BufMut, BytesMut};
use env_logger;
use futures::{Future, Poll, Sink, Stream};
use std::io;
use std::net::SocketAddr;
use tokio_codec::{Decoder, Encoder};
use tokio_io::try_nb;
use tokio_udp::{UdpFramed, UdpSocket};
use tokio_udp::UdpSocket;
macro_rules! t {
($e:expr) => {
match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
}
};
#[tokio::test]
async fn send_recv() -> std::io::Result<()> {
let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
sender.connect(&receiver.local_addr()?)?;
receiver.connect(&sender.local_addr()?)?;
let message = b"hello!";
sender.send(message).await?;
let mut recv_buf = [0u8; 32];
let len = receiver.recv(&mut recv_buf[..]).await?;
assert_eq!(&recv_buf[..len], message);
Ok(())
}
fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into()));
let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into()));
let a_addr = t!(a.local_addr());
let b_addr = t!(b.local_addr());
#[tokio::test]
async fn send_to_recv_from() -> std::io::Result<()> {
let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
{
let send = SendMessage::new(a, send.clone(), b_addr, b"1234");
let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234");
let (sendt, received) = t!(send.join(recv).wait());
a = sendt;
b = received;
}
let message = b"hello!";
let receiver_addr = receiver.local_addr()?;
sender.send_to(message, &receiver_addr).await?;
{
let send = SendMessage::new(a, send, b_addr, b"");
let recv = RecvMessage::new(b, recv, a_addr, b"");
t!(send.join(recv).wait());
}
let mut recv_buf = [0u8; 32];
let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
assert_eq!(&recv_buf[..len], message);
assert_eq!(addr, sender.local_addr()?);
Ok(())
}
#[test]
fn send_to_and_recv_from() {
send_messages(SendTo {}, RecvFrom {});
}
// pub struct ByteCodec;
#[test]
fn send_and_recv() {
send_messages(Send {}, Recv {});
}
// impl Decoder for ByteCodec {
// type Item = Vec<u8>;
// type Error = io::Error;
trait SendFn {
fn send(&self, _: &mut UdpSocket, _: &[u8], _: &SocketAddr) -> Result<usize, io::Error>;
}
// fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
// let len = buf.len();
// Ok(Some(buf.split_to(len).to_vec()))
// }
// }
#[derive(Debug, Clone)]
struct SendTo {}
// impl Encoder for ByteCodec {
// type Item = Vec<u8>;
// type Error = io::Error;
impl SendFn for SendTo {
fn send(
&self,
socket: &mut UdpSocket,
buf: &[u8],
addr: &SocketAddr,
) -> Result<usize, io::Error> {
#[allow(deprecated)]
socket.send_to(buf, addr)
}
}
// fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
// buf.reserve(data.len());
// buf.put(data);
// Ok(())
// }
// }
#[derive(Debug, Clone)]
struct Send {}
// #[test]
// fn send_framed() {
// drop(env_logger::try_init());
impl SendFn for Send {
fn send(
&self,
socket: &mut UdpSocket,
buf: &[u8],
addr: &SocketAddr,
) -> Result<usize, io::Error> {
socket.connect(addr).expect("could not connect");
#[allow(deprecated)]
socket.send(buf)
}
}
// let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
// let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
// let a_addr = t!(a_soc.local_addr());
// let b_addr = t!(b_soc.local_addr());
struct SendMessage<S> {
socket: Option<UdpSocket>,
send: S,
addr: SocketAddr,
data: &'static [u8],
}
// {
// let a = UdpFramed::new(a_soc, ByteCodec);
// let b = UdpFramed::new(b_soc, ByteCodec);
impl<S: SendFn> SendMessage<S> {
fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S> {
SendMessage {
socket: Some(socket),
send: send,
addr: addr,
data: data,
}
}
}
// let msg = b"4567".to_vec();
impl<S: SendFn> Future for SendMessage<S> {
type Item = UdpSocket;
type Error = io::Error;
// let send = a.send((msg.clone(), b_addr));
// let recv = b.into_future().map_err(|e| e.0);
// let (sendt, received) = t!(send.join(recv).wait());
fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
let n = try_nb!(self
.send
.send(self.socket.as_mut().unwrap(), &self.data[..], &self.addr));
// let (data, addr) = received.0.unwrap();
// assert_eq!(msg, data);
// assert_eq!(a_addr, addr);
assert_eq!(n, self.data.len());
// a_soc = sendt.into_inner();
// b_soc = received.1.into_inner();
// }
Ok(self.socket.take().unwrap().into())
}
}
// {
// let a = UdpFramed::new(a_soc, ByteCodec);
// let b = UdpFramed::new(b_soc, ByteCodec);
trait RecvFn {
fn recv(&self, _: &mut UdpSocket, _: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error>;
}
// let msg = b"".to_vec();
#[derive(Debug, Clone)]
struct RecvFrom {}
// let send = a.send((msg.clone(), b_addr));
// let recv = b.into_future().map_err(|e| e.0);
// let received = t!(send.join(recv).wait()).1;
impl RecvFn for RecvFrom {
fn recv(
&self,
socket: &mut UdpSocket,
buf: &mut [u8],
expected_addr: &SocketAddr,
) -> Result<usize, io::Error> {
#[allow(deprecated)]
socket.recv_from(buf).map(|(s, addr)| {
assert_eq!(addr, *expected_addr);
s
})
}
}
#[derive(Debug, Clone)]
struct Recv {}
impl RecvFn for Recv {
fn recv(
&self,
socket: &mut UdpSocket,
buf: &mut [u8],
_: &SocketAddr,
) -> Result<usize, io::Error> {
#[allow(deprecated)]
socket.recv(buf)
}
}
struct RecvMessage<R> {
socket: Option<UdpSocket>,
recv: R,
expected_addr: SocketAddr,
expected_data: &'static [u8],
}
impl<R: RecvFn> RecvMessage<R> {
fn new(
socket: UdpSocket,
recv: R,
expected_addr: SocketAddr,
expected_data: &'static [u8],
) -> RecvMessage<R> {
RecvMessage {
socket: Some(socket),
recv: recv,
expected_addr: expected_addr,
expected_data: expected_data,
}
}
}
impl<R: RecvFn> Future for RecvMessage<R> {
type Item = UdpSocket;
type Error = io::Error;
fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
let mut buf = vec![0u8; 10 + self.expected_data.len() * 10];
let n = try_nb!(self.recv.recv(
&mut self.socket.as_mut().unwrap(),
&mut buf[..],
&self.expected_addr
));
assert_eq!(n, self.expected_data.len());
assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]);
Ok(self.socket.take().unwrap().into())
}
}
#[test]
fn send_dgrams() {
let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
let mut buf = [0u8; 50];
let b_addr = t!(b.local_addr());
{
let send = a.send_dgram(&b"4321"[..], &b_addr);
let recv = b.recv_dgram(&mut buf[..]);
let (sendt, received) = t!(send.join(recv).wait());
assert_eq!(received.2, 4);
assert_eq!(&received.1[..4], b"4321");
a = sendt.0;
b = received.0;
}
{
let send = a.send_dgram(&b""[..], &b_addr);
let recv = b.recv_dgram(&mut buf[..]);
let received = t!(send.join(recv).wait()).1;
assert_eq!(received.2, 0);
}
}
pub struct ByteCodec;
impl Decoder for ByteCodec {
type Item = Vec<u8>;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
let len = buf.len();
Ok(Some(buf.split_to(len).to_vec()))
}
}
impl Encoder for ByteCodec {
type Item = Vec<u8>;
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
buf.reserve(data.len());
buf.put(data);
Ok(())
}
}
#[test]
fn send_framed() {
drop(env_logger::try_init());
let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
let a_addr = t!(a_soc.local_addr());
let b_addr = t!(b_soc.local_addr());
{
let a = UdpFramed::new(a_soc, ByteCodec);
let b = UdpFramed::new(b_soc, ByteCodec);
let msg = b"4567".to_vec();
let send = a.send((msg.clone(), b_addr));
let recv = b.into_future().map_err(|e| e.0);
let (sendt, received) = t!(send.join(recv).wait());
let (data, addr) = received.0.unwrap();
assert_eq!(msg, data);
assert_eq!(a_addr, addr);
a_soc = sendt.into_inner();
b_soc = received.1.into_inner();
}
{
let a = UdpFramed::new(a_soc, ByteCodec);
let b = UdpFramed::new(b_soc, ByteCodec);
let msg = b"".to_vec();
let send = a.send((msg.clone(), b_addr));
let recv = b.into_future().map_err(|e| e.0);
let received = t!(send.join(recv).wait()).1;
let (data, addr) = received.0.unwrap();
assert_eq!(msg, data);
assert_eq!(a_addr, addr);
}
}
// let (data, addr) = received.0.unwrap();
// assert_eq!(msg, data);
// assert_eq!(a_addr, addr);
// }
// }

View File

@ -34,7 +34,7 @@ default = [
"sync",
"tcp",
# "timer",
# "udp",
"udp",
# "uds",
]
@ -55,7 +55,7 @@ rt-full = [
sync = ["tokio-sync"]
tcp = ["tokio-tcp"]
#timer = ["tokio-timer"]
#udp = ["tokio-udp"]
udp = ["tokio-udp"]
#uds = ["tokio-uds"]
[dependencies]
@ -75,7 +75,7 @@ tokio-reactor = { version = "0.2.0", optional = true, path = "../tokio-reactor"
tokio-sync = { version = "0.2.0", optional = true, path = "../tokio-sync" }
#tokio-threadpool = { version = "0.2.0", optional = true, path = "../tokio-threadpool" }
tokio-tcp = { version = "0.2.0", optional = true, path = "../tokio-tcp" }
#tokio-udp = { version = "0.2.0", optional = true, path = "../tokio-udp" }
tokio-udp = { version = "0.2.0", optional = true, path = "../tokio-udp" }
#tokio-timer = { version = "0.3.0", optional = true, path = "../tokio-timer" }
#tokio-trace-core = { version = "0.2", optional = true }

View File

@ -102,4 +102,5 @@ if_runtime! {
#[cfg(not(test))] // Work around for rust-lang/rust#62127
pub use tokio_macros::main;
pub use tokio_macros::test;
}

View File

@ -6,7 +6,7 @@
//! # Organization
//!
//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP
//! * [`UdpSocket`] and [`UdpFramed`] provide functionality for communication over UDP
//! * [`UdpSocket`] provides functionality for communication over UDP
//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a
//! Unix Domain Stream Socket **(available on Unix only)**
//! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication
@ -16,7 +16,6 @@
//! [`TcpListener`]: struct.TcpListener.html
//! [`TcpStream`]: struct.TcpStream.html
//! [`UdpSocket`]: struct.UdpSocket.html
//! [`UdpFramed`]: struct.UdpFramed.html
//! [`UnixListener`]: struct.UnixListener.html
//! [`UnixStream`]: struct.UnixStream.html
//! [`UnixDatagram`]: struct.UnixDatagram.html
@ -52,20 +51,17 @@ pub mod udp {
//!
//! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket.
//! Reading and writing to it can be done using futures, which return the
//! [`RecvDgram`] and [`SendDgram`] structs respectively.
//!
//! For convenience it's also possible to convert raw datagrams into higher-level
//! frames.
//! [`Recv`], [`Send`], [`RecvFrom`], [`SendTo`] structs respectively.
//!
//! [`UdpSocket`]: struct.UdpSocket.html
//! [`RecvDgram`]: struct.RecvDgram.html
//! [`SendDgram`]: struct.SendDgram.html
//! [`UdpFramed`]: struct.UdpFramed.html
//! [`framed`]: struct.UdpSocket.html#method.framed
pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket};
//! [`Recv`]: struct.Recv.html
//! [`Send`]: struct.Send.html
//! [`RecvFrom`]: struct.RecvFrom.html
//! [`SendTo`]: struct.SendTo.html
pub use tokio_udp::{UdpSocket, Recv, Send, RecvFrom, SendTo};
}
#[cfg(feature = "udp")]
pub use self::udp::{UdpFramed, UdpSocket};
pub use self::udp::UdpSocket;
#[cfg(all(unix, feature = "uds"))]
pub mod unix {