mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
net: add try_read_buf
for named pipes (#4626)
This commit is contained in:
parent
711d9d0156
commit
d397e77c90
@ -12,6 +12,10 @@ use std::task::{Context, Poll};
|
||||
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
|
||||
use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
|
||||
|
||||
cfg_io_util! {
|
||||
use bytes::BufMut;
|
||||
}
|
||||
|
||||
// Hide imports which are not used when generating documentation.
|
||||
#[cfg(not(docsrs))]
|
||||
mod doc {
|
||||
@ -528,6 +532,86 @@ impl NamedPipeServer {
|
||||
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
|
||||
}
|
||||
|
||||
cfg_io_util! {
|
||||
/// Tries to read data from the stream into the provided buffer, advancing the
|
||||
/// buffer's internal cursor, returning how many bytes were read.
|
||||
///
|
||||
/// Receives any pending data from the socket but does not wait for new data
|
||||
/// to arrive. On success, returns the number of bytes read. Because
|
||||
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
|
||||
/// the async task and can exist entirely on the stack.
|
||||
///
|
||||
/// Usually, [`readable()`] or [`ready()`] is used with this function.
|
||||
///
|
||||
/// [`readable()`]: NamedPipeServer::readable()
|
||||
/// [`ready()`]: NamedPipeServer::ready()
|
||||
///
|
||||
/// # Return
|
||||
///
|
||||
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
|
||||
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
|
||||
/// and will no longer yield data. If the stream is not ready to read data
|
||||
/// `Err(io::ErrorKind::WouldBlock)` is returned.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::windows::named_pipe;
|
||||
/// use std::error::Error;
|
||||
/// use std::io;
|
||||
///
|
||||
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> Result<(), Box<dyn Error>> {
|
||||
/// let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
|
||||
///
|
||||
/// loop {
|
||||
/// // Wait for the socket to be readable
|
||||
/// server.readable().await?;
|
||||
///
|
||||
/// let mut buf = Vec::with_capacity(4096);
|
||||
///
|
||||
/// // Try to read data, this may still fail with `WouldBlock`
|
||||
/// // if the readiness event is a false positive.
|
||||
/// match server.try_read_buf(&mut buf) {
|
||||
/// Ok(0) => break,
|
||||
/// Ok(n) => {
|
||||
/// println!("read {} bytes", n);
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// return Err(e.into());
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
|
||||
self.io.registration().try_io(Interest::READABLE, || {
|
||||
use std::io::Read;
|
||||
|
||||
let dst = buf.chunk_mut();
|
||||
let dst =
|
||||
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
|
||||
|
||||
// Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
|
||||
// buffer.
|
||||
let n = (&*self.io).read(dst)?;
|
||||
|
||||
unsafe {
|
||||
buf.advance_mut(n);
|
||||
}
|
||||
|
||||
Ok(n)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for the pipe to become writable.
|
||||
///
|
||||
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
|
||||
@ -1186,6 +1270,86 @@ impl NamedPipeClient {
|
||||
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
|
||||
}
|
||||
|
||||
cfg_io_util! {
|
||||
/// Tries to read data from the stream into the provided buffer, advancing the
|
||||
/// buffer's internal cursor, returning how many bytes were read.
|
||||
///
|
||||
/// Receives any pending data from the socket but does not wait for new data
|
||||
/// to arrive. On success, returns the number of bytes read. Because
|
||||
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
|
||||
/// the async task and can exist entirely on the stack.
|
||||
///
|
||||
/// Usually, [`readable()`] or [`ready()`] is used with this function.
|
||||
///
|
||||
/// [`readable()`]: NamedPipeClient::readable()
|
||||
/// [`ready()`]: NamedPipeClient::ready()
|
||||
///
|
||||
/// # Return
|
||||
///
|
||||
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
|
||||
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
|
||||
/// and will no longer yield data. If the stream is not ready to read data
|
||||
/// `Err(io::ErrorKind::WouldBlock)` is returned.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use tokio::net::windows::named_pipe;
|
||||
/// use std::error::Error;
|
||||
/// use std::io;
|
||||
///
|
||||
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> Result<(), Box<dyn Error>> {
|
||||
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
|
||||
///
|
||||
/// loop {
|
||||
/// // Wait for the socket to be readable
|
||||
/// client.readable().await?;
|
||||
///
|
||||
/// let mut buf = Vec::with_capacity(4096);
|
||||
///
|
||||
/// // Try to read data, this may still fail with `WouldBlock`
|
||||
/// // if the readiness event is a false positive.
|
||||
/// match client.try_read_buf(&mut buf) {
|
||||
/// Ok(0) => break,
|
||||
/// Ok(n) => {
|
||||
/// println!("read {} bytes", n);
|
||||
/// }
|
||||
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
/// continue;
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// return Err(e.into());
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
|
||||
self.io.registration().try_io(Interest::READABLE, || {
|
||||
use std::io::Read;
|
||||
|
||||
let dst = buf.chunk_mut();
|
||||
let dst =
|
||||
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
|
||||
|
||||
// Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
|
||||
// buffer.
|
||||
let n = (&*self.io).read(dst)?;
|
||||
|
||||
unsafe {
|
||||
buf.advance_mut(n);
|
||||
}
|
||||
|
||||
Ok(n)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits for the pipe to become writable.
|
||||
///
|
||||
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
|
||||
|
Loading…
x
Reference in New Issue
Block a user