mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
chore: update bytes
dependency to git master (#1796)
Tokio will track changes to bytes until 0.5 is released.
This commit is contained in:
parent
3e643c7b81
commit
5cd665afd7
@ -8,7 +8,7 @@ edition = "2018"
|
||||
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
|
||||
tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" }
|
||||
|
||||
bytes = "0.4.12"
|
||||
bytes = { git = "https://github.com/tokio-rs/bytes" }
|
||||
futures = "0.3.0"
|
||||
|
||||
[[example]]
|
||||
|
@ -22,7 +22,7 @@ categories = ["asynchronous", "testing"]
|
||||
[dependencies]
|
||||
tokio = { version = "=0.2.0-alpha.6", path = "../tokio", features = ["test-util"] }
|
||||
|
||||
bytes = "0.4"
|
||||
bytes = { git = "https://github.com/tokio-rs/bytes" }
|
||||
futures-core = "0.3.0"
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -35,6 +35,7 @@ use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::marker::Unpin;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::ptr::null_mut;
|
||||
use std::task::{Context, Poll};
|
||||
@ -182,7 +183,7 @@ impl<S> AsyncRead for TlsStream<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
|
||||
// Note that this does not forward to `S` because the buffer is
|
||||
// unconditionally filled in by OpenSSL, not the actual object `S`.
|
||||
// We're decrypting bytes from `S` into the buffer above!
|
||||
|
@ -22,7 +22,7 @@ categories = ["asynchronous"]
|
||||
[dependencies]
|
||||
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
|
||||
|
||||
bytes = "0.4.7"
|
||||
bytes = { git = "https://github.com/tokio-rs/bytes" }
|
||||
futures-core = "0.3.0"
|
||||
futures-sink = "0.3.0"
|
||||
log = "0.4"
|
||||
|
@ -11,6 +11,7 @@ use futures_sink::Sink;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::fmt;
|
||||
use std::io::{self, BufRead, Read, Write};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@ -261,7 +262,7 @@ impl<T: BufRead, U> BufRead for Fuse<T, U> {
|
||||
}
|
||||
|
||||
impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
self.io.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ use log::trace;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::fmt;
|
||||
use std::io::{self, BufRead, Read};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@ -284,7 +285,7 @@ impl<T: BufRead> BufRead for FramedWrite2<T> {
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> AsyncRead for FramedWrite2<T> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
|
@ -345,7 +345,7 @@ use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use std::error::Error as StdError;
|
||||
use std::io::{self, Cursor};
|
||||
use std::{cmp, fmt};
|
||||
@ -457,7 +457,7 @@ impl LengthDelimitedCodec {
|
||||
|
||||
// match endianess
|
||||
let n = if self.builder.length_field_is_big_endian {
|
||||
src.get_uint_be(field_len)
|
||||
src.get_uint(field_len)
|
||||
} else {
|
||||
src.get_uint_le(field_len)
|
||||
};
|
||||
@ -551,7 +551,7 @@ impl Encoder for LengthDelimitedCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
|
||||
let n = (&data).into_buf().remaining();
|
||||
let n = (&data).remaining();
|
||||
|
||||
if n > self.builder.max_frame_len {
|
||||
return Err(io::Error::new(
|
||||
@ -579,7 +579,7 @@ impl Encoder for LengthDelimitedCodec {
|
||||
dst.reserve(self.builder.length_field_len + n);
|
||||
|
||||
if self.builder.length_field_is_big_endian {
|
||||
dst.put_uint_be(n as u64, self.builder.length_field_len);
|
||||
dst.put_uint(n as u64, self.builder.length_field_len);
|
||||
} else {
|
||||
dst.put_uint_le(n as u64, self.builder.length_field_len);
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
use crate::codec::decoder::Decoder;
|
||||
use crate::codec::encoder::Encoder;
|
||||
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use std::{cmp, fmt, io, str, usize};
|
||||
|
||||
/// A simple `Codec` implementation that splits up data into lines.
|
||||
@ -168,7 +168,7 @@ impl Decoder for LinesCodec {
|
||||
if buf.is_empty() || buf == &b"\r"[..] {
|
||||
None
|
||||
} else {
|
||||
let line = buf.take();
|
||||
let line = buf.split_to(buf.len());
|
||||
let line = without_carriage_return(&line);
|
||||
let line = utf8(line)?;
|
||||
self.next_index = 0;
|
||||
@ -185,7 +185,7 @@ impl Encoder for LinesCodec {
|
||||
|
||||
fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
|
||||
buf.reserve(line.len() + 1);
|
||||
buf.put(line);
|
||||
buf.put(line.as_bytes());
|
||||
buf.put_u8(b'\n');
|
||||
Ok(())
|
||||
}
|
||||
|
@ -47,7 +47,14 @@ impl<C: Decoder + Unpin> Stream for UdpFramed<C> {
|
||||
|
||||
let (_n, addr) = unsafe {
|
||||
// Read into the buffer without having to initialize the memory.
|
||||
let res = ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, pin.rd.bytes_mut()));
|
||||
//
|
||||
// safety: we know tokio::net::UdpSocket never reads from the memory
|
||||
// during a recv
|
||||
let res = {
|
||||
let bytes = &mut *(pin.rd.bytes_mut() as *mut _ as *mut [u8]);
|
||||
ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, bytes))
|
||||
};
|
||||
|
||||
let (n, addr) = res?;
|
||||
pin.rd.advance_mut(n);
|
||||
(n, addr)
|
||||
|
@ -45,14 +45,14 @@ fn lines_decoder() {
|
||||
let mut codec = LinesCodec::new();
|
||||
let buf = &mut BytesMut::new();
|
||||
buf.reserve(200);
|
||||
buf.put("line 1\nline 2\r\nline 3\n\r\n\r");
|
||||
buf.put_slice(b"line 1\nline 2\r\nline 3\n\r\n\r");
|
||||
assert_eq!("line 1", codec.decode(buf).unwrap().unwrap());
|
||||
assert_eq!("line 2", codec.decode(buf).unwrap().unwrap());
|
||||
assert_eq!("line 3", codec.decode(buf).unwrap().unwrap());
|
||||
assert_eq!("", codec.decode(buf).unwrap().unwrap());
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
assert_eq!(None, codec.decode_eof(buf).unwrap());
|
||||
buf.put("k");
|
||||
buf.put_slice(b"k");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap());
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
@ -67,7 +67,7 @@ fn lines_decoder_max_length() {
|
||||
let buf = &mut BytesMut::new();
|
||||
|
||||
buf.reserve(200);
|
||||
buf.put("line 1 is too long\nline 2\nline 3\r\nline 4\n\r\n\r");
|
||||
buf.put_slice(b"line 1 is too long\nline 2\nline 3\r\nline 4\n\r\n\r");
|
||||
|
||||
assert!(codec.decode(buf).is_err());
|
||||
|
||||
@ -102,7 +102,7 @@ fn lines_decoder_max_length() {
|
||||
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
assert_eq!(None, codec.decode_eof(buf).unwrap());
|
||||
buf.put("k");
|
||||
buf.put_slice(b"k");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
|
||||
let line = codec.decode_eof(buf).unwrap().unwrap();
|
||||
@ -119,7 +119,7 @@ fn lines_decoder_max_length() {
|
||||
|
||||
// Line that's one character too long. This could cause an out of bounds
|
||||
// error if we peek at the next characters using slice indexing.
|
||||
buf.put("aaabbbc");
|
||||
buf.put_slice(b"aaabbbc");
|
||||
assert!(codec.decode(buf).is_err());
|
||||
}
|
||||
|
||||
@ -131,16 +131,16 @@ fn lines_decoder_max_length_underrun() {
|
||||
let buf = &mut BytesMut::new();
|
||||
|
||||
buf.reserve(200);
|
||||
buf.put("line ");
|
||||
buf.put_slice(b"line ");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
buf.put("too l");
|
||||
buf.put_slice(b"too l");
|
||||
assert!(codec.decode(buf).is_err());
|
||||
buf.put("ong\n");
|
||||
buf.put_slice(b"ong\n");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
|
||||
buf.put("line 2");
|
||||
buf.put_slice(b"line 2");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
buf.put("\n");
|
||||
buf.put_slice(b"\n");
|
||||
assert_eq!("line 2", codec.decode(buf).unwrap().unwrap());
|
||||
}
|
||||
|
||||
@ -152,11 +152,11 @@ fn lines_decoder_max_length_bursts() {
|
||||
let buf = &mut BytesMut::new();
|
||||
|
||||
buf.reserve(200);
|
||||
buf.put("line ");
|
||||
buf.put_slice(b"line ");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
buf.put("too l");
|
||||
buf.put_slice(b"too l");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
buf.put("ong\n");
|
||||
buf.put_slice(b"ong\n");
|
||||
assert!(codec.decode(buf).is_err());
|
||||
}
|
||||
|
||||
@ -168,9 +168,9 @@ fn lines_decoder_max_length_big_burst() {
|
||||
let buf = &mut BytesMut::new();
|
||||
|
||||
buf.reserve(200);
|
||||
buf.put("line ");
|
||||
buf.put_slice(b"line ");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
buf.put("too long!\n");
|
||||
buf.put_slice(b"too long!\n");
|
||||
assert!(codec.decode(buf).is_err());
|
||||
}
|
||||
|
||||
@ -182,10 +182,10 @@ fn lines_decoder_max_length_newline_between_decodes() {
|
||||
let buf = &mut BytesMut::new();
|
||||
|
||||
buf.reserve(200);
|
||||
buf.put("hello");
|
||||
buf.put_slice(b"hello");
|
||||
assert_eq!(None, codec.decode(buf).unwrap());
|
||||
|
||||
buf.put("\nworld");
|
||||
buf.put_slice(b"\nworld");
|
||||
assert_eq!("hello", codec.decode(buf).unwrap().unwrap());
|
||||
}
|
||||
|
||||
@ -198,9 +198,9 @@ fn lines_decoder_discard_repeat() {
|
||||
let buf = &mut BytesMut::new();
|
||||
|
||||
buf.reserve(200);
|
||||
buf.put("aa");
|
||||
buf.put_slice(b"aa");
|
||||
assert!(codec.decode(buf).is_err());
|
||||
buf.put("a");
|
||||
buf.put_slice(b"a");
|
||||
assert!(codec.decode(buf).is_err());
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ use tokio::prelude::*;
|
||||
use tokio_test::assert_ok;
|
||||
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};
|
||||
|
||||
use bytes::{Buf, BufMut, BytesMut, IntoBuf};
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use futures::StreamExt;
|
||||
use std::io::{self, Read};
|
||||
use std::pin::Pin;
|
||||
@ -24,7 +24,7 @@ impl Decoder for U32Codec {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let n = buf.split_to(4).into_buf().get_u32_be();
|
||||
let n = buf.split_to(4).get_u32();
|
||||
Ok(Some(n))
|
||||
}
|
||||
}
|
||||
@ -36,7 +36,7 @@ impl Encoder for U32Codec {
|
||||
fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
|
||||
// Reserve space
|
||||
dst.reserve(4);
|
||||
dst.put_u32_be(item);
|
||||
dst.put_u32(item);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -66,7 +66,7 @@ impl AsyncRead for DontReadIntoThis {
|
||||
#[tokio::test]
|
||||
async fn can_read_from_existing_buf() {
|
||||
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
|
||||
parts.read_buf = vec![0, 0, 0, 42].into();
|
||||
parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]);
|
||||
|
||||
let mut framed = Framed::from_parts(parts);
|
||||
let num = assert_ok!(framed.next().await.unwrap());
|
||||
@ -77,7 +77,7 @@ async fn can_read_from_existing_buf() {
|
||||
#[test]
|
||||
fn external_buf_grows_to_init() {
|
||||
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
|
||||
parts.read_buf = vec![0, 0, 0, 42].into();
|
||||
parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]);
|
||||
|
||||
let framed = Framed::from_parts(parts);
|
||||
let FramedParts { read_buf, .. } = framed.into_parts();
|
||||
@ -88,7 +88,7 @@ fn external_buf_grows_to_init() {
|
||||
#[test]
|
||||
fn external_buf_does_not_shrink() {
|
||||
let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
|
||||
parts.read_buf = vec![0; INITIAL_CAPACITY * 2].into();
|
||||
parts.read_buf = BytesMut::from(&vec![0; INITIAL_CAPACITY * 2][..]);
|
||||
|
||||
let framed = Framed::from_parts(parts);
|
||||
let FramedParts { read_buf, .. } = framed.into_parts();
|
||||
|
@ -5,7 +5,7 @@ use tokio_test::assert_ready;
|
||||
use tokio_test::task;
|
||||
use tokio_util::codec::{Decoder, FramedRead};
|
||||
|
||||
use bytes::{Buf, BytesMut, IntoBuf};
|
||||
use bytes::{Buf, BytesMut};
|
||||
use futures::Stream;
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
@ -45,7 +45,7 @@ impl Decoder for U32Decoder {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let n = buf.split_to(4).into_buf().get_u32_be();
|
||||
let n = buf.split_to(4).get_u32();
|
||||
Ok(Some(n))
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ impl Encoder for U32Encoder {
|
||||
fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
|
||||
// Reserve space
|
||||
dst.reserve(4);
|
||||
dst.put_u32_be(item);
|
||||
dst.put_u32(item);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -78,7 +78,7 @@ fn write_hits_backpressure() {
|
||||
|
||||
for i in 0..=ITER {
|
||||
let mut b = BytesMut::with_capacity(4);
|
||||
b.put_u32_be(i as u32);
|
||||
b.put_u32(i as u32);
|
||||
|
||||
// Append to the end
|
||||
match mock.calls.back_mut().unwrap() {
|
||||
|
@ -73,7 +73,7 @@ impl Encoder for ByteCodec {
|
||||
|
||||
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
|
||||
buf.reserve(data.len());
|
||||
buf.put(data);
|
||||
buf.put_slice(&data);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -91,8 +91,7 @@ uds = ["io-driver", "mio-uds", "libc"]
|
||||
[dependencies]
|
||||
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
|
||||
|
||||
bytes = "0.4"
|
||||
iovec = "0.1"
|
||||
bytes = { git = "https://github.com/tokio-rs/bytes" }
|
||||
pin-project-lite = "0.1.1"
|
||||
|
||||
# Everything else is optional...
|
||||
|
@ -1,5 +1,6 @@
|
||||
use bytes::BufMut;
|
||||
use std::io;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ops::DerefMut;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
@ -63,9 +64,9 @@ pub trait AsyncRead {
|
||||
///
|
||||
/// [`io::Read`]: std::io::Read
|
||||
/// [`poll_read_buf`]: #method.poll_read_buf
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
for x in buf {
|
||||
*x = 0;
|
||||
*x.as_mut_ptr() = 0;
|
||||
}
|
||||
|
||||
true
|
||||
@ -109,6 +110,9 @@ pub trait AsyncRead {
|
||||
|
||||
self.prepare_uninitialized_buffer(b);
|
||||
|
||||
// Convert to `&mut [u8]`
|
||||
let b = &mut *(b as *mut [MaybeUninit<u8>] as *mut [u8]);
|
||||
|
||||
ready!(self.poll_read(cx, b))?
|
||||
};
|
||||
|
||||
@ -120,7 +124,7 @@ pub trait AsyncRead {
|
||||
|
||||
macro_rules! deref_async_read {
|
||||
() => {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
(**self).prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
@ -145,7 +149,7 @@ where
|
||||
P: DerefMut + Unpin,
|
||||
P::Target: AsyncRead,
|
||||
{
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
(**self).prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
@ -159,7 +163,7 @@ where
|
||||
}
|
||||
|
||||
impl AsyncRead for &[u8] {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@ -173,7 +177,7 @@ impl AsyncRead for &[u8] {
|
||||
}
|
||||
|
||||
impl<T: AsRef<[u8]> + Unpin> AsyncRead for io::Cursor<T> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use std::io::{self, Read};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{cmp, fmt};
|
||||
@ -45,7 +46,12 @@ impl<R: AsyncRead> BufReader<R> {
|
||||
unsafe {
|
||||
let mut buffer = Vec::with_capacity(capacity);
|
||||
buffer.set_len(capacity);
|
||||
inner.prepare_uninitialized_buffer(&mut buffer);
|
||||
|
||||
{
|
||||
// Convert to MaybeUninit
|
||||
let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]);
|
||||
inner.prepare_uninitialized_buffer(b);
|
||||
}
|
||||
Self {
|
||||
inner,
|
||||
buf: buffer.into_boxed_slice(),
|
||||
@ -120,7 +126,7 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
|
||||
}
|
||||
|
||||
// we can't skip unconditionally because of the large buffer case in read.
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
@ -2,11 +2,10 @@ use crate::io::util::{BufReader, BufWriter};
|
||||
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use std::io::{self};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use std::io;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
/// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
|
||||
@ -126,7 +125,7 @@ impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
|
||||
}
|
||||
|
||||
// we can't skip unconditionally because of the large buffer case in read.
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
|
||||
use pin_project_lite::pin_project;
|
||||
use std::fmt;
|
||||
use std::io::{self, Write};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@ -152,7 +153,7 @@ impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> {
|
||||
}
|
||||
|
||||
// we can't skip unconditionally because of the large buffer case in read.
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
self.get_ref().prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ use crate::io::AsyncRead;
|
||||
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@ -64,7 +65,10 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
|
||||
g.buf.reserve(32);
|
||||
let capacity = g.buf.capacity();
|
||||
g.buf.set_len(capacity);
|
||||
rd.prepare_uninitialized_buffer(&mut g.buf[g.len..]);
|
||||
|
||||
let b = &mut *(&mut g.buf[g.len..] as *mut [u8] as *mut [MaybeUninit<u8>]);
|
||||
|
||||
rd.prepare_uninitialized_buffer(b);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::io::{AsyncBufRead, AsyncRead};
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{cmp, io};
|
||||
@ -74,7 +75,7 @@ impl<R: AsyncRead> Take<R> {
|
||||
}
|
||||
|
||||
impl<R: AsyncRead> AsyncRead for Take<R> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
|
@ -11,8 +11,8 @@
|
||||
use crate::io::{AsyncRead, AsyncWrite};
|
||||
use crate::net::TcpStream;
|
||||
|
||||
use bytes::{Buf, BufMut};
|
||||
use std::io;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::net::Shutdown;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
@ -33,7 +33,7 @@ pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
|
||||
}
|
||||
|
||||
impl AsyncRead for ReadHalf<'_> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@ -44,14 +44,6 @@ impl AsyncRead for ReadHalf<'_> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.0.poll_read_priv(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_read_buf<B: BufMut>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.0.poll_read_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for WriteHalf<'_> {
|
||||
@ -73,14 +65,6 @@ impl AsyncWrite for WriteHalf<'_> {
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.0.shutdown(Shutdown::Write).into()
|
||||
}
|
||||
|
||||
fn poll_write_buf<B: Buf>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.0.poll_write_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<TcpStream> for ReadHalf<'_> {
|
||||
|
@ -3,11 +3,10 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented};
|
||||
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
|
||||
use crate::net::ToSocketAddrs;
|
||||
|
||||
use bytes::{Buf, BufMut};
|
||||
use iovec::IoVec;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::net::{self, Shutdown, SocketAddr};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
@ -601,70 +600,6 @@ impl TcpStream {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_read_buf_priv<B: BufMut>(
|
||||
&self,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
|
||||
|
||||
let r = unsafe {
|
||||
// The `IoVec` type can't have a 0-length size, so we create a bunch
|
||||
// of dummy versions on the stack with 1 length which we'll quickly
|
||||
// overwrite.
|
||||
let b1: &mut [u8] = &mut [0];
|
||||
let b2: &mut [u8] = &mut [0];
|
||||
let b3: &mut [u8] = &mut [0];
|
||||
let b4: &mut [u8] = &mut [0];
|
||||
let b5: &mut [u8] = &mut [0];
|
||||
let b6: &mut [u8] = &mut [0];
|
||||
let b7: &mut [u8] = &mut [0];
|
||||
let b8: &mut [u8] = &mut [0];
|
||||
let b9: &mut [u8] = &mut [0];
|
||||
let b10: &mut [u8] = &mut [0];
|
||||
let b11: &mut [u8] = &mut [0];
|
||||
let b12: &mut [u8] = &mut [0];
|
||||
let b13: &mut [u8] = &mut [0];
|
||||
let b14: &mut [u8] = &mut [0];
|
||||
let b15: &mut [u8] = &mut [0];
|
||||
let b16: &mut [u8] = &mut [0];
|
||||
let mut bufs: [&mut IoVec; 16] = [
|
||||
b1.into(),
|
||||
b2.into(),
|
||||
b3.into(),
|
||||
b4.into(),
|
||||
b5.into(),
|
||||
b6.into(),
|
||||
b7.into(),
|
||||
b8.into(),
|
||||
b9.into(),
|
||||
b10.into(),
|
||||
b11.into(),
|
||||
b12.into(),
|
||||
b13.into(),
|
||||
b14.into(),
|
||||
b15.into(),
|
||||
b16.into(),
|
||||
];
|
||||
let n = buf.bytes_vec_mut(&mut bufs);
|
||||
self.io.get_ref().read_bufs(&mut bufs[..n])
|
||||
};
|
||||
|
||||
match r {
|
||||
Ok(n) => {
|
||||
unsafe {
|
||||
buf.advance_mut(n);
|
||||
}
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
self.io.clear_read_ready(cx, mio::Ready::readable())?;
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_write_priv(
|
||||
&self,
|
||||
cx: &mut Context<'_>,
|
||||
@ -680,36 +615,6 @@ impl TcpStream {
|
||||
x => Poll::Ready(x),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_write_buf_priv<B: Buf>(
|
||||
&self,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
ready!(self.io.poll_write_ready(cx))?;
|
||||
|
||||
let r = {
|
||||
// The `IoVec` type can't have a zero-length size, so create a dummy
|
||||
// version from a 1-length slice which we'll overwrite with the
|
||||
// `bytes_vec` method.
|
||||
static DUMMY: &[u8] = &[0];
|
||||
let iovec = <&IoVec>::from(DUMMY);
|
||||
let mut bufs = [iovec; 64];
|
||||
let n = buf.bytes_vec(&mut bufs);
|
||||
self.io.get_ref().write_bufs(&bufs[..n])
|
||||
};
|
||||
match r {
|
||||
Ok(n) => {
|
||||
buf.advance(n);
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
self.io.clear_write_ready(cx)?;
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<TcpStream> for mio::net::TcpStream {
|
||||
@ -741,7 +646,7 @@ impl TryFrom<net::TcpStream> for TcpStream {
|
||||
// ===== impl Read / Write =====
|
||||
|
||||
impl AsyncRead for TcpStream {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@ -752,14 +657,6 @@ impl AsyncRead for TcpStream {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_read_priv(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_read_buf<B: BufMut>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_read_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for TcpStream {
|
||||
@ -781,14 +678,6 @@ impl AsyncWrite for TcpStream {
|
||||
self.shutdown(std::net::Shutdown::Write)?;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_write_buf<B: Buf>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_write_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for TcpStream {
|
||||
|
@ -11,8 +11,8 @@
|
||||
use crate::io::{AsyncRead, AsyncWrite};
|
||||
use crate::net::UnixStream;
|
||||
|
||||
use bytes::{Buf, BufMut};
|
||||
use std::io;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::net::Shutdown;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
@ -30,7 +30,7 @@ pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
|
||||
}
|
||||
|
||||
impl AsyncRead for ReadHalf<'_> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@ -41,14 +41,6 @@ impl AsyncRead for ReadHalf<'_> {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.0.poll_read_priv(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_read_buf<B: BufMut>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.0.poll_read_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for WriteHalf<'_> {
|
||||
@ -67,14 +59,6 @@ impl AsyncWrite for WriteHalf<'_> {
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.0.shutdown(Shutdown::Write).into()
|
||||
}
|
||||
|
||||
fn poll_write_buf<B: Buf>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.0.poll_write_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<UnixStream> for ReadHalf<'_> {
|
||||
|
@ -3,11 +3,10 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented};
|
||||
use crate::net::unix::split::{split, ReadHalf, WriteHalf};
|
||||
use crate::net::unix::ucred::{self, UCred};
|
||||
|
||||
use bytes::{Buf, BufMut};
|
||||
use iovec::IoVec;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::net::Shutdown;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::os::unix::net::{self, SocketAddr};
|
||||
@ -137,7 +136,7 @@ impl TryFrom<net::UnixStream> for UnixStream {
|
||||
}
|
||||
|
||||
impl AsyncRead for UnixStream {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@ -148,14 +147,6 @@ impl AsyncRead for UnixStream {
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_read_priv(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_read_buf<B: BufMut>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_read_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for UnixStream {
|
||||
@ -174,14 +165,6 @@ impl AsyncWrite for UnixStream {
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_write_buf<B: Buf>(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.poll_write_buf_priv(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl UnixStream {
|
||||
@ -212,70 +195,6 @@ impl UnixStream {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_read_buf_priv<B: BufMut>(
|
||||
&self,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
|
||||
|
||||
let r = unsafe {
|
||||
// The `IoVec` type can't have a 0-length size, so we create a bunch
|
||||
// of dummy versions on the stack with 1 length which we'll quickly
|
||||
// overwrite.
|
||||
let b1: &mut [u8] = &mut [0];
|
||||
let b2: &mut [u8] = &mut [0];
|
||||
let b3: &mut [u8] = &mut [0];
|
||||
let b4: &mut [u8] = &mut [0];
|
||||
let b5: &mut [u8] = &mut [0];
|
||||
let b6: &mut [u8] = &mut [0];
|
||||
let b7: &mut [u8] = &mut [0];
|
||||
let b8: &mut [u8] = &mut [0];
|
||||
let b9: &mut [u8] = &mut [0];
|
||||
let b10: &mut [u8] = &mut [0];
|
||||
let b11: &mut [u8] = &mut [0];
|
||||
let b12: &mut [u8] = &mut [0];
|
||||
let b13: &mut [u8] = &mut [0];
|
||||
let b14: &mut [u8] = &mut [0];
|
||||
let b15: &mut [u8] = &mut [0];
|
||||
let b16: &mut [u8] = &mut [0];
|
||||
let mut bufs: [&mut IoVec; 16] = [
|
||||
b1.into(),
|
||||
b2.into(),
|
||||
b3.into(),
|
||||
b4.into(),
|
||||
b5.into(),
|
||||
b6.into(),
|
||||
b7.into(),
|
||||
b8.into(),
|
||||
b9.into(),
|
||||
b10.into(),
|
||||
b11.into(),
|
||||
b12.into(),
|
||||
b13.into(),
|
||||
b14.into(),
|
||||
b15.into(),
|
||||
b16.into(),
|
||||
];
|
||||
let n = buf.bytes_vec_mut(&mut bufs);
|
||||
self.io.get_ref().read_bufs(&mut bufs[..n])
|
||||
};
|
||||
|
||||
match r {
|
||||
Ok(n) => {
|
||||
unsafe {
|
||||
buf.advance_mut(n);
|
||||
}
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
self.io.clear_read_ready(cx, mio::Ready::readable())?;
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_write_priv(
|
||||
&self,
|
||||
cx: &mut Context<'_>,
|
||||
@ -291,36 +210,6 @@ impl UnixStream {
|
||||
x => Poll::Ready(x),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_write_buf_priv<B: Buf>(
|
||||
&self,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut B,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
ready!(self.io.poll_write_ready(cx))?;
|
||||
|
||||
let r = {
|
||||
// The `IoVec` type can't have a zero-length size, so create a dummy
|
||||
// version from a 1-length slice which we'll overwrite with the
|
||||
// `bytes_vec` method.
|
||||
static DUMMY: &[u8] = &[0];
|
||||
let iovec = <&IoVec>::from(DUMMY);
|
||||
let mut bufs = [iovec; 64];
|
||||
let n = buf.bytes_vec(&mut bufs);
|
||||
self.io.get_ref().write_bufs(&bufs[..n])
|
||||
};
|
||||
match r {
|
||||
Ok(n) => {
|
||||
buf.advance(n);
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
self.io.clear_write_ready(cx)?;
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for UnixStream {
|
||||
|
@ -4,6 +4,7 @@ use tokio_test::{assert_ready_err, assert_ready_ok};
|
||||
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use std::io;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@ -75,12 +76,10 @@ fn read_buf_no_capacity() {
|
||||
}
|
||||
}
|
||||
|
||||
// Can't create BytesMut w/ zero capacity, so fill it up
|
||||
let mut buf = BytesMut::with_capacity(64);
|
||||
buf.put(&[0; 64][..]);
|
||||
let mut buf = [0u8; 0];
|
||||
|
||||
task::spawn(Rd).enter(|cx, rd| {
|
||||
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf));
|
||||
let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut &mut buf[..]));
|
||||
assert_eq!(0, n);
|
||||
});
|
||||
}
|
||||
@ -116,7 +115,7 @@ fn read_buf_uninitialized_ok() {
|
||||
struct Rd;
|
||||
|
||||
impl AsyncRead for Rd {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
@ -134,7 +133,8 @@ fn read_buf_uninitialized_ok() {
|
||||
let mut buf = BytesMut::with_capacity(64);
|
||||
|
||||
unsafe {
|
||||
buf.bytes_mut()[0..11].copy_from_slice(b"hello world");
|
||||
let b: &mut [u8] = std::mem::transmute(buf.bytes_mut());
|
||||
b[0..11].copy_from_slice(b"hello world");
|
||||
}
|
||||
|
||||
task::spawn(Rd).enter(|cx, rd| {
|
||||
|
Loading…
x
Reference in New Issue
Block a user