io: add AsyncBufRead trait

This commit is contained in:
Taiki Endo 2019-07-15 15:17:20 +09:00 committed by Sean McArthur
parent da49ede41e
commit 5774a9cd64
4 changed files with 105 additions and 1 deletions

View File

@ -0,0 +1,94 @@
use crate::AsyncRead;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Read bytes asynchronously.
///
/// This trait inherits from `std::io::BufRead` and indicates that an I/O object is
/// **non-blocking**. All non-blocking I/O objects must return an error when
/// bytes are unavailable instead of blocking the current thread.
pub trait AsyncBufRead: AsyncRead {
/// Attempt to return the contents of the internal buffer, filling it with more data
/// from the inner reader if it is empty.
///
/// On success, returns `Poll::Ready(Ok(buf))`.
///
/// If no data is available for reading, the method returns
/// `Poll::Pending` and arranges for the current task (via
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// This function is a lower-level call. It needs to be paired with the
/// [`consume`] method to function properly. When calling this
/// method, none of the contents will be "read" in the sense that later
/// calling [`poll_read`] may return the same contents. As such, [`consume`] must
/// be called with the number of bytes that are consumed from this buffer to
/// ensure that the bytes are never returned twice.
///
/// An empty buffer returned indicates that the stream has reached EOF.
///
/// [`poll_read`]: AsyncRead::poll_read
/// [`consume`]: AsyncBufRead::consume
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>>;
/// Tells this buffer that `amt` bytes have been consumed from the buffer,
/// so they should no longer be returned in calls to [`poll_read`].
///
/// This function is a lower-level call. It needs to be paired with the
/// [`poll_fill_buf`] method to function properly. This function does
/// not perform any I/O, it simply informs this object that some amount of
/// its buffer, returned from [`poll_fill_buf`], has been consumed and should
/// no longer be returned. As such, this function may do odd things if
/// [`poll_fill_buf`] isn't called before calling it.
///
/// The `amt` must be `<=` the number of bytes in the buffer returned by
/// [`poll_fill_buf`].
///
/// [`poll_read`]: AsyncRead::poll_read
/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
fn consume(self: Pin<&mut Self>, amt: usize);
}
macro_rules! deref_async_buf_read {
() => {
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
-> Poll<io::Result<&'a [u8]>>
{
Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
Pin::new(&mut **self).consume(amt)
}
}
}
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
deref_async_buf_read!();
}
impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
deref_async_buf_read!();
}
impl<P> AsyncBufRead for Pin<P>
where
P: DerefMut + Unpin,
P::Target: AsyncBufRead,
{
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>> {
self.get_mut().as_mut().poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_mut().as_mut().consume(amt)
}
}

View File

@ -12,8 +12,10 @@
//! [found online]: https://tokio.rs/docs/
//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
mod async_buf_read;
mod async_read;
mod async_write;
pub use self::async_buf_read::AsyncBufRead;
pub use self::async_read::AsyncRead;
pub use self::async_write::AsyncWrite;

View File

@ -0,0 +1,6 @@
use tokio_io::AsyncBufRead;
/// An extension trait which adds utility methods to `AsyncBufRead` types.
pub trait AsyncBufReadExt: AsyncBufRead {}
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

View File

@ -36,6 +36,7 @@
//! [`ErrorKind`]: enum.ErrorKind.html
//! [`Result`]: type.Result.html
mod async_buf_read_ext;
mod async_read_ext;
mod async_write_ext;
mod copy;
@ -45,13 +46,14 @@ mod read_to_end;
mod write;
mod write_all;
pub use self::async_buf_read_ext::AsyncBufReadExt;
pub use self::async_read_ext::AsyncReadExt;
pub use self::async_write_ext::AsyncWriteExt;
// standard input, output, and error
#[cfg(feature = "fs")]
pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout};
pub use tokio_io::{AsyncRead, AsyncWrite};
pub use tokio_io::{AsyncBufRead, AsyncRead, AsyncWrite};
// Re-export io::Error so that users don't have to deal
// with conflicts when `use`ing `tokio::io` and `std::io`.