From 8b298d9ed41930c071f023a43bf0ded64ea1129e Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Wed, 15 Sep 2021 10:55:50 -0600 Subject: [PATCH] io: add POSIX AIO on FreeBSD (#4054) --- tokio/Cargo.toml | 3 + tokio/src/io/bsd/poll_aio.rs | 195 +++++++++++++++++ tokio/src/io/driver/interest.rs | 20 ++ tokio/src/io/driver/mod.rs | 1 + tokio/src/io/driver/ready.rs | 11 + tokio/src/io/mod.rs | 9 + tokio/src/lib.rs | 5 +- tokio/src/macros/cfg.rs | 12 + tokio/tests/io_poll_aio.rs | 375 ++++++++++++++++++++++++++++++++ 9 files changed, 629 insertions(+), 2 deletions(-) create mode 100644 tokio/src/io/bsd/poll_aio.rs create mode 100644 tokio/tests/io_poll_aio.rs diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 3b027049d..080e000c8 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -131,6 +131,9 @@ tempfile = "3.1.0" async-stream = "0.3" socket2 = "0.4" +[target.'cfg(target_os = "freebsd")'.dev-dependencies] +mio-aio = { git = "https://github.com/asomers/mio-aio", rev = "2f56696", features = ["tokio"] } + [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5", features = ["futures", "checkpoint"] } diff --git a/tokio/src/io/bsd/poll_aio.rs b/tokio/src/io/bsd/poll_aio.rs new file mode 100644 index 000000000..a765d7677 --- /dev/null +++ b/tokio/src/io/bsd/poll_aio.rs @@ -0,0 +1,195 @@ +//! Use POSIX AIO futures with Tokio + +use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; +use mio::event::Source; +use mio::Registry; +use mio::Token; +use std::fmt; +use std::io; +use std::ops::{Deref, DerefMut}; +use std::os::unix::io::AsRawFd; +use std::os::unix::prelude::RawFd; +use std::task::{Context, Poll}; + +/// Like [`mio::event::Source`], but for POSIX AIO only. +/// +/// Tokio's consumer must pass an implementor of this trait to create a +/// [`Aio`] object. +pub trait AioSource { + /// Register this AIO event source with Tokio's reactor + fn register(&mut self, kq: RawFd, token: usize); + + /// Deregister this AIO event source with Tokio's reactor + fn deregister(&mut self); +} + +/// Wrap the user's AioSource in order to implement mio::event::Source, which +/// is what the rest of the crate wants. +struct MioSource(T); + +impl Source for MioSource { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: mio::Interest, + ) -> io::Result<()> { + assert!(interests.is_aio() || interests.is_lio()); + self.0.register(registry.as_raw_fd(), usize::from(token)); + Ok(()) + } + + fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { + self.0.deregister(); + Ok(()) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: mio::Interest, + ) -> io::Result<()> { + assert!(interests.is_aio() || interests.is_lio()); + self.0.register(registry.as_raw_fd(), usize::from(token)); + Ok(()) + } +} + +/// Associates a POSIX AIO control block with the reactor that drives it. +/// +/// `Aio`'s wrapped type must implement [`AioSource`] to be driven +/// by the reactor. +/// +/// The wrapped source may be accessed through the `Aio` via the `Deref` and +/// `DerefMut` traits. +/// +/// ## Clearing readiness +/// +/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the +/// Source is not completely ready and must return to the Pending state, +/// [`Aio::clear_ready`] may be used. This can be useful with +/// [`lio_listio`], which may generate a kevent when only a portion of the +/// operations have completed. +/// +/// ## Platforms +/// +/// Only FreeBSD implements POSIX AIO with kqueue notification, so +/// `Aio` is only available for that operating system. +/// +/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html +// Note: Unlike every other kqueue event source, POSIX AIO registers events not +// via kevent(2) but when the aiocb is submitted to the kernel via aio_read, +// aio_write, etc. It needs the kqueue's file descriptor to do that. So +// AsyncFd can't be used for POSIX AIO. +// +// Note that Aio doesn't implement Drop. There's no need. Unlike other +// kqueue sources, simply dropping the object effectively deregisters it. +pub struct Aio { + io: MioSource, + registration: Registration, +} + +// ===== impl Aio ===== + +impl Aio { + /// Creates a new `Aio` suitable for use with POSIX AIO functions. + /// + /// It will be associated with the default reactor. The runtime is usually + /// set implicitly when this function is called from a future driven by a + /// Tokio runtime, otherwise runtime can be set explicitly with + /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn new_for_aio(io: E) -> io::Result { + Self::new_with_interest(io, Interest::AIO) + } + + /// Creates a new `Aio` suitable for use with [`lio_listio`]. + /// + /// It will be associated with the default reactor. The runtime is usually + /// set implicitly when this function is called from a future driven by a + /// Tokio runtime, otherwise runtime can be set explicitly with + /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. + /// + /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + pub fn new_for_lio(io: E) -> io::Result { + Self::new_with_interest(io, Interest::LIO) + } + + fn new_with_interest(io: E, interest: Interest) -> io::Result { + let mut io = MioSource(io); + let handle = Handle::current(); + let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; + Ok(Self { io, registration }) + } + + /// Indicates to Tokio that the source is no longer ready. The internal + /// readiness flag will be cleared, and tokio will wait for the next + /// edge-triggered readiness notification from the OS. + /// + /// It is critical that this method not be called unless your code + /// _actually observes_ that the source is _not_ ready. The OS must + /// deliver a subsequent notification, or this source will block + /// forever. It is equally critical that you `do` call this method if you + /// resubmit the same structure to the kernel and poll it again. + /// + /// This method is not very useful with AIO readiness, since each `aiocb` + /// structure is typically only used once. It's main use with + /// [`lio_listio`], which will sometimes send notification when only a + /// portion of its elements are complete. In that case, the caller must + /// call `clear_ready` before resubmitting it. + /// + /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + pub fn clear_ready(&self, ev: AioEvent) { + self.registration.clear_readiness(ev.0) + } + + /// Destroy the [`Aio`] and return its inner source. + pub fn into_inner(self) -> E { + self.io.0 + } + + /// Polls for readiness. Either AIO or LIO counts. + /// + /// This method returns: + /// * `Poll::Pending` if the underlying operation is not complete, whether + /// or not it completed successfully. This will be true if the OS is + /// still processing it, or if it has not yet been submitted to the OS. + /// * `Poll::Ready(Ok(_))` if the underlying operation is complete. + /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does + /// _not_ indicate that the underlying operation encountered an error. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` + /// is scheduled to receive a wakeup when the underlying operation + /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the + /// `Context` passed to the most recent call is scheduled to receive a wakeup. + pub fn poll_ready<'a>(&'a self, cx: &mut Context<'_>) -> Poll> { + let ev = ready!(self.registration.poll_read_ready(cx))?; + Poll::Ready(Ok(AioEvent(ev))) + } +} + +impl Deref for Aio { + type Target = E; + + fn deref(&self) -> &E { + &self.io.0 + } +} + +impl DerefMut for Aio { + fn deref_mut(&mut self) -> &mut E { + &mut self.io.0 + } +} + +impl fmt::Debug for Aio { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Aio").field("io", &self.io.0).finish() + } +} + +/// Opaque data returned by [`Aio::poll_ready`]. +/// +/// It can be fed back to [`Aio::clear_ready`]. +#[derive(Debug)] +pub struct AioEvent(ReadyEvent); diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index 36951cf5a..c5b18edd4 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -14,6 +14,26 @@ use std::ops; pub struct Interest(mio::Interest); impl Interest { + // The non-FreeBSD definitions in this block are active only when + // building documentation. + cfg_aio! { + /// Interest for POSIX AIO + #[cfg(target_os = "freebsd")] + pub const AIO: Interest = Interest(mio::Interest::AIO); + + /// Interest for POSIX AIO + #[cfg(not(target_os = "freebsd"))] + pub const AIO: Interest = Interest(mio::Interest::READABLE); + + /// Interest for POSIX AIO lio_listio events + #[cfg(target_os = "freebsd")] + pub const LIO: Interest = Interest(mio::Interest::LIO); + + /// Interest for POSIX AIO lio_listio events + #[cfg(not(target_os = "freebsd"))] + pub const LIO: Interest = Interest(mio::Interest::READABLE); + } + /// Interest in all readable events. /// /// Readable interest includes read-closed events. diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 3aa0cfbb2..1511884ce 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -51,6 +51,7 @@ pub(crate) struct Handle { inner: Weak, } +#[derive(Debug)] pub(crate) struct ReadyEvent { tick: u8, pub(crate) ready: Ready, diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 2ac01bdbe..305dc91f0 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -38,6 +38,17 @@ impl Ready { pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { let mut ready = Ready::EMPTY; + #[cfg(all(target_os = "freebsd", feature = "net"))] + { + if event.is_aio() { + ready |= Ready::READABLE; + } + + if event.is_lio() { + ready |= Ready::READABLE; + } + } + if event.is_readable() { ready |= Ready::READABLE; } diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 14a4a6304..a5ee10800 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -217,6 +217,15 @@ cfg_io_driver_impl! { pub(crate) use poll_evented::PollEvented; } +cfg_aio! { + /// BSD-specific I/O types + pub mod bsd { + mod poll_aio; + + pub use poll_aio::{Aio, AioEvent, AioSource}; + } +} + cfg_net_unix! { mod async_fd; diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index f3687df35..9689223d2 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -314,8 +314,9 @@ //! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler. //! - `io-util`: Enables the IO based `Ext` traits. //! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types. -//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`, -//! as well as (on Unix-like systems) `AsyncFd` +//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and +//! `UdpSocket`, as well as (on Unix-like systems) `AsyncFd` and (on +//! FreeBSD) `PollAio`. //! - `time`: Enables `tokio::time` types and allows the schedulers to enable //! the built in timer. //! - `process`: Enables `tokio::process` types. diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 510d8a629..193bcd7d9 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl { } } +macro_rules! cfg_aio { + ($($item:item)*) => { + $( + #[cfg(all(any(docsrs, target_os = "freebsd"), feature = "net"))] + #[cfg_attr(docsrs, + doc(cfg(all(target_os = "freebsd", feature = "net"))) + )] + $item + )* + } +} + macro_rules! cfg_fs { ($($item:item)*) => { $( diff --git a/tokio/tests/io_poll_aio.rs b/tokio/tests/io_poll_aio.rs new file mode 100644 index 000000000..f044af5cc --- /dev/null +++ b/tokio/tests/io_poll_aio.rs @@ -0,0 +1,375 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(target_os = "freebsd", feature = "net"))] + +use mio_aio::{AioCb, AioFsyncMode, LioCb}; +use std::{ + future::Future, + mem, + os::unix::io::{AsRawFd, RawFd}, + pin::Pin, + task::{Context, Poll}, +}; +use tempfile::tempfile; +use tokio::io::bsd::{Aio, AioSource}; +use tokio_test::assert_pending; + +mod aio { + use super::*; + + /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource + struct WrappedAioCb<'a>(AioCb<'a>); + impl<'a> AioSource for WrappedAioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude implementation of an AIO-based future + struct FsyncFut(Aio>); + + impl Future for FsyncFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let result = (*self.0).0.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + /// Low-level AIO Source + /// + /// An example bypassing mio_aio and Nix to demonstrate how the kevent + /// registration actually works, under the hood. + struct LlSource(Pin>); + + impl AioSource for LlSource { + fn register(&mut self, kq: RawFd, token: usize) { + let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + sev.sigev_notify = libc::SIGEV_KEVENT; + sev.sigev_signo = kq; + sev.sigev_value = libc::sigval { + sival_ptr: token as *mut libc::c_void, + }; + self.0.aio_sigevent = sev; + } + + fn deregister(&mut self) { + unsafe { + self.0.aio_sigevent = mem::zeroed(); + } + } + } + + struct LlFut(Aio); + + impl Future for LlFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; + assert_eq!(0, r); + Poll::Ready(Ok(())) + } + } + } + } + + /// A very simple object that can implement AioSource and can be reused. + /// + /// mio_aio normally assumes that each AioCb will be consumed on completion. + /// This somewhat contrived example shows how an Aio object can be reused + /// anyway. + struct ReusableFsyncSource { + aiocb: Pin>>, + fd: RawFd, + token: usize, + } + impl ReusableFsyncSource { + fn fsync(&mut self) { + self.aiocb.register_raw(self.fd, self.token); + self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); + } + fn new(aiocb: AioCb<'static>) -> Self { + ReusableFsyncSource { + aiocb: Box::pin(aiocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, aiocb: AioCb<'static>) { + self.aiocb = Box::pin(aiocb); + } + } + impl AioSource for ReusableFsyncSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + + struct ReusableFsyncFut<'a>(&'a mut Aio); + impl<'a> Future for ReusableFsyncFut<'a> { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let result = (*self.0).aiocb.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + #[tokio::test] + async fn fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb = AioCb::from_fd(fd, 0); + let source = WrappedAioCb(aiocb); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); + let fut = FsyncFut(poll_aio); + fut.await.unwrap(); + } + + #[tokio::test] + async fn ll_fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + aiocb.aio_fildes = fd; + let source = LlSource(Box::pin(aiocb)); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + let r = unsafe { + let p = (*poll_aio).0.as_mut().get_unchecked_mut(); + libc::aio_fsync(libc::O_SYNC, p) + }; + assert_eq!(0, r); + let fut = LlFut(poll_aio); + fut.await.unwrap(); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb0 = AioCb::from_fd(fd, 0); + let source = ReusableFsyncSource::new(aiocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.fsync(); + let fut0 = ReusableFsyncFut(&mut poll_aio); + fut0.await.unwrap(); + + let aiocb1 = AioCb::from_fd(fd, 0); + poll_aio.reset(aiocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.fsync(); + let fut1 = ReusableFsyncFut(&mut poll_aio); + fut1.await.unwrap(); + } +} + +mod lio { + use super::*; + + struct WrappedLioCb<'a>(LioCb<'a>); + impl<'a> AioSource for WrappedLioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude lio_listio-based Future + struct LioFut(Option>>); + + impl Future for LioFut { + type Output = std::io::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.as_mut().unwrap().poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// Minimal example demonstrating reuse of an Aio object with lio + /// readiness. mio_aio::LioCb actually does something similar under the + /// hood. + struct ReusableLioSource { + liocb: Option>, + fd: RawFd, + token: usize, + } + impl ReusableLioSource { + fn new(liocb: LioCb<'static>) -> Self { + ReusableLioSource { + liocb: Some(liocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, liocb: LioCb<'static>) { + self.liocb = Some(liocb); + } + fn submit(&mut self) { + self.liocb + .as_mut() + .unwrap() + .register_raw(self.fd, self.token); + self.liocb.as_mut().unwrap().submit().unwrap(); + } + } + impl AioSource for ReusableLioSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + struct ReusableLioFut<'a>(&'a mut Aio); + impl<'a> Future for ReusableLioFut<'a> { + type Output = std::io::Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let r = (*self.0).liocb.take().unwrap().into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// An lio_listio operation with one write element + #[tokio::test] + async fn onewrite() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder = mio_aio::LioCbBuilder::with_capacity(1); + builder = builder.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb = builder.finish(); + let source = WrappedLioCb(liocb); + let mut poll_aio = Aio::new_for_lio(source).unwrap(); + + // Send the operation to the kernel + (*poll_aio).0.submit().unwrap(); + let fut = LioFut(Some(poll_aio)); + let v = fut.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); + builder0 = builder0.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb0 = builder0.finish(); + let source = ReusableLioSource::new(liocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.submit(); + let fut0 = ReusableLioFut(&mut poll_aio); + let v = fut0.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + + // Now reuse the same Aio + let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); + builder1 = builder1.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb1 = builder1.finish(); + poll_aio.reset(liocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.submit(); + let fut1 = ReusableLioFut(&mut poll_aio); + let v = fut1.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } +}