Switch TCP/UDP fns to poll_ -> Poll<...> style (#175)

Tokio is moving away from using `WouldBlock`, instead favoring
`Async::NotReady`.

This patch updates the TCP and UDP types, deprecating any function that
returns `WouldBlock` and adding a poll_ prefixed equivalent.
This commit is contained in:
Carl Lerche 2018-03-04 10:46:54 -08:00 committed by GitHub
parent 7db7719419
commit 9f7a98af3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 95 deletions

View File

@ -10,9 +10,9 @@
//! //!
//! Each line you type in to the `nc` terminal should be echo'd back to you! //! Each line you type in to the `nc` terminal should be echo'd back to you!
#[macro_use]
extern crate futures; extern crate futures;
extern crate tokio; extern crate tokio;
#[macro_use]
extern crate tokio_io; extern crate tokio_io;
use std::{env, io}; use std::{env, io};
@ -37,14 +37,14 @@ impl Future for Server {
// If so then we try to send it back to the original source, waiting // If so then we try to send it back to the original source, waiting
// until it's writable and we're able to do so. // until it's writable and we're able to do so.
if let Some((size, peer)) = self.to_send { if let Some((size, peer)) = self.to_send {
let amt = try_nb!(self.socket.send_to(&self.buf[..size], &peer)); let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
println!("Echoed {}/{} bytes to {}", amt, size, peer); println!("Echoed {}/{} bytes to {}", amt, size, peer);
self.to_send = None; self.to_send = None;
} }
// If we're here then `to_send` is `None`, so we take a look for the // If we're here then `to_send` is `None`, so we take a look for the
// next message we're going to echo back. // next message we're going to echo back.
self.to_send = Some(try_nb!(self.socket.recv_from(&mut self.buf))); self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
} }
} }
} }

View File

@ -71,7 +71,6 @@ extern crate futures;
extern crate iovec; extern crate iovec;
extern crate mio; extern crate mio;
extern crate slab; extern crate slab;
#[macro_use]
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_executor; extern crate tokio_executor;
extern crate tokio_reactor; extern crate tokio_reactor;

View File

@ -39,32 +39,50 @@ impl TcpListener {
Ok(TcpListener::new(l)) Ok(TcpListener::new(l))
} }
#[deprecated(since = "0.1.2", note = "use poll_accept instead")]
#[doc(hidden)]
pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
match self.poll_accept()? {
Async::Ready(ret) => Ok(ret),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
}
/// Attempt to accept a connection and create a new connected `TcpStream` if /// Attempt to accept a connection and create a new connected `TcpStream` if
/// successful. /// successful.
/// ///
/// This function will attempt an accept operation, but will not block
/// waiting for it to complete. If the operation would block then a "would
/// block" error is returned. Additionally, if this method would block, it
/// registers the current task to receive a notification when it would
/// otherwise not block.
///
/// Note that typically for simple usage it's easier to treat incoming /// Note that typically for simple usage it's easier to treat incoming
/// connections as a `Stream` of `TcpStream`s with the `incoming` method /// connections as a `Stream` of `TcpStream`s with the `incoming` method
/// below. /// below.
/// ///
/// # Return
///
/// On success, returns `Ok(Async::Ready((socket, addr)))`.
///
/// If the listener is not ready to accept, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the listener becomes ready to accept.
///
/// # Panics /// # Panics
/// ///
/// This function will panic if it is called outside the context of a /// This function will panic if called from outside of a task context.
/// future's task. It's recommended to only call this from the pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), io::Error> {
/// implementation of a `Future::poll`, if necessary. let (io, addr) = try_ready!(self.poll_accept_std());
pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
let (io, addr) = self.accept_std()?;
let io = mio::net::TcpStream::from_stream(io)?; let io = mio::net::TcpStream::from_stream(io)?;
let io = PollEvented2::new(io); let io = PollEvented2::new(io);
let io = TcpStream { io }; let io = TcpStream { io };
Ok((io, addr)) Ok((io, addr).into())
}
#[deprecated(since = "0.1.2", note = "use poll_accept_std instead")]
#[doc(hidden)]
pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
match self.poll_accept_std()? {
Async::Ready(ret) => Ok(ret),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
} }
/// Attempt to accept a connection and create a new connected `TcpStream` if /// Attempt to accept a connection and create a new connected `TcpStream` if
@ -75,23 +93,27 @@ impl TcpListener {
/// can then allow for the TCP stream to be assoiated with a different /// can then allow for the TCP stream to be assoiated with a different
/// reactor than the one this `TcpListener` is associated with. /// reactor than the one this `TcpListener` is associated with.
/// ///
/// # Return
///
/// On success, returns `Ok(Async::Ready((socket, addr)))`.
///
/// If the listener is not ready to accept, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the listener becomes ready to accept.
///
/// # Panics /// # Panics
/// ///
/// This function will panic for the same reasons as `accept`, notably if /// This function will panic if called from outside of a task context.
/// called outside the context of a future. pub fn poll_accept_std(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> {
pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> { try_ready!(self.io.poll_read_ready());
if let Async::NotReady = self.io.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
match self.io.get_ref().accept_std() { match self.io.get_ref().accept_std() {
Ok(pair) => Ok(pair), Ok(pair) => Ok(pair.into()),
Err(e) => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read()?; self.io.need_read()?;
Ok(Async::NotReady)
} }
Err(e) Err(e) => Err(e),
}
} }
} }
@ -181,7 +203,7 @@ impl Stream for Incoming {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
let (socket, _) = try_nb!(self.inner.accept()); let (socket, _) = try_ready!(self.inner.poll_accept());
Ok(Async::Ready(Some(socket))) Ok(Async::Ready(Some(socket)))
} }
} }
@ -298,22 +320,41 @@ impl TcpStream {
self.io.get_ref().peer_addr() self.io.get_ref().peer_addr()
} }
#[deprecated(since = "0.1.2", note = "use poll_peek instead")]
#[doc(hidden)]
pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.poll_peek(buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
}
/// Receives data on the socket from the remote address to which it is /// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success, /// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked. /// returns the number of bytes peeked.
/// ///
/// Successive calls return the same data. This is accomplished by passing /// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying recv system call. /// `MSG_PEEK` as a flag to the underlying recv system call.
pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { ///
if let Async::NotReady = self.io.poll_read_ready()? { /// # Return
return Err(io::ErrorKind::WouldBlock.into()) ///
} /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// If no data is available for reading, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the socket becomes readable or is closed.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
try_ready!(self.io.poll_read_ready());
match self.io.get_ref().peek(buf) { match self.io.get_ref().peek(buf) {
Ok(v) => Ok(v), Ok(ret) => Ok(ret.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read()?; self.io.need_read()?;
Err(io::ErrorKind::WouldBlock.into()) Ok(Async::NotReady)
} }
Err(e) => Err(e), Err(e) => Err(e),
} }

View File

@ -44,7 +44,7 @@ impl<C: Decoder> Stream for UdpFramed<C> {
let (n, addr) = unsafe { let (n, addr) = unsafe {
// Read into the buffer without having to initialize the memory. // Read into the buffer without having to initialize the memory.
let (n, addr) = try_nb!(self.socket.recv_from(self.rd.bytes_mut())); let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
self.rd.advance_mut(n); self.rd.advance_mut(n);
(n, addr) (n, addr)
}; };
@ -87,7 +87,7 @@ impl<C: Encoder> Sink for UdpFramed<C> {
} }
trace!("flushing frame; length={}", self.wr.len()); trace!("flushing frame; length={}", self.wr.len());
let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr)); let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr));
trace!("written {}", n); trace!("written {}", n);
let wrote_all = n == self.wr.len(); let wrote_all = n == self.wr.len();

View File

@ -56,75 +56,127 @@ impl UdpSocket {
self.io.get_ref().connect(*addr) self.io.get_ref().connect(*addr)
} }
/// Sends data on the socket to the address previously bound via connect(). #[deprecated(since = "0.1.2", note = "use poll_send instead")]
/// On success, returns the number of bytes written. #[doc(hidden)]
pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.poll_send(buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
}
/// Sends data on the socket to the remote address to which it is connected.
///
/// The [`connect`] method will connect this socket to a remote address. This
/// method will fail if the socket is not connected.
///
/// [`connect`]: #method.connect
///
/// # Return
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// If the socket is not ready for writing, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the socket becomes writable.
/// ///
/// # Panics /// # Panics
/// ///
/// This function will panic if called outside the context of a future's /// This function will panic if called from outside of a task context.
/// task. pub fn poll_send(&mut self, buf: &[u8]) -> Poll<usize, io::Error> {
pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> { try_ready!(self.io.poll_write_ready());
if let Async::NotReady = self.io.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
match self.io.get_ref().send(buf) { match self.io.get_ref().send(buf) {
Ok(n) => Ok(n), Ok(n) => Ok(n.into()),
Err(e) => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_write()?; self.io.need_write()?;
Ok(Async::NotReady)
} }
Err(e) Err(e) => Err(e),
}
} }
} }
/// Receives data from the socket previously bound with connect(). #[deprecated(since = "0.1.2", note = "use poll_recv instead")]
/// On success, returns the number of bytes read. #[doc(hidden)]
pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.poll_recv(buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
}
/// Receives a single datagram message on the socket from the remote address to
/// which it is connected. On success, returns the number of bytes read.
///
/// The function must be called with valid byte array `buf` of sufficient size to
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
/// excess bytes may be discarded.
///
/// The [`connect`] method will connect this socket to a remote address. This
/// method will fail if the socket is not connected.
///
/// [`connect`]: #method.connect
///
/// # Return
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// If no data is available for reading, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the socket becomes receivable or is closed.
/// ///
/// # Panics /// # Panics
/// ///
/// This function will panic if called outside the context of a future's /// This function will panic if called from outside of a task context.
/// task. pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { try_ready!(self.io.poll_read_ready());
if let Async::NotReady = self.io.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
match self.io.get_ref().recv(buf) { match self.io.get_ref().recv(buf) {
Ok(n) => Ok(n), Ok(n) => Ok(n.into()),
Err(e) => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read()?; self.io.need_read()?;
Ok(Async::NotReady)
} }
Err(e) Err(e) => Err(e),
} }
} }
#[deprecated(since = "0.1.2", note = "use poll_send_to instead")]
#[doc(hidden)]
pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
match self.poll_send_to(buf, target)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
} }
/// Sends data on the socket to the given address. On success, returns the /// Sends data on the socket to the given address. On success, returns the
/// number of bytes written. /// number of bytes written.
/// ///
/// Address type can be any implementer of `ToSocketAddrs` trait. See its /// This will return an error when the IP version of the local socket
/// documentation for concrete examples. /// does not match that of `target`.
///
/// # Return
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// If the socket is not ready for writing, the method returns
/// `Ok(Async::NotReady)` and arranges for the current task to receive a
/// notification when the socket becomes writable.
/// ///
/// # Panics /// # Panics
/// ///
/// This function will panic if called outside the context of a future's /// This function will panic if called from outside of a task context.
/// task. pub fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll<usize, io::Error> {
pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { try_ready!(self.io.poll_write_ready());
if let Async::NotReady = self.io.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
match self.io.get_ref().send_to(buf, target) { match self.io.get_ref().send_to(buf, target) {
Ok(n) => Ok(n), Ok(n) => Ok(n.into()),
Err(e) => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_write()?; self.io.need_write()?;
Ok(Async::NotReady)
} }
Err(e) Err(e) => Err(e),
}
} }
} }
@ -148,6 +200,15 @@ impl UdpSocket {
SendDgram::new(self, buf, *addr) SendDgram::new(self, buf, *addr)
} }
#[deprecated(since = "0.1.2", note = "use poll_recv_from instead")]
#[doc(hidden)]
pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
match self.poll_recv_from(buf)? {
Async::Ready(ret) => Ok(ret),
Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
}
/// Receives data from the socket. On success, returns the number of bytes /// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came. /// read and the address from whence the data came.
/// ///
@ -155,19 +216,16 @@ impl UdpSocket {
/// ///
/// This function will panic if called outside the context of a future's /// This function will panic if called outside the context of a future's
/// task. /// task.
pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
if let Async::NotReady = self.io.poll_read_ready()? { try_ready!(self.io.poll_read_ready());
return Err(io::ErrorKind::WouldBlock.into())
}
match self.io.get_ref().recv_from(buf) { match self.io.get_ref().recv_from(buf) {
Ok(n) => Ok(n), Ok(n) => Ok(n.into()),
Err(e) => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read()?; self.io.need_read()?;
Ok(Async::NotReady)
} }
Err(e) Err(e) => Err(e),
}
} }
} }
@ -384,7 +442,7 @@ impl<T> Future for SendDgram<T>
{ {
let ref mut inner = let ref mut inner =
self.state.as_mut().expect("SendDgram polled after completion"); self.state.as_mut().expect("SendDgram polled after completion");
let n = try_nb!(inner.socket.send_to(inner.buffer.as_ref(), &inner.addr)); let n = try_ready!(inner.socket.poll_send_to(inner.buffer.as_ref(), &inner.addr));
if n != inner.buffer.as_ref().len() { if n != inner.buffer.as_ref().len() {
return Err(incomplete_write("failed to send entire message \ return Err(incomplete_write("failed to send entire message \
in datagram")) in datagram"))
@ -436,7 +494,7 @@ impl<T> Future for RecvDgram<T>
let ref mut inner = let ref mut inner =
self.state.as_mut().expect("RecvDgram polled after completion"); self.state.as_mut().expect("RecvDgram polled after completion");
try_nb!(inner.socket.recv_from(inner.buffer.as_mut())) try_ready!(inner.socket.poll_recv_from(inner.buffer.as_mut()))
}; };
let inner = self.state.take().unwrap(); let inner = self.state.take().unwrap();

View File

@ -1,3 +1,5 @@
#![allow(deprecated)]
extern crate futures; extern crate futures;
extern crate tokio; extern crate tokio;
#[macro_use] #[macro_use]