From cbdceb91ac8d863d6f02be0a1ed34f13abaa91c0 Mon Sep 17 00:00:00 2001 From: Maarten de Vries Date: Tue, 19 Nov 2024 17:55:54 +0100 Subject: [PATCH] io: add `AsyncFd::try_io()` and `try_io_mut()` (#6967) --- tokio/src/io/async_fd.rs | 50 ++++++++++++++++++ tokio/tests/io_async_fd.rs | 104 ++++++++++++++++++++++++++++++++++++- 2 files changed, 153 insertions(+), 1 deletion(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 0fda5da40..8ecc6b952 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -872,6 +872,56 @@ impl AsyncFd { .async_io(interest, || f(self.inner.as_mut().unwrap())) .await } + + /// Tries to read or write from the file descriptor using a user-provided IO operation. + /// + /// If the file descriptor is ready, the provided closure is called. The closure + /// should attempt to perform IO operation on the file descriptor by manually + /// calling the appropriate syscall. If the operation fails because the + /// file descriptor is not actually ready, then the closure should return a + /// `WouldBlock` error and the readiness flag is cleared. The return value + /// of the closure is then returned by `try_io`. + /// + /// If the file descriptor is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the file descriptor that failed due to the file descriptor not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the file descriptor to + /// behave incorrectly. + /// + /// The closure should not perform the IO operation using any of the methods + /// defined on the Tokio `AsyncFd` type, as this will mess with the + /// readiness flag and can cause the file descriptor to behave incorrectly. + /// + /// This method is not intended to be used with combined interests. + /// The closure should perform only one type of IO operation, so it should not + /// require more than one ready state. This method may panic or sleep forever + /// if it is called with a combined interest. + pub fn try_io( + &self, + interest: Interest, + f: impl FnOnce(&T) -> io::Result, + ) -> io::Result { + self.registration + .try_io(interest, || f(self.inner.as_ref().unwrap())) + } + + /// Tries to read or write from the file descriptor using a user-provided IO operation. + /// + /// The behavior is the same as [`try_io`], except that the closure can mutate the inner + /// value of the [`AsyncFd`]. + /// + /// [`try_io`]: AsyncFd::try_io + pub fn try_io_mut( + &mut self, + interest: Interest, + f: impl FnOnce(&mut T) -> io::Result, + ) -> io::Result { + self.registration + .try_io(interest, || f(self.inner.as_mut().unwrap())) + } } impl AsRawFd for AsyncFd { diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index 5e4da3191..ab8893f23 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -302,7 +302,7 @@ async fn reregister() { #[tokio::test] #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri. -async fn try_io() { +async fn guard_try_io() { let (a, mut b) = socketpair(); b.write_all(b"0").unwrap(); @@ -336,6 +336,108 @@ async fn try_io() { let _ = readable.await.unwrap(); } +#[tokio::test] +#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri. +async fn try_io_readable() { + let (a, mut b) = socketpair(); + let mut afd_a = AsyncFd::new(a).unwrap(); + + // Give the runtime some time to update bookkeeping. + tokio::task::yield_now().await; + + { + let mut called = false; + let _ = afd_a.try_io_mut(Interest::READABLE, |_| { + called = true; + Ok(()) + }); + assert!( + !called, + "closure should not have been called, since socket should not be readable" + ); + } + + // Make `a` readable by writing to `b`. + // Give the runtime some time to update bookkeeping. + b.write_all(&[0]).unwrap(); + tokio::task::yield_now().await; + + { + let mut called = false; + let _ = afd_a.try_io(Interest::READABLE, |_| { + called = true; + Ok(()) + }); + assert!( + called, + "closure should have been called, since socket should have data available to read" + ); + } + + { + let mut called = false; + let _ = afd_a.try_io(Interest::READABLE, |_| { + called = true; + io::Result::<()>::Err(ErrorKind::WouldBlock.into()) + }); + assert!( + called, + "closure should have been called, since socket should have data available to read" + ); + } + + { + let mut called = false; + let _ = afd_a.try_io(Interest::READABLE, |_| { + called = true; + Ok(()) + }); + assert!(!called, "closure should not have been called, since socket readable state should have been cleared"); + } +} + +#[tokio::test] +#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri. +async fn try_io_writable() { + let (a, _b) = socketpair(); + let afd_a = AsyncFd::new(a).unwrap(); + + // Give the runtime some time to update bookkeeping. + tokio::task::yield_now().await; + + { + let mut called = false; + let _ = afd_a.try_io(Interest::WRITABLE, |_| { + called = true; + Ok(()) + }); + assert!( + called, + "closure should have been called, since socket should still be marked as writable" + ); + } + { + let mut called = false; + let _ = afd_a.try_io(Interest::WRITABLE, |_| { + called = true; + io::Result::<()>::Err(ErrorKind::WouldBlock.into()) + }); + assert!( + called, + "closure should have been called, since socket should still be marked as writable" + ); + } + + { + let mut called = false; + let _ = afd_a.try_io(Interest::WRITABLE, |_| { + called = true; + Ok(()) + }); + assert!(!called, "closure should not have been called, since socket writable state should have been cleared"); + } +} + #[tokio::test] #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri. async fn multiple_waiters() {