io: add async fns for reading / writing bufs (#1881)

Adds `read_buf` and `write_buf` which work with `T: BufMut` and `T: Buf`
respectively. This adds an easy API for using the buffer traits provided
by `bytes.
This commit is contained in:
Carl Lerche 2019-12-02 13:09:31 -08:00 committed by GitHub
parent a8a4a9f0fc
commit e87df0557d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 234 additions and 13 deletions

View File

@ -1,5 +1,6 @@
use crate::io::util::chain::{chain, Chain};
use crate::io::util::read::{read, Read};
use crate::io::util::read_buf::{read_buf, ReadBuf};
use crate::io::util::read_exact::{read_exact, ReadExact};
use crate::io::util::read_int::{ReadU8, ReadU16, ReadU32, ReadU64, ReadU128};
use crate::io::util::read_int::{ReadI8, ReadI16, ReadI32, ReadI64, ReadI128};
@ -8,6 +9,8 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString};
use crate::io::util::take::{take, Take};
use crate::io::AsyncRead;
use bytes::BufMut;
cfg_io_util! {
/// Define numeric reader
macro_rules! read_impl {
@ -159,6 +162,71 @@ cfg_io_util! {
read(self, buf)
}
/// Pull some bytes from this source into the specified buffer,
/// advancing the buffer's internal cursor.
///
/// Equivalent to:
///
/// ```ignore
/// async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize>;
/// ```
///
/// Usually, only a single `read` syscall is issued, even if there is
/// more space in the supplied buffer.
///
/// This function does not provide any guarantees about whether it
/// completes immediately or asynchronously
///
/// # Return
///
/// On a successful read, the number of read bytes is returned. If the
/// supplied buffer is not empty and the function returns `Ok(0)` then
/// the source as reached an "end-of-file" event.
///
/// # Errors
///
/// If this function encounters any form of I/O or other error, an error
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
/// # Examples
///
/// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
///
/// [`File`]: crate::fs::File
/// [`BytesMut`]: bytes::BytesMut
/// [`BufMut`]: bytes::BufMut
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::io::{self, AsyncReadExt};
///
/// use bytes::BytesMut;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut f = File::open("foo.txt").await?;
/// let mut buffer = BytesMut::with_capacity(10);
///
/// assert!(buffer.is_empty());
///
/// // read up to 10 bytes, note that the return value is not needed
/// // to access the data that was read as `buffer`'s internal
/// // cursor is updated.
/// f.read_buf(&mut buffer).await?;
///
/// println!("The bytes: {:?}", &buffer[..]);
/// Ok(())
/// }
/// ```
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where
Self: Sized,
B: BufMut,
{
read_buf(self, buf)
}
/// Read the exact number of bytes required to fill `buf`.
///
/// Equivalent to:

View File

@ -2,10 +2,13 @@ use crate::io::util::flush::{flush, Flush};
use crate::io::util::shutdown::{shutdown, Shutdown};
use crate::io::util::write::{write, Write};
use crate::io::util::write_all::{write_all, WriteAll};
use crate::io::util::write_buf::{write_buf, WriteBuf};
use crate::io::util::write_int::{WriteU8, WriteU16, WriteU32, WriteU64, WriteU128};
use crate::io::util::write_int::{WriteI8, WriteI16, WriteI32, WriteI64, WriteI128};
use crate::io::AsyncWrite;
use bytes::Buf;
cfg_io_util! {
/// Define numeric writer
macro_rules! write_impl {
@ -71,6 +74,8 @@ cfg_io_util! {
/// error. A call to `write` represents *at most one* attempt to write to
/// any wrapped object.
///
/// # Return
///
/// If the return value is `Ok(n)` then it must be guaranteed that `n <=
/// buf.len()`. A return value of `0` typically means that the
/// underlying object is no longer able to accept bytes and will likely
@ -94,10 +99,10 @@ cfg_io_util! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut buffer = File::create("foo.txt").await?;
/// let mut file = File::create("foo.txt").await?;
///
/// // Writes some prefix of the byte string, not necessarily all of it.
/// buffer.write(b"some bytes").await?;
/// file.write(b"some bytes").await?;
/// Ok(())
/// }
/// ```
@ -108,6 +113,79 @@ cfg_io_util! {
write(self, src)
}
/// Write a buffer into this writer, advancing the buffer's internal
/// cursor.
///
/// Equivalent to:
///
/// ```ignore
/// async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<usize>;
/// ```
///
/// This function will attempt to write the entire contents of `buf`, but
/// the entire write may not succeed, or the write may also generate an
/// error. After the operation completes, the buffer's
/// internal cursor is advanced by the number of bytes written. A
/// subsequent call to `write_buf` using the **same** `buf` value will
/// resume from the point that the first call to `write_buf` completed.
/// A call to `write` represents *at most one* attempt to write to any
/// wrapped object.
///
/// # Return
///
/// If the return value is `Ok(n)` then it must be guaranteed that `n <=
/// buf.len()`. A return value of `0` typically means that the
/// underlying object is no longer able to accept bytes and will likely
/// not be able to in the future as well, or that the buffer provided is
/// empty.
///
/// # Errors
///
/// Each call to `write` may generate an I/O error indicating that the
/// operation could not be completed. If an error is returned then no bytes
/// in the buffer were written to this writer.
///
/// It is **not** considered an error if the entire buffer could not be
/// written to this writer.
///
/// # Examples
///
/// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
///
/// [`File`]: crate::fs::File
/// [`Buf`]: bytes::Buf
///
/// ```no_run
/// use tokio::io::{self, AsyncWriteExt};
/// use tokio::fs::File;
///
/// use bytes::Buf;
/// use std::io::Cursor;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// let mut buffer = Cursor::new(b"data to write");
///
/// // Loop until the entire contents of the buffer are written to
/// // the file.
/// while buffer.has_remaining() {
/// // Writes some prefix of the byte string, not necessarily
/// // all of it.
/// file.write_buf(&mut buffer).await?;
/// }
///
/// Ok(())
/// }
/// ```
fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B>
where
Self: Sized,
B: Buf,
{
write_buf(self, src)
}
/// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:

View File

@ -33,6 +33,7 @@ cfg_io_util! {
pub use lines::Lines;
mod read;
mod read_buf;
mod read_exact;
mod read_int;
mod read_line;
@ -61,6 +62,7 @@ cfg_io_util! {
mod write;
mod write_all;
mod write_buf;
mod write_int;

View File

@ -0,0 +1,41 @@
use crate::io::AsyncRead;
use bytes::BufMut;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
pub(crate) fn read_buf<'a, R, B>(reader: &'a mut R, buf: &'a mut B) -> ReadBuf<'a, R, B>
where
R: AsyncRead,
B: BufMut,
{
ReadBuf { reader, buf }
}
cfg_io_util! {
/// Future returned by [`read_buf`](AsyncReadExt::read_buf).
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadBuf<'a, R, B> {
reader: &'a mut R,
buf: &'a mut B,
}
}
impl<R, B> Future for ReadBuf<'_, R, B>
where
R: AsyncRead,
B: BufMut,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
// safety: no data is moved from self
unsafe {
let me = self.get_unchecked_mut();
Pin::new_unchecked(&mut *me.reader).poll_read_buf(cx, &mut me.buf)
}
}
}

View File

@ -35,14 +35,3 @@ where
Pin::new(&mut *me.writer).poll_write(cx, me.buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn assert_unpin() {
use std::marker::PhantomPinned;
crate::is_unpin::<Write<'_, PhantomPinned>>();
}
}

View File

@ -0,0 +1,43 @@
use crate::io::AsyncWrite;
use bytes::Buf;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
cfg_io_util! {
/// A future to write some of the buffer to an `AsyncWrite`.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteBuf<'a, W, B> {
writer: &'a mut W,
buf: &'a mut B,
}
}
/// Tries to write some bytes from the given `buf` to the writer in an
/// asynchronous manner, returning a future.
pub(crate) fn write_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteBuf<'a, W, B>
where
W: AsyncWrite,
B: Buf,
{
WriteBuf { writer, buf }
}
impl<W, B> Future for WriteBuf<'_, W, B>
where
W: AsyncWrite,
B: Buf,
{
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
// safety: no data is moved from self
unsafe {
let me = self.get_unchecked_mut();
Pin::new_unchecked(&mut *me.writer).poll_write_buf(cx, &mut me.buf)
}
}
}