mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
net: support non-blocking vectored I/O (#3761)
This commit is contained in:
parent
d846bf24b1
commit
0b93bd511d
@ -563,6 +563,84 @@ impl TcpStream {
|
|||||||
.try_io(Interest::READABLE, || (&*self.io).read(buf))
|
.try_io(Interest::READABLE, || (&*self.io).read(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try to read data from the stream into the provided buffers, returning
|
||||||
|
/// how many bytes were read.
|
||||||
|
///
|
||||||
|
/// Data is copied to fill each buffer in order, with the final buffer
|
||||||
|
/// written to possibly being only partially filled. This method behaves
|
||||||
|
/// equivalently to a single call to [`try_read()`] with concatenated
|
||||||
|
/// buffers.
|
||||||
|
///
|
||||||
|
/// Receives any pending data from the socket but does not wait for new data
|
||||||
|
/// to arrive. On success, returns the number of bytes read. Because
|
||||||
|
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
|
||||||
|
/// stored by the async task and can exist entirely on the stack.
|
||||||
|
///
|
||||||
|
/// Usually, [`readable()`] or [`ready()`] is used with this function.
|
||||||
|
///
|
||||||
|
/// [`try_read()`]: TcpStream::try_read()
|
||||||
|
/// [`readable()`]: TcpStream::readable()
|
||||||
|
/// [`ready()`]: TcpStream::ready()
|
||||||
|
///
|
||||||
|
/// # Return
|
||||||
|
///
|
||||||
|
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
|
||||||
|
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
|
||||||
|
/// and will no longer yield data. If the stream is not ready to read data
|
||||||
|
/// `Err(io::ErrorKind::WouldBlock)` is returned.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use tokio::net::TcpStream;
|
||||||
|
/// use std::error::Error;
|
||||||
|
/// use std::io::{self, IoSliceMut};
|
||||||
|
///
|
||||||
|
/// #[tokio::main]
|
||||||
|
/// async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
/// // Connect to a peer
|
||||||
|
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
|
||||||
|
///
|
||||||
|
/// loop {
|
||||||
|
/// // Wait for the socket to be readable
|
||||||
|
/// stream.readable().await?;
|
||||||
|
///
|
||||||
|
/// // Creating the buffer **after** the `await` prevents it from
|
||||||
|
/// // being stored in the async task.
|
||||||
|
/// let mut buf_a = [0; 512];
|
||||||
|
/// let mut buf_b = [0; 1024];
|
||||||
|
/// let mut bufs = [
|
||||||
|
/// IoSliceMut::new(&mut buf_a),
|
||||||
|
/// IoSliceMut::new(&mut buf_b),
|
||||||
|
/// ];
|
||||||
|
///
|
||||||
|
/// // Try to read data, this may still fail with `WouldBlock`
|
||||||
|
/// // if the readiness event is a false positive.
|
||||||
|
/// match stream.try_read_vectored(&mut bufs) {
|
||||||
|
/// Ok(0) => break,
|
||||||
|
/// Ok(n) => {
|
||||||
|
/// println!("read {} bytes", n);
|
||||||
|
/// }
|
||||||
|
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
/// continue;
|
||||||
|
/// }
|
||||||
|
/// Err(e) => {
|
||||||
|
/// return Err(e.into());
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
|
self.io
|
||||||
|
.registration()
|
||||||
|
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
|
||||||
|
}
|
||||||
|
|
||||||
cfg_io_util! {
|
cfg_io_util! {
|
||||||
/// Try to read data from the stream into the provided buffer, advancing the
|
/// Try to read data from the stream into the provided buffer, advancing the
|
||||||
/// buffer's internal cursor, returning how many bytes were read.
|
/// buffer's internal cursor, returning how many bytes were read.
|
||||||
@ -775,6 +853,68 @@ impl TcpStream {
|
|||||||
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
|
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try to write several buffers to the stream, returning how many bytes
|
||||||
|
/// were written.
|
||||||
|
///
|
||||||
|
/// Data is written from each buffer in order, with the final buffer read
|
||||||
|
/// from possible being only partially consumed. This method behaves
|
||||||
|
/// equivalently to a single call to [`try_write()`] with concatenated
|
||||||
|
/// buffers.
|
||||||
|
///
|
||||||
|
/// This function is usually paired with `writable()`.
|
||||||
|
///
|
||||||
|
/// [`try_write()`]: TcpStream::try_write()
|
||||||
|
///
|
||||||
|
/// # Return
|
||||||
|
///
|
||||||
|
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
|
||||||
|
/// number of bytes written. If the stream is not ready to write data,
|
||||||
|
/// `Err(io::ErrorKind::WouldBlock)` is returned.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use tokio::net::TcpStream;
|
||||||
|
/// use std::error::Error;
|
||||||
|
/// use std::io;
|
||||||
|
///
|
||||||
|
/// #[tokio::main]
|
||||||
|
/// async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
/// // Connect to a peer
|
||||||
|
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
|
||||||
|
///
|
||||||
|
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
|
||||||
|
///
|
||||||
|
/// loop {
|
||||||
|
/// // Wait for the socket to be writable
|
||||||
|
/// stream.writable().await?;
|
||||||
|
///
|
||||||
|
/// // Try to write data, this may still fail with `WouldBlock`
|
||||||
|
/// // if the readiness event is a false positive.
|
||||||
|
/// match stream.try_write_vectored(&bufs) {
|
||||||
|
/// Ok(n) => {
|
||||||
|
/// break;
|
||||||
|
/// }
|
||||||
|
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
/// continue;
|
||||||
|
/// }
|
||||||
|
/// Err(e) => {
|
||||||
|
/// return Err(e.into());
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
|
self.io
|
||||||
|
.registration()
|
||||||
|
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
|
||||||
|
}
|
||||||
|
|
||||||
/// Receives data on the socket from the remote address to which it is
|
/// Receives data on the socket from the remote address to which it is
|
||||||
/// connected, without removing that data from the queue. On success,
|
/// connected, without removing that data from the queue. On success,
|
||||||
/// returns the number of bytes peeked.
|
/// returns the number of bytes peeked.
|
||||||
|
@ -271,6 +271,84 @@ impl UnixStream {
|
|||||||
.try_io(Interest::READABLE, || (&*self.io).read(buf))
|
.try_io(Interest::READABLE, || (&*self.io).read(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try to read data from the stream into the provided buffers, returning
|
||||||
|
/// how many bytes were read.
|
||||||
|
///
|
||||||
|
/// Data is copied to fill each buffer in order, with the final buffer
|
||||||
|
/// written to possibly being only partially filled. This method behaves
|
||||||
|
/// equivalently to a single call to [`try_read()`] with concatenated
|
||||||
|
/// buffers.
|
||||||
|
///
|
||||||
|
/// Receives any pending data from the socket but does not wait for new data
|
||||||
|
/// to arrive. On success, returns the number of bytes read. Because
|
||||||
|
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
|
||||||
|
/// stored by the async task and can exist entirely on the stack.
|
||||||
|
///
|
||||||
|
/// Usually, [`readable()`] or [`ready()`] is used with this function.
|
||||||
|
///
|
||||||
|
/// [`try_read()`]: UnixStream::try_read()
|
||||||
|
/// [`readable()`]: UnixStream::readable()
|
||||||
|
/// [`ready()`]: UnixStream::ready()
|
||||||
|
///
|
||||||
|
/// # Return
|
||||||
|
///
|
||||||
|
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
|
||||||
|
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
|
||||||
|
/// and will no longer yield data. If the stream is not ready to read data
|
||||||
|
/// `Err(io::ErrorKind::WouldBlock)` is returned.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use tokio::net::UnixStream;
|
||||||
|
/// use std::error::Error;
|
||||||
|
/// use std::io::{self, IoSliceMut};
|
||||||
|
///
|
||||||
|
/// #[tokio::main]
|
||||||
|
/// async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
/// // Connect to a peer
|
||||||
|
/// let dir = tempfile::tempdir().unwrap();
|
||||||
|
/// let bind_path = dir.path().join("bind_path");
|
||||||
|
/// let stream = UnixStream::connect(bind_path).await?;
|
||||||
|
///
|
||||||
|
/// loop {
|
||||||
|
/// // Wait for the socket to be readable
|
||||||
|
/// stream.readable().await?;
|
||||||
|
///
|
||||||
|
/// // Creating the buffer **after** the `await` prevents it from
|
||||||
|
/// // being stored in the async task.
|
||||||
|
/// let mut buf_a = [0; 512];
|
||||||
|
/// let mut buf_b = [0; 1024];
|
||||||
|
/// let mut bufs = [
|
||||||
|
/// IoSliceMut::new(&mut buf_a),
|
||||||
|
/// IoSliceMut::new(&mut buf_b),
|
||||||
|
/// ];
|
||||||
|
///
|
||||||
|
/// // Try to read data, this may still fail with `WouldBlock`
|
||||||
|
/// // if the readiness event is a false positive.
|
||||||
|
/// match stream.try_read_vectored(&mut bufs) {
|
||||||
|
/// Ok(0) => break,
|
||||||
|
/// Ok(n) => {
|
||||||
|
/// println!("read {} bytes", n);
|
||||||
|
/// }
|
||||||
|
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
/// continue;
|
||||||
|
/// }
|
||||||
|
/// Err(e) => {
|
||||||
|
/// return Err(e.into());
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
|
||||||
|
self.io
|
||||||
|
.registration()
|
||||||
|
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
|
||||||
|
}
|
||||||
|
|
||||||
cfg_io_util! {
|
cfg_io_util! {
|
||||||
/// Try to read data from the stream into the provided buffer, advancing the
|
/// Try to read data from the stream into the provided buffer, advancing the
|
||||||
/// buffer's internal cursor, returning how many bytes were read.
|
/// buffer's internal cursor, returning how many bytes were read.
|
||||||
@ -487,6 +565,68 @@ impl UnixStream {
|
|||||||
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
|
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try to write several buffers to the stream, returning how many bytes
|
||||||
|
/// were written.
|
||||||
|
///
|
||||||
|
/// Data is written from each buffer in order, with the final buffer read
|
||||||
|
/// from possible being only partially consumed. This method behaves
|
||||||
|
/// equivalently to a single call to [`try_write()`] with concatenated
|
||||||
|
/// buffers.
|
||||||
|
///
|
||||||
|
/// This function is usually paired with `writable()`.
|
||||||
|
///
|
||||||
|
/// [`try_write()`]: UnixStream::try_write()
|
||||||
|
///
|
||||||
|
/// # Return
|
||||||
|
///
|
||||||
|
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
|
||||||
|
/// number of bytes written. If the stream is not ready to write data,
|
||||||
|
/// `Err(io::ErrorKind::WouldBlock)` is returned.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use tokio::net::UnixStream;
|
||||||
|
/// use std::error::Error;
|
||||||
|
/// use std::io;
|
||||||
|
///
|
||||||
|
/// #[tokio::main]
|
||||||
|
/// async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
/// // Connect to a peer
|
||||||
|
/// let dir = tempfile::tempdir().unwrap();
|
||||||
|
/// let bind_path = dir.path().join("bind_path");
|
||||||
|
/// let stream = UnixStream::connect(bind_path).await?;
|
||||||
|
///
|
||||||
|
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
|
||||||
|
///
|
||||||
|
/// loop {
|
||||||
|
/// // Wait for the socket to be writable
|
||||||
|
/// stream.writable().await?;
|
||||||
|
///
|
||||||
|
/// // Try to write data, this may still fail with `WouldBlock`
|
||||||
|
/// // if the readiness event is a false positive.
|
||||||
|
/// match stream.try_write_vectored(&bufs) {
|
||||||
|
/// Ok(n) => {
|
||||||
|
/// break;
|
||||||
|
/// }
|
||||||
|
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
/// continue;
|
||||||
|
/// }
|
||||||
|
/// Err(e) => {
|
||||||
|
/// return Err(e.into());
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
|
||||||
|
self.io
|
||||||
|
.registration()
|
||||||
|
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
|
/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
|
||||||
///
|
///
|
||||||
/// This function is intended to be used to wrap a UnixStream from the
|
/// This function is intended to be used to wrap a UnixStream from the
|
||||||
|
@ -55,7 +55,7 @@ async fn try_read_write() {
|
|||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fill the write buffer
|
// Fill the write buffer using non-vectored I/O
|
||||||
loop {
|
loop {
|
||||||
// Still ready
|
// Still ready
|
||||||
let mut writable = task::spawn(client.writable());
|
let mut writable = task::spawn(client.writable());
|
||||||
@ -75,7 +75,7 @@ async fn try_read_write() {
|
|||||||
let mut writable = task::spawn(client.writable());
|
let mut writable = task::spawn(client.writable());
|
||||||
assert_pending!(writable.poll());
|
assert_pending!(writable.poll());
|
||||||
|
|
||||||
// Drain the socket from the server end
|
// Drain the socket from the server end using non-vectored I/O
|
||||||
let mut read = vec![0; written.len()];
|
let mut read = vec![0; written.len()];
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
|
|
||||||
@ -92,6 +92,51 @@ async fn try_read_write() {
|
|||||||
assert_eq!(read, written);
|
assert_eq!(read, written);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
written.clear();
|
||||||
|
client.writable().await.unwrap();
|
||||||
|
|
||||||
|
// Fill the write buffer using vectored I/O
|
||||||
|
let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect();
|
||||||
|
loop {
|
||||||
|
// Still ready
|
||||||
|
let mut writable = task::spawn(client.writable());
|
||||||
|
assert_ready_ok!(writable.poll());
|
||||||
|
|
||||||
|
match client.try_write_vectored(&data_bufs) {
|
||||||
|
Ok(n) => written.extend(&DATA[..n]),
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => panic!("error = {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Write buffer full
|
||||||
|
let mut writable = task::spawn(client.writable());
|
||||||
|
assert_pending!(writable.poll());
|
||||||
|
|
||||||
|
// Drain the socket from the server end using vectored I/O
|
||||||
|
let mut read = vec![0; written.len()];
|
||||||
|
let mut i = 0;
|
||||||
|
|
||||||
|
while i < read.len() {
|
||||||
|
server.readable().await.unwrap();
|
||||||
|
|
||||||
|
let mut bufs: Vec<_> = read[i..]
|
||||||
|
.chunks_mut(0x10000)
|
||||||
|
.map(io::IoSliceMut::new)
|
||||||
|
.collect();
|
||||||
|
match server.try_read_vectored(&mut bufs) {
|
||||||
|
Ok(n) => i += n,
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||||
|
Err(e) => panic!("error = {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(read, written);
|
||||||
|
}
|
||||||
|
|
||||||
// Now, we listen for shutdown
|
// Now, we listen for shutdown
|
||||||
drop(client);
|
drop(client);
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ async fn try_read_write() -> std::io::Result<()> {
|
|||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fill the write buffer
|
// Fill the write buffer using non-vectored I/O
|
||||||
loop {
|
loop {
|
||||||
// Still ready
|
// Still ready
|
||||||
let mut writable = task::spawn(client.writable());
|
let mut writable = task::spawn(client.writable());
|
||||||
@ -110,7 +110,7 @@ async fn try_read_write() -> std::io::Result<()> {
|
|||||||
let mut writable = task::spawn(client.writable());
|
let mut writable = task::spawn(client.writable());
|
||||||
assert_pending!(writable.poll());
|
assert_pending!(writable.poll());
|
||||||
|
|
||||||
// Drain the socket from the server end
|
// Drain the socket from the server end using non-vectored I/O
|
||||||
let mut read = vec![0; written.len()];
|
let mut read = vec![0; written.len()];
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
|
|
||||||
@ -127,6 +127,51 @@ async fn try_read_write() -> std::io::Result<()> {
|
|||||||
assert_eq!(read, written);
|
assert_eq!(read, written);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
written.clear();
|
||||||
|
client.writable().await.unwrap();
|
||||||
|
|
||||||
|
// Fill the write buffer using vectored I/O
|
||||||
|
let msg_bufs: Vec<_> = msg.chunks(3).map(io::IoSlice::new).collect();
|
||||||
|
loop {
|
||||||
|
// Still ready
|
||||||
|
let mut writable = task::spawn(client.writable());
|
||||||
|
assert_ready_ok!(writable.poll());
|
||||||
|
|
||||||
|
match client.try_write_vectored(&msg_bufs) {
|
||||||
|
Ok(n) => written.extend(&msg[..n]),
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => panic!("error = {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Write buffer full
|
||||||
|
let mut writable = task::spawn(client.writable());
|
||||||
|
assert_pending!(writable.poll());
|
||||||
|
|
||||||
|
// Drain the socket from the server end using vectored I/O
|
||||||
|
let mut read = vec![0; written.len()];
|
||||||
|
let mut i = 0;
|
||||||
|
|
||||||
|
while i < read.len() {
|
||||||
|
server.readable().await?;
|
||||||
|
|
||||||
|
let mut bufs: Vec<_> = read[i..]
|
||||||
|
.chunks_mut(0x10000)
|
||||||
|
.map(io::IoSliceMut::new)
|
||||||
|
.collect();
|
||||||
|
match server.try_read_vectored(&mut bufs) {
|
||||||
|
Ok(n) => i += n,
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||||
|
Err(e) => panic!("error = {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(read, written);
|
||||||
|
}
|
||||||
|
|
||||||
// Now, we listen for shutdown
|
// Now, we listen for shutdown
|
||||||
drop(client);
|
drop(client);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user