io: add fill_buf and consume (#3991)

This commit is contained in:
Alice Ryhl 2021-07-30 11:17:40 +02:00 committed by GitHub
parent f957f7f9a7
commit 3340ae6aa9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 139 additions and 2 deletions

View File

@ -1,3 +1,4 @@
use crate::io::util::fill_buf::{fill_buf, FillBuf};
use crate::io::util::lines::{lines, Lines};
use crate::io::util::read_line::{read_line, ReadLine};
use crate::io::util::read_until::{read_until, ReadUntil};
@ -243,6 +244,59 @@ cfg_io_util! {
split(self, byte)
}
/// Returns the contents of the internal buffer, filling it with more
/// data from the inner reader if it is empty.
///
/// This function is a lower-level call. It needs to be paired with the
/// [`consume`] method to function properly. When calling this method,
/// none of the contents will be "read" in the sense that later calling
/// `read` may return the same contents. As such, [`consume`] must be
/// called with the number of bytes that are consumed from this buffer
/// to ensure that the bytes are never returned twice.
///
/// An empty buffer returned indicates that the stream has reached EOF.
///
/// Equivalent to:
///
/// ```ignore
/// async fn fill_buf(&mut self) -> io::Result<&[u8]>;
/// ```
///
/// # Errors
///
/// This function will return an I/O error if the underlying reader was
/// read, but returned an error.
///
/// [`consume`]: crate::io::AsyncBufReadExt::consume
fn fill_buf(&mut self) -> FillBuf<'_, Self>
where
Self: Unpin,
{
fill_buf(self)
}
/// Tells this buffer that `amt` bytes have been consumed from the
/// buffer, so they should no longer be returned in calls to [`read`].
///
/// This function is a lower-level call. It needs to be paired with the
/// [`fill_buf`] method to function properly. This function does not
/// perform any I/O, it simply informs this object that some amount of
/// its buffer, returned from [`fill_buf`], has been consumed and should
/// no longer be returned. As such, this function may do odd things if
/// [`fill_buf`] isn't called before calling it.
///
/// The `amt` must be less than the number of bytes in the buffer
/// returned by [`fill_buf`].
///
/// [`read`]: crate::io::AsyncReadExt::read
/// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf
fn consume(&mut self, amt: usize)
where
Self: Unpin,
{
std::pin::Pin::new(self).consume(amt)
}
/// Returns a stream over the lines of this reader.
/// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
///

View File

@ -0,0 +1,52 @@
use crate::io::AsyncBufRead;
use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
/// Future for the [`fill_buf`](crate::io::AsyncBufReadExt::fill_buf) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FillBuf<'a, R: ?Sized> {
reader: Option<&'a mut R>,
#[pin]
_pin: PhantomPinned,
}
}
pub(crate) fn fill_buf<R>(reader: &mut R) -> FillBuf<'_, R>
where
R: AsyncBufRead + ?Sized + Unpin,
{
FillBuf {
reader: Some(reader),
_pin: PhantomPinned,
}
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> {
type Output = io::Result<&'a [u8]>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
// Due to a limitation in the borrow-checker, we cannot return the value
// directly on Ready. Once Rust starts using the polonius borrow checker,
// this can be simplified.
let reader = me.reader.take().expect("Polled after completion.");
match Pin::new(&mut *reader).poll_fill_buf(cx) {
Poll::Ready(_) => match Pin::new(reader).poll_fill_buf(cx) {
Poll::Ready(slice) => Poll::Ready(slice),
Poll::Pending => panic!("poll_fill_buf returned Pending while having data"),
},
Poll::Pending => {
*me.reader = Some(reader);
Poll::Pending
}
}
}
}

View File

@ -49,6 +49,7 @@ cfg_io_util! {
mod read_exact;
mod read_int;
mod read_line;
mod fill_buf;
mod read_to_end;
mod vec_with_initialized;

View File

@ -514,6 +514,7 @@ async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &
async_assert_fn!(
tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): Send & Sync & !Unpin
);
async_assert_fn!(tokio::io::AsyncBufReadExt::fill_buf(&mut BoxAsyncRead): Send & Sync & !Unpin);
async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::io::AsyncReadExt::read_buf(&mut BoxAsyncRead, &mut Vec<u8>): Send & Sync & !Unpin);
async_assert_fn!(

View File

@ -8,9 +8,11 @@ use std::cmp;
use std::io::{self, Cursor};
use std::pin::Pin;
use tokio::io::{
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, BufReader,
ReadBuf, SeekFrom,
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt,
BufReader, ReadBuf, SeekFrom,
};
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
macro_rules! run_fill_buf {
($reader:expr) => {{
@ -348,3 +350,30 @@ async fn maybe_pending_seek() {
Pin::new(&mut reader).consume(1);
assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
}
// This tests the AsyncBufReadExt::fill_buf wrapper.
#[tokio::test]
async fn test_fill_buf_wrapper() {
let (mut write, read) = tokio::io::duplex(16);
let mut read = BufReader::new(read);
write.write_all(b"hello world").await.unwrap();
assert_eq!(read.fill_buf().await.unwrap(), b"hello world");
read.consume(b"hello ".len());
assert_eq!(read.fill_buf().await.unwrap(), b"world");
assert_eq!(read.fill_buf().await.unwrap(), b"world");
read.consume(b"world".len());
let mut fill = spawn(read.fill_buf());
assert_pending!(fill.poll());
write.write_all(b"foo bar").await.unwrap();
assert_eq!(assert_ready!(fill.poll()).unwrap(), b"foo bar");
drop(fill);
drop(write);
assert_eq!(read.fill_buf().await.unwrap(), b"foo bar");
read.consume(b"foo bar".len());
assert_eq!(read.fill_buf().await.unwrap(), b"");
}