diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 543f97fd4..c70a19788 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -72,7 +72,9 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { let mut buf_offset: usize = 0; let mut file_offset: u64 = 0; while buf_offset < total { - let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await?; + let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await; + // TODO: handle EINT here + let n = n?; if n == 0 { return Err(io::ErrorKind::WriteZero.into()); } diff --git a/tokio/src/io/uring/open.rs b/tokio/src/io/uring/open.rs index 68f434ff1..913588c66 100644 --- a/tokio/src/io/uring/open.rs +++ b/tokio/src/io/uring/open.rs @@ -1,10 +1,13 @@ use super::utils::cstr; -use crate::{ - fs::UringOpenOptions, - runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}, -}; + +use crate::fs::UringOpenOptions; +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; + use io_uring::{opcode, types}; -use std::{ffi::CString, io, os::fd::FromRawFd, path::Path}; +use std::ffi::CString; +use std::io::{self, Error}; +use std::os::fd::FromRawFd; +use std::path::Path; #[derive(Debug)] pub(crate) struct Open { @@ -15,11 +18,14 @@ pub(crate) struct Open { } impl Completable for Open { - type Output = crate::fs::File; - fn complete(self, cqe: CqeResult) -> io::Result { - let fd = cqe.result? as i32; - let file = unsafe { crate::fs::File::from_raw_fd(fd) }; - Ok(file) + type Output = io::Result; + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result + .map(|fd| unsafe { crate::fs::File::from_raw_fd(fd as i32) }) + } + + fn complete_with_error(self, err: Error) -> Self::Output { + Err(err) } } diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 9332bd007..7341f7622 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -1,12 +1,9 @@ -use crate::{ - runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}, - util::as_ref::OwnedBuf, -}; +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use crate::util::as_ref::OwnedBuf; + use io_uring::{opcode, types}; -use std::{ - io, - os::fd::{AsRawFd, OwnedFd}, -}; +use std::io::{self, Error}; +use std::os::fd::{AsRawFd, OwnedFd}; #[derive(Debug)] pub(crate) struct Write { @@ -15,9 +12,13 @@ pub(crate) struct Write { } impl Completable for Write { - type Output = (u32, OwnedBuf, OwnedFd); - fn complete(self, cqe: CqeResult) -> io::Result { - Ok((cqe.result?, self.buf, self.fd)) + type Output = (io::Result, OwnedBuf, OwnedFd); + fn complete(self, cqe: CqeResult) -> Self::Output { + (cqe.result, self.buf, self.fd) + } + + fn complete_with_error(self, err: Error) -> Self::Output { + (Err(err), self.buf, self.fd) } } diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 413dd4a83..37945cf5a 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,14 +1,14 @@ use crate::io::uring::open::Open; use crate::io::uring::write::Write; use crate::runtime::Handle; + use io_uring::cqueue; use io_uring::squeue::Entry; use std::future::Future; +use std::io::{self, Error}; +use std::mem; use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use std::task::Waker; -use std::{io, mem}; +use std::task::{Context, Poll, Waker}; // This field isn't accessed directly, but it holds cancellation data, // so `#[allow(dead_code)]` is needed. @@ -110,7 +110,13 @@ impl From for CqeResult { /// A trait that converts a CQE result into a usable value for each operation. pub(crate) trait Completable { type Output; - fn complete(self, cqe: CqeResult) -> io::Result; + fn complete(self, cqe: CqeResult) -> Self::Output; + + // This is used when you want to terminate an operation with an error. + // + // The `Op` type that implements this trait can return the passed error + // upstream by embedding it in the `Output`. + fn complete_with_error(self, error: Error) -> Self::Output; } /// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation. @@ -121,7 +127,7 @@ pub(crate) trait Cancellable { impl Unpin for Op {} impl Future for Op { - type Output = io::Result; + type Output = T::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -132,9 +138,21 @@ impl Future for Op { State::Initialize(entry_opt) => { let entry = entry_opt.take().expect("Entry must be present"); let waker = cx.waker().clone(); + // SAFETY: entry is valid for the entire duration of the operation - let idx = unsafe { driver.register_op(entry, waker)? }; - this.state = State::Polled(idx); + match unsafe { driver.register_op(entry, waker) } { + Ok(idx) => this.state = State::Polled(idx), + Err(err) => { + let data = this + .take_data() + .expect("Data must be present on Initialization"); + + this.state = State::Complete; + + return Poll::Ready(data.complete_with_error(err)); + } + }; + Poll::Pending }