From e929d0e8b9aa46d4b7cad0f5d692ad94f5b2da45 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Sat, 6 Jan 2024 04:14:32 -0700 Subject: [PATCH] tests: update mio-aio to 0.8 (#6269) This is a test-only dependency. The main reason for the update is to avoid transitively depending on a Nix version with a CVE. mio-aio 0.8.0 has a substantially different API than 0.7.0. Notably, it no longer includes any lio_listio functionality. So to test Tokio's handling of EVFILT_LIO events we must go low-level and call libc::lio_listio directly. --- .cargo/audit.toml | 8 - tokio/Cargo.toml | 2 +- tokio/tests/io_poll_aio.rs | 397 +++++++++++++++++-------------------- 3 files changed, 181 insertions(+), 226 deletions(-) delete mode 100644 .cargo/audit.toml diff --git a/.cargo/audit.toml b/.cargo/audit.toml deleted file mode 100644 index 25e764be2..000000000 --- a/.cargo/audit.toml +++ /dev/null @@ -1,8 +0,0 @@ -# See https://github.com/rustsec/rustsec/blob/59e1d2ad0b9cbc6892c26de233d4925074b4b97b/cargo-audit/audit.toml.example for example. - -[advisories] -ignore = [ - # We depend on nix 0.22 only via mio-aio, a dev-dependency. - # https://github.com/tokio-rs/tokio/pull/4255#issuecomment-974786349 - "RUSTSEC-2021-0119", -] diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 05157cdb5..87d2a460c 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -149,7 +149,7 @@ rand = "0.8.0" wasm-bindgen-test = "0.3.0" [target.'cfg(target_os = "freebsd")'.dev-dependencies] -mio-aio = { version = "0.7.0", features = ["tokio"] } +mio-aio = { version = "0.8.0", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.7", features = ["futures", "checkpoint"] } diff --git a/tokio/tests/io_poll_aio.rs b/tokio/tests/io_poll_aio.rs index f044af5cc..e83859f5c 100644 --- a/tokio/tests/io_poll_aio.rs +++ b/tokio/tests/io_poll_aio.rs @@ -1,12 +1,12 @@ #![warn(rust_2018_idioms)] #![cfg(all(target_os = "freebsd", feature = "net"))] -use mio_aio::{AioCb, AioFsyncMode, LioCb}; +use mio_aio::{AioFsyncMode, SourceApi}; use std::{ future::Future, - mem, + io, mem, os::unix::io::{AsRawFd, RawFd}, - pin::Pin, + pin::{pin, Pin}, task::{Context, Poll}, }; use tempfile::tempfile; @@ -16,9 +16,10 @@ use tokio_test::assert_pending; mod aio { use super::*; - /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource - struct WrappedAioCb<'a>(AioCb<'a>); - impl<'a> AioSource for WrappedAioCb<'a> { + #[derive(Debug)] + struct TokioSource(mio_aio::Source); + + impl AioSource for TokioSource { fn register(&mut self, kq: RawFd, token: usize) { self.0.register_raw(kq, token) } @@ -28,12 +29,22 @@ mod aio { } /// A very crude implementation of an AIO-based future - struct FsyncFut(Aio>); + struct FsyncFut(Aio); + + impl FsyncFut { + pub fn submit(self: Pin<&mut Self>) -> io::Result<()> { + let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) }; + match p.submit() { + Ok(()) => Ok(()), + Err(e) => Err(io::Error::from_raw_os_error(e as i32)), + } + } + } impl Future for FsyncFut { - type Output = std::io::Result<()>; + type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, @@ -41,10 +52,11 @@ mod aio { Poll::Ready(Ok(_ev)) => { // At this point, we could clear readiness. But there's no // point, since we're about to drop the Aio. - let result = (*self.0).0.aio_return(); + let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) }; + let result = p.aio_return(); match result { - Ok(_) => Poll::Ready(Ok(())), - Err(e) => Poll::Ready(Err(e.into())), + Ok(r) => Poll::Ready(Ok(r)), + Err(e) => Poll::Ready(Err(io::Error::from_raw_os_error(e as i32))), } } } @@ -57,6 +69,16 @@ mod aio { /// registration actually works, under the hood. struct LlSource(Pin>); + impl LlSource { + fn fsync(mut self: Pin<&mut Self>) { + let r = unsafe { + let p = self.0.as_mut().get_unchecked_mut(); + libc::aio_fsync(libc::O_SYNC, p) + }; + assert_eq!(0, r); + } + } + impl AioSource for LlSource { fn register(&mut self, kq: RawFd, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; @@ -77,62 +99,15 @@ mod aio { struct LlFut(Aio); + impl LlFut { + pub fn fsync(self: Pin<&mut Self>) { + let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) }; + p.fsync(); + } + } + impl Future for LlFut { - type Output = std::io::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let poll_result = self.0.poll_ready(cx); - match poll_result { - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Ready(Ok(_ev)) => { - let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; - assert_eq!(0, r); - Poll::Ready(Ok(())) - } - } - } - } - - /// A very simple object that can implement AioSource and can be reused. - /// - /// mio_aio normally assumes that each AioCb will be consumed on completion. - /// This somewhat contrived example shows how an Aio object can be reused - /// anyway. - struct ReusableFsyncSource { - aiocb: Pin>>, - fd: RawFd, - token: usize, - } - impl ReusableFsyncSource { - fn fsync(&mut self) { - self.aiocb.register_raw(self.fd, self.token); - self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); - } - fn new(aiocb: AioCb<'static>) -> Self { - ReusableFsyncSource { - aiocb: Box::pin(aiocb), - fd: 0, - token: 0, - } - } - fn reset(&mut self, aiocb: AioCb<'static>) { - self.aiocb = Box::pin(aiocb); - } - } - impl AioSource for ReusableFsyncSource { - fn register(&mut self, kq: RawFd, token: usize) { - self.fd = kq; - self.token = token; - } - fn deregister(&mut self) { - self.fd = 0; - } - } - - struct ReusableFsyncFut<'a>(&'a mut Aio); - impl<'a> Future for ReusableFsyncFut<'a> { - type Output = std::io::Result<()>; + type Output = std::io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); @@ -140,16 +115,16 @@ mod aio { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(ev)) => { - // Since this future uses a reusable Aio, we must clear - // its readiness here. That makes the future - // non-idempotent; the caller can't poll it repeatedly after - // it has already returned Ready. But that's ok; most - // futures behave this way. + // Clearing readiness makes the future non-idempotent; the + // caller can't poll it repeatedly after it has already + // returned Ready. But that's ok; most futures behave this + // way. self.0.clear_ready(ev); - let result = (*self.0).aiocb.aio_return(); - match result { - Ok(_) => Poll::Ready(Ok(())), - Err(e) => Poll::Ready(Err(e.into())), + let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; + if r >= 0 { + Poll::Ready(Ok(r as usize)) + } else { + Poll::Ready(Err(io::Error::last_os_error())) } } } @@ -160,11 +135,11 @@ mod aio { async fn fsync() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); - let aiocb = AioCb::from_fd(fd, 0); - let source = WrappedAioCb(aiocb); - let mut poll_aio = Aio::new_for_aio(source).unwrap(); - (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); - let fut = FsyncFut(poll_aio); + let mode = AioFsyncMode::O_SYNC; + let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0)); + let poll_aio = Aio::new_for_aio(source).unwrap(); + let mut fut = pin!(FsyncFut(poll_aio)); + fut.as_mut().submit().unwrap(); fut.await.unwrap(); } @@ -177,7 +152,7 @@ mod aio { let source = LlSource(Box::pin(aiocb)); let mut poll_aio = Aio::new_for_aio(source).unwrap(); let r = unsafe { - let p = (*poll_aio).0.as_mut().get_unchecked_mut(); + let p = poll_aio.0.as_mut().get_unchecked_mut(); libc::aio_fsync(libc::O_SYNC, p) }; assert_eq!(0, r); @@ -190,144 +165,140 @@ mod aio { async fn reuse() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); - let aiocb0 = AioCb::from_fd(fd, 0); - let source = ReusableFsyncSource::new(aiocb0); - let mut poll_aio = Aio::new_for_aio(source).unwrap(); - poll_aio.fsync(); - let fut0 = ReusableFsyncFut(&mut poll_aio); - fut0.await.unwrap(); + let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + aiocb.aio_fildes = fd; + let source = LlSource(Box::pin(aiocb)); + let poll_aio = Aio::new_for_aio(source).unwrap(); - let aiocb1 = AioCb::from_fd(fd, 0); - poll_aio.reset(aiocb1); + // Send the operation to the kernel the first time + let mut fut = LlFut(poll_aio); + { + let mut pfut = Pin::new(&mut fut); + pfut.as_mut().fsync(); + pfut.as_mut().await.unwrap(); + } + + // Check that readiness was cleared let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); - assert_pending!(poll_aio.poll_ready(&mut ctx)); - poll_aio.fsync(); - let fut1 = ReusableFsyncFut(&mut poll_aio); - fut1.await.unwrap(); + assert_pending!(fut.0.poll_ready(&mut ctx)); + + // and reuse the future and its Aio object + { + let mut pfut = Pin::new(&mut fut); + pfut.as_mut().fsync(); + pfut.as_mut().await.unwrap(); + } } } mod lio { use super::*; - struct WrappedLioCb<'a>(LioCb<'a>); - impl<'a> AioSource for WrappedLioCb<'a> { - fn register(&mut self, kq: RawFd, token: usize) { - self.0.register_raw(kq, token) + /// Low-level source based on lio_listio + /// + /// An example demonstrating using AIO with `Interest::Lio`. mio_aio 0.8 + /// doesn't include any bindings for lio_listio, so we've got to go + /// low-level. + struct LioSource<'a> { + aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>, + sev: libc::sigevent, + } + + impl<'a> LioSource<'a> { + fn new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self { + LioSource { + aiocb, + sev: unsafe { mem::zeroed() }, + } } - fn deregister(&mut self) { - self.0.deregister_raw() + + fn submit(mut self: Pin<&mut Self>) { + let p: *const *mut libc::aiocb = + unsafe { self.aiocb.as_mut().get_unchecked_mut() } as *const _ as *const *mut _; + let r = unsafe { libc::lio_listio(libc::LIO_NOWAIT, p, 1, &mut self.sev) }; + assert_eq!(r, 0); } } - /// A very crude lio_listio-based Future - struct LioFut(Option>>); + impl<'a> AioSource for LioSource<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + sev.sigev_notify = libc::SIGEV_KEVENT; + sev.sigev_signo = kq; + sev.sigev_value = libc::sigval { + sival_ptr: token as *mut libc::c_void, + }; + self.sev = sev; + } - impl Future for LioFut { - type Output = std::io::Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let poll_result = self.0.as_mut().unwrap().poll_ready(cx); - match poll_result { - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Ready(Ok(_ev)) => { - // At this point, we could clear readiness. But there's no - // point, since we're about to drop the Aio. - let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { - iter.map(|lr| lr.result.unwrap()).collect::>() - }); - Poll::Ready(Ok(r)) - } + fn deregister(&mut self) { + unsafe { + self.sev = mem::zeroed(); } } } - /// Minimal example demonstrating reuse of an Aio object with lio - /// readiness. mio_aio::LioCb actually does something similar under the - /// hood. - struct ReusableLioSource { - liocb: Option>, - fd: RawFd, - token: usize, - } - impl ReusableLioSource { - fn new(liocb: LioCb<'static>) -> Self { - ReusableLioSource { - liocb: Some(liocb), - fd: 0, - token: 0, - } - } - fn reset(&mut self, liocb: LioCb<'static>) { - self.liocb = Some(liocb); - } - fn submit(&mut self) { - self.liocb - .as_mut() - .unwrap() - .register_raw(self.fd, self.token); - self.liocb.as_mut().unwrap().submit().unwrap(); - } - } - impl AioSource for ReusableLioSource { - fn register(&mut self, kq: RawFd, token: usize) { - self.fd = kq; - self.token = token; - } - fn deregister(&mut self) { - self.fd = 0; - } - } - struct ReusableLioFut<'a>(&'a mut Aio); - impl<'a> Future for ReusableLioFut<'a> { - type Output = std::io::Result>; + struct LioFut<'a>(Aio>); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + impl<'a> LioFut<'a> { + pub fn submit(self: Pin<&mut Self>) { + let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) }; + p.submit(); + } + } + + impl<'a> Future for LioFut<'a> { + type Output = std::io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(ev)) => { - // Since this future uses a reusable Aio, we must clear - // its readiness here. That makes the future - // non-idempotent; the caller can't poll it repeatedly after - // it has already returned Ready. But that's ok; most - // futures behave this way. + // Clearing readiness makes the future non-idempotent; the + // caller can't poll it repeatedly after it has already + // returned Ready. But that's ok; most futures behave this + // way. Clearing readiness is especially useful for + // lio_listio, because sometimes some operations will be + // ready but not all. self.0.clear_ready(ev); - let r = (*self.0).liocb.take().unwrap().into_results(|iter| { - iter.map(|lr| lr.result.unwrap()).collect::>() - }); - Poll::Ready(Ok(r)) + let r = unsafe { + let p1 = self.get_unchecked_mut(); + let p2: &mut [&mut libc::aiocb; 1] = + p1.0.aiocb.as_mut().get_unchecked_mut(); + let p3: &mut libc::aiocb = p2[0]; + libc::aio_return(p3) + }; + if r >= 0 { + Poll::Ready(Ok(r as usize)) + } else { + Poll::Ready(Err(io::Error::last_os_error())) + } } } } } - /// An lio_listio operation with one write element + /// An lio_listio operation with one fsync element #[tokio::test] async fn onewrite() { const WBUF: &[u8] = b"abcdef"; let f = tempfile().unwrap(); - let mut builder = mio_aio::LioCbBuilder::with_capacity(1); - builder = builder.emplace_slice( - f.as_raw_fd(), - 0, - &WBUF[..], - 0, - mio_aio::LioOpcode::LIO_WRITE, - ); - let liocb = builder.finish(); - let source = WrappedLioCb(liocb); - let mut poll_aio = Aio::new_for_lio(source).unwrap(); + let mut aiocb: libc::aiocb = unsafe { mem::zeroed() }; + aiocb.aio_fildes = f.as_raw_fd(); + aiocb.aio_lio_opcode = libc::LIO_WRITE; + aiocb.aio_nbytes = WBUF.len(); + aiocb.aio_buf = WBUF.as_ptr() as *mut _; + let aiocb = pin!([&mut aiocb]); + let source = LioSource::new(aiocb); + let poll_aio = Aio::new_for_lio(source).unwrap(); // Send the operation to the kernel - (*poll_aio).0.submit().unwrap(); - let fut = LioFut(Some(poll_aio)); - let v = fut.await.unwrap(); - assert_eq!(v.len(), 1); - assert_eq!(v[0] as usize, WBUF.len()); + let mut fut = pin!(LioFut(poll_aio)); + fut.as_mut().submit(); + fut.await.unwrap(); } /// A suitably crafted future type can reuse an Aio object @@ -336,40 +307,32 @@ mod lio { const WBUF: &[u8] = b"abcdef"; let f = tempfile().unwrap(); - let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); - builder0 = builder0.emplace_slice( - f.as_raw_fd(), - 0, - &WBUF[..], - 0, - mio_aio::LioOpcode::LIO_WRITE, - ); - let liocb0 = builder0.finish(); - let source = ReusableLioSource::new(liocb0); - let mut poll_aio = Aio::new_for_aio(source).unwrap(); - poll_aio.submit(); - let fut0 = ReusableLioFut(&mut poll_aio); - let v = fut0.await.unwrap(); - assert_eq!(v.len(), 1); - assert_eq!(v[0] as usize, WBUF.len()); + let mut aiocb: libc::aiocb = unsafe { mem::zeroed() }; + aiocb.aio_fildes = f.as_raw_fd(); + aiocb.aio_lio_opcode = libc::LIO_WRITE; + aiocb.aio_nbytes = WBUF.len(); + aiocb.aio_buf = WBUF.as_ptr() as *mut _; + let aiocb = pin!([&mut aiocb]); + let source = LioSource::new(aiocb); + let poll_aio = Aio::new_for_lio(source).unwrap(); - // Now reuse the same Aio - let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); - builder1 = builder1.emplace_slice( - f.as_raw_fd(), - 0, - &WBUF[..], - 0, - mio_aio::LioOpcode::LIO_WRITE, - ); - let liocb1 = builder1.finish(); - poll_aio.reset(liocb1); + // Send the operation to the kernel the first time + let mut fut = LioFut(poll_aio); + { + let mut pfut = Pin::new(&mut fut); + pfut.as_mut().submit(); + pfut.as_mut().await.unwrap(); + } + + // Check that readiness was cleared let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); - assert_pending!(poll_aio.poll_ready(&mut ctx)); - poll_aio.submit(); - let fut1 = ReusableLioFut(&mut poll_aio); - let v = fut1.await.unwrap(); - assert_eq!(v.len(), 1); - assert_eq!(v[0] as usize, WBUF.len()); + assert_pending!(fut.0.poll_ready(&mut ctx)); + + // and reuse the future and its Aio object + { + let mut pfut = Pin::new(&mut fut); + pfut.as_mut().submit(); + pfut.as_mut().await.unwrap(); + } } }