net: add read/try_read etc methods to NamedPipeServer (#3899)

This commit is contained in:
Jake Shadle 2021-06-29 10:05:20 +02:00 committed by GitHub
parent d35ff7064f
commit 57c90c9750
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 611 additions and 15 deletions

View File

@ -2,7 +2,7 @@ use std::io;
#[cfg(windows)]
async fn windows_main() -> io::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest};
use tokio::io::Interest;
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client";
@ -13,11 +13,62 @@ async fn windows_main() -> io::Result<()> {
// Note: we wait for a client to connect.
server.connect().await?;
let mut server = BufReader::new(server);
let buf = {
let mut read_buf = [0u8; 5];
let mut read_buf_cursor = 0;
loop {
server.readable().await?;
let buf = &mut read_buf[read_buf_cursor..];
match server.try_read(buf) {
Ok(n) => {
read_buf_cursor += n;
if read_buf_cursor == read_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
read_buf
};
{
let write_buf = b"pong\n";
let mut write_buf_cursor = 0;
loop {
let buf = &write_buf[write_buf_cursor..];
if buf.is_empty() {
break;
}
server.writable().await?;
match server.try_write(buf) {
Ok(n) => {
write_buf_cursor += n;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
let mut buf = String::new();
server.read_line(&mut buf).await?;
server.write_all(b"pong\n").await?;
Ok::<_, io::Error>(buf)
});
@ -33,9 +84,12 @@ async fn windows_main() -> io::Result<()> {
let mut write_buf_cursor = 0;
loop {
let ready = client
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
let mut interest = Interest::READABLE;
if write_buf_cursor < write_buf.len() {
interest |= Interest::WRITABLE;
}
let ready = client.ready(interest).await?;
if ready.is_readable() {
let buf = &mut read_buf[read_buf_cursor..];
@ -85,7 +139,7 @@ async fn windows_main() -> io::Result<()> {
let (server, client) = tokio::try_join!(server, client)?;
assert_eq!(server?, "ping\n");
assert_eq!(server?, *b"ping\n");
assert_eq!(client?, "pong\n");
Ok(())

View File

@ -225,6 +225,496 @@ impl NamedPipeServer {
pub fn disconnect(&self) -> io::Result<()> {
self.io.disconnect()
}
/// Wait for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same pipe on a single
/// task without splitting the pipe.
///
/// # Examples
///
/// Concurrently read and write to the pipe on the same task without
/// splitting.
///
/// ```no_run
/// use tokio::io::Interest;
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new()
/// .create(PIPE_NAME)?;
///
/// loop {
/// let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
///
/// if ready.is_readable() {
/// let mut data = vec![0; 1024];
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// if ready.is_writable() {
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_write(b"hello world") {
/// Ok(n) => {
/// println!("write {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
/// }
/// }
/// ```
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
/// Wait for the pipe to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new()
/// .create(PIPE_NAME)?;
///
/// let mut msg = vec![0; 1024];
///
/// loop {
/// // Wait for the pipe to be readable
/// server.readable().await?;
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_read(&mut msg) {
/// Ok(n) => {
/// msg.truncate(n);
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// println!("GOT = {:?}", msg);
/// Ok(())
/// }
/// ```
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}
/// Polls for read readiness.
///
/// If the pipe is not currently ready for reading, this method will
/// store a clone of the `Waker` from the provided `Context`. When the pipe
/// becomes ready for reading, `Waker::wake` will be called on the waker.
///
/// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the pipe is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
/// Try to read data from the pipe into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeServer::readable()
/// [`ready()`]: NamedPipeServer::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
/// and will no longer yield data. If the pipe is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new()
/// .create(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be readable
/// server.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf = [0; 4096];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_read(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
/// Try to read data from the pipe into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Receives any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`try_read()`]: NamedPipeServer::try_read()
/// [`readable()`]: NamedPipeServer::readable()
/// [`ready()`]: NamedPipeServer::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
/// and will no longer yield data. If the pipe is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io::{self, IoSliceMut};
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new()
/// .create(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be readable
/// server.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf_a = [0; 512];
/// let mut buf_b = [0; 1024];
/// let mut bufs = [
/// IoSliceMut::new(&mut buf_a),
/// IoSliceMut::new(&mut buf_b),
/// ];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_read_vectored(&mut bufs) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
/// Wait for the pipe to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new()
/// .create(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be writable
/// server.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Polls for write readiness.
///
/// If the pipe is not currently ready for writing, this method will
/// store a clone of the `Waker` from the provided `Context`. When the pipe
/// becomes ready for writing, `Waker::wake` will be called on the waker.
///
/// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the pipe is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
/// Try to write a buffer to the pipe, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
/// only part of the buffer may be written.
///
/// This function is usually paired with `writable()`.
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the pipe is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new()
/// .create(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be writable
/// server.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
/// Try to write several buffers to the pipe, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// This function is usually paired with `writable()`.
///
/// [`try_write()`]: NamedPipeServer::try_write()
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the pipe is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new()
/// .create(PIPE_NAME)?;
///
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
///
/// loop {
/// // Wait for the pipe to be writable
/// server.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_write_vectored(&bufs) {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
}
impl AsyncRead for NamedPipeServer {

View File

@ -150,7 +150,7 @@ async fn test_named_pipe_multi_client() -> io::Result<()> {
#[tokio::test]
async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
use tokio::io::{AsyncBufReadExt as _, BufReader, Interest};
use tokio::io::Interest;
const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready";
const N: usize = 10;
@ -164,7 +164,8 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
for _ in 0..N {
// Wait for client to connect.
server.connect().await?;
let mut inner = BufReader::new(server);
let inner_server = server;
// Construct the next server to be connected before sending the one
// we already have of onto a task. This ensures that the server
@ -174,10 +175,61 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
server = ServerOptions::new().create(PIPE_NAME)?;
let _ = tokio::spawn(async move {
let mut buf = String::new();
inner.read_line(&mut buf).await?;
inner.write_all(b"pong\n").await?;
inner.flush().await?;
let server = inner_server;
{
let mut read_buf = [0u8; 5];
let mut read_buf_cursor = 0;
loop {
server.readable().await?;
let buf = &mut read_buf[read_buf_cursor..];
match server.try_read(buf) {
Ok(n) => {
read_buf_cursor += n;
if read_buf_cursor == read_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
};
{
let write_buf = b"pong\n";
let mut write_buf_cursor = 0;
loop {
server.writable().await?;
let buf = &write_buf[write_buf_cursor..];
match server.try_write(buf) {
Ok(n) => {
write_buf_cursor += n;
if write_buf_cursor == write_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
Ok::<_, io::Error>(())
});
}