net: add ready/try methods to NamedPipeClient (#3866)

This commit is contained in:
Jake Shadle 2021-06-28 10:43:57 +02:00 committed by GitHub
parent 845626410a
commit 959c5c997f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 724 additions and 3 deletions

View File

@ -84,6 +84,10 @@ path = "custom-executor-tokio-context.rs"
name = "named-pipe"
path = "named-pipe.rs"
[[example]]
name = "named-pipe-ready"
path = "named-pipe-ready.rs"
[[example]]
name = "named-pipe-multi-client"
path = "named-pipe-multi-client.rs"

View File

@ -0,0 +1,107 @@
use std::io;
#[cfg(windows)]
async fn windows_main() -> io::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest};
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client";
let server = ServerOptions::new().create(PIPE_NAME)?;
let server = tokio::spawn(async move {
// Note: we wait for a client to connect.
server.connect().await?;
let mut server = BufReader::new(server);
let mut buf = String::new();
server.read_line(&mut buf).await?;
server.write_all(b"pong\n").await?;
Ok::<_, io::Error>(buf)
});
let client = tokio::spawn(async move {
// There's no need to use a connect loop here, since we know that the
// server is already up - `open` was called before spawning any of the
// tasks.
let client = ClientOptions::new().open(PIPE_NAME)?;
let mut read_buf = [0u8; 5];
let mut read_buf_cursor = 0;
let write_buf = b"ping\n";
let mut write_buf_cursor = 0;
loop {
let ready = client
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
if ready.is_readable() {
let buf = &mut read_buf[read_buf_cursor..];
match client.try_read(buf) {
Ok(n) => {
read_buf_cursor += n;
if read_buf_cursor == read_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
if ready.is_writable() {
let buf = &write_buf[write_buf_cursor..];
if buf.is_empty() {
continue;
}
match client.try_write(buf) {
Ok(n) => {
write_buf_cursor += n;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
let buf = String::from_utf8_lossy(&read_buf).into_owned();
Ok::<_, io::Error>(buf)
});
let (server, client) = tokio::try_join!(server, client)?;
assert_eq!(server?, "ping\n");
assert_eq!(client?, "pong\n");
Ok(())
}
#[tokio::main]
async fn main() -> io::Result<()> {
#[cfg(windows)]
{
windows_main().await?;
}
#[cfg(not(windows))]
{
println!("Named pipes are only supported on Windows!");
}
Ok(())
}

View File

@ -4,12 +4,12 @@
use std::ffi::c_void;
use std::ffi::OsStr;
use std::io;
use std::io::{self, Read, Write};
use std::pin::Pin;
use std::ptr;
use std::task::{Context, Poll};
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
// Hide imports which are not used when generating documentation.
@ -362,6 +362,489 @@ impl NamedPipeClient {
// Safety: we're ensuring the lifetime of the named pipe.
unsafe { named_pipe_info(self.io.as_raw_handle()) }
}
/// Wait for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same pipe on a single
/// task without splitting the pipe.
///
/// # Examples
///
/// Concurrently read and write to the pipe on the same task without
/// splitting.
///
/// ```no_run
/// use tokio::io::Interest;
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// loop {
/// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
///
/// if ready.is_readable() {
/// let mut data = vec![0; 1024];
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// if ready.is_writable() {
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_write(b"hello world") {
/// Ok(n) => {
/// println!("write {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
/// }
/// }
/// ```
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
/// Wait for the pipe to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// let mut msg = vec![0; 1024];
///
/// loop {
/// // Wait for the pipe to be readable
/// client.readable().await?;
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_read(&mut msg) {
/// Ok(n) => {
/// msg.truncate(n);
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// println!("GOT = {:?}", msg);
/// Ok(())
/// }
/// ```
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}
/// Polls for read readiness.
///
/// If the pipe is not currently ready for reading, this method will
/// store a clone of the `Waker` from the provided `Context`. When the pipe
/// becomes ready for reading, `Waker::wake` will be called on the waker.
///
/// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the pipe is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
/// Try to read data from the pipe into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeClient::readable()
/// [`ready()`]: NamedPipeClient::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
/// and will no longer yield data. If the pipe is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be readable
/// client.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf = [0; 4096];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_read(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
/// Try to read data from the pipe into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Receives any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`try_read()`]: NamedPipeClient::try_read()
/// [`readable()`]: NamedPipeClient::readable()
/// [`ready()`]: NamedPipeClient::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
/// and will no longer yield data. If the pipe is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io::{self, IoSliceMut};
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be readable
/// client.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf_a = [0; 512];
/// let mut buf_b = [0; 1024];
/// let mut bufs = [
/// IoSliceMut::new(&mut buf_a),
/// IoSliceMut::new(&mut buf_b),
/// ];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_read_vectored(&mut bufs) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
/// Wait for the pipe to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be writable
/// client.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Polls for write readiness.
///
/// If the pipe is not currently ready for writing, this method will
/// store a clone of the `Waker` from the provided `Context`. When the pipe
/// becomes ready for writing, `Waker::wake` will be called on the waker.
///
/// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the pipe is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
/// Try to write a buffer to the pipe, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
/// only part of the buffer may be written.
///
/// This function is usually paired with `writable()`.
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the pipe is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// loop {
/// // Wait for the pipe to be writable
/// client.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
/// Try to write several buffers to the pipe, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// This function is usually paired with `writable()`.
///
/// [`try_write()`]: NamedPipeClient::try_write()
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the pipe is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
///
/// loop {
/// // Wait for the pipe to be writable
/// client.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_write_vectored(&bufs) {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
}
impl AsyncRead for NamedPipeClient {
@ -1017,7 +1500,7 @@ impl ClientOptions {
/// [enabled I/O]: crate::runtime::Builder::enable_io
/// [Tokio Runtime]: crate::runtime::Runtime
///
/// A connect loop that waits until a socket becomes available looks like
/// A connect loop that waits until a pipe becomes available looks like
/// this:
///
/// ```no_run

View File

@ -148,6 +148,133 @@ async fn test_named_pipe_multi_client() -> io::Result<()> {
Ok(())
}
#[tokio::test]
async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
use tokio::io::{AsyncBufReadExt as _, BufReader, Interest};
const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready";
const N: usize = 10;
// The first server needs to be constructed early so that clients can
// be correctly connected. Otherwise calling .wait will cause the client to
// error.
let mut server = ServerOptions::new().create(PIPE_NAME)?;
let server = tokio::spawn(async move {
for _ in 0..N {
// Wait for client to connect.
server.connect().await?;
let mut inner = BufReader::new(server);
// Construct the next server to be connected before sending the one
// we already have of onto a task. This ensures that the server
// isn't closed (after it's done in the task) before a new one is
// available. Otherwise the client might error with
// `io::ErrorKind::NotFound`.
server = ServerOptions::new().create(PIPE_NAME)?;
let _ = tokio::spawn(async move {
let mut buf = String::new();
inner.read_line(&mut buf).await?;
inner.write_all(b"pong\n").await?;
inner.flush().await?;
Ok::<_, io::Error>(())
});
}
Ok::<_, io::Error>(())
});
let mut clients = Vec::new();
for _ in 0..N {
clients.push(tokio::spawn(async move {
// This showcases a generic connect loop.
//
// We immediately try to create a client, if it's not found or the
// pipe is busy we use the specialized wait function on the client
// builder.
let client = loop {
match ClientOptions::new().open(PIPE_NAME) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
Err(e) if e.kind() == io::ErrorKind::NotFound => (),
Err(e) => return Err(e),
}
// Wait for a named pipe to become available.
time::sleep(Duration::from_millis(50)).await;
};
let mut read_buf = [0u8; 5];
let mut read_buf_cursor = 0;
let write_buf = b"ping\n";
let mut write_buf_cursor = 0;
loop {
let mut interest = Interest::READABLE;
if write_buf_cursor < write_buf.len() {
interest |= Interest::WRITABLE;
}
let ready = client.ready(interest).await?;
if ready.is_readable() {
let buf = &mut read_buf[read_buf_cursor..];
match client.try_read(buf) {
Ok(n) => {
read_buf_cursor += n;
if read_buf_cursor == read_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
if ready.is_writable() {
let buf = &write_buf[write_buf_cursor..];
if buf.is_empty() {
continue;
}
match client.try_write(buf) {
Ok(n) => {
write_buf_cursor += n;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
let buf = String::from_utf8_lossy(&read_buf).into_owned();
Ok::<_, io::Error>(buf)
}));
}
for client in clients {
let result = client.await?;
assert_eq!(result?, "pong\n");
}
server.await??;
Ok(())
}
// This tests what happens when a client tries to disconnect.
#[tokio::test]
async fn test_named_pipe_mode_message() -> io::Result<()> {