mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
io: add POSIX AIO on FreeBSD (#4054)
This commit is contained in:
parent
57563e218b
commit
8b298d9ed4
@ -131,6 +131,9 @@ tempfile = "3.1.0"
|
||||
async-stream = "0.3"
|
||||
socket2 = "0.4"
|
||||
|
||||
[target.'cfg(target_os = "freebsd")'.dev-dependencies]
|
||||
mio-aio = { git = "https://github.com/asomers/mio-aio", rev = "2f56696", features = ["tokio"] }
|
||||
|
||||
[target.'cfg(loom)'.dev-dependencies]
|
||||
loom = { version = "0.5", features = ["futures", "checkpoint"] }
|
||||
|
||||
|
195
tokio/src/io/bsd/poll_aio.rs
Normal file
195
tokio/src/io/bsd/poll_aio.rs
Normal file
@ -0,0 +1,195 @@
|
||||
//! Use POSIX AIO futures with Tokio
|
||||
|
||||
use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
|
||||
use mio::event::Source;
|
||||
use mio::Registry;
|
||||
use mio::Token;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::os::unix::prelude::RawFd;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// Like [`mio::event::Source`], but for POSIX AIO only.
|
||||
///
|
||||
/// Tokio's consumer must pass an implementor of this trait to create a
|
||||
/// [`Aio`] object.
|
||||
pub trait AioSource {
|
||||
/// Register this AIO event source with Tokio's reactor
|
||||
fn register(&mut self, kq: RawFd, token: usize);
|
||||
|
||||
/// Deregister this AIO event source with Tokio's reactor
|
||||
fn deregister(&mut self);
|
||||
}
|
||||
|
||||
/// Wrap the user's AioSource in order to implement mio::event::Source, which
|
||||
/// is what the rest of the crate wants.
|
||||
struct MioSource<T>(T);
|
||||
|
||||
impl<T: AioSource> Source for MioSource<T> {
|
||||
fn register(
|
||||
&mut self,
|
||||
registry: &Registry,
|
||||
token: Token,
|
||||
interests: mio::Interest,
|
||||
) -> io::Result<()> {
|
||||
assert!(interests.is_aio() || interests.is_lio());
|
||||
self.0.register(registry.as_raw_fd(), usize::from(token));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
|
||||
self.0.deregister();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reregister(
|
||||
&mut self,
|
||||
registry: &Registry,
|
||||
token: Token,
|
||||
interests: mio::Interest,
|
||||
) -> io::Result<()> {
|
||||
assert!(interests.is_aio() || interests.is_lio());
|
||||
self.0.register(registry.as_raw_fd(), usize::from(token));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Associates a POSIX AIO control block with the reactor that drives it.
|
||||
///
|
||||
/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
|
||||
/// by the reactor.
|
||||
///
|
||||
/// The wrapped source may be accessed through the `Aio` via the `Deref` and
|
||||
/// `DerefMut` traits.
|
||||
///
|
||||
/// ## Clearing readiness
|
||||
///
|
||||
/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the
|
||||
/// Source is not completely ready and must return to the Pending state,
|
||||
/// [`Aio::clear_ready`] may be used. This can be useful with
|
||||
/// [`lio_listio`], which may generate a kevent when only a portion of the
|
||||
/// operations have completed.
|
||||
///
|
||||
/// ## Platforms
|
||||
///
|
||||
/// Only FreeBSD implements POSIX AIO with kqueue notification, so
|
||||
/// `Aio` is only available for that operating system.
|
||||
///
|
||||
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
|
||||
// Note: Unlike every other kqueue event source, POSIX AIO registers events not
|
||||
// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
|
||||
// aio_write, etc. It needs the kqueue's file descriptor to do that. So
|
||||
// AsyncFd can't be used for POSIX AIO.
|
||||
//
|
||||
// Note that Aio doesn't implement Drop. There's no need. Unlike other
|
||||
// kqueue sources, simply dropping the object effectively deregisters it.
|
||||
pub struct Aio<E> {
|
||||
io: MioSource<E>,
|
||||
registration: Registration,
|
||||
}
|
||||
|
||||
// ===== impl Aio =====
|
||||
|
||||
impl<E: AioSource> Aio<E> {
|
||||
/// Creates a new `Aio` suitable for use with POSIX AIO functions.
|
||||
///
|
||||
/// It will be associated with the default reactor. The runtime is usually
|
||||
/// set implicitly when this function is called from a future driven by a
|
||||
/// Tokio runtime, otherwise runtime can be set explicitly with
|
||||
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
|
||||
pub fn new_for_aio(io: E) -> io::Result<Self> {
|
||||
Self::new_with_interest(io, Interest::AIO)
|
||||
}
|
||||
|
||||
/// Creates a new `Aio` suitable for use with [`lio_listio`].
|
||||
///
|
||||
/// It will be associated with the default reactor. The runtime is usually
|
||||
/// set implicitly when this function is called from a future driven by a
|
||||
/// Tokio runtime, otherwise runtime can be set explicitly with
|
||||
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
|
||||
///
|
||||
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
|
||||
pub fn new_for_lio(io: E) -> io::Result<Self> {
|
||||
Self::new_with_interest(io, Interest::LIO)
|
||||
}
|
||||
|
||||
fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
|
||||
let mut io = MioSource(io);
|
||||
let handle = Handle::current();
|
||||
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
|
||||
Ok(Self { io, registration })
|
||||
}
|
||||
|
||||
/// Indicates to Tokio that the source is no longer ready. The internal
|
||||
/// readiness flag will be cleared, and tokio will wait for the next
|
||||
/// edge-triggered readiness notification from the OS.
|
||||
///
|
||||
/// It is critical that this method not be called unless your code
|
||||
/// _actually observes_ that the source is _not_ ready. The OS must
|
||||
/// deliver a subsequent notification, or this source will block
|
||||
/// forever. It is equally critical that you `do` call this method if you
|
||||
/// resubmit the same structure to the kernel and poll it again.
|
||||
///
|
||||
/// This method is not very useful with AIO readiness, since each `aiocb`
|
||||
/// structure is typically only used once. It's main use with
|
||||
/// [`lio_listio`], which will sometimes send notification when only a
|
||||
/// portion of its elements are complete. In that case, the caller must
|
||||
/// call `clear_ready` before resubmitting it.
|
||||
///
|
||||
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
|
||||
pub fn clear_ready(&self, ev: AioEvent) {
|
||||
self.registration.clear_readiness(ev.0)
|
||||
}
|
||||
|
||||
/// Destroy the [`Aio`] and return its inner source.
|
||||
pub fn into_inner(self) -> E {
|
||||
self.io.0
|
||||
}
|
||||
|
||||
/// Polls for readiness. Either AIO or LIO counts.
|
||||
///
|
||||
/// This method returns:
|
||||
/// * `Poll::Pending` if the underlying operation is not complete, whether
|
||||
/// or not it completed successfully. This will be true if the OS is
|
||||
/// still processing it, or if it has not yet been submitted to the OS.
|
||||
/// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
|
||||
/// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
|
||||
/// _not_ indicate that the underlying operation encountered an error.
|
||||
///
|
||||
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context`
|
||||
/// is scheduled to receive a wakeup when the underlying operation
|
||||
/// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the
|
||||
/// `Context` passed to the most recent call is scheduled to receive a wakeup.
|
||||
pub fn poll_ready<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
|
||||
let ev = ready!(self.registration.poll_read_ready(cx))?;
|
||||
Poll::Ready(Ok(AioEvent(ev)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: AioSource> Deref for Aio<E> {
|
||||
type Target = E;
|
||||
|
||||
fn deref(&self) -> &E {
|
||||
&self.io.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: AioSource> DerefMut for Aio<E> {
|
||||
fn deref_mut(&mut self) -> &mut E {
|
||||
&mut self.io.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Aio").field("io", &self.io.0).finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Opaque data returned by [`Aio::poll_ready`].
|
||||
///
|
||||
/// It can be fed back to [`Aio::clear_ready`].
|
||||
#[derive(Debug)]
|
||||
pub struct AioEvent(ReadyEvent);
|
@ -14,6 +14,26 @@ use std::ops;
|
||||
pub struct Interest(mio::Interest);
|
||||
|
||||
impl Interest {
|
||||
// The non-FreeBSD definitions in this block are active only when
|
||||
// building documentation.
|
||||
cfg_aio! {
|
||||
/// Interest for POSIX AIO
|
||||
#[cfg(target_os = "freebsd")]
|
||||
pub const AIO: Interest = Interest(mio::Interest::AIO);
|
||||
|
||||
/// Interest for POSIX AIO
|
||||
#[cfg(not(target_os = "freebsd"))]
|
||||
pub const AIO: Interest = Interest(mio::Interest::READABLE);
|
||||
|
||||
/// Interest for POSIX AIO lio_listio events
|
||||
#[cfg(target_os = "freebsd")]
|
||||
pub const LIO: Interest = Interest(mio::Interest::LIO);
|
||||
|
||||
/// Interest for POSIX AIO lio_listio events
|
||||
#[cfg(not(target_os = "freebsd"))]
|
||||
pub const LIO: Interest = Interest(mio::Interest::READABLE);
|
||||
}
|
||||
|
||||
/// Interest in all readable events.
|
||||
///
|
||||
/// Readable interest includes read-closed events.
|
||||
|
@ -51,6 +51,7 @@ pub(crate) struct Handle {
|
||||
inner: Weak<Inner>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ReadyEvent {
|
||||
tick: u8,
|
||||
pub(crate) ready: Ready,
|
||||
|
@ -38,6 +38,17 @@ impl Ready {
|
||||
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
|
||||
let mut ready = Ready::EMPTY;
|
||||
|
||||
#[cfg(all(target_os = "freebsd", feature = "net"))]
|
||||
{
|
||||
if event.is_aio() {
|
||||
ready |= Ready::READABLE;
|
||||
}
|
||||
|
||||
if event.is_lio() {
|
||||
ready |= Ready::READABLE;
|
||||
}
|
||||
}
|
||||
|
||||
if event.is_readable() {
|
||||
ready |= Ready::READABLE;
|
||||
}
|
||||
|
@ -217,6 +217,15 @@ cfg_io_driver_impl! {
|
||||
pub(crate) use poll_evented::PollEvented;
|
||||
}
|
||||
|
||||
cfg_aio! {
|
||||
/// BSD-specific I/O types
|
||||
pub mod bsd {
|
||||
mod poll_aio;
|
||||
|
||||
pub use poll_aio::{Aio, AioEvent, AioSource};
|
||||
}
|
||||
}
|
||||
|
||||
cfg_net_unix! {
|
||||
mod async_fd;
|
||||
|
||||
|
@ -314,8 +314,9 @@
|
||||
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
|
||||
//! - `io-util`: Enables the IO based `Ext` traits.
|
||||
//! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types.
|
||||
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`,
|
||||
//! as well as (on Unix-like systems) `AsyncFd`
|
||||
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and
|
||||
//! `UdpSocket`, as well as (on Unix-like systems) `AsyncFd` and (on
|
||||
//! FreeBSD) `PollAio`.
|
||||
//! - `time`: Enables `tokio::time` types and allows the schedulers to enable
|
||||
//! the built in timer.
|
||||
//! - `process`: Enables `tokio::process` types.
|
||||
|
@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl {
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! cfg_aio {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
#[cfg(all(any(docsrs, target_os = "freebsd"), feature = "net"))]
|
||||
#[cfg_attr(docsrs,
|
||||
doc(cfg(all(target_os = "freebsd", feature = "net")))
|
||||
)]
|
||||
$item
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! cfg_fs {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
|
375
tokio/tests/io_poll_aio.rs
Normal file
375
tokio/tests/io_poll_aio.rs
Normal file
@ -0,0 +1,375 @@
|
||||
#![warn(rust_2018_idioms)]
|
||||
#![cfg(all(target_os = "freebsd", feature = "net"))]
|
||||
|
||||
use mio_aio::{AioCb, AioFsyncMode, LioCb};
|
||||
use std::{
|
||||
future::Future,
|
||||
mem,
|
||||
os::unix::io::{AsRawFd, RawFd},
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tempfile::tempfile;
|
||||
use tokio::io::bsd::{Aio, AioSource};
|
||||
use tokio_test::assert_pending;
|
||||
|
||||
mod aio {
|
||||
use super::*;
|
||||
|
||||
/// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
|
||||
struct WrappedAioCb<'a>(AioCb<'a>);
|
||||
impl<'a> AioSource for WrappedAioCb<'a> {
|
||||
fn register(&mut self, kq: RawFd, token: usize) {
|
||||
self.0.register_raw(kq, token)
|
||||
}
|
||||
fn deregister(&mut self) {
|
||||
self.0.deregister_raw()
|
||||
}
|
||||
}
|
||||
|
||||
/// A very crude implementation of an AIO-based future
|
||||
struct FsyncFut(Aio<WrappedAioCb<'static>>);
|
||||
|
||||
impl Future for FsyncFut {
|
||||
type Output = std::io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let poll_result = self.0.poll_ready(cx);
|
||||
match poll_result {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Ready(Ok(_ev)) => {
|
||||
// At this point, we could clear readiness. But there's no
|
||||
// point, since we're about to drop the Aio.
|
||||
let result = (*self.0).0.aio_return();
|
||||
match result {
|
||||
Ok(_) => Poll::Ready(Ok(())),
|
||||
Err(e) => Poll::Ready(Err(e.into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Low-level AIO Source
|
||||
///
|
||||
/// An example bypassing mio_aio and Nix to demonstrate how the kevent
|
||||
/// registration actually works, under the hood.
|
||||
struct LlSource(Pin<Box<libc::aiocb>>);
|
||||
|
||||
impl AioSource for LlSource {
|
||||
fn register(&mut self, kq: RawFd, token: usize) {
|
||||
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
|
||||
sev.sigev_notify = libc::SIGEV_KEVENT;
|
||||
sev.sigev_signo = kq;
|
||||
sev.sigev_value = libc::sigval {
|
||||
sival_ptr: token as *mut libc::c_void,
|
||||
};
|
||||
self.0.aio_sigevent = sev;
|
||||
}
|
||||
|
||||
fn deregister(&mut self) {
|
||||
unsafe {
|
||||
self.0.aio_sigevent = mem::zeroed();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LlFut(Aio<LlSource>);
|
||||
|
||||
impl Future for LlFut {
|
||||
type Output = std::io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let poll_result = self.0.poll_ready(cx);
|
||||
match poll_result {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Ready(Ok(_ev)) => {
|
||||
let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
|
||||
assert_eq!(0, r);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A very simple object that can implement AioSource and can be reused.
|
||||
///
|
||||
/// mio_aio normally assumes that each AioCb will be consumed on completion.
|
||||
/// This somewhat contrived example shows how an Aio object can be reused
|
||||
/// anyway.
|
||||
struct ReusableFsyncSource {
|
||||
aiocb: Pin<Box<AioCb<'static>>>,
|
||||
fd: RawFd,
|
||||
token: usize,
|
||||
}
|
||||
impl ReusableFsyncSource {
|
||||
fn fsync(&mut self) {
|
||||
self.aiocb.register_raw(self.fd, self.token);
|
||||
self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
|
||||
}
|
||||
fn new(aiocb: AioCb<'static>) -> Self {
|
||||
ReusableFsyncSource {
|
||||
aiocb: Box::pin(aiocb),
|
||||
fd: 0,
|
||||
token: 0,
|
||||
}
|
||||
}
|
||||
fn reset(&mut self, aiocb: AioCb<'static>) {
|
||||
self.aiocb = Box::pin(aiocb);
|
||||
}
|
||||
}
|
||||
impl AioSource for ReusableFsyncSource {
|
||||
fn register(&mut self, kq: RawFd, token: usize) {
|
||||
self.fd = kq;
|
||||
self.token = token;
|
||||
}
|
||||
fn deregister(&mut self) {
|
||||
self.fd = 0;
|
||||
}
|
||||
}
|
||||
|
||||
struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
|
||||
impl<'a> Future for ReusableFsyncFut<'a> {
|
||||
type Output = std::io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let poll_result = self.0.poll_ready(cx);
|
||||
match poll_result {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Ready(Ok(ev)) => {
|
||||
// Since this future uses a reusable Aio, we must clear
|
||||
// its readiness here. That makes the future
|
||||
// non-idempotent; the caller can't poll it repeatedly after
|
||||
// it has already returned Ready. But that's ok; most
|
||||
// futures behave this way.
|
||||
self.0.clear_ready(ev);
|
||||
let result = (*self.0).aiocb.aio_return();
|
||||
match result {
|
||||
Ok(_) => Poll::Ready(Ok(())),
|
||||
Err(e) => Poll::Ready(Err(e.into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fsync() {
|
||||
let f = tempfile().unwrap();
|
||||
let fd = f.as_raw_fd();
|
||||
let aiocb = AioCb::from_fd(fd, 0);
|
||||
let source = WrappedAioCb(aiocb);
|
||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
||||
(*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
|
||||
let fut = FsyncFut(poll_aio);
|
||||
fut.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ll_fsync() {
|
||||
let f = tempfile().unwrap();
|
||||
let fd = f.as_raw_fd();
|
||||
let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
|
||||
aiocb.aio_fildes = fd;
|
||||
let source = LlSource(Box::pin(aiocb));
|
||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
||||
let r = unsafe {
|
||||
let p = (*poll_aio).0.as_mut().get_unchecked_mut();
|
||||
libc::aio_fsync(libc::O_SYNC, p)
|
||||
};
|
||||
assert_eq!(0, r);
|
||||
let fut = LlFut(poll_aio);
|
||||
fut.await.unwrap();
|
||||
}
|
||||
|
||||
/// A suitably crafted future type can reuse an Aio object
|
||||
#[tokio::test]
|
||||
async fn reuse() {
|
||||
let f = tempfile().unwrap();
|
||||
let fd = f.as_raw_fd();
|
||||
let aiocb0 = AioCb::from_fd(fd, 0);
|
||||
let source = ReusableFsyncSource::new(aiocb0);
|
||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
||||
poll_aio.fsync();
|
||||
let fut0 = ReusableFsyncFut(&mut poll_aio);
|
||||
fut0.await.unwrap();
|
||||
|
||||
let aiocb1 = AioCb::from_fd(fd, 0);
|
||||
poll_aio.reset(aiocb1);
|
||||
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
|
||||
assert_pending!(poll_aio.poll_ready(&mut ctx));
|
||||
poll_aio.fsync();
|
||||
let fut1 = ReusableFsyncFut(&mut poll_aio);
|
||||
fut1.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
mod lio {
|
||||
use super::*;
|
||||
|
||||
struct WrappedLioCb<'a>(LioCb<'a>);
|
||||
impl<'a> AioSource for WrappedLioCb<'a> {
|
||||
fn register(&mut self, kq: RawFd, token: usize) {
|
||||
self.0.register_raw(kq, token)
|
||||
}
|
||||
fn deregister(&mut self) {
|
||||
self.0.deregister_raw()
|
||||
}
|
||||
}
|
||||
|
||||
/// A very crude lio_listio-based Future
|
||||
struct LioFut(Option<Aio<WrappedLioCb<'static>>>);
|
||||
|
||||
impl Future for LioFut {
|
||||
type Output = std::io::Result<Vec<isize>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
|
||||
match poll_result {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Ready(Ok(_ev)) => {
|
||||
// At this point, we could clear readiness. But there's no
|
||||
// point, since we're about to drop the Aio.
|
||||
let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
|
||||
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
|
||||
});
|
||||
Poll::Ready(Ok(r))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal example demonstrating reuse of an Aio object with lio
|
||||
/// readiness. mio_aio::LioCb actually does something similar under the
|
||||
/// hood.
|
||||
struct ReusableLioSource {
|
||||
liocb: Option<LioCb<'static>>,
|
||||
fd: RawFd,
|
||||
token: usize,
|
||||
}
|
||||
impl ReusableLioSource {
|
||||
fn new(liocb: LioCb<'static>) -> Self {
|
||||
ReusableLioSource {
|
||||
liocb: Some(liocb),
|
||||
fd: 0,
|
||||
token: 0,
|
||||
}
|
||||
}
|
||||
fn reset(&mut self, liocb: LioCb<'static>) {
|
||||
self.liocb = Some(liocb);
|
||||
}
|
||||
fn submit(&mut self) {
|
||||
self.liocb
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.register_raw(self.fd, self.token);
|
||||
self.liocb.as_mut().unwrap().submit().unwrap();
|
||||
}
|
||||
}
|
||||
impl AioSource for ReusableLioSource {
|
||||
fn register(&mut self, kq: RawFd, token: usize) {
|
||||
self.fd = kq;
|
||||
self.token = token;
|
||||
}
|
||||
fn deregister(&mut self) {
|
||||
self.fd = 0;
|
||||
}
|
||||
}
|
||||
struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
|
||||
impl<'a> Future for ReusableLioFut<'a> {
|
||||
type Output = std::io::Result<Vec<isize>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let poll_result = self.0.poll_ready(cx);
|
||||
match poll_result {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||
Poll::Ready(Ok(ev)) => {
|
||||
// Since this future uses a reusable Aio, we must clear
|
||||
// its readiness here. That makes the future
|
||||
// non-idempotent; the caller can't poll it repeatedly after
|
||||
// it has already returned Ready. But that's ok; most
|
||||
// futures behave this way.
|
||||
self.0.clear_ready(ev);
|
||||
let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
|
||||
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
|
||||
});
|
||||
Poll::Ready(Ok(r))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An lio_listio operation with one write element
|
||||
#[tokio::test]
|
||||
async fn onewrite() {
|
||||
const WBUF: &[u8] = b"abcdef";
|
||||
let f = tempfile().unwrap();
|
||||
|
||||
let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
|
||||
builder = builder.emplace_slice(
|
||||
f.as_raw_fd(),
|
||||
0,
|
||||
&WBUF[..],
|
||||
0,
|
||||
mio_aio::LioOpcode::LIO_WRITE,
|
||||
);
|
||||
let liocb = builder.finish();
|
||||
let source = WrappedLioCb(liocb);
|
||||
let mut poll_aio = Aio::new_for_lio(source).unwrap();
|
||||
|
||||
// Send the operation to the kernel
|
||||
(*poll_aio).0.submit().unwrap();
|
||||
let fut = LioFut(Some(poll_aio));
|
||||
let v = fut.await.unwrap();
|
||||
assert_eq!(v.len(), 1);
|
||||
assert_eq!(v[0] as usize, WBUF.len());
|
||||
}
|
||||
|
||||
/// A suitably crafted future type can reuse an Aio object
|
||||
#[tokio::test]
|
||||
async fn reuse() {
|
||||
const WBUF: &[u8] = b"abcdef";
|
||||
let f = tempfile().unwrap();
|
||||
|
||||
let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
|
||||
builder0 = builder0.emplace_slice(
|
||||
f.as_raw_fd(),
|
||||
0,
|
||||
&WBUF[..],
|
||||
0,
|
||||
mio_aio::LioOpcode::LIO_WRITE,
|
||||
);
|
||||
let liocb0 = builder0.finish();
|
||||
let source = ReusableLioSource::new(liocb0);
|
||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
||||
poll_aio.submit();
|
||||
let fut0 = ReusableLioFut(&mut poll_aio);
|
||||
let v = fut0.await.unwrap();
|
||||
assert_eq!(v.len(), 1);
|
||||
assert_eq!(v[0] as usize, WBUF.len());
|
||||
|
||||
// Now reuse the same Aio
|
||||
let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
|
||||
builder1 = builder1.emplace_slice(
|
||||
f.as_raw_fd(),
|
||||
0,
|
||||
&WBUF[..],
|
||||
0,
|
||||
mio_aio::LioOpcode::LIO_WRITE,
|
||||
);
|
||||
let liocb1 = builder1.finish();
|
||||
poll_aio.reset(liocb1);
|
||||
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
|
||||
assert_pending!(poll_aio.poll_ready(&mut ctx));
|
||||
poll_aio.submit();
|
||||
let fut1 = ReusableLioFut(&mut poll_aio);
|
||||
let v = fut1.await.unwrap();
|
||||
assert_eq!(v.len(), 1);
|
||||
assert_eq!(v[0] as usize, WBUF.len());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user