rt: add infrastructure code for io_uring (#7320)

This commit is contained in:
Motoyuki Kimura 2025-05-21 02:36:52 +09:00 committed by GitHub
parent ea30a5ea5e
commit 327bec2caf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 516 additions and 6 deletions

View File

@ -350,6 +350,39 @@ jobs:
# the unstable cfg to RustDoc
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump
test-uring:
name: test tokio full --cfg tokio_uring
needs: basics
runs-on: ${{ matrix.os }}
strategy:
matrix:
include:
- os: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ env.rust_stable }}
- name: Install cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-nextest
- uses: Swatinem/rust-cache@v2
- name: test tokio full --cfg tokio_uring
run: |
set -euxo pipefail
cargo nextest run --all-features
cargo test --doc --all-features
working-directory: tokio
env:
RUSTFLAGS: --cfg tokio_uring -Dwarnings
# in order to run doctests for unstable features, we must also pass
# the unstable cfg to RustDoc
RUSTDOCFLAGS: --cfg tokio_uring
check-unstable-mt-counters:
name: check tokio full --internal-mt-counters
needs: basics
@ -703,6 +736,8 @@ jobs:
- { name: "--unstable", rustflags: "--cfg tokio_unstable -Dwarnings" }
# Try with unstable and taskdump feature flags
- { name: "--unstable --taskdump", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump" }
- { name: "--tokio_uring", rustflags: "-Dwarnings --cfg tokio_uring" }
- { name: "--unstable --taskdump --tokio_uring", rustflags: "--cfg tokio_unstable -Dwarnings --cfg tokio_taskdump --cfg tokio_uring" }
steps:
- uses: actions/checkout@v4
- name: Install Rust ${{ env.rust_nightly }}
@ -765,7 +800,7 @@ jobs:
cargo hack check --all-features --ignore-private
- name: "check --all-features --unstable -Z minimal-versions"
env:
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump --cfg tokio_uring -Dwarnings
run: |
# Remove dev-dependencies from Cargo.toml to prevent the next `cargo update`
# from determining minimal versions based on dev-dependencies.
@ -817,8 +852,8 @@ jobs:
run:
- os: windows-latest
- os: ubuntu-latest
RUSTFLAGS: --cfg tokio_taskdump
RUSTDOCFLAGS: --cfg tokio_taskdump
RUSTFLAGS: --cfg tokio_taskdump --cfg tokio_uring
RUSTDOCFLAGS: --cfg tokio_taskdump --cfg tokio_uring
steps:
- uses: actions/checkout@v4

View File

@ -29,4 +29,5 @@ unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(tokio_no_tuning_tests)',
'cfg(tokio_taskdump)',
'cfg(tokio_unstable)',
'cfg(tokio_uring)',
] }

View File

@ -1,4 +1,4 @@
302
306
&
+
<
@ -70,6 +70,9 @@ connectionless
coroutines
cpu
cpus
cqe
CQE
cqe's
customizable
Customizable
datagram
@ -287,6 +290,7 @@ unsets
Unsets
unsynchronized
untrusted
uring
usecases
Valgrind
Varghese

View File

@ -103,6 +103,12 @@ socket2 = { version = "0.5.5", optional = true, features = ["all"] }
[target.'cfg(tokio_unstable)'.dependencies]
tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } # Not in full
[target.'cfg(all(tokio_uring, target_os = "linux"))'.dependencies]
io-uring = { version = "0.7.6", default-features = false }
libc = { version = "0.2.168" }
mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"] }
slab = "0.4.9"
# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_taskdump)'.dependencies]

View File

@ -218,7 +218,7 @@ cfg_io_driver_impl! {
pub(crate) mod interest;
pub(crate) mod ready;
cfg_net! {
cfg_net_or_uring! {
pub use interest::Interest;
pub use ready::Ready;
}

View File

@ -120,11 +120,23 @@ macro_rules! cfg_io_driver {
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
all(
tokio_uring,
feature = "rt",
feature = "fs",
target_os = "linux"
)
))]
#[cfg_attr(docsrs, doc(cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
all(
tokio_uring,
feature = "rt",
feature = "fs",
target_os = "linux"
)
))))]
$item
)*
@ -138,6 +150,12 @@ macro_rules! cfg_io_driver_impl {
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
all(
tokio_uring,
feature = "rt",
feature = "fs",
target_os = "linux"
)
))]
$item
)*
@ -151,6 +169,12 @@ macro_rules! cfg_not_io_driver {
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
all(
tokio_uring,
feature = "rt",
feature = "fs",
target_os = "linux"
)
)))]
$item
)*
@ -279,6 +303,35 @@ macro_rules! cfg_net {
}
}
macro_rules! cfg_net_or_uring {
($($item:item)*) => {
$(
#[cfg(any(
feature = "net",
all(
tokio_uring,
feature = "rt",
feature = "fs",
target_os = "linux",
)
))]
#[cfg_attr(
docsrs,
doc(cfg(any(
feature = "net",
all(
tokio_uring,
feature = "rt",
feature = "fs",
target_os = "linux",
)
)))
)]
$item
)*
}
}
macro_rules! cfg_net_unix {
($($item:item)*) => {
$(
@ -616,3 +669,17 @@ macro_rules! cfg_metrics_variant {
}
}
}
macro_rules! cfg_tokio_uring {
($($item:item)*) => {
$(
#[cfg(all(
tokio_uring,
feature = "rt",
feature = "fs",
target_os = "linux",
))]
$item
)*
};
}

View File

@ -347,3 +347,7 @@ cfg_not_time! {
(io_stack, ())
}
}
cfg_tokio_uring! {
pub(crate) mod op;
}

View File

@ -0,0 +1,178 @@
use crate::runtime::Handle;
use io_uring::cqueue;
use io_uring::squeue::Entry;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use std::{io, mem};
#[derive(Debug)]
pub(crate) enum CancelData {}
#[derive(Debug)]
pub(crate) enum Lifecycle {
/// The operation has been submitted to uring and is currently in-flight
Submitted,
/// The submitter is waiting for the completion of the operation
Waiting(Waker),
/// The submitter no longer has interest in the operation result. The state
/// must be passed to the driver and held until the operation completes.
Cancelled(CancelData),
/// The operation has completed with a single cqe result
Completed(io_uring::cqueue::Entry),
}
pub(crate) enum State {
#[allow(dead_code)]
Initialize(Option<Entry>),
Polled(usize),
Complete,
}
pub(crate) struct Op<T: Cancellable> {
// Handle to the runtime
handle: Handle,
// State of this Op
state: State,
// Per operation data.
data: Option<T>,
}
impl<T: Cancellable> Op<T> {
/// # Safety
///
/// Callers must ensure that parameters of the entry (such as buffer) are valid and will
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
#[allow(dead_code)]
pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
let handle = Handle::current();
Self {
handle,
data: Some(data),
state: State::Initialize(Some(entry)),
}
}
pub(crate) fn take_data(&mut self) -> Option<T> {
self.data.take()
}
}
impl<T: Cancellable> Drop for Op<T> {
fn drop(&mut self) {
match self.state {
// We've already dropped this Op.
State::Complete => (),
// We will cancel this Op.
State::Polled(index) => {
let data = self.take_data();
let handle = &mut self.handle;
handle.inner.driver().io().cancel_op(index, data);
}
// This Op has not been polled yet.
// We don't need to do anything here.
State::Initialize(_) => (),
}
}
}
/// A single CQE result
pub(crate) struct CqeResult {
#[allow(dead_code)]
pub(crate) result: io::Result<u32>,
}
impl From<cqueue::Entry> for CqeResult {
fn from(cqe: cqueue::Entry) -> Self {
let res = cqe.result();
let result = if res >= 0 {
Ok(res as u32)
} else {
Err(io::Error::from_raw_os_error(-res))
};
CqeResult { result }
}
}
/// A trait that converts a CQE result into a usable value for each operation.
pub(crate) trait Completable {
type Output;
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output>;
}
/// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation.
pub(crate) trait Cancellable {
fn cancell(self) -> CancelData;
}
impl<T: Cancellable> Unpin for Op<T> {}
impl<T: Cancellable + Completable + Send> Future for Op<T> {
type Output = io::Result<T::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let handle = &mut this.handle;
let driver = handle.inner.driver().io();
match &mut this.state {
State::Initialize(entry_opt) => {
let entry = entry_opt.take().expect("Entry must be present");
let waker = cx.waker().clone();
// SAFETY: entry is valid for the entire duration of the operation
let idx = unsafe { driver.register_op(entry, waker)? };
this.state = State::Polled(idx);
Poll::Pending
}
State::Polled(idx) => {
let mut ctx = driver.get_uring().lock();
let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
match mem::replace(lifecycle, Lifecycle::Submitted) {
// Only replace the stored waker if it wouldn't wake the new one
Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => {
let waker = cx.waker().clone();
*lifecycle = Lifecycle::Waiting(waker);
Poll::Pending
}
Lifecycle::Waiting(prev) => {
*lifecycle = Lifecycle::Waiting(prev);
Poll::Pending
}
Lifecycle::Completed(cqe) => {
// Clean up and complete the future
ctx.remove_op(*idx);
this.state = State::Complete;
drop(ctx);
let data = this
.take_data()
.expect("Data must be present on completion");
Poll::Ready(data.complete(cqe.into()))
}
Lifecycle::Submitted => {
unreachable!("Submitted lifecycle should never be seen here");
}
Lifecycle::Cancelled(_) => {
unreachable!("Cancelled lifecycle should never be seen here");
}
}
}
State::Complete => {
panic!("Future polled after completion");
}
}
}
}

View File

@ -2,6 +2,10 @@
cfg_signal_internal_and_unix! {
mod signal;
}
cfg_tokio_uring! {
mod uring;
use uring::UringContext;
}
use crate::io::interest::Interest;
use crate::io::ready::Ready;
@ -45,6 +49,9 @@ pub(crate) struct Handle {
waker: mio::Waker,
pub(crate) metrics: IoDriverMetrics,
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
pub(crate) uring_context: Mutex<UringContext>,
}
#[derive(Debug)]
@ -112,8 +119,15 @@ impl Driver {
#[cfg(not(target_os = "wasi"))]
waker,
metrics: IoDriverMetrics::default(),
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
uring_context: Mutex::new(UringContext::new()),
};
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
{
handle.add_uring_source(Interest::READABLE)?;
}
Ok((driver, handle))
}
@ -183,6 +197,13 @@ impl Driver {
}
}
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
{
let mut guard = handle.get_uring().lock();
let ctx = &mut *guard;
ctx.dispatch_completions();
}
handle.metrics.incr_ready_count_by(ready_count);
}
}

View File

@ -0,0 +1,191 @@
use io_uring::{squeue::Entry, IoUring};
use mio::unix::SourceFd;
use slab::Slab;
use crate::runtime::driver::op::{Cancellable, Lifecycle};
use crate::{io::Interest, loom::sync::Mutex};
use super::{Handle, TOKEN_WAKEUP};
use std::os::fd::AsRawFd;
use std::{io, mem, task::Waker};
const DEFAULT_RING_SIZE: u32 = 256;
pub(crate) struct UringContext {
pub(crate) uring: io_uring::IoUring,
pub(crate) ops: slab::Slab<Lifecycle>,
}
impl UringContext {
pub(crate) fn new() -> Self {
Self {
ops: Slab::new(),
// TODO: make configurable
uring: IoUring::new(DEFAULT_RING_SIZE).unwrap(),
}
}
pub(crate) fn dispatch_completions(&mut self) {
let ops = &mut self.ops;
let cq = self.uring.completion();
for cqe in cq {
let idx = cqe.user_data() as usize;
match ops.get_mut(idx) {
Some(Lifecycle::Waiting(waker)) => {
waker.wake_by_ref();
*ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe);
}
Some(Lifecycle::Cancelled(_)) => {
// Op future was cancelled, so we discard the result.
// We just remove the entry from the slab.
ops.remove(idx);
}
Some(other) => {
panic!("unexpected lifecycle for slot {}: {:?}", idx, other);
}
None => {
panic!("no op at index {}", idx);
}
}
}
// `cq`'s drop gets called here, updating the latest head pointer
}
pub(crate) fn submit(&mut self) -> io::Result<()> {
loop {
// Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS
match self.uring.submit() {
Ok(_) => {
return Ok(());
}
// If the submission queue is full, we dispatch completions and try again.
Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => {
self.dispatch_completions();
}
// For other errors, we currently return the error as is.
Err(e) => {
return Err(e);
}
}
}
}
pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle {
self.ops.remove(index)
}
}
/// Drop the driver, cancelling any in-progress ops and waiting for them to terminate.
impl Drop for UringContext {
fn drop(&mut self) {
// Make sure we flush the submission queue before dropping the driver.
while !self.uring.submission().is_empty() {
self.submit().expect("Internal error when dropping driver");
}
let mut cancel_ops = Slab::new();
let mut keys_to_move = Vec::new();
for (key, lifecycle) in self.ops.iter() {
match lifecycle {
Lifecycle::Waiting(_) | Lifecycle::Submitted | Lifecycle::Cancelled(_) => {
// these should be cancelled
keys_to_move.push(key);
}
// We don't wait for completed ops.
Lifecycle::Completed(_) => {}
}
}
for key in keys_to_move {
let lifecycle = self.remove_op(key);
cancel_ops.insert(lifecycle);
}
while !cancel_ops.is_empty() {
// Wait until at least one completion is available.
self.uring
.submit_and_wait(1)
.expect("Internal error when dropping driver");
for cqe in self.uring.completion() {
let idx = cqe.user_data() as usize;
cancel_ops.remove(idx);
}
}
}
}
impl Handle {
#[allow(dead_code)]
pub(crate) fn add_uring_source(&self, interest: Interest) -> io::Result<()> {
// setup for io_uring
let uringfd = self.get_uring().lock().uring.as_raw_fd();
let mut source = SourceFd(&uringfd);
self.registry
.register(&mut source, TOKEN_WAKEUP, interest.to_mio())
}
pub(crate) fn get_uring(&self) -> &Mutex<UringContext> {
&self.uring_context
}
/// # Safety
///
/// Callers must ensure that parameters of the entry (such as buffer) are valid and will
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
let mut guard = self.get_uring().lock();
let ctx = &mut *guard;
let index = ctx.ops.insert(Lifecycle::Waiting(waker));
let entry = entry.user_data(index as u64);
let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> {
if let Err(e) = ctx.submit() {
// Submission failed, remove the entry from the slab and return the error
ctx.remove_op(index);
return Err(e);
}
Ok(())
};
// SAFETY: entry is valid for the entire duration of the operation
while unsafe { ctx.uring.submission().push(&entry).is_err() } {
// If the submission queue is full, flush it to the kernel
submit_or_remove(ctx)?;
}
// Note: For now, we submit the entry immediately without utilizing batching.
submit_or_remove(ctx)?;
Ok(index)
}
// TODO: Remove this annotation when operations are actually supported
#[allow(unused_variables, unreachable_code)]
pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
let mut guard = self.get_uring().lock();
let ctx = &mut *guard;
let ops = &mut ctx.ops;
let Some(lifecycle) = ops.get_mut(index) else {
// The corresponding index doesn't exist anymore, so this Op is already complete.
return;
};
// This Op will be cancelled. Here, we don't remove the lifecycle from the slab to keep
// uring data alive until the operation completes.
let cancell_data = data.expect("Data should be present").cancell();
match mem::replace(lifecycle, Lifecycle::Cancelled(cancell_data)) {
Lifecycle::Submitted | Lifecycle::Waiting(_) => (),
// The driver saw the completion, but it was never polled.
Lifecycle::Completed(_) => (),
prev => panic!("Unexpected state: {:?}", prev),
};
}
}

View File

@ -1,4 +1,7 @@
#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))]
#![cfg_attr(
not(all(feature = "rt", feature = "net", tokio_uring)),
allow(dead_code)
)]
mod driver;
use driver::{Direction, Tick};
pub(crate) use driver::{Driver, Handle, ReadyEvent};