io: add AsyncFd::try_io() and try_io_mut() (#6967)

This commit is contained in:
Maarten de Vries 2024-11-19 17:55:54 +01:00 committed by GitHub
parent d4178cf349
commit cbdceb91ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 153 additions and 1 deletions

View File

@ -872,6 +872,56 @@ impl<T: AsRawFd> AsyncFd<T> {
.async_io(interest, || f(self.inner.as_mut().unwrap()))
.await
}
/// Tries to read or write from the file descriptor using a user-provided IO operation.
///
/// If the file descriptor is ready, the provided closure is called. The closure
/// should attempt to perform IO operation on the file descriptor by manually
/// calling the appropriate syscall. If the operation fails because the
/// file descriptor is not actually ready, then the closure should return a
/// `WouldBlock` error and the readiness flag is cleared. The return value
/// of the closure is then returned by `try_io`.
///
/// If the file descriptor is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the file descriptor that failed due to the file descriptor not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the file descriptor to
/// behave incorrectly.
///
/// The closure should not perform the IO operation using any of the methods
/// defined on the Tokio `AsyncFd` type, as this will mess with the
/// readiness flag and can cause the file descriptor to behave incorrectly.
///
/// This method is not intended to be used with combined interests.
/// The closure should perform only one type of IO operation, so it should not
/// require more than one ready state. This method may panic or sleep forever
/// if it is called with a combined interest.
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce(&T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.try_io(interest, || f(self.inner.as_ref().unwrap()))
}
/// Tries to read or write from the file descriptor using a user-provided IO operation.
///
/// The behavior is the same as [`try_io`], except that the closure can mutate the inner
/// value of the [`AsyncFd`].
///
/// [`try_io`]: AsyncFd::try_io
pub fn try_io_mut<R>(
&mut self,
interest: Interest,
f: impl FnOnce(&mut T) -> io::Result<R>,
) -> io::Result<R> {
self.registration
.try_io(interest, || f(self.inner.as_mut().unwrap()))
}
}
impl<T: AsRawFd> AsRawFd for AsyncFd<T> {

View File

@ -302,7 +302,7 @@ async fn reregister() {
#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn try_io() {
async fn guard_try_io() {
let (a, mut b) = socketpair();
b.write_all(b"0").unwrap();
@ -336,6 +336,108 @@ async fn try_io() {
let _ = readable.await.unwrap();
}
#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn try_io_readable() {
let (a, mut b) = socketpair();
let mut afd_a = AsyncFd::new(a).unwrap();
// Give the runtime some time to update bookkeeping.
tokio::task::yield_now().await;
{
let mut called = false;
let _ = afd_a.try_io_mut(Interest::READABLE, |_| {
called = true;
Ok(())
});
assert!(
!called,
"closure should not have been called, since socket should not be readable"
);
}
// Make `a` readable by writing to `b`.
// Give the runtime some time to update bookkeeping.
b.write_all(&[0]).unwrap();
tokio::task::yield_now().await;
{
let mut called = false;
let _ = afd_a.try_io(Interest::READABLE, |_| {
called = true;
Ok(())
});
assert!(
called,
"closure should have been called, since socket should have data available to read"
);
}
{
let mut called = false;
let _ = afd_a.try_io(Interest::READABLE, |_| {
called = true;
io::Result::<()>::Err(ErrorKind::WouldBlock.into())
});
assert!(
called,
"closure should have been called, since socket should have data available to read"
);
}
{
let mut called = false;
let _ = afd_a.try_io(Interest::READABLE, |_| {
called = true;
Ok(())
});
assert!(!called, "closure should not have been called, since socket readable state should have been cleared");
}
}
#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn try_io_writable() {
let (a, _b) = socketpair();
let afd_a = AsyncFd::new(a).unwrap();
// Give the runtime some time to update bookkeeping.
tokio::task::yield_now().await;
{
let mut called = false;
let _ = afd_a.try_io(Interest::WRITABLE, |_| {
called = true;
Ok(())
});
assert!(
called,
"closure should have been called, since socket should still be marked as writable"
);
}
{
let mut called = false;
let _ = afd_a.try_io(Interest::WRITABLE, |_| {
called = true;
io::Result::<()>::Err(ErrorKind::WouldBlock.into())
});
assert!(
called,
"closure should have been called, since socket should still be marked as writable"
);
}
{
let mut called = false;
let _ = afd_a.try_io(Interest::WRITABLE, |_| {
called = true;
Ok(())
});
assert!(!called, "closure should not have been called, since socket writable state should have been cleared");
}
}
#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn multiple_waiters() {