mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
tests: update mio-aio to 0.8 (#6269)
This is a test-only dependency. The main reason for the update is to avoid transitively depending on a Nix version with a CVE. mio-aio 0.8.0 has a substantially different API than 0.7.0. Notably, it no longer includes any lio_listio functionality. So to test Tokio's handling of EVFILT_LIO events we must go low-level and call libc::lio_listio directly.
This commit is contained in:
parent
2f730d4d5a
commit
e929d0e8b9
@ -1,8 +0,0 @@
|
|||||||
# See https://github.com/rustsec/rustsec/blob/59e1d2ad0b9cbc6892c26de233d4925074b4b97b/cargo-audit/audit.toml.example for example.
|
|
||||||
|
|
||||||
[advisories]
|
|
||||||
ignore = [
|
|
||||||
# We depend on nix 0.22 only via mio-aio, a dev-dependency.
|
|
||||||
# https://github.com/tokio-rs/tokio/pull/4255#issuecomment-974786349
|
|
||||||
"RUSTSEC-2021-0119",
|
|
||||||
]
|
|
@ -149,7 +149,7 @@ rand = "0.8.0"
|
|||||||
wasm-bindgen-test = "0.3.0"
|
wasm-bindgen-test = "0.3.0"
|
||||||
|
|
||||||
[target.'cfg(target_os = "freebsd")'.dev-dependencies]
|
[target.'cfg(target_os = "freebsd")'.dev-dependencies]
|
||||||
mio-aio = { version = "0.7.0", features = ["tokio"] }
|
mio-aio = { version = "0.8.0", features = ["tokio"] }
|
||||||
|
|
||||||
[target.'cfg(loom)'.dev-dependencies]
|
[target.'cfg(loom)'.dev-dependencies]
|
||||||
loom = { version = "0.7", features = ["futures", "checkpoint"] }
|
loom = { version = "0.7", features = ["futures", "checkpoint"] }
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
#![warn(rust_2018_idioms)]
|
#![warn(rust_2018_idioms)]
|
||||||
#![cfg(all(target_os = "freebsd", feature = "net"))]
|
#![cfg(all(target_os = "freebsd", feature = "net"))]
|
||||||
|
|
||||||
use mio_aio::{AioCb, AioFsyncMode, LioCb};
|
use mio_aio::{AioFsyncMode, SourceApi};
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
mem,
|
io, mem,
|
||||||
os::unix::io::{AsRawFd, RawFd},
|
os::unix::io::{AsRawFd, RawFd},
|
||||||
pin::Pin,
|
pin::{pin, Pin},
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
@ -16,9 +16,10 @@ use tokio_test::assert_pending;
|
|||||||
mod aio {
|
mod aio {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
/// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
|
#[derive(Debug)]
|
||||||
struct WrappedAioCb<'a>(AioCb<'a>);
|
struct TokioSource(mio_aio::Source<nix::sys::aio::AioFsync>);
|
||||||
impl<'a> AioSource for WrappedAioCb<'a> {
|
|
||||||
|
impl AioSource for TokioSource {
|
||||||
fn register(&mut self, kq: RawFd, token: usize) {
|
fn register(&mut self, kq: RawFd, token: usize) {
|
||||||
self.0.register_raw(kq, token)
|
self.0.register_raw(kq, token)
|
||||||
}
|
}
|
||||||
@ -28,12 +29,22 @@ mod aio {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A very crude implementation of an AIO-based future
|
/// A very crude implementation of an AIO-based future
|
||||||
struct FsyncFut(Aio<WrappedAioCb<'static>>);
|
struct FsyncFut(Aio<TokioSource>);
|
||||||
|
|
||||||
|
impl FsyncFut {
|
||||||
|
pub fn submit(self: Pin<&mut Self>) -> io::Result<()> {
|
||||||
|
let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
|
||||||
|
match p.submit() {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(e) => Err(io::Error::from_raw_os_error(e as i32)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Future for FsyncFut {
|
impl Future for FsyncFut {
|
||||||
type Output = std::io::Result<()>;
|
type Output = io::Result<()>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let poll_result = self.0.poll_ready(cx);
|
let poll_result = self.0.poll_ready(cx);
|
||||||
match poll_result {
|
match poll_result {
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
@ -41,10 +52,11 @@ mod aio {
|
|||||||
Poll::Ready(Ok(_ev)) => {
|
Poll::Ready(Ok(_ev)) => {
|
||||||
// At this point, we could clear readiness. But there's no
|
// At this point, we could clear readiness. But there's no
|
||||||
// point, since we're about to drop the Aio.
|
// point, since we're about to drop the Aio.
|
||||||
let result = (*self.0).0.aio_return();
|
let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
|
||||||
|
let result = p.aio_return();
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => Poll::Ready(Ok(())),
|
Ok(r) => Poll::Ready(Ok(r)),
|
||||||
Err(e) => Poll::Ready(Err(e.into())),
|
Err(e) => Poll::Ready(Err(io::Error::from_raw_os_error(e as i32))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -57,6 +69,16 @@ mod aio {
|
|||||||
/// registration actually works, under the hood.
|
/// registration actually works, under the hood.
|
||||||
struct LlSource(Pin<Box<libc::aiocb>>);
|
struct LlSource(Pin<Box<libc::aiocb>>);
|
||||||
|
|
||||||
|
impl LlSource {
|
||||||
|
fn fsync(mut self: Pin<&mut Self>) {
|
||||||
|
let r = unsafe {
|
||||||
|
let p = self.0.as_mut().get_unchecked_mut();
|
||||||
|
libc::aio_fsync(libc::O_SYNC, p)
|
||||||
|
};
|
||||||
|
assert_eq!(0, r);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl AioSource for LlSource {
|
impl AioSource for LlSource {
|
||||||
fn register(&mut self, kq: RawFd, token: usize) {
|
fn register(&mut self, kq: RawFd, token: usize) {
|
||||||
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
|
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
|
||||||
@ -77,62 +99,15 @@ mod aio {
|
|||||||
|
|
||||||
struct LlFut(Aio<LlSource>);
|
struct LlFut(Aio<LlSource>);
|
||||||
|
|
||||||
|
impl LlFut {
|
||||||
|
pub fn fsync(self: Pin<&mut Self>) {
|
||||||
|
let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) };
|
||||||
|
p.fsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Future for LlFut {
|
impl Future for LlFut {
|
||||||
type Output = std::io::Result<()>;
|
type Output = std::io::Result<usize>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let poll_result = self.0.poll_ready(cx);
|
|
||||||
match poll_result {
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
|
||||||
Poll::Ready(Ok(_ev)) => {
|
|
||||||
let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
|
|
||||||
assert_eq!(0, r);
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A very simple object that can implement AioSource and can be reused.
|
|
||||||
///
|
|
||||||
/// mio_aio normally assumes that each AioCb will be consumed on completion.
|
|
||||||
/// This somewhat contrived example shows how an Aio object can be reused
|
|
||||||
/// anyway.
|
|
||||||
struct ReusableFsyncSource {
|
|
||||||
aiocb: Pin<Box<AioCb<'static>>>,
|
|
||||||
fd: RawFd,
|
|
||||||
token: usize,
|
|
||||||
}
|
|
||||||
impl ReusableFsyncSource {
|
|
||||||
fn fsync(&mut self) {
|
|
||||||
self.aiocb.register_raw(self.fd, self.token);
|
|
||||||
self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
|
|
||||||
}
|
|
||||||
fn new(aiocb: AioCb<'static>) -> Self {
|
|
||||||
ReusableFsyncSource {
|
|
||||||
aiocb: Box::pin(aiocb),
|
|
||||||
fd: 0,
|
|
||||||
token: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn reset(&mut self, aiocb: AioCb<'static>) {
|
|
||||||
self.aiocb = Box::pin(aiocb);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl AioSource for ReusableFsyncSource {
|
|
||||||
fn register(&mut self, kq: RawFd, token: usize) {
|
|
||||||
self.fd = kq;
|
|
||||||
self.token = token;
|
|
||||||
}
|
|
||||||
fn deregister(&mut self) {
|
|
||||||
self.fd = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
|
|
||||||
impl<'a> Future for ReusableFsyncFut<'a> {
|
|
||||||
type Output = std::io::Result<()>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let poll_result = self.0.poll_ready(cx);
|
let poll_result = self.0.poll_ready(cx);
|
||||||
@ -140,16 +115,16 @@ mod aio {
|
|||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||||
Poll::Ready(Ok(ev)) => {
|
Poll::Ready(Ok(ev)) => {
|
||||||
// Since this future uses a reusable Aio, we must clear
|
// Clearing readiness makes the future non-idempotent; the
|
||||||
// its readiness here. That makes the future
|
// caller can't poll it repeatedly after it has already
|
||||||
// non-idempotent; the caller can't poll it repeatedly after
|
// returned Ready. But that's ok; most futures behave this
|
||||||
// it has already returned Ready. But that's ok; most
|
// way.
|
||||||
// futures behave this way.
|
|
||||||
self.0.clear_ready(ev);
|
self.0.clear_ready(ev);
|
||||||
let result = (*self.0).aiocb.aio_return();
|
let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
|
||||||
match result {
|
if r >= 0 {
|
||||||
Ok(_) => Poll::Ready(Ok(())),
|
Poll::Ready(Ok(r as usize))
|
||||||
Err(e) => Poll::Ready(Err(e.into())),
|
} else {
|
||||||
|
Poll::Ready(Err(io::Error::last_os_error()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -160,11 +135,11 @@ mod aio {
|
|||||||
async fn fsync() {
|
async fn fsync() {
|
||||||
let f = tempfile().unwrap();
|
let f = tempfile().unwrap();
|
||||||
let fd = f.as_raw_fd();
|
let fd = f.as_raw_fd();
|
||||||
let aiocb = AioCb::from_fd(fd, 0);
|
let mode = AioFsyncMode::O_SYNC;
|
||||||
let source = WrappedAioCb(aiocb);
|
let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0));
|
||||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
let poll_aio = Aio::new_for_aio(source).unwrap();
|
||||||
(*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
|
let mut fut = pin!(FsyncFut(poll_aio));
|
||||||
let fut = FsyncFut(poll_aio);
|
fut.as_mut().submit().unwrap();
|
||||||
fut.await.unwrap();
|
fut.await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,7 +152,7 @@ mod aio {
|
|||||||
let source = LlSource(Box::pin(aiocb));
|
let source = LlSource(Box::pin(aiocb));
|
||||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
||||||
let r = unsafe {
|
let r = unsafe {
|
||||||
let p = (*poll_aio).0.as_mut().get_unchecked_mut();
|
let p = poll_aio.0.as_mut().get_unchecked_mut();
|
||||||
libc::aio_fsync(libc::O_SYNC, p)
|
libc::aio_fsync(libc::O_SYNC, p)
|
||||||
};
|
};
|
||||||
assert_eq!(0, r);
|
assert_eq!(0, r);
|
||||||
@ -190,144 +165,140 @@ mod aio {
|
|||||||
async fn reuse() {
|
async fn reuse() {
|
||||||
let f = tempfile().unwrap();
|
let f = tempfile().unwrap();
|
||||||
let fd = f.as_raw_fd();
|
let fd = f.as_raw_fd();
|
||||||
let aiocb0 = AioCb::from_fd(fd, 0);
|
let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
|
||||||
let source = ReusableFsyncSource::new(aiocb0);
|
aiocb.aio_fildes = fd;
|
||||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
let source = LlSource(Box::pin(aiocb));
|
||||||
poll_aio.fsync();
|
let poll_aio = Aio::new_for_aio(source).unwrap();
|
||||||
let fut0 = ReusableFsyncFut(&mut poll_aio);
|
|
||||||
fut0.await.unwrap();
|
|
||||||
|
|
||||||
let aiocb1 = AioCb::from_fd(fd, 0);
|
// Send the operation to the kernel the first time
|
||||||
poll_aio.reset(aiocb1);
|
let mut fut = LlFut(poll_aio);
|
||||||
|
{
|
||||||
|
let mut pfut = Pin::new(&mut fut);
|
||||||
|
pfut.as_mut().fsync();
|
||||||
|
pfut.as_mut().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that readiness was cleared
|
||||||
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
|
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
|
||||||
assert_pending!(poll_aio.poll_ready(&mut ctx));
|
assert_pending!(fut.0.poll_ready(&mut ctx));
|
||||||
poll_aio.fsync();
|
|
||||||
let fut1 = ReusableFsyncFut(&mut poll_aio);
|
// and reuse the future and its Aio object
|
||||||
fut1.await.unwrap();
|
{
|
||||||
|
let mut pfut = Pin::new(&mut fut);
|
||||||
|
pfut.as_mut().fsync();
|
||||||
|
pfut.as_mut().await.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod lio {
|
mod lio {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
struct WrappedLioCb<'a>(LioCb<'a>);
|
/// Low-level source based on lio_listio
|
||||||
impl<'a> AioSource for WrappedLioCb<'a> {
|
///
|
||||||
fn register(&mut self, kq: RawFd, token: usize) {
|
/// An example demonstrating using AIO with `Interest::Lio`. mio_aio 0.8
|
||||||
self.0.register_raw(kq, token)
|
/// doesn't include any bindings for lio_listio, so we've got to go
|
||||||
|
/// low-level.
|
||||||
|
struct LioSource<'a> {
|
||||||
|
aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>,
|
||||||
|
sev: libc::sigevent,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> LioSource<'a> {
|
||||||
|
fn new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self {
|
||||||
|
LioSource {
|
||||||
|
aiocb,
|
||||||
|
sev: unsafe { mem::zeroed() },
|
||||||
|
}
|
||||||
}
|
}
|
||||||
fn deregister(&mut self) {
|
|
||||||
self.0.deregister_raw()
|
fn submit(mut self: Pin<&mut Self>) {
|
||||||
|
let p: *const *mut libc::aiocb =
|
||||||
|
unsafe { self.aiocb.as_mut().get_unchecked_mut() } as *const _ as *const *mut _;
|
||||||
|
let r = unsafe { libc::lio_listio(libc::LIO_NOWAIT, p, 1, &mut self.sev) };
|
||||||
|
assert_eq!(r, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A very crude lio_listio-based Future
|
impl<'a> AioSource for LioSource<'a> {
|
||||||
struct LioFut(Option<Aio<WrappedLioCb<'static>>>);
|
fn register(&mut self, kq: RawFd, token: usize) {
|
||||||
|
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
|
||||||
|
sev.sigev_notify = libc::SIGEV_KEVENT;
|
||||||
|
sev.sigev_signo = kq;
|
||||||
|
sev.sigev_value = libc::sigval {
|
||||||
|
sival_ptr: token as *mut libc::c_void,
|
||||||
|
};
|
||||||
|
self.sev = sev;
|
||||||
|
}
|
||||||
|
|
||||||
impl Future for LioFut {
|
fn deregister(&mut self) {
|
||||||
type Output = std::io::Result<Vec<isize>>;
|
unsafe {
|
||||||
|
self.sev = mem::zeroed();
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
|
|
||||||
match poll_result {
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
|
||||||
Poll::Ready(Ok(_ev)) => {
|
|
||||||
// At this point, we could clear readiness. But there's no
|
|
||||||
// point, since we're about to drop the Aio.
|
|
||||||
let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
|
|
||||||
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
|
|
||||||
});
|
|
||||||
Poll::Ready(Ok(r))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Minimal example demonstrating reuse of an Aio object with lio
|
struct LioFut<'a>(Aio<LioSource<'a>>);
|
||||||
/// readiness. mio_aio::LioCb actually does something similar under the
|
|
||||||
/// hood.
|
|
||||||
struct ReusableLioSource {
|
|
||||||
liocb: Option<LioCb<'static>>,
|
|
||||||
fd: RawFd,
|
|
||||||
token: usize,
|
|
||||||
}
|
|
||||||
impl ReusableLioSource {
|
|
||||||
fn new(liocb: LioCb<'static>) -> Self {
|
|
||||||
ReusableLioSource {
|
|
||||||
liocb: Some(liocb),
|
|
||||||
fd: 0,
|
|
||||||
token: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn reset(&mut self, liocb: LioCb<'static>) {
|
|
||||||
self.liocb = Some(liocb);
|
|
||||||
}
|
|
||||||
fn submit(&mut self) {
|
|
||||||
self.liocb
|
|
||||||
.as_mut()
|
|
||||||
.unwrap()
|
|
||||||
.register_raw(self.fd, self.token);
|
|
||||||
self.liocb.as_mut().unwrap().submit().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl AioSource for ReusableLioSource {
|
|
||||||
fn register(&mut self, kq: RawFd, token: usize) {
|
|
||||||
self.fd = kq;
|
|
||||||
self.token = token;
|
|
||||||
}
|
|
||||||
fn deregister(&mut self) {
|
|
||||||
self.fd = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
|
|
||||||
impl<'a> Future for ReusableLioFut<'a> {
|
|
||||||
type Output = std::io::Result<Vec<isize>>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
impl<'a> LioFut<'a> {
|
||||||
|
pub fn submit(self: Pin<&mut Self>) {
|
||||||
|
let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) };
|
||||||
|
p.submit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Future for LioFut<'a> {
|
||||||
|
type Output = std::io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let poll_result = self.0.poll_ready(cx);
|
let poll_result = self.0.poll_ready(cx);
|
||||||
match poll_result {
|
match poll_result {
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
||||||
Poll::Ready(Ok(ev)) => {
|
Poll::Ready(Ok(ev)) => {
|
||||||
// Since this future uses a reusable Aio, we must clear
|
// Clearing readiness makes the future non-idempotent; the
|
||||||
// its readiness here. That makes the future
|
// caller can't poll it repeatedly after it has already
|
||||||
// non-idempotent; the caller can't poll it repeatedly after
|
// returned Ready. But that's ok; most futures behave this
|
||||||
// it has already returned Ready. But that's ok; most
|
// way. Clearing readiness is especially useful for
|
||||||
// futures behave this way.
|
// lio_listio, because sometimes some operations will be
|
||||||
|
// ready but not all.
|
||||||
self.0.clear_ready(ev);
|
self.0.clear_ready(ev);
|
||||||
let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
|
let r = unsafe {
|
||||||
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
|
let p1 = self.get_unchecked_mut();
|
||||||
});
|
let p2: &mut [&mut libc::aiocb; 1] =
|
||||||
Poll::Ready(Ok(r))
|
p1.0.aiocb.as_mut().get_unchecked_mut();
|
||||||
|
let p3: &mut libc::aiocb = p2[0];
|
||||||
|
libc::aio_return(p3)
|
||||||
|
};
|
||||||
|
if r >= 0 {
|
||||||
|
Poll::Ready(Ok(r as usize))
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Err(io::Error::last_os_error()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An lio_listio operation with one write element
|
/// An lio_listio operation with one fsync element
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn onewrite() {
|
async fn onewrite() {
|
||||||
const WBUF: &[u8] = b"abcdef";
|
const WBUF: &[u8] = b"abcdef";
|
||||||
let f = tempfile().unwrap();
|
let f = tempfile().unwrap();
|
||||||
|
|
||||||
let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
|
let mut aiocb: libc::aiocb = unsafe { mem::zeroed() };
|
||||||
builder = builder.emplace_slice(
|
aiocb.aio_fildes = f.as_raw_fd();
|
||||||
f.as_raw_fd(),
|
aiocb.aio_lio_opcode = libc::LIO_WRITE;
|
||||||
0,
|
aiocb.aio_nbytes = WBUF.len();
|
||||||
&WBUF[..],
|
aiocb.aio_buf = WBUF.as_ptr() as *mut _;
|
||||||
0,
|
let aiocb = pin!([&mut aiocb]);
|
||||||
mio_aio::LioOpcode::LIO_WRITE,
|
let source = LioSource::new(aiocb);
|
||||||
);
|
let poll_aio = Aio::new_for_lio(source).unwrap();
|
||||||
let liocb = builder.finish();
|
|
||||||
let source = WrappedLioCb(liocb);
|
|
||||||
let mut poll_aio = Aio::new_for_lio(source).unwrap();
|
|
||||||
|
|
||||||
// Send the operation to the kernel
|
// Send the operation to the kernel
|
||||||
(*poll_aio).0.submit().unwrap();
|
let mut fut = pin!(LioFut(poll_aio));
|
||||||
let fut = LioFut(Some(poll_aio));
|
fut.as_mut().submit();
|
||||||
let v = fut.await.unwrap();
|
fut.await.unwrap();
|
||||||
assert_eq!(v.len(), 1);
|
|
||||||
assert_eq!(v[0] as usize, WBUF.len());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A suitably crafted future type can reuse an Aio object
|
/// A suitably crafted future type can reuse an Aio object
|
||||||
@ -336,40 +307,32 @@ mod lio {
|
|||||||
const WBUF: &[u8] = b"abcdef";
|
const WBUF: &[u8] = b"abcdef";
|
||||||
let f = tempfile().unwrap();
|
let f = tempfile().unwrap();
|
||||||
|
|
||||||
let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
|
let mut aiocb: libc::aiocb = unsafe { mem::zeroed() };
|
||||||
builder0 = builder0.emplace_slice(
|
aiocb.aio_fildes = f.as_raw_fd();
|
||||||
f.as_raw_fd(),
|
aiocb.aio_lio_opcode = libc::LIO_WRITE;
|
||||||
0,
|
aiocb.aio_nbytes = WBUF.len();
|
||||||
&WBUF[..],
|
aiocb.aio_buf = WBUF.as_ptr() as *mut _;
|
||||||
0,
|
let aiocb = pin!([&mut aiocb]);
|
||||||
mio_aio::LioOpcode::LIO_WRITE,
|
let source = LioSource::new(aiocb);
|
||||||
);
|
let poll_aio = Aio::new_for_lio(source).unwrap();
|
||||||
let liocb0 = builder0.finish();
|
|
||||||
let source = ReusableLioSource::new(liocb0);
|
|
||||||
let mut poll_aio = Aio::new_for_aio(source).unwrap();
|
|
||||||
poll_aio.submit();
|
|
||||||
let fut0 = ReusableLioFut(&mut poll_aio);
|
|
||||||
let v = fut0.await.unwrap();
|
|
||||||
assert_eq!(v.len(), 1);
|
|
||||||
assert_eq!(v[0] as usize, WBUF.len());
|
|
||||||
|
|
||||||
// Now reuse the same Aio
|
// Send the operation to the kernel the first time
|
||||||
let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
|
let mut fut = LioFut(poll_aio);
|
||||||
builder1 = builder1.emplace_slice(
|
{
|
||||||
f.as_raw_fd(),
|
let mut pfut = Pin::new(&mut fut);
|
||||||
0,
|
pfut.as_mut().submit();
|
||||||
&WBUF[..],
|
pfut.as_mut().await.unwrap();
|
||||||
0,
|
}
|
||||||
mio_aio::LioOpcode::LIO_WRITE,
|
|
||||||
);
|
// Check that readiness was cleared
|
||||||
let liocb1 = builder1.finish();
|
|
||||||
poll_aio.reset(liocb1);
|
|
||||||
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
|
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
|
||||||
assert_pending!(poll_aio.poll_ready(&mut ctx));
|
assert_pending!(fut.0.poll_ready(&mut ctx));
|
||||||
poll_aio.submit();
|
|
||||||
let fut1 = ReusableLioFut(&mut poll_aio);
|
// and reuse the future and its Aio object
|
||||||
let v = fut1.await.unwrap();
|
{
|
||||||
assert_eq!(v.len(), 1);
|
let mut pfut = Pin::new(&mut fut);
|
||||||
assert_eq!(v[0] as usize, WBUF.len());
|
pfut.as_mut().submit();
|
||||||
|
pfut.as_mut().await.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user