io: remove poll_{read,write}_buf from traits (#2882)

These functions have object safety issues. It also has been decided to
avoid vectored operations on the I/O traits. A later PR will bring back
vectored operations on specific types that support them.

Refs: #2879, #2716
This commit is contained in:
Carl Lerche 2020-09-24 17:26:03 -07:00 committed by GitHub
parent 760ae89401
commit 4186b0aa38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 319 additions and 514 deletions

View File

@ -22,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use tokio::time::{self, Delay, Duration, Instant};
use bytes::Buf;
use futures_core::ready;
use std::collections::VecDeque;
use std::future::Future;
@ -439,16 +438,6 @@ impl AsyncWrite for Mock {
}
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
let n = ready!(self.poll_write(cx, buf.bytes()))?;
buf.advance(n);
Poll::Ready(Ok(n))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

View File

@ -118,6 +118,8 @@ where
type Item = Result<U::Item, U::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;
let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
loop {
@ -148,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};

View File

@ -70,6 +70,8 @@ impl<R: AsyncRead> ReaderStream<R> {
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}
match reader.poll_read_buf(cx, &mut this.buf) {
match poll_read_buf(cx, reader, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);

View File

@ -1,4 +1,4 @@
use bytes::{Buf, BufMut};
use bytes::Buf;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::io;
@ -119,29 +119,6 @@ where
self.consume(len);
Poll::Ready(Ok(()))
}
fn poll_read_buf<BM: BufMut>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut BM,
) -> Poll<io::Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let inner_buf = match self.as_mut().poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => buf,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
};
let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
buf.put_slice(&inner_buf[..len]);
self.consume(len);
Poll::Ready(Ok(len))
}
}
impl<S, B, E> AsyncBufRead for StreamReader<S, B>

View File

@ -52,3 +52,37 @@ pub mod context;
pub mod sync;
pub mod either;
#[cfg(any(feature = "io", feature = "codec"))]
mod util {
use tokio::io::{AsyncRead, ReadBuf};
use bytes::BufMut;
use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
pub(crate) fn poll_read_buf<T: AsyncRead>(
cx: &mut Context<'_>,
io: Pin<&mut T>,
buf: &mut impl BufMut,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let orig = buf.bytes_mut().as_ptr() as *const u8;
let mut b = ReadBuf::uninit(buf.bytes_mut());
ready!(io.poll_read(cx, &mut b))?;
let n = b.filled().len();
// Safety: we can assume `n` bytes were read, since they are in`filled`.
assert_eq!(orig, b.filled().as_ptr());
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
}

View File

@ -82,7 +82,7 @@ signal = [
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["iovec", "lazy_static", "mio"]
tcp = ["lazy_static", "mio"]
time = ["slab"]
udp = ["lazy_static", "mio"]
uds = ["lazy_static", "libc", "mio", "mio-uds"]
@ -99,7 +99,6 @@ futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
iovec = { version = "0.1.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`

View File

@ -0,0 +1,276 @@
# Refactor I/O driver
Describes changes to the I/O driver for the Tokio 0.3 release.
## Goals
* Support `async fn` on I/O types with `&self`.
* Refine the `Registration` API.
### Non-goals
* Implement `AsyncRead` / `AsyncWrite` for `&TcpStream` or other reference type.
## Overview
Currently, I/O types require `&mut self` for `async` functions. The reason for
this is the task's waker is stored in the I/O resource's internal state
(`ScheduledIo`) instead of in the future returned by the `async` function.
Because of this limitation, I/O types limit the number of wakers to one per
direction (a direction is either read-related events or write-related events).
Moving the waker from the internal I/O resource's state to the operation's
future enables multiple wakers to be registered per operation. The "intrusive
wake list" strategy used by `Notify` applies to this case, though there are some
concerns unique to the I/O driver.
## Reworking the `Registration` type
While `Registration` is made private (per #2728), it remains in Tokio as an
implementation detail backing I/O resources such as `TcpStream`. The API of
`Registration` is updated to support waiting for an arbitrary interest set with
`&self`. This supports concurrent waiters with a different readiness interest.
```rust
struct Registration { ... }
// TODO: naming
struct ReadyEvent {
tick: u32,
ready: mio::Ready,
}
impl Registration {
/// `interest` must be a super set of **all** interest sets specified in
/// the other methods. This is the interest set passed to `mio`.
pub fn new<T>(io: &T, interest: mio::Ready) -> io::Result<Registration>
where T: mio::Evented;
/// Awaits for any readiness event included in `interest`. Returns a
/// `ReadyEvent` representing the received readiness event.
async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent>;
/// Clears resource level readiness represented by the specified `ReadyEvent`
async fn clear_readiness(&self, ready_event: ReadyEvent);
```
A new registration is created for a `T: mio::Evented` and a `interest`. This
creates a `ScheduledIo` entry with the I/O driver and registers the resource
with `mio`.
Because Tokio uses **edge-triggered** notifications, the I/O driver only
receives readiness from the OS once the ready state **changes**. The I/O driver
must track each resource's known readiness state. This helps prevent syscalls
when the process knows the syscall should return with `EWOULDBLOCK`.
A call to `readiness()` checks if the currently known resource readiness
overlaps with `interest`. If it does, then the `readiness()` immediately
returns. If it does not, then the task waits until the I/O driver receives a
readiness event.
The pseudocode to perform a TCP read is as follows.
```rust
async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
loop {
// Await readiness
let event = self.readiness(interest).await?;
match self.mio_socket.read(buf) {
Ok(v) => return Ok(v),
Err(ref e) if e.kind() == WouldBlock => {
self.clear_readiness(event);
}
Err(e) => return Err(e),
}
}
}
```
## Reworking the `ScheduledIo` type
The `ScheduledIo` type is switched to use an intrusive waker linked list. Each
entry in the linked list includes the `interest` set passed to `readiness()`.
```rust
#[derive(Debug)]
pub(crate) struct ScheduledIo {
/// Resource's known state packed with other state that must be
/// atomically updated.
readiness: AtomicUsize,
/// Tracks tasks waiting on the resource
waiters: Mutex<Waiters>,
}
#[derive(Debug)]
struct Waiters {
// List of intrusive waiters.
list: LinkedList<Waiter>,
/// Waiter used by `AsyncRead` implementations.
reader: Option<Waker>,
/// Waiter used by `AsyncWrite` implementations.
writer: Option<Waker>,
}
// This struct is contained by the **future** returned by `readiness()`.
#[derive(Debug)]
struct Waiter {
/// Intrusive linked-list pointers
pointers: linked_list::Pointers<Waiter>,
/// Waker for task waiting on I/O resource
waiter: Option<Waker>,
/// Readiness events being waited on. This is
/// the value passed to `readiness()`
interest: mio::Ready,
/// Should not be `Unpin`.
_p: PhantomPinned,
}
```
When an I/O event is received from `mio`, the associated resources' readiness is
updated and the waiter list is iterated. All waiters with `interest` that
overlap the received readiness event are notified. Any waiter with an `interest`
that does not overlap the readiness event remains in the list.
## Cancel interest on drop
The future returned by `readiness()` uses an intrusive linked list to store the
waker with `ScheduledIo`. Because `readiness()` can be called concurrently, many
wakers may be stored simultaneously in the list. If the `readiness()` future is
dropped early, it is essential that the waker is removed from the list. This
prevents leaking memory.
## Race condition
Consider how many tasks may concurrently attempt I/O operations. This, combined
with how Tokio uses edge-triggered events, can result in a race condition. Let's
revisit the TCP read function:
```rust
async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
loop {
// Await readiness
let event = self.readiness(interest).await?;
match self.mio_socket.read(buf) {
Ok(v) => return Ok(v),
Err(ref e) if e.kind() == WouldBlock => {
self.clear_readiness(event);
}
Err(e) => return Err(e),
}
}
}
```
If care is not taken, if between `mio_socket.read(buf)` returning and
`clear_readiness(event)` is called, a readiness event arrives, the `read()`
function could deadlock. This happens because the readiness event is received,
`clear_readiness()` unsets the readiness event, and on the next iteration,
`readiness().await` will block forever as a new readiness event is not received.
The current I/O driver handles this condition by always registering the task's
waker before performing the operation. This is not ideal as it will result in
unnecessary task notification.
Instead, we will use a strategy to prevent clearing readiness if an "unseen"
readiness event has been received. The I/O driver will maintain a "tick" value.
Every time the `mio` `poll()` function is called, the tick is incremented. Each
readiness event has an associated tick. When the I/O driver sets the resource's
readiness, the driver's tick is packed into the atomic `usize`.
The `ScheduledIo` readiness `AtomicUsize` is structured as:
```
| reserved | generation | driver tick | readinesss |
|----------+------------+--------------+------------|
| 1 bit | 7 bits + 8 bits + 16 bits |
```
The `reserved` and `generation` components exist today.
The `readiness()` function returns a `ReadyEvent` value. This value includes the
`tick` component read with the resource's readiness value. When
`clear_readiness()` is called, the `ReadyEvent` is provided. Readiness is only
cleared if the current `tick` matches the `tick` included in the `ReadyEvent`.
If the tick values do not match, the call to `readiness()` on the next iteration
will not block and the new `tick` is included in the new `ReadyToken.`
TODO
## Implementing `AsyncRead` / `AsyncWrite`
The `AsyncRead` and `AsyncWrite` traits use a "poll" based API. This means that
it is not possible to use an intrusive linked list to track the waker.
Additionally, there is no future associated with the operation which means it is
not possible to cancel interest in the readiness events.
To implement `AsyncRead` and `AsyncWrite`, `ScheduledIo` includes dedicated
waker values for the read direction and the write direction. These values are
used to store the waker. Specific `interest` is not tracked for `AsyncRead` and
`AsyncWrite` implementations. It is assumed that only events of interest are:
* Read ready
* Read closed
* Write ready
* Write closed
Note that "read closed" and "write closed" are only available with Mio 0.7. With
Mio 0.6, things were a bit messy.
It is only possible to implement `AsyncRead` and `AsyncWrite` for resource types
themselves and not for `&Resource`. Implementing the traits for `&Resource`
would permit concurrent operations to the resource. Because only a single waker
is stored per direction, any concurrent usage would result in deadlocks. An
alterate implementation would call for a `Vec<Waker>` but this would result in
memory leaks.
## Enabling reads and writes for `&TcpStream`
Instead of implementing `AsyncRead` and `AsyncWrite` for `&TcpStream`, a new
function is added to `TcpStream`.
```rust
impl TcpStream {
/// Naming TBD
fn by_ref(&self) -> TcpStreamRef<'_>;
}
struct TcpStreamRef<'a> {
stream: &'a TcpStream,
// `Waiter` is the node in the intrusive waiter linked-list
read_waiter: Waiter,
write_waiter: Waiter,
}
```
Now, `AsyncRead` and `AsyncWrite` can be implemented on `TcpStreamRef<'a>`. When
the `TcpStreamRef` is dropped, all associated waker resources are cleaned up.
### Removing all the `split()` functions
With `TcpStream::by_ref()`, `TcpStream::split()` is no longer needed. Instead,
it is possible to do something as follows.
```rust
let rd = my_stream.by_ref();
let wr = my_stream.by_ref();
select! {
// use `rd` and `wr` in separate branches.
}
```
It is also possible to sotre a `TcpStream` in an `Arc`.
```rust
let arc_stream = Arc::new(my_tcp_stream);
let n = arc_stream.by_ref().read(buf).await?;
```

View File

@ -1,5 +1,4 @@
use super::ReadBuf;
use bytes::BufMut;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
@ -54,36 +53,6 @@ pub trait AsyncRead {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>;
/// Pulls some bytes from this source into the specified `BufMut`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
fn poll_read_buf<B: BufMut>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let mut b = ReadBuf::uninit(buf.bytes_mut());
ready!(self.poll_read(cx, &mut b))?;
let n = b.filled().len();
// Safety: we can assume `n` bytes were read, since they are in`filled`.
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
}
macro_rules! deref_async_read {

View File

@ -1,4 +1,3 @@
use bytes::Buf;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
@ -128,27 +127,6 @@ pub trait AsyncWrite {
/// This function will panic if not called within the context of a future's
/// task.
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
/// Writes a `Buf` into this value, returning how many bytes were written.
///
/// Note that this method will advance the `buf` provided automatically by
/// the number of bytes written.
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize, io::Error>>
where
Self: Sized,
{
if !buf.has_remaining() {
return Poll::Ready(Ok(0));
}
let n = ready!(self.poll_write(cx, buf.bytes()))?;
buf.advance(n);
Poll::Ready(Ok(n))
}
}
macro_rules! deref_async_write {

View File

@ -6,7 +6,6 @@
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use bytes::{Buf, BufMut};
use std::cell::UnsafeCell;
use std::fmt;
use std::io;
@ -107,15 +106,6 @@ impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_read(cx, buf)
}
fn poll_read_buf<B: BufMut>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_read_buf(cx, buf)
}
}
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
@ -137,15 +127,6 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_shutdown(cx)
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize, io::Error>> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_write_buf(cx, buf)
}
}
impl<T> Inner<T> {

View File

@ -1,6 +1,5 @@
use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
use crate::io::util::read_buf::{read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
use crate::io::util::read_int::{
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
@ -13,8 +12,6 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString};
use crate::io::util::take::{take, Take};
use crate::io::AsyncRead;
use bytes::BufMut;
cfg_io_util! {
/// Defines numeric reader
macro_rules! read_impl {
@ -166,71 +163,6 @@ cfg_io_util! {
read(self, buf)
}
/// Pulls some bytes from this source into the specified buffer,
/// advancing the buffer's internal cursor.
///
/// Equivalent to:
///
/// ```ignore
/// async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize>;
/// ```
///
/// Usually, only a single `read` syscall is issued, even if there is
/// more space in the supplied buffer.
///
/// This function does not provide any guarantees about whether it
/// completes immediately or asynchronously
///
/// # Return
///
/// On a successful read, the number of read bytes is returned. If the
/// supplied buffer is not empty and the function returns `Ok(0)` then
/// the source as reached an "end-of-file" event.
///
/// # Errors
///
/// If this function encounters any form of I/O or other error, an error
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
/// # Examples
///
/// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
///
/// [`File`]: crate::fs::File
/// [`BytesMut`]: bytes::BytesMut
/// [`BufMut`]: bytes::BufMut
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::{self, AsyncReadExt};
///
/// use bytes::BytesMut;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut f = File::open("foo.txt").await?;
/// let mut buffer = BytesMut::with_capacity(10);
///
/// assert!(buffer.is_empty());
///
/// // read up to 10 bytes, note that the return value is not needed
/// // to access the data that was read as `buffer`'s internal
/// // cursor is updated.
/// f.read_buf(&mut buffer).await?;
///
/// println!("The bytes: {:?}", &buffer[..]);
/// Ok(())
/// }
/// ```
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where
Self: Sized + Unpin,
B: BufMut,
{
read_buf(self, buf)
}
/// Reads the exact number of bytes required to fill `buf`.
///
/// Equivalent to:

View File

@ -2,7 +2,6 @@ use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
use crate::io::util::write_buf::{write_buf, WriteBuf};
use crate::io::util::write_int::{
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le,
WriteI8,
@ -13,8 +12,6 @@ use crate::io::util::write_int::{
};
use crate::io::AsyncWrite;
use bytes::Buf;
cfg_io_util! {
/// Defines numeric writer
macro_rules! write_impl {
@ -119,79 +116,6 @@ cfg_io_util! {
write(self, src)
}
/// Writes a buffer into this writer, advancing the buffer's internal
/// cursor.
///
/// Equivalent to:
///
/// ```ignore
/// async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<usize>;
/// ```
///
/// This function will attempt to write the entire contents of `buf`, but
/// the entire write may not succeed, or the write may also generate an
/// error. After the operation completes, the buffer's
/// internal cursor is advanced by the number of bytes written. A
/// subsequent call to `write_buf` using the **same** `buf` value will
/// resume from the point that the first call to `write_buf` completed.
/// A call to `write` represents *at most one* attempt to write to any
/// wrapped object.
///
/// # Return
///
/// If the return value is `Ok(n)` then it must be guaranteed that `n <=
/// buf.len()`. A return value of `0` typically means that the
/// underlying object is no longer able to accept bytes and will likely
/// not be able to in the future as well, or that the buffer provided is
/// empty.
///
/// # Errors
///
/// Each call to `write` may generate an I/O error indicating that the
/// operation could not be completed. If an error is returned then no bytes
/// in the buffer were written to this writer.
///
/// It is **not** considered an error if the entire buffer could not be
/// written to this writer.
///
/// # Examples
///
/// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
///
/// [`File`]: crate::fs::File
/// [`Buf`]: bytes::Buf
///
/// ```no_run
/// use tokio::io::{self, AsyncWriteExt};
/// use tokio::fs::File;
///
/// use bytes::Buf;
/// use std::io::Cursor;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// let mut buffer = Cursor::new(b"data to write");
///
/// // Loop until the entire contents of the buffer are written to
/// // the file.
/// while buffer.has_remaining() {
/// // Writes some prefix of the byte string, not necessarily
/// // all of it.
/// file.write_buf(&mut buffer).await?;
/// }
///
/// Ok(())
/// }
/// ```
fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B>
where
Self: Sized + Unpin,
B: Buf,
{
write_buf(self, src)
}
/// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:

View File

@ -1,7 +1,6 @@
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
use bytes::Buf;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
@ -151,14 +150,6 @@ impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
self.get_pin_mut().poll_write(cx, buf)
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_write_buf(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}

View File

@ -39,7 +39,6 @@ cfg_io_util! {
pub use mem::{duplex, DuplexStream};
mod read;
mod read_buf;
mod read_exact;
mod read_int;
mod read_line;
@ -68,7 +67,6 @@ cfg_io_util! {
mod write;
mod write_all;
mod write_buf;
mod write_int;

View File

@ -1,38 +0,0 @@
use crate::io::AsyncRead;
use bytes::BufMut;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
pub(crate) fn read_buf<'a, R, B>(reader: &'a mut R, buf: &'a mut B) -> ReadBuf<'a, R, B>
where
R: AsyncRead + Unpin,
B: BufMut,
{
ReadBuf { reader, buf }
}
cfg_io_util! {
/// Future returned by [`read_buf`](crate::io::AsyncReadExt::read_buf).
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadBuf<'a, R, B> {
reader: &'a mut R,
buf: &'a mut B,
}
}
impl<R, B> Future for ReadBuf<'_, R, B>
where
R: AsyncRead + Unpin,
B: BufMut,
{
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = &mut *self;
Pin::new(&mut *me.reader).poll_read_buf(cx, me.buf)
}
}

View File

@ -1,40 +0,0 @@
use crate::io::AsyncWrite;
use bytes::Buf;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
cfg_io_util! {
/// A future to write some of the buffer to an `AsyncWrite`.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteBuf<'a, W, B> {
writer: &'a mut W,
buf: &'a mut B,
}
}
/// Tries to write some bytes from the given `buf` to the writer in an
/// asynchronous manner, returning a future.
pub(crate) fn write_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteBuf<'a, W, B>
where
W: AsyncWrite + Unpin,
B: Buf,
{
WriteBuf { writer, buf }
}
impl<W, B> Future for WriteBuf<'_, W, B>
where
W: AsyncWrite + Unpin,
B: Buf,
{
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = &mut *self;
Pin::new(&mut *me.writer).poll_write_buf(cx, me.buf)
}
}

View File

@ -12,7 +12,6 @@ use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::net::TcpStream;
use bytes::Buf;
use std::io;
use std::net::Shutdown;
use std::pin::Pin;
@ -148,14 +147,6 @@ impl AsyncWrite for WriteHalf<'_> {
self.0.poll_write_priv(cx, buf)
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
self.0.poll_write_buf_priv(cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op

View File

@ -12,7 +12,6 @@ use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::net::TcpStream;
use bytes::Buf;
use std::error::Error;
use std::net::Shutdown;
use std::pin::Pin;
@ -230,14 +229,6 @@ impl AsyncWrite for OwnedWriteHalf {
self.inner.poll_write_priv(cx, buf)
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
self.inner.poll_write_buf_priv(cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op

View File

@ -4,8 +4,6 @@ use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::ToSocketAddrs;
use bytes::Buf;
use iovec::IoVec;
use std::convert::TryFrom;
use std::fmt;
use std::io::{self, Read, Write};
@ -745,44 +743,6 @@ impl TcpStream {
}
}
}
pub(super) fn poll_write_buf_priv<B: Buf>(
&self,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
use std::io::IoSlice;
loop {
let ev = ready!(self.io.poll_write_ready(cx))?;
// The `IoVec` (v0.1.x) type can't have a zero-length size, so create
// a dummy version from a 1-length slice which we'll overwrite with
// the `bytes_vectored` method.
static S: &[u8] = &[0];
const MAX_BUFS: usize = 64;
let mut slices: [IoSlice<'_>; MAX_BUFS] = [IoSlice::new(S); 64];
let cnt = buf.bytes_vectored(&mut slices);
let iovec = <&IoVec>::from(S);
let mut vecs = [iovec; MAX_BUFS];
for i in 0..cnt {
vecs[i] = (*slices[i]).into();
}
match self.io.get_ref().write_bufs(&vecs[..cnt]) {
Ok(n) => {
buf.advance(n);
return Poll::Ready(Ok(n));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
}
impl TryFrom<TcpStream> for mio::net::TcpStream {
@ -827,14 +787,6 @@ impl AsyncWrite for TcpStream {
self.poll_write_priv(cx, buf)
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
self.poll_write_buf_priv(cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op

View File

@ -1,113 +1,10 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::io::{AsyncRead, ReadBuf};
use tokio_test::task;
use tokio_test::{assert_ready_err, assert_ready_ok};
use bytes::BytesMut;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
#[test]
fn assert_obj_safe() {
fn _assert<T>() {}
_assert::<Box<dyn AsyncRead>>();
}
#[test]
fn read_buf_success() {
struct Rd;
impl AsyncRead for Rd {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
buf.append(b"hello world");
Poll::Ready(Ok(()))
}
}
let mut buf = BytesMut::with_capacity(65);
task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(11, n);
assert_eq!(buf[..], b"hello world"[..]);
});
}
#[test]
fn read_buf_error() {
struct Rd;
impl AsyncRead for Rd {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let err = io::ErrorKind::Other.into();
Poll::Ready(Err(err))
}
}
let mut buf = BytesMut::with_capacity(65);
task::spawn(Rd).enter(|cx, rd| {
let err = assert_ready_err!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(err.kind(), io::ErrorKind::Other);
});
}
#[test]
fn read_buf_no_capacity() {
struct Rd;
impl AsyncRead for Rd {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unimplemented!();
}
}
let mut buf = [0u8; 0];
task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut &mut buf[..]));
assert_eq!(0, n);
});
}
#[test]
fn read_buf_uninitialized_ok() {
struct Rd;
impl AsyncRead for Rd {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
assert_eq!(buf.remaining(), 64);
assert_eq!(buf.filled().len(), 0);
assert_eq!(buf.initialized().len(), 0);
Poll::Ready(Ok(()))
}
}
// Can't create BytesMut w/ zero capacity, so fill it up
let mut buf = BytesMut::with_capacity(64);
task::spawn(Rd).enter(|cx, rd| {
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
assert_eq!(0, n);
});
}