mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
parent
d562e58871
commit
954f2b7304
@ -91,6 +91,7 @@ where
|
|||||||
let me = self.project();
|
let me = self.project();
|
||||||
|
|
||||||
let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
|
let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
|
||||||
|
debug_assert_eq!(*me.read, 0);
|
||||||
|
|
||||||
if n == 0 && me.buf.is_empty() {
|
if n == 0 && me.buf.is_empty() {
|
||||||
return Poll::Ready(Ok(None));
|
return Poll::Ready(Ok(None));
|
||||||
|
@ -5,7 +5,6 @@ use std::future::Future;
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::str;
|
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
cfg_io_util! {
|
cfg_io_util! {
|
||||||
@ -14,45 +13,72 @@ cfg_io_util! {
|
|||||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||||
pub struct ReadLine<'a, R: ?Sized> {
|
pub struct ReadLine<'a, R: ?Sized> {
|
||||||
reader: &'a mut R,
|
reader: &'a mut R,
|
||||||
buf: &'a mut String,
|
/// This is the buffer we were provided. It will be replaced with an empty string
|
||||||
bytes: Vec<u8>,
|
/// while reading to postpone utf-8 handling until after reading.
|
||||||
|
output: &'a mut String,
|
||||||
|
/// The actual allocation of the string is moved into a vector instead.
|
||||||
|
buf: Vec<u8>,
|
||||||
|
/// The number of bytes appended to buf. This can be less than buf.len() if
|
||||||
|
/// the buffer was not empty when the operation was started.
|
||||||
read: usize,
|
read: usize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn read_line<'a, R>(reader: &'a mut R, buf: &'a mut String) -> ReadLine<'a, R>
|
pub(crate) fn read_line<'a, R>(reader: &'a mut R, string: &'a mut String) -> ReadLine<'a, R>
|
||||||
where
|
where
|
||||||
R: AsyncBufRead + ?Sized + Unpin,
|
R: AsyncBufRead + ?Sized + Unpin,
|
||||||
{
|
{
|
||||||
ReadLine {
|
ReadLine {
|
||||||
reader,
|
reader,
|
||||||
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
|
buf: mem::replace(string, String::new()).into_bytes(),
|
||||||
buf,
|
output: string,
|
||||||
read: 0,
|
read: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_read: usize) {
|
||||||
|
let original_len = vector.len() - num_bytes_read;
|
||||||
|
vector.truncate(original_len);
|
||||||
|
*output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
|
pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
|
||||||
reader: Pin<&mut R>,
|
reader: Pin<&mut R>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut String,
|
output: &mut String,
|
||||||
bytes: &mut Vec<u8>,
|
buf: &mut Vec<u8>,
|
||||||
read: &mut usize,
|
read: &mut usize,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<usize>> {
|
||||||
let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
|
let io_res = ready!(read_until_internal(reader, cx, b'\n', buf, read));
|
||||||
if str::from_utf8(&bytes).is_err() {
|
let utf8_res = String::from_utf8(mem::replace(buf, Vec::new()));
|
||||||
Poll::Ready(ret.and_then(|_| {
|
|
||||||
Err(io::Error::new(
|
// At this point both buf and output are empty. The allocation is in utf8_res.
|
||||||
|
|
||||||
|
debug_assert!(buf.is_empty());
|
||||||
|
match (io_res, utf8_res) {
|
||||||
|
(Ok(num_bytes), Ok(string)) => {
|
||||||
|
debug_assert_eq!(*read, 0);
|
||||||
|
*output = string;
|
||||||
|
Poll::Ready(Ok(num_bytes))
|
||||||
|
}
|
||||||
|
(Err(io_err), Ok(string)) => {
|
||||||
|
*output = string;
|
||||||
|
Poll::Ready(Err(io_err))
|
||||||
|
}
|
||||||
|
(Ok(num_bytes), Err(utf8_err)) => {
|
||||||
|
debug_assert_eq!(*read, 0);
|
||||||
|
put_back_original_data(output, utf8_err.into_bytes(), num_bytes);
|
||||||
|
|
||||||
|
Poll::Ready(Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidData,
|
io::ErrorKind::InvalidData,
|
||||||
"stream did not contain valid UTF-8",
|
"stream did not contain valid UTF-8",
|
||||||
))
|
)))
|
||||||
}))
|
}
|
||||||
} else {
|
(Err(io_err), Err(utf8_err)) => {
|
||||||
debug_assert!(buf.is_empty());
|
put_back_original_data(output, utf8_err.into_bytes(), *read);
|
||||||
debug_assert_eq!(*read, 0);
|
|
||||||
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
|
Poll::Ready(Err(io_err))
|
||||||
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
|
}
|
||||||
Poll::Ready(ret)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,11 +88,12 @@ impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
|
|||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let Self {
|
let Self {
|
||||||
reader,
|
reader,
|
||||||
|
output,
|
||||||
buf,
|
buf,
|
||||||
bytes,
|
|
||||||
read,
|
read,
|
||||||
} = &mut *self;
|
} = &mut *self;
|
||||||
read_line_internal(Pin::new(reader), cx, buf, bytes, read)
|
|
||||||
|
read_line_internal(Pin::new(reader), cx, output, buf, read)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,19 +8,22 @@ use std::task::{Context, Poll};
|
|||||||
|
|
||||||
cfg_io_util! {
|
cfg_io_util! {
|
||||||
/// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
|
/// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
|
||||||
|
/// The delimeter is included in the resulting vector.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||||
pub struct ReadUntil<'a, R: ?Sized> {
|
pub struct ReadUntil<'a, R: ?Sized> {
|
||||||
reader: &'a mut R,
|
reader: &'a mut R,
|
||||||
byte: u8,
|
delimeter: u8,
|
||||||
buf: &'a mut Vec<u8>,
|
buf: &'a mut Vec<u8>,
|
||||||
|
/// The number of bytes appended to buf. This can be less than buf.len() if
|
||||||
|
/// the buffer was not empty when the operation was started.
|
||||||
read: usize,
|
read: usize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn read_until<'a, R>(
|
pub(crate) fn read_until<'a, R>(
|
||||||
reader: &'a mut R,
|
reader: &'a mut R,
|
||||||
byte: u8,
|
delimeter: u8,
|
||||||
buf: &'a mut Vec<u8>,
|
buf: &'a mut Vec<u8>,
|
||||||
) -> ReadUntil<'a, R>
|
) -> ReadUntil<'a, R>
|
||||||
where
|
where
|
||||||
@ -28,7 +31,7 @@ where
|
|||||||
{
|
{
|
||||||
ReadUntil {
|
ReadUntil {
|
||||||
reader,
|
reader,
|
||||||
byte,
|
delimeter,
|
||||||
buf,
|
buf,
|
||||||
read: 0,
|
read: 0,
|
||||||
}
|
}
|
||||||
@ -37,14 +40,14 @@ where
|
|||||||
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
|
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
|
||||||
mut reader: Pin<&mut R>,
|
mut reader: Pin<&mut R>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
byte: u8,
|
delimeter: u8,
|
||||||
buf: &mut Vec<u8>,
|
buf: &mut Vec<u8>,
|
||||||
read: &mut usize,
|
read: &mut usize,
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<usize>> {
|
||||||
loop {
|
loop {
|
||||||
let (done, used) = {
|
let (done, used) = {
|
||||||
let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
|
let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
|
||||||
if let Some(i) = memchr::memchr(byte, available) {
|
if let Some(i) = memchr::memchr(delimeter, available) {
|
||||||
buf.extend_from_slice(&available[..=i]);
|
buf.extend_from_slice(&available[..=i]);
|
||||||
(true, i + 1)
|
(true, i + 1)
|
||||||
} else {
|
} else {
|
||||||
@ -66,11 +69,11 @@ impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
|
|||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let Self {
|
let Self {
|
||||||
reader,
|
reader,
|
||||||
byte,
|
delimeter,
|
||||||
buf,
|
buf,
|
||||||
read,
|
read,
|
||||||
} = &mut *self;
|
} = &mut *self;
|
||||||
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
|
read_until_internal(Pin::new(reader), cx, *delimeter, buf, read)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,6 +75,8 @@ where
|
|||||||
let n = ready!(read_until_internal(
|
let n = ready!(read_until_internal(
|
||||||
me.reader, cx, *me.delim, me.buf, me.read,
|
me.reader, cx, *me.delim, me.buf, me.read,
|
||||||
))?;
|
))?;
|
||||||
|
// read_until_internal resets me.read to zero once it finds the delimeter
|
||||||
|
debug_assert_eq!(*me.read, 0);
|
||||||
|
|
||||||
if n == 0 && me.buf.is_empty() {
|
if n == 0 && me.buf.is_empty() {
|
||||||
return Poll::Ready(Ok(None));
|
return Poll::Ready(Ok(None));
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
#![warn(rust_2018_idioms)]
|
#![warn(rust_2018_idioms)]
|
||||||
#![cfg(feature = "full")]
|
#![cfg(feature = "full")]
|
||||||
|
|
||||||
use tokio::io::AsyncBufReadExt;
|
use std::io::ErrorKind;
|
||||||
use tokio_test::assert_ok;
|
use tokio::io::{AsyncBufReadExt, BufReader, Error};
|
||||||
|
use tokio_test::{assert_ok, io::Builder};
|
||||||
|
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
@ -27,3 +28,80 @@ async fn read_line() {
|
|||||||
assert_eq!(n, 0);
|
assert_eq!(n, 0);
|
||||||
assert_eq!(buf, "");
|
assert_eq!(buf, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_line_not_all_ready() {
|
||||||
|
let mock = Builder::new()
|
||||||
|
.read(b"Hello Wor")
|
||||||
|
.read(b"ld\nFizzBuz")
|
||||||
|
.read(b"z\n1\n2")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let mut read = BufReader::new(mock);
|
||||||
|
|
||||||
|
let mut line = "We say ".to_string();
|
||||||
|
let bytes = read.read_line(&mut line).await.unwrap();
|
||||||
|
assert_eq!(bytes, "Hello World\n".len());
|
||||||
|
assert_eq!(line.as_str(), "We say Hello World\n");
|
||||||
|
|
||||||
|
line = "I solve ".to_string();
|
||||||
|
let bytes = read.read_line(&mut line).await.unwrap();
|
||||||
|
assert_eq!(bytes, "FizzBuzz\n".len());
|
||||||
|
assert_eq!(line.as_str(), "I solve FizzBuzz\n");
|
||||||
|
|
||||||
|
line.clear();
|
||||||
|
let bytes = read.read_line(&mut line).await.unwrap();
|
||||||
|
assert_eq!(bytes, 2);
|
||||||
|
assert_eq!(line.as_str(), "1\n");
|
||||||
|
|
||||||
|
line.clear();
|
||||||
|
let bytes = read.read_line(&mut line).await.unwrap();
|
||||||
|
assert_eq!(bytes, 1);
|
||||||
|
assert_eq!(line.as_str(), "2");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_line_invalid_utf8() {
|
||||||
|
let mock = Builder::new().read(b"Hello Wor\xffld.\n").build();
|
||||||
|
|
||||||
|
let mut read = BufReader::new(mock);
|
||||||
|
|
||||||
|
let mut line = "Foo".to_string();
|
||||||
|
let err = read.read_line(&mut line).await.expect_err("Should fail");
|
||||||
|
assert_eq!(err.kind(), ErrorKind::InvalidData);
|
||||||
|
assert_eq!(err.to_string(), "stream did not contain valid UTF-8");
|
||||||
|
assert_eq!(line.as_str(), "Foo");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_line_fail() {
|
||||||
|
let mock = Builder::new()
|
||||||
|
.read(b"Hello Wor")
|
||||||
|
.read_error(Error::new(ErrorKind::Other, "The world has no end"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let mut read = BufReader::new(mock);
|
||||||
|
|
||||||
|
let mut line = "Foo".to_string();
|
||||||
|
let err = read.read_line(&mut line).await.expect_err("Should fail");
|
||||||
|
assert_eq!(err.kind(), ErrorKind::Other);
|
||||||
|
assert_eq!(err.to_string(), "The world has no end");
|
||||||
|
assert_eq!(line.as_str(), "FooHello Wor");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_line_fail_and_utf8_fail() {
|
||||||
|
let mock = Builder::new()
|
||||||
|
.read(b"Hello Wor")
|
||||||
|
.read(b"\xff\xff\xff")
|
||||||
|
.read_error(Error::new(ErrorKind::Other, "The world has no end"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let mut read = BufReader::new(mock);
|
||||||
|
|
||||||
|
let mut line = "Foo".to_string();
|
||||||
|
let err = read.read_line(&mut line).await.expect_err("Should fail");
|
||||||
|
assert_eq!(err.kind(), ErrorKind::Other);
|
||||||
|
assert_eq!(err.to_string(), "The world has no end");
|
||||||
|
assert_eq!(line.as_str(), "Foo");
|
||||||
|
}
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
#![warn(rust_2018_idioms)]
|
#![warn(rust_2018_idioms)]
|
||||||
#![cfg(feature = "full")]
|
#![cfg(feature = "full")]
|
||||||
|
|
||||||
use tokio::io::AsyncBufReadExt;
|
use std::io::ErrorKind;
|
||||||
use tokio_test::assert_ok;
|
use tokio::io::{AsyncBufReadExt, BufReader, Error};
|
||||||
|
use tokio_test::{assert_ok, io::Builder};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn read_until() {
|
async fn read_until() {
|
||||||
@ -21,3 +22,53 @@ async fn read_until() {
|
|||||||
assert_eq!(n, 0);
|
assert_eq!(n, 0);
|
||||||
assert_eq!(buf, []);
|
assert_eq!(buf, []);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_until_not_all_ready() {
|
||||||
|
let mock = Builder::new()
|
||||||
|
.read(b"Hello Wor")
|
||||||
|
.read(b"ld#Fizz\xffBuz")
|
||||||
|
.read(b"z#1#2")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let mut read = BufReader::new(mock);
|
||||||
|
|
||||||
|
let mut chunk = b"We say ".to_vec();
|
||||||
|
let bytes = read.read_until(b'#', &mut chunk).await.unwrap();
|
||||||
|
assert_eq!(bytes, b"Hello World#".len());
|
||||||
|
assert_eq!(chunk, b"We say Hello World#");
|
||||||
|
|
||||||
|
chunk = b"I solve ".to_vec();
|
||||||
|
let bytes = read.read_until(b'#', &mut chunk).await.unwrap();
|
||||||
|
assert_eq!(bytes, b"Fizz\xffBuzz\n".len());
|
||||||
|
assert_eq!(chunk, b"I solve Fizz\xffBuzz#");
|
||||||
|
|
||||||
|
chunk.clear();
|
||||||
|
let bytes = read.read_until(b'#', &mut chunk).await.unwrap();
|
||||||
|
assert_eq!(bytes, 2);
|
||||||
|
assert_eq!(chunk, b"1#");
|
||||||
|
|
||||||
|
chunk.clear();
|
||||||
|
let bytes = read.read_until(b'#', &mut chunk).await.unwrap();
|
||||||
|
assert_eq!(bytes, 1);
|
||||||
|
assert_eq!(chunk, b"2");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_until_fail() {
|
||||||
|
let mock = Builder::new()
|
||||||
|
.read(b"Hello \xffWor")
|
||||||
|
.read_error(Error::new(ErrorKind::Other, "The world has no end"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let mut read = BufReader::new(mock);
|
||||||
|
|
||||||
|
let mut chunk = b"Foo".to_vec();
|
||||||
|
let err = read
|
||||||
|
.read_until(b'#', &mut chunk)
|
||||||
|
.await
|
||||||
|
.expect_err("Should fail");
|
||||||
|
assert_eq!(err.kind(), ErrorKind::Other);
|
||||||
|
assert_eq!(err.to_string(), "The world has no end");
|
||||||
|
assert_eq!(chunk, b"FooHello \xffWor");
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user