Enable buffering both reads and writes (#1558)

`BufWriter` and `BufReader` did not previously forward the "opposite" trait (`AsyncRead` for `BufWriter` and `AsyncWrite` for `BufReader`). This meant that there was no way to have both directions buffered at once. This patch fixes that, and introduces a convenience type + constructor for this double-wrapped construct.
This commit is contained in:
Jon Gjengset 2019-09-19 14:17:15 -04:00 committed by GitHub
parent 613fde2637
commit 9d5af20bcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 121 additions and 3 deletions

View File

@ -28,6 +28,7 @@ log = "0.4"
futures-core-preview = "=0.3.0-alpha.18"
memchr = { version = "2.2", optional = true }
pin-utils = { version = "=0.1.0-alpha.4", optional = true }
pin-project = "=0.4.0-alpha.11"
[dev-dependencies]
tokio = { version = "0.2.0-alpha.4", path = "../tokio" }

View File

@ -1,5 +1,5 @@
use super::DEFAULT_BUF_SIZE;
use crate::{AsyncBufRead, AsyncRead};
use crate::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_core::ready;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::io::{self, Read};
@ -152,6 +152,24 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
}
}
impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_pin_mut().poll_shutdown(cx)
}
}
impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader")

View File

@ -0,0 +1,71 @@
use crate::io::{BufReader, BufWriter};
use crate::{AsyncBufRead, AsyncRead, AsyncWrite};
use pin_project::pin_project;
use std::io::{self};
use std::{
pin::Pin,
task::{Context, Poll},
};
/// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
///
/// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`]
/// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall
/// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`]
/// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps
/// one in the other so that both directions are buffered. See their documentation for details.
#[pin_project]
#[derive(Debug)]
pub struct BufStream<RW: AsyncRead + AsyncWrite>(#[pin] BufReader<BufWriter<RW>>);
impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
/// Wrap a type in both [`BufWriter`] and [`BufReader`].
///
/// See the documentation for those types and [`BufStream`] for details.
pub fn new(stream: RW) -> BufStream<RW> {
BufStream(BufReader::new(BufWriter::new(stream)))
}
}
impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().0.poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().0.poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().0.poll_shutdown(cx)
}
}
impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.project().0.poll_read(cx, buf)
}
// we can't skip unconditionally because of the large buffer case in read.
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.0.prepare_uninitialized_buffer(buf)
}
}
impl<RW: AsyncBufRead + AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project_into().0.poll_fill_buf(cx)
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.project().0.consume(amt)
}
}

View File

@ -1,5 +1,5 @@
use super::DEFAULT_BUF_SIZE;
use crate::AsyncWrite;
use crate::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_core::ready;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::fmt;
@ -145,6 +145,31 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}
impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_read(cx, buf)
}
// we can't skip unconditionally because of the large buffer case in read.
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.get_ref().prepare_uninitialized_buffer(buf)
}
}
impl<W: AsyncWrite + AsyncBufRead> AsyncBufRead for BufWriter<W> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.get_pin_mut().poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_pin_mut().consume(amt)
}
}
impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufWriter")

View File

@ -2,6 +2,7 @@ mod async_buf_read_ext;
mod async_read_ext;
mod async_write_ext;
mod buf_reader;
mod buf_stream;
mod buf_writer;
mod chain;
mod copy;
@ -27,6 +28,8 @@ pub use self::async_write_ext::AsyncWriteExt;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_reader::BufReader;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_stream::BufStream;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buf_writer::BufWriter;
// used by `BufReader` and `BufWriter`

View File

@ -34,7 +34,7 @@ pub use self::async_read::AsyncRead;
pub use self::async_write::AsyncWrite;
#[cfg(feature = "util")]
pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream, BufWriter};
// Re-export `Buf` and `BufMut` since they are part of the API
pub use bytes::{Buf, BufMut};