net: add TcpStream::ready and non-blocking ops (#3130)

Adds function to await for readiness on the TcpStream and non-blocking read/write functions.

`async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified
interest. There are also two shorthand functions, `readable()` and `writable()`.

Once the stream is in a ready state, the caller may perform non-blocking operations on it using
`try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready.

The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in
`AsyncFd` protect against a potential race between receiving the readiness notification and clearing
it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()`
and `try_write()` function handle clearing stream readiness as needed.

This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These
types will also be useful for fixing #3072 .

Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this
is left for later PRs.

Refs: #3130
This commit is contained in:
Carl Lerche 2020-11-12 20:07:43 -08:00 committed by GitHub
parent 685da8dadd
commit 02b1117dca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 576 additions and 54 deletions

View File

@ -1,3 +1,7 @@
#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
use crate::io::driver::Ready;
use std::fmt;
use std::ops;
@ -5,34 +9,84 @@ use std::ops;
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
#[derive(Clone, Copy)]
pub(crate) struct Interest(mio::Interest);
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Interest(mio::Interest);
impl Interest {
/// Interest in all readable events
pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE);
/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);
/// Interest in all writable events
pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
/// Returns true if the value includes readable interest.
pub(crate) const fn is_readable(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(Interest::READABLE.is_readable());
/// assert!(!Interest::WRITABLE.is_readable());
///
/// let both = Interest::READABLE | Interest::WRITABLE;
/// assert!(both.is_readable());
/// ```
pub const fn is_readable(self) -> bool {
self.0.is_readable()
}
/// Returns true if the value includes writable interest.
pub(crate) const fn is_writable(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(!Interest::READABLE.is_writable());
/// assert!(Interest::WRITABLE.is_writable());
///
/// let both = Interest::READABLE | Interest::WRITABLE;
/// assert!(both.is_writable());
/// ```
pub const fn is_writable(self) -> bool {
self.0.is_writable()
}
/// Add together two `Interst` values.
pub(crate) const fn add(self, other: Interest) -> Interest {
///
/// This function works from a `const` context.
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
///
/// assert!(BOTH.is_readable());
/// assert!(BOTH.is_writable());
pub const fn add(self, other: Interest) -> Interest {
Interest(self.0.add(other.0))
}
// This function must be crate-private to avoid exposing a `mio` dependency.
pub(crate) const fn to_mio(self) -> mio::Interest {
self.0
}
pub(super) fn mask(self) -> Ready {
match self {
Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED,
Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
_ => Ready::EMPTY,
}
}
}
impl ops::BitOr for Interest {

View File

@ -1,10 +1,12 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
mod interest;
pub(crate) use interest::Interest;
#[allow(unreachable_pub)]
pub use interest::Interest;
mod ready;
use ready::Ready;
#[allow(unreachable_pub)]
pub use ready::Ready;
mod registration;
pub(crate) use registration::Registration;
@ -51,7 +53,7 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
ready: Ready,
pub(crate) ready: Ready,
}
pub(super) struct Inner {

View File

@ -1,3 +1,5 @@
#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
use std::fmt;
use std::ops;
@ -6,36 +8,33 @@ const WRITABLE: usize = 0b0_10;
const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;
/// A set of readiness event kinds.
/// Describes the readiness state of an I/O resources.
///
/// `Ready` is set of operation descriptors indicating which kind of an
/// operation is ready to be performed.
///
/// This struct only represents portable event kinds. Portable events are
/// events that can be raised on any platform while guaranteeing no false
/// positives.
/// `Ready` tracks which operation an I/O resource is ready to perform.
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, PartialEq, PartialOrd)]
pub(crate) struct Ready(usize);
pub struct Ready(usize);
impl Ready {
/// Returns the empty `Ready` set.
pub(crate) const EMPTY: Ready = Ready(0);
pub const EMPTY: Ready = Ready(0);
/// Returns a `Ready` representing readable readiness.
pub(crate) const READABLE: Ready = Ready(READABLE);
pub const READABLE: Ready = Ready(READABLE);
/// Returns a `Ready` representing writable readiness.
pub(crate) const WRITABLE: Ready = Ready(WRITABLE);
pub const WRITABLE: Ready = Ready(WRITABLE);
/// Returns a `Ready` representing read closed readiness.
pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED);
pub const READ_CLOSED: Ready = Ready(READ_CLOSED);
/// Returns a `Ready` representing write closed readiness.
pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
/// Returns a `Ready` representing readiness for all operations.
pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
// Must remain crate-private to avoid adding a public dependency on Mio.
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;
@ -59,27 +58,78 @@ impl Ready {
}
/// Returns true if `Ready` is the empty set
pub(crate) fn is_empty(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(Ready::EMPTY.is_empty());
/// assert!(!Ready::READABLE.is_empty());
/// ```
pub fn is_empty(self) -> bool {
self == Ready::EMPTY
}
/// Returns true if the value includes readable readiness
pub(crate) fn is_readable(self) -> bool {
/// Returns `true` if the value includes `readable`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_readable());
/// assert!(Ready::READABLE.is_readable());
/// assert!(Ready::READ_CLOSED.is_readable());
/// assert!(!Ready::WRITABLE.is_readable());
/// ```
pub fn is_readable(self) -> bool {
self.contains(Ready::READABLE) || self.is_read_closed()
}
/// Returns true if the value includes writable readiness
pub(crate) fn is_writable(self) -> bool {
/// Returns `true` if the value includes writable `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_writable());
/// assert!(!Ready::READABLE.is_writable());
/// assert!(Ready::WRITABLE.is_writable());
/// assert!(Ready::WRITE_CLOSED.is_writable());
/// ```
pub fn is_writable(self) -> bool {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}
/// Returns true if the value includes read closed readiness
pub(crate) fn is_read_closed(self) -> bool {
/// Returns `true` if the value includes read-closed `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_read_closed());
/// assert!(!Ready::READABLE.is_read_closed());
/// assert!(Ready::READ_CLOSED.is_read_closed());
/// ```
pub fn is_read_closed(self) -> bool {
self.contains(Ready::READ_CLOSED)
}
/// Returns true if the value includes write closed readiness
pub(crate) fn is_write_closed(self) -> bool {
/// Returns `true` if the value includes write-closed `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_write_closed());
/// assert!(!Ready::WRITABLE.is_write_closed());
/// assert!(Ready::WRITE_CLOSED.is_write_closed());
/// ```
pub fn is_write_closed(self) -> bool {
self.contains(Ready::WRITE_CLOSED)
}
@ -143,37 +193,37 @@ cfg_io_readiness! {
}
}
impl<T: Into<Ready>> ops::BitOr<T> for Ready {
impl ops::BitOr<Ready> for Ready {
type Output = Ready;
#[inline]
fn bitor(self, other: T) -> Ready {
Ready(self.0 | other.into().0)
fn bitor(self, other: Ready) -> Ready {
Ready(self.0 | other.0)
}
}
impl<T: Into<Ready>> ops::BitOrAssign<T> for Ready {
impl ops::BitOrAssign<Ready> for Ready {
#[inline]
fn bitor_assign(&mut self, other: T) {
self.0 |= other.into().0;
fn bitor_assign(&mut self, other: Ready) {
self.0 |= other.0;
}
}
impl<T: Into<Ready>> ops::BitAnd<T> for Ready {
impl ops::BitAnd<Ready> for Ready {
type Output = Ready;
#[inline]
fn bitand(self, other: T) -> Ready {
Ready(self.0 & other.into().0)
fn bitand(self, other: Ready) -> Ready {
Ready(self.0 & other.0)
}
}
impl<T: Into<Ready>> ops::Sub<T> for Ready {
impl ops::Sub<Ready> for Ready {
type Output = Ready;
#[inline]
fn sub(self, other: T) -> Ready {
Ready(self.0 & !other.into().0)
fn sub(self, other: Ready) -> Ready {
Ready(self.0 & !other.0)
}
}

View File

@ -182,6 +182,27 @@ impl Registration {
}
}
}
pub(crate) fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
let ev = self.shared.ready_event(interest);
// Don't attempt the operation if the resource is not ready.
if ev.ready.is_empty() {
return Err(io::ErrorKind::WouldBlock.into());
}
match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(ev);
Err(io::ErrorKind::WouldBlock.into())
}
res => res,
}
}
}
fn gone() -> io::Error {

View File

@ -1,4 +1,4 @@
use super::{Ready, ReadyEvent, Tick};
use super::{Interest, Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
@ -49,8 +49,6 @@ struct Waiters {
}
cfg_io_readiness! {
use crate::io::Interest;
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
@ -280,6 +278,15 @@ impl ScheduledIo {
}
}
pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
let curr = self.readiness.load(Acquire);
ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
}
}
/// Poll version of checking readiness for a certain direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,

View File

@ -204,9 +204,12 @@ pub use self::read_buf::ReadBuf;
#[doc(no_inline)]
pub use std::io::{Error, ErrorKind, Result, SeekFrom};
cfg_io_driver! {
cfg_io_driver_impl! {
pub(crate) mod driver;
pub(crate) use driver::Interest;
cfg_net! {
pub use driver::{Interest, Ready};
}
mod poll_evented;

View File

@ -79,6 +79,19 @@ macro_rules! cfg_io_driver {
}
}
macro_rules! cfg_io_driver_impl {
( $( $item:item )* ) => {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "signal"),
))]
$item
)*
}
}
macro_rules! cfg_not_io_driver {
($($item:item)*) => {
$(

View File

@ -1,5 +1,5 @@
use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
@ -264,6 +264,266 @@ impl TcpStream {
}
}
/// Wait for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// # Examples
///
/// Concurrently read and write to the stream on the same task without
/// splitting.
///
/// ```no_run
/// use tokio::io::Interest;
/// use tokio::net::TcpStream;
/// use std::error::Error;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
///
/// if ready.is_readable() {
/// // The buffer is **not** included in the async task and will only exist
/// // on the stack.
/// let mut data = [0; 1024];
/// let n = stream.try_read(&mut data[..]).unwrap();
///
/// println!("GOT {:?}", &data[..n]);
/// }
///
/// if ready.is_writable() {
/// // Write some data
/// stream.try_write(b"hello world").unwrap();
/// }
/// }
/// }
/// ```
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
/// Wait for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` is usually
/// paired with `try_read()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// let mut msg = vec![0; 1024];
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut msg) {
/// Ok(n) => {
/// msg.truncate(n);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// println!("GOT = {:?}", msg);
/// Ok(())
/// }
/// ```
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}
/// Try to read data from the stream into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: TcpStream::readable()
/// [`ready()`]: TcpStream::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(n)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKinid::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf = [0; 4096];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
use std::io::Read;
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` is usually
/// paired with `try_write()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be writable
/// stream.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Try to write a buffer to the stream, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
/// only part of the buffer may be written.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` is usually
/// paired with `try_write()`.
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the stream is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be writable
/// stream.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
use std::io::Write;
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.

View File

@ -2,8 +2,8 @@
//! Signal driver
use crate::io::driver::Driver as IoDriver;
use crate::io::{Interest, PollEvented};
use crate::io::driver::{Driver as IoDriver, Interest};
use crate::io::PollEvented;
use crate::park::Park;
use crate::signal::registry::globals;

112
tokio/tests/tcp_stream.rs Normal file
View File

@ -0,0 +1,112 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::io::Interest;
use tokio::net::{TcpListener, TcpStream};
use tokio_test::task;
use tokio_test::{assert_pending, assert_ready_ok};
use std::io;
#[tokio::test]
async fn try_read_write() {
const DATA: &[u8] = b"this is some data to write to the socket";
// Create listener
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
// Create socket pair
let client = TcpStream::connect(listener.local_addr().unwrap())
.await
.unwrap();
let (server, _) = listener.accept().await.unwrap();
let mut written = DATA.to_vec();
// Track the server receiving data
let mut readable = task::spawn(server.readable());
assert_pending!(readable.poll());
// Write data.
client.writable().await.unwrap();
assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
// The task should be notified
while !readable.is_woken() {
tokio::task::yield_now().await;
}
// Fill the write buffer
loop {
// Still ready
let mut writable = task::spawn(client.writable());
assert_ready_ok!(writable.poll());
match client.try_write(DATA) {
Ok(n) => written.extend(&DATA[..n]),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
break;
}
Err(e) => panic!("error = {:?}", e),
}
}
{
// Write buffer full
let mut writable = task::spawn(client.writable());
assert_pending!(writable.poll());
// Drain the socket from the server end
let mut read = vec![0; written.len()];
let mut i = 0;
while i < read.len() {
server.readable().await.unwrap();
match server.try_read(&mut read[i..]) {
Ok(n) => i += n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("error = {:?}", e),
}
}
assert_eq!(read, written);
}
// Now, we listen for shutdown
drop(client);
loop {
let ready = server.ready(Interest::READABLE).await.unwrap();
if ready.is_read_closed() {
return;
} else {
tokio::task::yield_now().await;
}
}
}
#[test]
fn buffer_not_included_in_future() {
use std::mem;
const N: usize = 4096;
let fut = async {
let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
loop {
stream.readable().await.unwrap();
let mut buf = [0; N];
let n = stream.try_read(&mut buf[..]).unwrap();
if n == 0 {
break;
}
}
};
let n = mem::size_of_val(&fut);
assert!(n < 1000);
}