tokio: add AsyncBufReadExt::read_until

This commit is contained in:
Taiki Endo 2019-07-15 15:21:09 +09:00 committed by Sean McArthur
parent 5774a9cd64
commit 0cfa120ba8
5 changed files with 162 additions and 2 deletions

View File

@ -40,7 +40,7 @@ default = [
codec = ["io", "tokio-codec"]
fs = ["tokio-fs"]
io = ["bytes", "tokio-io"]
io = ["bytes", "tokio-io", "memchr"]
reactor = ["io", "tokio-reactor"]
rt-full = [
"num_cpus",
@ -80,6 +80,7 @@ tokio-tcp = { version = "0.2.0", optional = true, path = "../tokio-tcp" }
tokio-udp = { version = "0.2.0", optional = true, path = "../tokio-udp" }
tokio-timer = { version = "0.3.0", optional = true, path = "../tokio-timer" }
tracing-core = { version = "0.1", optional = true }
memchr = { version = "2.2", optional = true }
# Needed for async/await preview support
#tokio-futures = { version = "0.2.0", optional = true, path = "../tokio-futures" }

View File

@ -1,6 +1,34 @@
use crate::io::read_until::{read_until, ReadUntil};
use tokio_io::AsyncBufRead;
/// An extension trait which adds utility methods to `AsyncBufRead` types.
pub trait AsyncBufReadExt: AsyncBufRead {}
pub trait AsyncBufReadExt: AsyncBufRead {
/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until the delimiter `byte` or EOF is reached.
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
///
/// This function will read bytes from the underlying stream until the
/// delimiter or EOF is found. Once found, all bytes up to, and including,
/// the delimiter (if found) will be appended to `buf`.
///
/// The returned future will resolve to the number of bytes read once the read
/// operation is completed.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded.
///
/// # Examples
///
/// ```
/// unimplemented!();
/// ```
fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
where
Self: Unpin,
{
read_until(self, byte, buf)
}
}
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

View File

@ -43,6 +43,7 @@ mod copy;
mod read;
mod read_exact;
mod read_to_end;
mod read_until;
mod write;
mod write_all;

View File

@ -0,0 +1,74 @@
use std::future::Future;
use std::io;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::AsyncBufRead;
/// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadUntil<'a, R: ?Sized + Unpin> {
reader: &'a mut R,
byte: u8,
buf: &'a mut Vec<u8>,
read: usize,
}
impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {}
pub(crate) fn read_until<'a, R>(
reader: &'a mut R,
byte: u8,
buf: &'a mut Vec<u8>,
) -> ReadUntil<'a, R>
where
R: AsyncBufRead + ?Sized + Unpin,
{
ReadUntil {
reader,
byte,
buf,
read: 0,
}
}
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
byte: u8,
buf: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
loop {
let (done, used) = {
let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
if let Some(i) = memchr::memchr(byte, available) {
buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
buf.extend_from_slice(available);
(false, available.len())
}
};
reader.as_mut().consume(used);
*read += used;
if done || used == 0 {
return Poll::Ready(Ok(mem::replace(read, 0)));
}
}
}
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
byte,
buf,
read,
} = &mut *self;
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}

View File

@ -0,0 +1,56 @@
#![deny(warnings, rust_2018_idioms)]
#![feature(async_await)]
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead};
use tokio_test::assert_ok;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
#[tokio::test]
async fn read_until() {
struct Rd {
val: &'static [u8],
}
impl AsyncRead for Rd {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut [u8],
) -> Poll<io::Result<usize>> {
unimplemented!()
}
}
impl AsyncBufRead for Rd {
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>> {
Poll::Ready(Ok(self.val))
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.val = &self.val[amt..];
}
}
let mut buf = vec![];
let mut rd = Rd {
val: b"hello world",
};
let n = assert_ok!(rd.read_until(b' ', &mut buf).await);
assert_eq!(n, 6);
assert_eq!(buf, b"hello ");
buf.clear();
let n = assert_ok!(rd.read_until(b' ', &mut buf).await);
assert_eq!(n, 5);
assert_eq!(buf, b"world");
buf.clear();
let n = assert_ok!(rd.read_until(b' ', &mut buf).await);
assert_eq!(n, 0);
assert_eq!(buf, []);
}