From e87df0557da4e8f70f3939c77e84acddc25652ba Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 2 Dec 2019 13:09:31 -0800 Subject: [PATCH] io: add async fns for reading / writing bufs (#1881) Adds `read_buf` and `write_buf` which work with `T: BufMut` and `T: Buf` respectively. This adds an easy API for using the buffer traits provided by `bytes. --- tokio/src/io/util/async_read_ext.rs | 68 +++++++++++++++++++++++ tokio/src/io/util/async_write_ext.rs | 82 +++++++++++++++++++++++++++- tokio/src/io/util/mod.rs | 2 + tokio/src/io/util/read_buf.rs | 41 ++++++++++++++ tokio/src/io/util/write.rs | 11 ---- tokio/src/io/util/write_buf.rs | 43 +++++++++++++++ 6 files changed, 234 insertions(+), 13 deletions(-) create mode 100644 tokio/src/io/util/read_buf.rs create mode 100644 tokio/src/io/util/write_buf.rs diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index 1c30b145a..04979336f 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -1,5 +1,6 @@ use crate::io::util::chain::{chain, Chain}; use crate::io::util::read::{read, Read}; +use crate::io::util::read_buf::{read_buf, ReadBuf}; use crate::io::util::read_exact::{read_exact, ReadExact}; use crate::io::util::read_int::{ReadU8, ReadU16, ReadU32, ReadU64, ReadU128}; use crate::io::util::read_int::{ReadI8, ReadI16, ReadI32, ReadI64, ReadI128}; @@ -8,6 +9,8 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString}; use crate::io::util::take::{take, Take}; use crate::io::AsyncRead; +use bytes::BufMut; + cfg_io_util! { /// Define numeric reader macro_rules! read_impl { @@ -159,6 +162,71 @@ cfg_io_util! { read(self, buf) } + /// Pull some bytes from this source into the specified buffer, + /// advancing the buffer's internal cursor. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn read_buf(&mut self, buf: &mut B) -> io::Result; + /// ``` + /// + /// Usually, only a single `read` syscall is issued, even if there is + /// more space in the supplied buffer. + /// + /// This function does not provide any guarantees about whether it + /// completes immediately or asynchronously + /// + /// # Return + /// + /// On a successful read, the number of read bytes is returned. If the + /// supplied buffer is not empty and the function returns `Ok(0)` then + /// the source as reached an "end-of-file" event. + /// + /// # Errors + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. If an error is returned then it must be + /// guaranteed that no bytes were read. + /// + /// # Examples + /// + /// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]: + /// + /// [`File`]: crate::fs::File + /// [`BytesMut`]: bytes::BytesMut + /// [`BufMut`]: bytes::BufMut + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::{self, AsyncReadExt}; + /// + /// use bytes::BytesMut; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut f = File::open("foo.txt").await?; + /// let mut buffer = BytesMut::with_capacity(10); + /// + /// assert!(buffer.is_empty()); + /// + /// // read up to 10 bytes, note that the return value is not needed + /// // to access the data that was read as `buffer`'s internal + /// // cursor is updated. + /// f.read_buf(&mut buffer).await?; + /// + /// println!("The bytes: {:?}", &buffer[..]); + /// Ok(()) + /// } + /// ``` + fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> + where + Self: Sized, + B: BufMut, + { + read_buf(self, buf) + } + /// Read the exact number of bytes required to fill `buf`. /// /// Equivalent to: diff --git a/tokio/src/io/util/async_write_ext.rs b/tokio/src/io/util/async_write_ext.rs index 346da7ab2..13d8b7456 100644 --- a/tokio/src/io/util/async_write_ext.rs +++ b/tokio/src/io/util/async_write_ext.rs @@ -2,10 +2,13 @@ use crate::io::util::flush::{flush, Flush}; use crate::io::util::shutdown::{shutdown, Shutdown}; use crate::io::util::write::{write, Write}; use crate::io::util::write_all::{write_all, WriteAll}; +use crate::io::util::write_buf::{write_buf, WriteBuf}; use crate::io::util::write_int::{WriteU8, WriteU16, WriteU32, WriteU64, WriteU128}; use crate::io::util::write_int::{WriteI8, WriteI16, WriteI32, WriteI64, WriteI128}; use crate::io::AsyncWrite; +use bytes::Buf; + cfg_io_util! { /// Define numeric writer macro_rules! write_impl { @@ -71,6 +74,8 @@ cfg_io_util! { /// error. A call to `write` represents *at most one* attempt to write to /// any wrapped object. /// + /// # Return + /// /// If the return value is `Ok(n)` then it must be guaranteed that `n <= /// buf.len()`. A return value of `0` typically means that the /// underlying object is no longer able to accept bytes and will likely @@ -94,10 +99,10 @@ cfg_io_util! { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut buffer = File::create("foo.txt").await?; + /// let mut file = File::create("foo.txt").await?; /// /// // Writes some prefix of the byte string, not necessarily all of it. - /// buffer.write(b"some bytes").await?; + /// file.write(b"some bytes").await?; /// Ok(()) /// } /// ``` @@ -108,6 +113,79 @@ cfg_io_util! { write(self, src) } + /// Write a buffer into this writer, advancing the buffer's internal + /// cursor. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn write_buf(&mut self, buf: &mut B) -> io::Result; + /// ``` + /// + /// This function will attempt to write the entire contents of `buf`, but + /// the entire write may not succeed, or the write may also generate an + /// error. After the operation completes, the buffer's + /// internal cursor is advanced by the number of bytes written. A + /// subsequent call to `write_buf` using the **same** `buf` value will + /// resume from the point that the first call to `write_buf` completed. + /// A call to `write` represents *at most one* attempt to write to any + /// wrapped object. + /// + /// # Return + /// + /// If the return value is `Ok(n)` then it must be guaranteed that `n <= + /// buf.len()`. A return value of `0` typically means that the + /// underlying object is no longer able to accept bytes and will likely + /// not be able to in the future as well, or that the buffer provided is + /// empty. + /// + /// # Errors + /// + /// Each call to `write` may generate an I/O error indicating that the + /// operation could not be completed. If an error is returned then no bytes + /// in the buffer were written to this writer. + /// + /// It is **not** considered an error if the entire buffer could not be + /// written to this writer. + /// + /// # Examples + /// + /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]: + /// + /// [`File`]: crate::fs::File + /// [`Buf`]: bytes::Buf + /// + /// ```no_run + /// use tokio::io::{self, AsyncWriteExt}; + /// use tokio::fs::File; + /// + /// use bytes::Buf; + /// use std::io::Cursor; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// let mut buffer = Cursor::new(b"data to write"); + /// + /// // Loop until the entire contents of the buffer are written to + /// // the file. + /// while buffer.has_remaining() { + /// // Writes some prefix of the byte string, not necessarily + /// // all of it. + /// file.write_buf(&mut buffer).await?; + /// } + /// + /// Ok(()) + /// } + /// ``` + fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B> + where + Self: Sized, + B: Buf, + { + write_buf(self, src) + } + /// Attempts to write an entire buffer into this writer. /// /// Equivalent to: diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 076686bb8..c06c070d6 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -33,6 +33,7 @@ cfg_io_util! { pub use lines::Lines; mod read; + mod read_buf; mod read_exact; mod read_int; mod read_line; @@ -61,6 +62,7 @@ cfg_io_util! { mod write; mod write_all; + mod write_buf; mod write_int; diff --git a/tokio/src/io/util/read_buf.rs b/tokio/src/io/util/read_buf.rs new file mode 100644 index 000000000..550499b93 --- /dev/null +++ b/tokio/src/io/util/read_buf.rs @@ -0,0 +1,41 @@ +use crate::io::AsyncRead; + +use bytes::BufMut; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub(crate) fn read_buf<'a, R, B>(reader: &'a mut R, buf: &'a mut B) -> ReadBuf<'a, R, B> +where + R: AsyncRead, + B: BufMut, +{ + ReadBuf { reader, buf } +} + +cfg_io_util! { + /// Future returned by [`read_buf`](AsyncReadExt::read_buf). + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct ReadBuf<'a, R, B> { + reader: &'a mut R, + buf: &'a mut B, + } +} + +impl Future for ReadBuf<'_, R, B> +where + R: AsyncRead, + B: BufMut, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // safety: no data is moved from self + unsafe { + let me = self.get_unchecked_mut(); + Pin::new_unchecked(&mut *me.reader).poll_read_buf(cx, &mut me.buf) + } + } +} diff --git a/tokio/src/io/util/write.rs b/tokio/src/io/util/write.rs index 758f8dbcc..433a421d3 100644 --- a/tokio/src/io/util/write.rs +++ b/tokio/src/io/util/write.rs @@ -35,14 +35,3 @@ where Pin::new(&mut *me.writer).poll_write(cx, me.buf) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_unpin() { - use std::marker::PhantomPinned; - crate::is_unpin::>(); - } -} diff --git a/tokio/src/io/util/write_buf.rs b/tokio/src/io/util/write_buf.rs new file mode 100644 index 000000000..e49282fe0 --- /dev/null +++ b/tokio/src/io/util/write_buf.rs @@ -0,0 +1,43 @@ +use crate::io::AsyncWrite; + +use bytes::Buf; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + /// A future to write some of the buffer to an `AsyncWrite`. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct WriteBuf<'a, W, B> { + writer: &'a mut W, + buf: &'a mut B, + } +} + +/// Tries to write some bytes from the given `buf` to the writer in an +/// asynchronous manner, returning a future. +pub(crate) fn write_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteBuf<'a, W, B> +where + W: AsyncWrite, + B: Buf, +{ + WriteBuf { writer, buf } +} + +impl Future for WriteBuf<'_, W, B> +where + W: AsyncWrite, + B: Buf, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // safety: no data is moved from self + unsafe { + let me = self.get_unchecked_mut(); + Pin::new_unchecked(&mut *me.writer).poll_write_buf(cx, &mut me.buf) + } + } +}