mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
net: add UdpSocket readiness and non-blocking ops (#3138)
Adds `ready()`, `readable()`, and `writable()` async methods for waiting for socket readiness. Adds `try_send`, `try_send_to`, `try_recv`, and `try_recv_from` for performing non-blocking operations on the socket. This is the UDP equivalent of #3130.
This commit is contained in:
parent
d0ebb41547
commit
0ea2307650
@ -385,7 +385,7 @@ impl TcpStream {
|
||||
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
|
||||
/// number of bytes read. `Ok(n)` indicates the stream's read half is closed
|
||||
/// and will no longer yield data. If the stream is not ready to read data
|
||||
/// `Err(io::ErrorKinid::WouldBlock)` is returned.
|
||||
/// `Err(io::ErrorKind::WouldBlock)` is returned.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
@ -495,8 +495,7 @@ impl TcpStream {
|
||||
/// The function will attempt to write the entire contents of `buf`, but
|
||||
/// only part of the buffer may be written.
|
||||
///
|
||||
/// This function is equivalent to `ready(Interest::WRITABLE)` is usually
|
||||
/// paired with `try_write()`.
|
||||
/// This function is usually paired with `writable()`.
|
||||
///
|
||||
/// # Return
|
||||
///
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::io::{Interest, PollEvented, ReadBuf};
|
||||
use crate::io::{Interest, PollEvented, ReadBuf, Ready};
|
||||
use crate::net::{to_socket_addrs, ToSocketAddrs};
|
||||
|
||||
use std::convert::TryFrom;
|
||||
@ -262,13 +262,149 @@ impl UdpSocket {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Returns a future that sends data on the socket to the remote address to which it is connected.
|
||||
/// On success, the future will resolve to the number of bytes written.
|
||||
/// Wait for any of the requested ready states.
|
||||
///
|
||||
/// The [`connect`] method will connect this socket to a remote address. The future
|
||||
/// will resolve to an error if the socket is not connected.
|
||||
/// This function is usually paired with `try_recv()` or `try_send()`. It
|
||||
/// can be used to concurrently recv / send to the same socket on a single
|
||||
/// task without splitting the socket.
|
||||
///
|
||||
/// The function may complete without the socket being ready. This is a
|
||||
/// false-positive and attempting an operation will return with
|
||||
/// `io::ErrorKind::WouldBlock`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Concurrently receive from and send to the socket on the same task
|
||||
/// without splitting.
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::io::{self, Interest};
|
||||
/// use tokio::net::UdpSocket;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// loop {
|
||||
/// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
|
||||
///
|
||||
/// if ready.is_readable() {
|
||||
/// // The buffer is **not** included in the async task and will only exist
|
||||
/// // on the stack.
|
||||
/// let mut data = [0; 1024];
|
||||
/// match socket.try_recv(&mut data[..]) {
|
||||
/// Ok(n) => {
|
||||
/// println!("received {:?}", &data[..n]);
|
||||
/// }
|
||||
/// // False-positive, continue
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
|
||||
/// Err(e) => {
|
||||
/// return Err(e);
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// if ready.is_writable() {
|
||||
/// // Write some data
|
||||
/// match socket.try_send(b"hello world") {
|
||||
/// Ok(n) => {
|
||||
/// println!("sent {} bytes", n);
|
||||
/// }
|
||||
/// // False-positive, continue
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
|
||||
/// Err(e) => {
|
||||
/// return Err(e);
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
|
||||
let event = self.io.registration().readiness(interest).await?;
|
||||
Ok(event.ready)
|
||||
}
|
||||
|
||||
/// Wait for the socket to become writable.
|
||||
///
|
||||
/// This function is equivalent to `ready(Interest::WRITABLE)` and is
|
||||
/// usually paired with `try_send()` or `try_send_to()`.
|
||||
///
|
||||
/// The function may complete without the socket being writable. This is a
|
||||
/// false-positive and attempting a `try_send()` will return with
|
||||
/// `io::ErrorKind::WouldBlock`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// use std::io;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Bind socket
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// loop {
|
||||
/// // Wait for the socket to be writable
|
||||
/// socket.writable().await?;
|
||||
///
|
||||
/// // Try to send data, this may still fail with `WouldBlock`
|
||||
/// // if the readiness event is a false positive.
|
||||
/// match socket.try_send(b"hello world") {
|
||||
/// Ok(n) => {
|
||||
/// break;
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// return Err(e);
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn writable(&self) -> io::Result<()> {
|
||||
self.ready(Interest::WRITABLE).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends data on the socket to the remote address that the socket is
|
||||
/// connected to.
|
||||
///
|
||||
/// The [`connect`] method will connect this socket to a remote address.
|
||||
/// This method will fail if the socket is not connected.
|
||||
///
|
||||
/// [`connect`]: method@Self::connect
|
||||
///
|
||||
/// # Return
|
||||
///
|
||||
/// On success, the number of bytes sent is returned, otherwise, the
|
||||
/// encountered error is returned.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::io;
|
||||
/// use tokio::net::UdpSocket;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Bind socket
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// // Send a message
|
||||
/// socket.send(b"hello world").await?;
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.io
|
||||
.registration()
|
||||
@ -276,15 +412,15 @@ impl UdpSocket {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Attempts to send data on the socket to the remote address to which it was previously
|
||||
/// `connect`ed.
|
||||
/// Attempts to send data on the socket to the remote address to which it
|
||||
/// was previously `connect`ed.
|
||||
///
|
||||
/// The [`connect`] method will connect this socket to a remote address. The future
|
||||
/// will resolve to an error if the socket is not connected.
|
||||
/// The [`connect`] method will connect this socket to a remote address.
|
||||
/// This method will fail if the socket is not connected.
|
||||
///
|
||||
/// Note that on multiple calls to a `poll_*` method in the send direction, only the
|
||||
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
|
||||
/// receive a wakeup.
|
||||
/// Note that on multiple calls to a `poll_*` method in the send direction,
|
||||
/// only the `Waker` from the `Context` passed to the most recent call will
|
||||
/// be scheduled to receive a wakeup.
|
||||
///
|
||||
/// # Return value
|
||||
///
|
||||
@ -308,29 +444,140 @@ impl UdpSocket {
|
||||
/// Try to send data on the socket to the remote address to which it is
|
||||
/// connected.
|
||||
///
|
||||
/// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is
|
||||
/// returned. This function is usually paired with `writable()`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// If successfull, the number of bytes sent is returned. Users
|
||||
/// should ensure that when the remote cannot receive, the
|
||||
/// [`ErrorKind::WouldBlock`] is properly handled.
|
||||
/// If successful, `Ok(n)` is returned, where `n` is the number of bytes
|
||||
/// sent. If the socket is not ready to send data,
|
||||
/// `Err(ErrorKind::WouldBlock)` is returned.
|
||||
///
|
||||
/// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// use std::io;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Bind a UDP socket
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
///
|
||||
/// // Connect to a peer
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// loop {
|
||||
/// // Wait for the socket to be writable
|
||||
/// socket.writable().await?;
|
||||
///
|
||||
/// // Try to send data, this may still fail with `WouldBlock`
|
||||
/// // if the readiness event is a false positive.
|
||||
/// match socket.try_send(b"hello world") {
|
||||
/// Ok(n) => {
|
||||
/// break;
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// return Err(e);
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.io.send(buf)
|
||||
self.io
|
||||
.registration()
|
||||
.try_io(Interest::WRITABLE, || self.io.send(buf))
|
||||
}
|
||||
|
||||
/// Returns a future that receives a single datagram message on the socket from
|
||||
/// the remote address to which it is connected. On success, the future will resolve
|
||||
/// to the number of bytes read.
|
||||
/// Wait for the socket to become readable.
|
||||
///
|
||||
/// The function must be called with valid byte array `buf` of sufficient size to
|
||||
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
|
||||
/// excess bytes may be discarded.
|
||||
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
|
||||
/// paired with `try_recv()`.
|
||||
///
|
||||
/// The [`connect`] method will connect this socket to a remote address. The future
|
||||
/// will fail if the socket is not connected.
|
||||
/// The function may complete without the socket being readable. This is a
|
||||
/// false-positive and attempting a `try_recv()` will return with
|
||||
/// `io::ErrorKind::WouldBlock`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// use std::io;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Connect to a peer
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// loop {
|
||||
/// // Wait for the socket to be readable
|
||||
/// socket.readable().await?;
|
||||
///
|
||||
/// // The buffer is **not** included in the async task and will
|
||||
/// // only exist on the stack.
|
||||
/// let mut buf = [0; 1024];
|
||||
///
|
||||
/// // Try to recv data, this may still fail with `WouldBlock`
|
||||
/// // if the readiness event is a false positive.
|
||||
/// match socket.try_recv(&mut buf) {
|
||||
/// Ok(n) => {
|
||||
/// println!("GOT {:?}", &buf[..n]);
|
||||
/// break;
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// return Err(e);
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn readable(&self) -> io::Result<()> {
|
||||
self.ready(Interest::READABLE).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Receives a single datagram message on the socket from the remote address
|
||||
/// to which it is connected. On success, returns the number of bytes read.
|
||||
///
|
||||
/// The function must be called with valid byte array `buf` of sufficient
|
||||
/// size to hold the message bytes. If a message is too long to fit in the
|
||||
/// supplied buffer, excess bytes may be discarded.
|
||||
///
|
||||
/// The [`connect`] method will connect this socket to a remote address.
|
||||
/// This method will fail if the socket is not connected.
|
||||
///
|
||||
/// [`connect`]: method@Self::connect
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// use std::io;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Bind socket
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// let mut buf = vec![0; 10];
|
||||
/// let n = socket.recv(&mut buf).await?;
|
||||
///
|
||||
/// println!("received {} bytes {:?}", n, &buf[..n]);
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.io
|
||||
.registration()
|
||||
@ -379,26 +626,91 @@ impl UdpSocket {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
/// Returns a future that sends data on the socket to the given address.
|
||||
/// On success, the future will resolve to the number of bytes written.
|
||||
/// Try to receive a single datagram message on the socket from the remote
|
||||
/// address to which it is connected. On success, returns the number of
|
||||
/// bytes read.
|
||||
///
|
||||
/// The future will resolve to an error if the IP version of the socket does
|
||||
/// not match that of `target`.
|
||||
/// The function must be called with valid byte array buf of sufficient size
|
||||
/// to hold the message bytes. If a message is too long to fit in the
|
||||
/// supplied buffer, excess bytes may be discarded.
|
||||
///
|
||||
/// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
|
||||
/// returned. This function is usually paired with `readable()`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// use std::io;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Connect to a peer
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// loop {
|
||||
/// // Wait for the socket to be readable
|
||||
/// socket.readable().await?;
|
||||
///
|
||||
/// // The buffer is **not** included in the async task and will
|
||||
/// // only exist on the stack.
|
||||
/// let mut buf = [0; 1024];
|
||||
///
|
||||
/// // Try to recv data, this may still fail with `WouldBlock`
|
||||
/// // if the readiness event is a false positive.
|
||||
/// match socket.try_recv(&mut buf) {
|
||||
/// Ok(n) => {
|
||||
/// println!("GOT {:?}", &buf[..n]);
|
||||
/// break;
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// return Err(e);
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.io
|
||||
.registration()
|
||||
.try_io(Interest::READABLE, || self.io.recv(buf))
|
||||
}
|
||||
|
||||
/// Sends data on the socket to the given address. On success, returns the
|
||||
/// number of bytes written.
|
||||
///
|
||||
/// Address type can be any implementor of [`ToSocketAddrs`] trait. See its
|
||||
/// documentation for concrete examples.
|
||||
///
|
||||
/// It is possible for `addr` to yield multiple addresses, but `send_to`
|
||||
/// will only send data to the first address yielded by `addr`.
|
||||
///
|
||||
/// This will return an error when the IP version of the local socket does
|
||||
/// not match that returned from [`ToSocketAddrs`].
|
||||
///
|
||||
/// [`ToSocketAddrs`]: crate::net::ToSocketAddrs
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// # use std::{io, net::SocketAddr};
|
||||
/// use std::io;
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> io::Result<()> {
|
||||
/// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
|
||||
/// let buf = b"hello world";
|
||||
/// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap();
|
||||
/// let _len = sock.send_to(&buf[..], remote_addr).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?;
|
||||
///
|
||||
/// println!("Sent {} bytes", len);
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> {
|
||||
let mut addrs = to_socket_addrs(target).await?;
|
||||
@ -440,8 +752,10 @@ impl UdpSocket {
|
||||
.poll_write_io(cx, || self.io.send_to(buf, *target))
|
||||
}
|
||||
|
||||
/// Try to send data on the socket to the given address, but if the send is blocked
|
||||
/// this will return right away.
|
||||
/// Try to send data on the socket to the given address, but if the send is
|
||||
/// blocked this will return right away.
|
||||
///
|
||||
/// This function is usually paired with `writable()`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -451,25 +765,44 @@ impl UdpSocket {
|
||||
/// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur
|
||||
/// if the IP version of the socket does not match that of `target`.
|
||||
///
|
||||
/// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// # use std::{io, net::SocketAddr};
|
||||
/// use std::error::Error;
|
||||
/// use std::io;
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> io::Result<()> {
|
||||
/// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
|
||||
/// let buf = b"hello world";
|
||||
/// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap();
|
||||
/// let _len = sock.try_send_to(&buf[..], remote_addr)?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> Result<(), Box<dyn Error>> {
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
///
|
||||
/// let dst = "127.0.0.1:8081".parse()?;
|
||||
///
|
||||
/// loop {
|
||||
/// socket.writable().await?;
|
||||
///
|
||||
/// match socket.try_send_to(&b"hello world"[..], dst) {
|
||||
/// Ok(sent) => {
|
||||
/// println!("sent {} bytes", sent);
|
||||
/// break;
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// // Writable false positive.
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => return Err(e.into()),
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
|
||||
pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
|
||||
self.io.send_to(buf, target)
|
||||
self.io
|
||||
.registration()
|
||||
.try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
|
||||
}
|
||||
|
||||
async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
|
||||
@ -479,27 +812,30 @@ impl UdpSocket {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns a future that receives a single datagram on the socket. On success,
|
||||
/// the future resolves to the number of bytes read and the origin.
|
||||
/// Receives a single datagram message on the socket. On success, returns
|
||||
/// the number of bytes read and the origin.
|
||||
///
|
||||
/// The function must be called with valid byte array `buf` of sufficient size
|
||||
/// to hold the message bytes. If a message is too long to fit in the supplied
|
||||
/// buffer, excess bytes may be discarded.
|
||||
/// The function must be called with valid byte array `buf` of sufficient
|
||||
/// size to hold the message bytes. If a message is too long to fit in the
|
||||
/// supplied buffer, excess bytes may be discarded.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// # use std::{io, net::SocketAddr};
|
||||
/// use std::io;
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> io::Result<()> {
|
||||
/// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
|
||||
/// let mut buf = [0u8; 32];
|
||||
/// let (len, addr) = sock.recv_from(&mut buf).await?;
|
||||
/// println!("received {:?} bytes from {:?}", len, addr);
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
///
|
||||
/// let mut buf = vec![0u8; 32];
|
||||
/// let (len, addr) = socket.recv_from(&mut buf).await?;
|
||||
///
|
||||
/// println!("received {:?} bytes from {:?}", len, addr);
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
||||
self.io
|
||||
@ -547,6 +883,61 @@ impl UdpSocket {
|
||||
Poll::Ready(Ok(addr))
|
||||
}
|
||||
|
||||
/// Try to receive a single datagram message on the socket. On success,
|
||||
/// returns the number of bytes read and the origin.
|
||||
///
|
||||
/// The function must be called with valid byte array buf of sufficient size
|
||||
/// to hold the message bytes. If a message is too long to fit in the
|
||||
/// supplied buffer, excess bytes may be discarded.
|
||||
///
|
||||
/// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
|
||||
/// returned. This function is usually paired with `readable()`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// use std::io;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Connect to a peer
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
/// socket.connect("127.0.0.1:8081").await?;
|
||||
///
|
||||
/// loop {
|
||||
/// // Wait for the socket to be readable
|
||||
/// socket.readable().await?;
|
||||
///
|
||||
/// // The buffer is **not** included in the async task and will
|
||||
/// // only exist on the stack.
|
||||
/// let mut buf = [0; 1024];
|
||||
///
|
||||
/// // Try to recv data, this may still fail with `WouldBlock`
|
||||
/// // if the readiness event is a false positive.
|
||||
/// match socket.try_recv(&mut buf) {
|
||||
/// Ok(n) => {
|
||||
/// println!("GOT {:?}", &buf[..n]);
|
||||
/// break;
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// return Err(e);
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
||||
self.io
|
||||
.registration()
|
||||
.try_io(Interest::READABLE, || self.io.recv_from(buf))
|
||||
}
|
||||
|
||||
/// Receives data from the socket, without removing it from the input queue.
|
||||
/// On success, returns the number of bytes read and the address from whence
|
||||
/// the data came.
|
||||
@ -563,16 +954,19 @@ impl UdpSocket {
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// # use std::{io, net::SocketAddr};
|
||||
/// use std::io;
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> io::Result<()> {
|
||||
/// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
|
||||
/// let mut buf = [0u8; 32];
|
||||
/// let (len, addr) = sock.peek_from(&mut buf).await?;
|
||||
/// println!("peeked {:?} bytes from {:?}", len, addr);
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
|
||||
///
|
||||
/// let mut buf = vec![0u8; 32];
|
||||
/// let (len, addr) = socket.peek_from(&mut buf).await?;
|
||||
///
|
||||
/// println!("peeked {:?} bytes from {:?}", len, addr);
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
||||
self.io
|
||||
@ -795,20 +1189,20 @@ impl UdpSocket {
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// # use std::error::Error;
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() -> Result<(), Box<dyn Error>> {
|
||||
/// use tokio::net::UdpSocket;
|
||||
/// use std::io;
|
||||
///
|
||||
/// // Create a socket
|
||||
/// let socket = UdpSocket::bind("0.0.0.0:8080").await?;
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> io::Result<()> {
|
||||
/// // Create a socket
|
||||
/// let socket = UdpSocket::bind("0.0.0.0:8080").await?;
|
||||
///
|
||||
/// if let Ok(Some(err)) = socket.take_error() {
|
||||
/// println!("Got error: {:?}", err);
|
||||
/// if let Ok(Some(err)) = socket.take_error() {
|
||||
/// println!("Got error: {:?}", err);
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
///
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
|
||||
self.io.take_error()
|
||||
|
@ -2,6 +2,7 @@
|
||||
#![cfg(feature = "full")]
|
||||
|
||||
use futures::future::poll_fn;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tokio::{io::ReadBuf, net::UdpSocket};
|
||||
|
||||
@ -238,6 +239,8 @@ async fn try_send_spawn() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
sender.writable().await.unwrap();
|
||||
|
||||
let sent = &sender
|
||||
.try_send_to(MSG, receiver.local_addr().unwrap())
|
||||
.unwrap();
|
||||
@ -263,3 +266,90 @@ async fn try_send_spawn() {
|
||||
|
||||
assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn try_send_recv() {
|
||||
// Create listener
|
||||
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
|
||||
|
||||
// Create socket pair
|
||||
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
|
||||
|
||||
// Connect the two
|
||||
client.connect(server.local_addr().unwrap()).await.unwrap();
|
||||
server.connect(client.local_addr().unwrap()).await.unwrap();
|
||||
|
||||
for _ in 0..5 {
|
||||
loop {
|
||||
client.writable().await.unwrap();
|
||||
|
||||
match client.try_send(b"hello world") {
|
||||
Ok(n) => {
|
||||
assert_eq!(n, 11);
|
||||
break;
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(e) => panic!("{:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
server.readable().await.unwrap();
|
||||
|
||||
let mut buf = [0; 512];
|
||||
|
||||
match server.try_recv(&mut buf) {
|
||||
Ok(n) => {
|
||||
assert_eq!(n, 11);
|
||||
assert_eq!(&buf[0..11], &b"hello world"[..]);
|
||||
break;
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(e) => panic!("{:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn try_send_to_recv_from() {
|
||||
// Create listener
|
||||
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
|
||||
let saddr = server.local_addr().unwrap();
|
||||
|
||||
// Create socket pair
|
||||
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
|
||||
let caddr = client.local_addr().unwrap();
|
||||
|
||||
for _ in 0..5 {
|
||||
loop {
|
||||
client.writable().await.unwrap();
|
||||
|
||||
match client.try_send_to(b"hello world", saddr) {
|
||||
Ok(n) => {
|
||||
assert_eq!(n, 11);
|
||||
break;
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(e) => panic!("{:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
server.readable().await.unwrap();
|
||||
|
||||
let mut buf = [0; 512];
|
||||
|
||||
match server.try_recv_from(&mut buf) {
|
||||
Ok((n, addr)) => {
|
||||
assert_eq!(n, 11);
|
||||
assert_eq!(addr, caddr);
|
||||
assert_eq!(&buf[0..11], &b"hello world"[..]);
|
||||
break;
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(e) => panic!("{:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user