diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f6680967e..6626f64c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -350,6 +350,39 @@ jobs: # the unstable cfg to RustDoc RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump + test-uring: + name: test tokio full --cfg tokio_uring + needs: basics + runs-on: ${{ matrix.os }} + strategy: + matrix: + include: + - os: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust ${{ env.rust_stable }} + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.rust_stable }} + + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: cargo-nextest + + - uses: Swatinem/rust-cache@v2 + - name: test tokio full --cfg tokio_uring + run: | + set -euxo pipefail + cargo nextest run --all-features + cargo test --doc --all-features + working-directory: tokio + env: + RUSTFLAGS: --cfg tokio_uring -Dwarnings + # in order to run doctests for unstable features, we must also pass + # the unstable cfg to RustDoc + RUSTDOCFLAGS: --cfg tokio_uring + check-unstable-mt-counters: name: check tokio full --internal-mt-counters needs: basics @@ -703,6 +736,8 @@ jobs: - { name: "--unstable", rustflags: "--cfg tokio_unstable -Dwarnings" } # Try with unstable and taskdump feature flags - { name: "--unstable --taskdump", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump" } + - { name: "--tokio_uring", rustflags: "-Dwarnings --cfg tokio_uring" } + - { name: "--unstable --taskdump --tokio_uring", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump --cfg tokio_uring" } steps: - uses: actions/checkout@v4 - name: Install Rust ${{ env.rust_nightly }} @@ -765,7 +800,7 @@ jobs: cargo hack check --all-features --ignore-private - name: "check --all-features --unstable -Z minimal-versions" env: - RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings + RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump --cfg tokio_uring -Dwarnings run: | # Remove dev-dependencies from Cargo.toml to prevent the next `cargo update` # from determining minimal versions based on dev-dependencies. @@ -817,8 +852,8 @@ jobs: run: - os: windows-latest - os: ubuntu-latest - RUSTFLAGS: --cfg tokio_taskdump - RUSTDOCFLAGS: --cfg tokio_taskdump + RUSTFLAGS: --cfg tokio_taskdump --cfg tokio_uring + RUSTDOCFLAGS: --cfg tokio_taskdump --cfg tokio_uring steps: - uses: actions/checkout@v4 diff --git a/Cargo.toml b/Cargo.toml index 618b310e3..7ecb6eea0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,4 +29,5 @@ unexpected_cfgs = { level = "warn", check-cfg = [ 'cfg(tokio_no_tuning_tests)', 'cfg(tokio_taskdump)', 'cfg(tokio_unstable)', + 'cfg(tokio_uring)', ] } diff --git a/spellcheck.dic b/spellcheck.dic index 39b98a7ee..a8140e3b7 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -302 +306 & + < @@ -70,6 +70,9 @@ connectionless coroutines cpu cpus +cqe +CQE +cqe's customizable Customizable datagram @@ -287,6 +290,7 @@ unsets Unsets unsynchronized untrusted +uring usecases Valgrind Varghese diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 44eb58d40..fd7223445 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -103,6 +103,12 @@ socket2 = { version = "0.5.5", optional = true, features = ["all"] } [target.'cfg(tokio_unstable)'.dependencies] tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } # Not in full +[target.'cfg(all(tokio_uring, target_os = "linux"))'.dependencies] +io-uring = { version = "0.7.6", default-features = false } +libc = { version = "0.2.168" } +mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"] } +slab = "0.4.9" + # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. [target.'cfg(tokio_taskdump)'.dependencies] diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index dc2c4309e..bfdd1ccfd 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -218,7 +218,7 @@ cfg_io_driver_impl! { pub(crate) mod interest; pub(crate) mod ready; - cfg_net! { + cfg_net_or_uring! { pub use interest::Interest; pub use ready::Ready; } diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index c9bd644bd..10da95708 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -120,11 +120,23 @@ macro_rules! cfg_io_driver { feature = "net", all(unix, feature = "process"), all(unix, feature = "signal"), + all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux" + ) ))] #[cfg_attr(docsrs, doc(cfg(any( feature = "net", all(unix, feature = "process"), all(unix, feature = "signal"), + all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux" + ) ))))] $item )* @@ -138,6 +150,12 @@ macro_rules! cfg_io_driver_impl { feature = "net", all(unix, feature = "process"), all(unix, feature = "signal"), + all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux" + ) ))] $item )* @@ -151,6 +169,12 @@ macro_rules! cfg_not_io_driver { feature = "net", all(unix, feature = "process"), all(unix, feature = "signal"), + all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux" + ) )))] $item )* @@ -279,6 +303,35 @@ macro_rules! cfg_net { } } +macro_rules! cfg_net_or_uring { + ($($item:item)*) => { + $( + #[cfg(any( + feature = "net", + all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux", + ) + ))] + #[cfg_attr( + docsrs, + doc(cfg(any( + feature = "net", + all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux", + ) + ))) + )] + $item + )* + } +} + macro_rules! cfg_net_unix { ($($item:item)*) => { $( @@ -616,3 +669,17 @@ macro_rules! cfg_metrics_variant { } } } + +macro_rules! cfg_tokio_uring { + ($($item:item)*) => { + $( + #[cfg(all( + tokio_uring, + feature = "rt", + feature = "fs", + target_os = "linux", + ))] + $item + )* + }; +} diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 3b84a8669..f06b70427 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -347,3 +347,7 @@ cfg_not_time! { (io_stack, ()) } } + +cfg_tokio_uring! { + pub(crate) mod op; +} diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs new file mode 100644 index 000000000..0d7ca9455 --- /dev/null +++ b/tokio/src/runtime/driver/op.rs @@ -0,0 +1,178 @@ +use crate::runtime::Handle; +use io_uring::cqueue; +use io_uring::squeue::Entry; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; +use std::{io, mem}; + +#[derive(Debug)] +pub(crate) enum CancelData {} + +#[derive(Debug)] +pub(crate) enum Lifecycle { + /// The operation has been submitted to uring and is currently in-flight + Submitted, + + /// The submitter is waiting for the completion of the operation + Waiting(Waker), + + /// The submitter no longer has interest in the operation result. The state + /// must be passed to the driver and held until the operation completes. + Cancelled(CancelData), + + /// The operation has completed with a single cqe result + Completed(io_uring::cqueue::Entry), +} + +pub(crate) enum State { + #[allow(dead_code)] + Initialize(Option), + Polled(usize), + Complete, +} + +pub(crate) struct Op { + // Handle to the runtime + handle: Handle, + // State of this Op + state: State, + // Per operation data. + data: Option, +} + +impl Op { + /// # Safety + /// + /// Callers must ensure that parameters of the entry (such as buffer) are valid and will + /// be valid for the entire duration of the operation, otherwise it may cause memory problems. + #[allow(dead_code)] + pub(crate) unsafe fn new(entry: Entry, data: T) -> Self { + let handle = Handle::current(); + Self { + handle, + data: Some(data), + state: State::Initialize(Some(entry)), + } + } + pub(crate) fn take_data(&mut self) -> Option { + self.data.take() + } +} + +impl Drop for Op { + fn drop(&mut self) { + match self.state { + // We've already dropped this Op. + State::Complete => (), + // We will cancel this Op. + State::Polled(index) => { + let data = self.take_data(); + let handle = &mut self.handle; + handle.inner.driver().io().cancel_op(index, data); + } + // This Op has not been polled yet. + // We don't need to do anything here. + State::Initialize(_) => (), + } + } +} + +/// A single CQE result +pub(crate) struct CqeResult { + #[allow(dead_code)] + pub(crate) result: io::Result, +} + +impl From for CqeResult { + fn from(cqe: cqueue::Entry) -> Self { + let res = cqe.result(); + let result = if res >= 0 { + Ok(res as u32) + } else { + Err(io::Error::from_raw_os_error(-res)) + }; + CqeResult { result } + } +} + +/// A trait that converts a CQE result into a usable value for each operation. +pub(crate) trait Completable { + type Output; + fn complete(self, cqe: CqeResult) -> io::Result; +} + +/// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation. +pub(crate) trait Cancellable { + fn cancell(self) -> CancelData; +} + +impl Unpin for Op {} + +impl Future for Op { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + let handle = &mut this.handle; + let driver = handle.inner.driver().io(); + + match &mut this.state { + State::Initialize(entry_opt) => { + let entry = entry_opt.take().expect("Entry must be present"); + let waker = cx.waker().clone(); + // SAFETY: entry is valid for the entire duration of the operation + let idx = unsafe { driver.register_op(entry, waker)? }; + this.state = State::Polled(idx); + Poll::Pending + } + + State::Polled(idx) => { + let mut ctx = driver.get_uring().lock(); + let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present"); + + match mem::replace(lifecycle, Lifecycle::Submitted) { + // Only replace the stored waker if it wouldn't wake the new one + Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => { + let waker = cx.waker().clone(); + *lifecycle = Lifecycle::Waiting(waker); + Poll::Pending + } + + Lifecycle::Waiting(prev) => { + *lifecycle = Lifecycle::Waiting(prev); + Poll::Pending + } + + Lifecycle::Completed(cqe) => { + // Clean up and complete the future + ctx.remove_op(*idx); + + this.state = State::Complete; + + drop(ctx); + + let data = this + .take_data() + .expect("Data must be present on completion"); + Poll::Ready(data.complete(cqe.into())) + } + + Lifecycle::Submitted => { + unreachable!("Submitted lifecycle should never be seen here"); + } + + Lifecycle::Cancelled(_) => { + unreachable!("Cancelled lifecycle should never be seen here"); + } + } + } + + State::Complete => { + panic!("Future polled after completion"); + } + } + } +} diff --git a/tokio/src/runtime/io/driver.rs b/tokio/src/runtime/io/driver.rs index 1139cbf58..fb496f140 100644 --- a/tokio/src/runtime/io/driver.rs +++ b/tokio/src/runtime/io/driver.rs @@ -2,6 +2,10 @@ cfg_signal_internal_and_unix! { mod signal; } +cfg_tokio_uring! { + mod uring; + use uring::UringContext; +} use crate::io::interest::Interest; use crate::io::ready::Ready; @@ -45,6 +49,9 @@ pub(crate) struct Handle { waker: mio::Waker, pub(crate) metrics: IoDriverMetrics, + + #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))] + pub(crate) uring_context: Mutex, } #[derive(Debug)] @@ -112,8 +119,15 @@ impl Driver { #[cfg(not(target_os = "wasi"))] waker, metrics: IoDriverMetrics::default(), + #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))] + uring_context: Mutex::new(UringContext::new()), }; + #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))] + { + handle.add_uring_source(Interest::READABLE)?; + } + Ok((driver, handle)) } @@ -183,6 +197,13 @@ impl Driver { } } + #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))] + { + let mut guard = handle.get_uring().lock(); + let ctx = &mut *guard; + ctx.dispatch_completions(); + } + handle.metrics.incr_ready_count_by(ready_count); } } diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs new file mode 100644 index 000000000..8a129bc49 --- /dev/null +++ b/tokio/src/runtime/io/driver/uring.rs @@ -0,0 +1,191 @@ +use io_uring::{squeue::Entry, IoUring}; +use mio::unix::SourceFd; +use slab::Slab; + +use crate::runtime::driver::op::{Cancellable, Lifecycle}; +use crate::{io::Interest, loom::sync::Mutex}; + +use super::{Handle, TOKEN_WAKEUP}; + +use std::os::fd::AsRawFd; +use std::{io, mem, task::Waker}; + +const DEFAULT_RING_SIZE: u32 = 256; + +pub(crate) struct UringContext { + pub(crate) uring: io_uring::IoUring, + pub(crate) ops: slab::Slab, +} + +impl UringContext { + pub(crate) fn new() -> Self { + Self { + ops: Slab::new(), + // TODO: make configurable + uring: IoUring::new(DEFAULT_RING_SIZE).unwrap(), + } + } + + pub(crate) fn dispatch_completions(&mut self) { + let ops = &mut self.ops; + let cq = self.uring.completion(); + + for cqe in cq { + let idx = cqe.user_data() as usize; + + match ops.get_mut(idx) { + Some(Lifecycle::Waiting(waker)) => { + waker.wake_by_ref(); + *ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe); + } + Some(Lifecycle::Cancelled(_)) => { + // Op future was cancelled, so we discard the result. + // We just remove the entry from the slab. + ops.remove(idx); + } + Some(other) => { + panic!("unexpected lifecycle for slot {}: {:?}", idx, other); + } + None => { + panic!("no op at index {}", idx); + } + } + } + + // `cq`'s drop gets called here, updating the latest head pointer + } + + pub(crate) fn submit(&mut self) -> io::Result<()> { + loop { + // Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS + match self.uring.submit() { + Ok(_) => { + return Ok(()); + } + + // If the submission queue is full, we dispatch completions and try again. + Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => { + self.dispatch_completions(); + } + // For other errors, we currently return the error as is. + Err(e) => { + return Err(e); + } + } + } + } + + pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle { + self.ops.remove(index) + } +} + +/// Drop the driver, cancelling any in-progress ops and waiting for them to terminate. +impl Drop for UringContext { + fn drop(&mut self) { + // Make sure we flush the submission queue before dropping the driver. + while !self.uring.submission().is_empty() { + self.submit().expect("Internal error when dropping driver"); + } + + let mut cancel_ops = Slab::new(); + let mut keys_to_move = Vec::new(); + + for (key, lifecycle) in self.ops.iter() { + match lifecycle { + Lifecycle::Waiting(_) | Lifecycle::Submitted | Lifecycle::Cancelled(_) => { + // these should be cancelled + keys_to_move.push(key); + } + // We don't wait for completed ops. + Lifecycle::Completed(_) => {} + } + } + + for key in keys_to_move { + let lifecycle = self.remove_op(key); + cancel_ops.insert(lifecycle); + } + + while !cancel_ops.is_empty() { + // Wait until at least one completion is available. + self.uring + .submit_and_wait(1) + .expect("Internal error when dropping driver"); + + for cqe in self.uring.completion() { + let idx = cqe.user_data() as usize; + cancel_ops.remove(idx); + } + } + } +} + +impl Handle { + #[allow(dead_code)] + pub(crate) fn add_uring_source(&self, interest: Interest) -> io::Result<()> { + // setup for io_uring + let uringfd = self.get_uring().lock().uring.as_raw_fd(); + let mut source = SourceFd(&uringfd); + self.registry + .register(&mut source, TOKEN_WAKEUP, interest.to_mio()) + } + + pub(crate) fn get_uring(&self) -> &Mutex { + &self.uring_context + } + + /// # Safety + /// + /// Callers must ensure that parameters of the entry (such as buffer) are valid and will + /// be valid for the entire duration of the operation, otherwise it may cause memory problems. + pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result { + let mut guard = self.get_uring().lock(); + let ctx = &mut *guard; + let index = ctx.ops.insert(Lifecycle::Waiting(waker)); + let entry = entry.user_data(index as u64); + + let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> { + if let Err(e) = ctx.submit() { + // Submission failed, remove the entry from the slab and return the error + ctx.remove_op(index); + return Err(e); + } + Ok(()) + }; + + // SAFETY: entry is valid for the entire duration of the operation + while unsafe { ctx.uring.submission().push(&entry).is_err() } { + // If the submission queue is full, flush it to the kernel + submit_or_remove(ctx)?; + } + + // Note: For now, we submit the entry immediately without utilizing batching. + submit_or_remove(ctx)?; + + Ok(index) + } + + // TODO: Remove this annotation when operations are actually supported + #[allow(unused_variables, unreachable_code)] + pub(crate) fn cancel_op(&self, index: usize, data: Option) { + let mut guard = self.get_uring().lock(); + let ctx = &mut *guard; + let ops = &mut ctx.ops; + let Some(lifecycle) = ops.get_mut(index) else { + // The corresponding index doesn't exist anymore, so this Op is already complete. + return; + }; + + // This Op will be cancelled. Here, we don't remove the lifecycle from the slab to keep + // uring data alive until the operation completes. + + let cancell_data = data.expect("Data should be present").cancell(); + match mem::replace(lifecycle, Lifecycle::Cancelled(cancell_data)) { + Lifecycle::Submitted | Lifecycle::Waiting(_) => (), + // The driver saw the completion, but it was never polled. + Lifecycle::Completed(_) => (), + prev => panic!("Unexpected state: {:?}", prev), + }; + } +} diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index 404359bf5..f58dd6b30 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -1,4 +1,7 @@ -#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))] +#![cfg_attr( + not(all(feature = "rt", feature = "net", tokio_uring)), + allow(dead_code) +)] mod driver; use driver::{Direction, Tick}; pub(crate) use driver::{Driver, Handle, ReadyEvent};