mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
io: add vectored writes to AsyncWrite
(#3149)
This adds `AsyncWrite::poll_write_vectored`, and implements it for `TcpStream` and `UnixStream`. Refs: #3135.
This commit is contained in:
parent
7d11aa8668
commit
34fcef258b
@ -1,4 +1,4 @@
|
|||||||
use std::io;
|
use std::io::{self, IoSlice};
|
||||||
use std::ops::DerefMut;
|
use std::ops::DerefMut;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
@ -127,6 +127,55 @@ pub trait AsyncWrite {
|
|||||||
/// This function will panic if not called within the context of a future's
|
/// This function will panic if not called within the context of a future's
|
||||||
/// task.
|
/// task.
|
||||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
|
||||||
|
|
||||||
|
/// Like [`poll_write`], except that it writes from a slice of buffers.
|
||||||
|
///
|
||||||
|
/// Data is copied from each buffer in order, with the final buffer
|
||||||
|
/// read from possibly being only partially consumed. This method must
|
||||||
|
/// behave as a call to [`write`] with the buffers concatenated would.
|
||||||
|
///
|
||||||
|
/// The default implementation calls [`poll_write`] with either the first nonempty
|
||||||
|
/// buffer provided, or an empty one if none exists.
|
||||||
|
///
|
||||||
|
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
|
||||||
|
///
|
||||||
|
/// If the object is not ready for writing, the method returns
|
||||||
|
/// `Poll::Pending` and arranges for the current task (via
|
||||||
|
/// `cx.waker()`) to receive a notification when the object becomes
|
||||||
|
/// writable or is closed.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
///
|
||||||
|
/// This should be implemented as a single "atomic" write action. If any
|
||||||
|
/// data has been partially written, it is wrong to return an error or
|
||||||
|
/// pending.
|
||||||
|
///
|
||||||
|
/// [`poll_write`]: AsyncWrite::poll_write
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<Result<usize, io::Error>> {
|
||||||
|
let buf = bufs
|
||||||
|
.iter()
|
||||||
|
.find(|b| !b.is_empty())
|
||||||
|
.map_or(&[][..], |b| &**b);
|
||||||
|
self.poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Determines if this writer has an efficient [`poll_write_vectored`]
|
||||||
|
/// implementation.
|
||||||
|
///
|
||||||
|
/// If a writer does not override the default [`poll_write_vectored`]
|
||||||
|
/// implementation, code using it may want to avoid the method all together
|
||||||
|
/// and coalesce writes into a single buffer for higher performance.
|
||||||
|
///
|
||||||
|
/// The default implementation returns `false`.
|
||||||
|
///
|
||||||
|
/// [`poll_write_vectored`]: AsyncWrite::poll_write_vectored
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! deref_async_write {
|
macro_rules! deref_async_write {
|
||||||
@ -139,6 +188,18 @@ macro_rules! deref_async_write {
|
|||||||
Pin::new(&mut **self).poll_write(cx, buf)
|
Pin::new(&mut **self).poll_write(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self).poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Pin::new(&mut **self).poll_flush(cx)
|
Pin::new(&mut **self).poll_flush(cx)
|
||||||
}
|
}
|
||||||
@ -170,6 +231,18 @@ where
|
|||||||
self.get_mut().as_mut().poll_write(cx, buf)
|
self.get_mut().as_mut().poll_write(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.get_mut().as_mut().poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
self.get_mut().as_mut().poll_flush(cx)
|
self.get_mut().as_mut().poll_flush(cx)
|
||||||
}
|
}
|
||||||
@ -189,6 +262,18 @@ impl AsyncWrite for Vec<u8> {
|
|||||||
Poll::Ready(Ok(buf.len()))
|
Poll::Ready(Ok(buf.len()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
@ -207,6 +292,18 @@ impl AsyncWrite for io::Cursor<&mut [u8]> {
|
|||||||
Poll::Ready(io::Write::write(&mut *self, buf))
|
Poll::Ready(io::Write::write(&mut *self, buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Poll::Ready(io::Write::flush(&mut *self))
|
Poll::Ready(io::Write::flush(&mut *self))
|
||||||
}
|
}
|
||||||
@ -225,6 +322,18 @@ impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
|
|||||||
Poll::Ready(io::Write::write(&mut *self, buf))
|
Poll::Ready(io::Write::write(&mut *self, buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Poll::Ready(io::Write::flush(&mut *self))
|
Poll::Ready(io::Write::flush(&mut *self))
|
||||||
}
|
}
|
||||||
@ -243,6 +352,18 @@ impl AsyncWrite for io::Cursor<Vec<u8>> {
|
|||||||
Poll::Ready(io::Write::write(&mut *self, buf))
|
Poll::Ready(io::Write::write(&mut *self, buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Poll::Ready(io::Write::flush(&mut *self))
|
Poll::Ready(io::Write::flush(&mut *self))
|
||||||
}
|
}
|
||||||
@ -261,6 +382,18 @@ impl AsyncWrite for io::Cursor<Box<[u8]>> {
|
|||||||
Poll::Ready(io::Write::write(&mut *self, buf))
|
Poll::Ready(io::Write::write(&mut *self, buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Poll::Ready(io::Write::flush(&mut *self))
|
Poll::Ready(io::Write::flush(&mut *self))
|
||||||
}
|
}
|
||||||
|
@ -163,6 +163,19 @@ feature! {
|
|||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf))
|
self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "net")]
|
||||||
|
pub(crate) fn poll_write_vectored<'a>(
|
||||||
|
&'a self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>>
|
||||||
|
where
|
||||||
|
&'a E: io::Write + 'a,
|
||||||
|
{
|
||||||
|
use std::io::Write;
|
||||||
|
self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,6 +147,18 @@ impl AsyncWrite for WriteHalf<'_> {
|
|||||||
self.0.poll_write_priv(cx, buf)
|
self.0.poll_write_priv(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.0.poll_write_vectored_priv(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
self.0.is_write_vectored()
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
// tcp flush is a no-op
|
// tcp flush is a no-op
|
||||||
|
@ -229,6 +229,18 @@ impl AsyncWrite for OwnedWriteHalf {
|
|||||||
self.inner.poll_write_priv(cx, buf)
|
self.inner.poll_write_priv(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.inner.poll_write_vectored_priv(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
self.inner.is_write_vectored()
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
// tcp flush is a no-op
|
// tcp flush is a no-op
|
||||||
|
@ -832,6 +832,14 @@ impl TcpStream {
|
|||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<usize>> {
|
||||||
self.io.poll_write(cx, buf)
|
self.io.poll_write(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn poll_write_vectored_priv(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.io.poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<std::net::TcpStream> for TcpStream {
|
impl TryFrom<std::net::TcpStream> for TcpStream {
|
||||||
@ -867,6 +875,18 @@ impl AsyncWrite for TcpStream {
|
|||||||
self.poll_write_priv(cx, buf)
|
self.poll_write_priv(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_write_vectored_priv(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
// tcp flush is a no-op
|
// tcp flush is a no-op
|
||||||
|
@ -68,6 +68,18 @@ impl AsyncWrite for WriteHalf<'_> {
|
|||||||
self.0.poll_write_priv(cx, buf)
|
self.0.poll_write_priv(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.0.poll_write_vectored_priv(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
self.0.is_write_vectored()
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
@ -153,6 +153,18 @@ impl AsyncWrite for OwnedWriteHalf {
|
|||||||
self.inner.poll_write_priv(cx, buf)
|
self.inner.poll_write_priv(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.inner.poll_write_vectored_priv(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
self.inner.is_write_vectored()
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
// flush is a no-op
|
// flush is a no-op
|
||||||
|
@ -172,6 +172,18 @@ impl AsyncWrite for UnixStream {
|
|||||||
self.poll_write_priv(cx, buf)
|
self.poll_write_priv(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.poll_write_vectored_priv(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
@ -199,7 +211,7 @@ impl UnixStream {
|
|||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut ReadBuf<'_>,
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<()>> {
|
) -> Poll<io::Result<()>> {
|
||||||
// Safety: `UdpStream::read` correctly handles reads into uninitialized memory
|
// Safety: `UnixStream::read` correctly handles reads into uninitialized memory
|
||||||
unsafe { self.io.poll_read(cx, buf) }
|
unsafe { self.io.poll_read(cx, buf) }
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,6 +222,14 @@ impl UnixStream {
|
|||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<usize>> {
|
||||||
self.io.poll_write(cx, buf)
|
self.io.poll_write(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn poll_write_vectored_priv(
|
||||||
|
&self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[io::IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.io.poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for UnixStream {
|
impl fmt::Debug for UnixStream {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user