mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
fs: add io_uring open
operation (#7321)
This commit is contained in:
parent
7497561fed
commit
3e84a198e4
11
.github/workflows/ci.yml
vendored
11
.github/workflows/ci.yml
vendored
@ -359,6 +359,17 @@ jobs:
|
||||
include:
|
||||
- os: ubuntu-latest
|
||||
steps:
|
||||
- name: check if io-uring is supported in the CI environment
|
||||
run: |
|
||||
# Try to read the io-uring setting in the kernel config file.
|
||||
# https://github.com/torvalds/linux/blob/75f5f23f8787c5e184fcb2fbcd02d8e9317dc5e7/init/Kconfig#L1782-L1789
|
||||
CONFIG_FILE="/boot/config-$(uname -r)"
|
||||
echo "Checking $CONFIG_FILE for io-uring support"
|
||||
if ! grep -q "CONFIG_IO_URING=y" "$CONFIG_FILE"; then
|
||||
echo "Error: io_uring is not supported"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Rust ${{ env.rust_stable }}
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
|
@ -136,6 +136,7 @@ features = [
|
||||
[dev-dependencies]
|
||||
tokio-test = { version = "0.4.0", path = "../tokio-test" }
|
||||
tokio-stream = { version = "0.1", path = "../tokio-stream" }
|
||||
tokio-util = { version = "0.7", path = "../tokio-util", features = ["rt"] }
|
||||
futures = { version = "0.3.0", features = ["async-await"] }
|
||||
mockall = "0.13.0"
|
||||
async-stream = "0.3"
|
||||
@ -164,10 +165,10 @@ tracing-mock = "= 0.1.0-beta.1"
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
# enable unstable features in the documentation
|
||||
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
|
||||
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump", "--cfg", "tokio_uring"]
|
||||
# it's necessary to _also_ pass `--cfg tokio_unstable` and `--cfg tokio_taskdump`
|
||||
# to rustc, or else dependencies will not be enabled, and the docs build will fail.
|
||||
rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
|
||||
rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump", "--cfg", "tokio_uring"]
|
||||
|
||||
[package.metadata.playground]
|
||||
features = ["full", "test-util"]
|
||||
|
@ -237,6 +237,9 @@ pub use self::metadata::metadata;
|
||||
|
||||
mod open_options;
|
||||
pub use self::open_options::OpenOptions;
|
||||
cfg_tokio_uring! {
|
||||
pub(crate) use self::open_options::UringOpenOptions;
|
||||
}
|
||||
|
||||
mod read;
|
||||
pub use self::read::read;
|
||||
|
@ -3,6 +3,12 @@ use crate::fs::{asyncify, File};
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
|
||||
cfg_tokio_uring! {
|
||||
mod uring_open_options;
|
||||
pub(crate) use uring_open_options::UringOpenOptions;
|
||||
use crate::runtime::driver::op::Op;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod mock_open_options;
|
||||
#[cfg(test)]
|
||||
@ -79,7 +85,16 @@ use std::os::windows::fs::OpenOptionsExt;
|
||||
/// }
|
||||
/// ```
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OpenOptions(StdOpenOptions);
|
||||
pub struct OpenOptions {
|
||||
inner: Kind,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Kind {
|
||||
Std(StdOpenOptions),
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Uring(UringOpenOptions),
|
||||
}
|
||||
|
||||
impl OpenOptions {
|
||||
/// Creates a blank new set of options ready for configuration.
|
||||
@ -99,7 +114,12 @@ impl OpenOptions {
|
||||
/// let future = options.read(true).open("foo.txt");
|
||||
/// ```
|
||||
pub fn new() -> OpenOptions {
|
||||
OpenOptions(StdOpenOptions::new())
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
let inner = Kind::Uring(UringOpenOptions::new());
|
||||
#[cfg(not(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux")))]
|
||||
let inner = Kind::Std(StdOpenOptions::new());
|
||||
|
||||
OpenOptions { inner }
|
||||
}
|
||||
|
||||
/// Sets the option for read access.
|
||||
@ -128,7 +148,15 @@ impl OpenOptions {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
self.0.read(read);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.read(read);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.read(read);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@ -158,7 +186,15 @@ impl OpenOptions {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
self.0.write(write);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.write(write);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.write(write);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@ -217,7 +253,15 @@ impl OpenOptions {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn append(&mut self, append: bool) -> &mut OpenOptions {
|
||||
self.0.append(append);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.append(append);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.append(append);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@ -250,7 +294,15 @@ impl OpenOptions {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
self.0.truncate(truncate);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.truncate(truncate);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.truncate(truncate);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@ -286,7 +338,15 @@ impl OpenOptions {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
self.0.create(create);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.create(create);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.create(create);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@ -329,7 +389,15 @@ impl OpenOptions {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
self.0.create_new(create_new);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.create_new(create_new);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.create_new(create_new);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@ -366,6 +434,15 @@ impl OpenOptions {
|
||||
/// open files, too long filename, too many symbolic links in the
|
||||
/// specified path (Unix-like systems only), etc.
|
||||
///
|
||||
/// # io_uring support
|
||||
///
|
||||
/// On Linux, you can also use `io_uring` for executing system calls.
|
||||
/// To enable `io_uring`, you need to specify the `--cfg tokio_uring` flag
|
||||
/// at compile time and set the `Builder::enable_io_uring` runtime option.
|
||||
///
|
||||
/// Support for `io_uring` is currently experimental, so its behavior may
|
||||
/// change or it may be removed in future versions.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
@ -386,17 +463,36 @@ impl OpenOptions {
|
||||
/// [`Other`]: std::io::ErrorKind::Other
|
||||
/// [`PermissionDenied`]: std::io::ErrorKind::PermissionDenied
|
||||
pub async fn open(&self, path: impl AsRef<Path>) -> io::Result<File> {
|
||||
match &self.inner {
|
||||
Kind::Std(opts) => Self::std_open(opts, path).await,
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
let handle = crate::runtime::Handle::current();
|
||||
let driver_handle = handle.inner.driver().io();
|
||||
|
||||
if driver_handle.check_and_init()? {
|
||||
Op::open(path.as_ref(), opts)?.await
|
||||
} else {
|
||||
let opts = opts.clone().into();
|
||||
Self::std_open(&opts, path).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn std_open(opts: &StdOpenOptions, path: impl AsRef<Path>) -> io::Result<File> {
|
||||
let path = path.as_ref().to_owned();
|
||||
let opts = self.0.clone();
|
||||
let opts = opts.clone();
|
||||
|
||||
let std = asyncify(move || opts.open(path)).await?;
|
||||
Ok(File::from_std(std))
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the underlying `std::fs::OpenOptions`
|
||||
#[cfg(any(windows, unix))]
|
||||
#[cfg(windows)]
|
||||
pub(super) fn as_inner_mut(&mut self) -> &mut StdOpenOptions {
|
||||
&mut self.0
|
||||
match &mut self.inner {
|
||||
Kind::Std(ref mut opts) => opts,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -428,7 +524,15 @@ feature! {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
self.as_inner_mut().mode(mode);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.mode(mode);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.mode(mode);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@ -459,7 +563,15 @@ feature! {
|
||||
/// }
|
||||
/// ```
|
||||
pub fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
self.as_inner_mut().custom_flags(flags);
|
||||
match &mut self.inner {
|
||||
Kind::Std(opts) => {
|
||||
opts.custom_flags(flags);
|
||||
}
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
Kind::Uring(opts) => {
|
||||
opts.custom_flags(flags);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
@ -651,7 +763,13 @@ cfg_windows! {
|
||||
|
||||
impl From<StdOpenOptions> for OpenOptions {
|
||||
fn from(options: StdOpenOptions) -> OpenOptions {
|
||||
OpenOptions(options)
|
||||
OpenOptions {
|
||||
inner: Kind::Std(options),
|
||||
// TODO: Add support for converting `StdOpenOptions` to `UringOpenOptions`
|
||||
// if user enables the `--cfg tokio_uring`. It is blocked by:
|
||||
// * https://github.com/rust-lang/rust/issues/74943
|
||||
// * https://github.com/rust-lang/rust/issues/76801
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
128
tokio/src/fs/open_options/uring_open_options.rs
Normal file
128
tokio/src/fs/open_options/uring_open_options.rs
Normal file
@ -0,0 +1,128 @@
|
||||
use std::{io, os::unix::fs::OpenOptionsExt};
|
||||
|
||||
#[cfg(test)]
|
||||
use super::mock_open_options::MockOpenOptions as StdOpenOptions;
|
||||
#[cfg(not(test))]
|
||||
use std::fs::OpenOptions as StdOpenOptions;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct UringOpenOptions {
|
||||
pub(crate) read: bool,
|
||||
pub(crate) write: bool,
|
||||
pub(crate) append: bool,
|
||||
pub(crate) truncate: bool,
|
||||
pub(crate) create: bool,
|
||||
pub(crate) create_new: bool,
|
||||
pub(crate) mode: libc::mode_t,
|
||||
pub(crate) custom_flags: libc::c_int,
|
||||
}
|
||||
|
||||
impl UringOpenOptions {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
read: false,
|
||||
write: false,
|
||||
append: false,
|
||||
truncate: false,
|
||||
create: false,
|
||||
create_new: false,
|
||||
mode: 0o666,
|
||||
custom_flags: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn append(&mut self, append: bool) -> &mut Self {
|
||||
self.append = append;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn create(&mut self, create: bool) -> &mut Self {
|
||||
self.create = create;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn create_new(&mut self, create_new: bool) -> &mut Self {
|
||||
self.create_new = create_new;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn read(&mut self, read: bool) -> &mut Self {
|
||||
self.read = read;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn write(&mut self, write: bool) -> &mut Self {
|
||||
self.write = write;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn truncate(&mut self, truncate: bool) -> &mut Self {
|
||||
self.truncate = truncate;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn mode(&mut self, mode: u32) -> &mut Self {
|
||||
self.mode = mode as libc::mode_t;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn custom_flags(&mut self, flags: i32) -> &mut Self {
|
||||
self.custom_flags = flags;
|
||||
self
|
||||
}
|
||||
|
||||
// Equivalent to https://github.com/rust-lang/rust/blob/64c81fd10509924ca4da5d93d6052a65b75418a5/library/std/src/sys/fs/unix.rs#L1118-L1127
|
||||
pub(crate) fn access_mode(&self) -> io::Result<libc::c_int> {
|
||||
match (self.read, self.write, self.append) {
|
||||
(true, false, false) => Ok(libc::O_RDONLY),
|
||||
(false, true, false) => Ok(libc::O_WRONLY),
|
||||
(true, true, false) => Ok(libc::O_RDWR),
|
||||
(false, _, true) => Ok(libc::O_WRONLY | libc::O_APPEND),
|
||||
(true, _, true) => Ok(libc::O_RDWR | libc::O_APPEND),
|
||||
(false, false, false) => Err(io::Error::from_raw_os_error(libc::EINVAL)),
|
||||
}
|
||||
}
|
||||
|
||||
// Equivalent to https://github.com/rust-lang/rust/blob/64c81fd10509924ca4da5d93d6052a65b75418a5/library/std/src/sys/fs/unix.rs#L1129-L1151
|
||||
pub(crate) fn creation_mode(&self) -> io::Result<libc::c_int> {
|
||||
match (self.write, self.append) {
|
||||
(true, false) => {}
|
||||
(false, false) => {
|
||||
if self.truncate || self.create || self.create_new {
|
||||
return Err(io::Error::from_raw_os_error(libc::EINVAL));
|
||||
}
|
||||
}
|
||||
(_, true) => {
|
||||
if self.truncate && !self.create_new {
|
||||
return Err(io::Error::from_raw_os_error(libc::EINVAL));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(match (self.create, self.truncate, self.create_new) {
|
||||
(false, false, false) => 0,
|
||||
(true, false, false) => libc::O_CREAT,
|
||||
(false, true, false) => libc::O_TRUNC,
|
||||
(true, true, false) => libc::O_CREAT | libc::O_TRUNC,
|
||||
(_, _, true) => libc::O_CREAT | libc::O_EXCL,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<UringOpenOptions> for StdOpenOptions {
|
||||
fn from(value: UringOpenOptions) -> Self {
|
||||
let mut std = StdOpenOptions::new();
|
||||
|
||||
std.append(value.append);
|
||||
std.create(value.create);
|
||||
std.create_new(value.create_new);
|
||||
std.read(value.read);
|
||||
std.truncate(value.truncate);
|
||||
std.write(value.write);
|
||||
|
||||
std.mode(value.mode);
|
||||
std.custom_flags(value.custom_flags);
|
||||
|
||||
std
|
||||
}
|
||||
}
|
@ -293,3 +293,7 @@ cfg_io_blocking! {
|
||||
pub(crate) use crate::blocking::JoinHandle as Blocking;
|
||||
}
|
||||
}
|
||||
|
||||
cfg_tokio_uring! {
|
||||
pub(crate) mod uring;
|
||||
}
|
||||
|
2
tokio/src/io/uring/mod.rs
Normal file
2
tokio/src/io/uring/mod.rs
Normal file
@ -0,0 +1,2 @@
|
||||
pub(crate) mod open;
|
||||
pub(crate) mod utils;
|
53
tokio/src/io/uring/open.rs
Normal file
53
tokio/src/io/uring/open.rs
Normal file
@ -0,0 +1,53 @@
|
||||
use super::utils::cstr;
|
||||
use crate::{
|
||||
fs::UringOpenOptions,
|
||||
runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op},
|
||||
};
|
||||
use io_uring::{opcode, types};
|
||||
use std::{ffi::CString, io, os::fd::FromRawFd, path::Path};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Open {
|
||||
/// This field will be read by the kernel during the operation, so we
|
||||
/// need to ensure it is valid for the entire duration of the operation.
|
||||
#[allow(dead_code)]
|
||||
path: CString,
|
||||
}
|
||||
|
||||
impl Completable for Open {
|
||||
type Output = crate::fs::File;
|
||||
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
|
||||
let fd = cqe.result? as i32;
|
||||
let file = unsafe { crate::fs::File::from_raw_fd(fd) };
|
||||
Ok(file)
|
||||
}
|
||||
}
|
||||
|
||||
impl Cancellable for Open {
|
||||
fn cancel(self) -> CancelData {
|
||||
CancelData::Open(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Op<Open> {
|
||||
/// Submit a request to open a file.
|
||||
pub(crate) fn open(path: &Path, options: &UringOpenOptions) -> io::Result<Op<Open>> {
|
||||
let inner_opt = options;
|
||||
let path = cstr(path)?;
|
||||
|
||||
let custom_flags = inner_opt.custom_flags;
|
||||
let flags = libc::O_CLOEXEC
|
||||
| options.access_mode()?
|
||||
| options.creation_mode()?
|
||||
| (custom_flags & !libc::O_ACCMODE);
|
||||
|
||||
let open_op = opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr())
|
||||
.flags(flags)
|
||||
.mode(inner_opt.mode)
|
||||
.build();
|
||||
|
||||
// SAFETY: Parameters are valid for the entire duration of the operation
|
||||
let op = unsafe { Op::new(open_op, Open { path }) };
|
||||
Ok(op)
|
||||
}
|
||||
}
|
6
tokio/src/io/uring/utils.rs
Normal file
6
tokio/src/io/uring/utils.rs
Normal file
@ -0,0 +1,6 @@
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::{ffi::CString, io, path::Path};
|
||||
|
||||
pub(crate) fn cstr(p: &Path) -> io::Result<CString> {
|
||||
Ok(CString::new(p.as_os_str().as_bytes())?)
|
||||
}
|
@ -338,6 +338,10 @@ impl Builder {
|
||||
all(unix, feature = "signal")
|
||||
))]
|
||||
self.enable_io();
|
||||
|
||||
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
|
||||
self.enable_io_uring();
|
||||
|
||||
#[cfg(feature = "time")]
|
||||
self.enable_time();
|
||||
|
||||
@ -1578,6 +1582,31 @@ cfg_time! {
|
||||
}
|
||||
}
|
||||
|
||||
cfg_tokio_uring! {
|
||||
impl Builder {
|
||||
/// Enables the tokio's io_uring driver.
|
||||
///
|
||||
/// Doing this enables using io_uring operations on the runtime.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::runtime;
|
||||
///
|
||||
/// let rt = runtime::Builder::new_multi_thread()
|
||||
/// .enable_io_uring()
|
||||
/// .build()
|
||||
/// .unwrap();
|
||||
/// ```
|
||||
#[cfg_attr(docsrs, doc(cfg(tokio_uring)))]
|
||||
pub fn enable_io_uring(&mut self) -> &mut Self {
|
||||
// Currently, the uring flag is equivalent to `enable_io`.
|
||||
self.enable_io = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cfg_test_util! {
|
||||
impl Builder {
|
||||
/// Controls if the runtime's clock starts paused or advancing.
|
||||
|
@ -1,3 +1,4 @@
|
||||
use crate::io::uring::open::Open;
|
||||
use crate::runtime::Handle;
|
||||
use io_uring::cqueue;
|
||||
use io_uring::squeue::Entry;
|
||||
@ -9,7 +10,13 @@ use std::task::Waker;
|
||||
use std::{io, mem};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum CancelData {}
|
||||
pub(crate) enum CancelData {
|
||||
Open(
|
||||
// This field isn't accessed directly, but it holds cancellation data,
|
||||
// so `#[allow(dead_code)]` is needed.
|
||||
#[allow(dead_code)] Open,
|
||||
),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum Lifecycle {
|
||||
@ -21,14 +28,17 @@ pub(crate) enum Lifecycle {
|
||||
|
||||
/// The submitter no longer has interest in the operation result. The state
|
||||
/// must be passed to the driver and held until the operation completes.
|
||||
Cancelled(CancelData),
|
||||
Cancelled(
|
||||
// This field isn't accessed directly, but it holds cancellation data,
|
||||
// so `#[allow(dead_code)]` is needed.
|
||||
#[allow(dead_code)] CancelData,
|
||||
),
|
||||
|
||||
/// The operation has completed with a single cqe result
|
||||
Completed(io_uring::cqueue::Entry),
|
||||
}
|
||||
|
||||
pub(crate) enum State {
|
||||
#[allow(dead_code)]
|
||||
Initialize(Option<Entry>),
|
||||
Polled(usize),
|
||||
Complete,
|
||||
@ -48,7 +58,6 @@ impl<T: Cancellable> Op<T> {
|
||||
///
|
||||
/// Callers must ensure that parameters of the entry (such as buffer) are valid and will
|
||||
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
|
||||
let handle = Handle::current();
|
||||
Self {
|
||||
@ -82,7 +91,6 @@ impl<T: Cancellable> Drop for Op<T> {
|
||||
|
||||
/// A single CQE result
|
||||
pub(crate) struct CqeResult {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) result: io::Result<u32>,
|
||||
}
|
||||
|
||||
|
@ -147,26 +147,12 @@ impl Drop for UringContext {
|
||||
self.submit().expect("Internal error when dropping driver");
|
||||
}
|
||||
|
||||
let mut cancel_ops = Slab::new();
|
||||
let mut keys_to_move = Vec::new();
|
||||
let mut ops = std::mem::take(&mut self.ops);
|
||||
|
||||
for (key, lifecycle) in self.ops.iter() {
|
||||
match lifecycle {
|
||||
Lifecycle::Waiting(_) | Lifecycle::Submitted | Lifecycle::Cancelled(_) => {
|
||||
// these should be cancelled
|
||||
keys_to_move.push(key);
|
||||
}
|
||||
// We don't wait for completed ops.
|
||||
Lifecycle::Completed(_) => {}
|
||||
}
|
||||
}
|
||||
// Remove all completed ops since we don't need to wait for them.
|
||||
ops.retain(|_, lifecycle| !matches!(lifecycle, Lifecycle::Completed(_)));
|
||||
|
||||
for key in keys_to_move {
|
||||
let lifecycle = self.remove_op(key);
|
||||
cancel_ops.insert(lifecycle);
|
||||
}
|
||||
|
||||
while !cancel_ops.is_empty() {
|
||||
while !ops.is_empty() {
|
||||
// Wait until at least one completion is available.
|
||||
self.ring_mut()
|
||||
.submit_and_wait(1)
|
||||
@ -174,14 +160,13 @@ impl Drop for UringContext {
|
||||
|
||||
for cqe in self.ring_mut().completion() {
|
||||
let idx = cqe.user_data() as usize;
|
||||
cancel_ops.remove(idx);
|
||||
ops.remove(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handle {
|
||||
#[allow(dead_code)]
|
||||
fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
|
||||
let mut source = SourceFd(&uringfd);
|
||||
self.registry
|
||||
@ -265,14 +250,17 @@ impl Handle {
|
||||
submit_or_remove(ctx)?;
|
||||
}
|
||||
|
||||
// Ensure that the completion queue is not full before submitting the entry.
|
||||
while ctx.ring_mut().completion().is_full() {
|
||||
ctx.dispatch_completions();
|
||||
}
|
||||
|
||||
// Note: For now, we submit the entry immediately without utilizing batching.
|
||||
submit_or_remove(ctx)?;
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
// TODO: Remove this annotation when operations are actually supported
|
||||
#[allow(unused_variables, unreachable_code)]
|
||||
pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
|
||||
let mut guard = self.get_uring().lock();
|
||||
let ctx = &mut *guard;
|
||||
|
@ -323,7 +323,7 @@ pub(crate) mod context;
|
||||
|
||||
pub(crate) mod park;
|
||||
|
||||
mod driver;
|
||||
pub(crate) mod driver;
|
||||
|
||||
pub(crate) mod scheduler;
|
||||
|
||||
|
@ -58,7 +58,7 @@ async fn open_options_mode() {
|
||||
let mode = format!("{:?}", OpenOptions::new().mode(0o644));
|
||||
// TESTING HACK: use Debug output to check the stored data
|
||||
assert!(
|
||||
mode.contains("mode: 420 ") || mode.contains("mode: 0o000644 "),
|
||||
mode.contains("mode: 420") || mode.contains("mode: 0o000644"),
|
||||
"mode is: {mode}"
|
||||
);
|
||||
}
|
||||
@ -69,7 +69,7 @@ async fn open_options_custom_flags_linux() {
|
||||
// TESTING HACK: use Debug output to check the stored data
|
||||
assert!(
|
||||
format!("{:?}", OpenOptions::new().custom_flags(libc::O_TRUNC))
|
||||
.contains("custom_flags: 512,")
|
||||
.contains("custom_flags: 512")
|
||||
);
|
||||
}
|
||||
|
||||
|
151
tokio/tests/fs_uring.rs
Normal file
151
tokio/tests/fs_uring.rs
Normal file
@ -0,0 +1,151 @@
|
||||
//! Uring file operations tests.
|
||||
|
||||
#![cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
|
||||
|
||||
use futures::future::FutureExt;
|
||||
use std::sync::mpsc;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use std::{future::poll_fn, path::PathBuf};
|
||||
use tempfile::NamedTempFile;
|
||||
use tokio::{
|
||||
fs::OpenOptions,
|
||||
runtime::{Builder, Runtime},
|
||||
};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
|
||||
Box::new(move || {
|
||||
Builder::new_multi_thread()
|
||||
.worker_threads(n)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
})
|
||||
}
|
||||
|
||||
fn current_rt() -> Box<dyn Fn() -> Runtime> {
|
||||
Box::new(|| Builder::new_current_thread().enable_all().build().unwrap())
|
||||
}
|
||||
|
||||
fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
|
||||
vec![
|
||||
current_rt(),
|
||||
multi_rt(1),
|
||||
multi_rt(2),
|
||||
multi_rt(8),
|
||||
multi_rt(64),
|
||||
multi_rt(256),
|
||||
]
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shutdown_runtime_while_performing_io_uring_ops() {
|
||||
fn run(rt: Runtime) {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let (done_tx, done_rx) = mpsc::channel();
|
||||
|
||||
let (_tmp, path) = create_tmp_files(1);
|
||||
rt.spawn(async move {
|
||||
let path = path[0].clone();
|
||||
|
||||
// spawning a bunch of uring operations.
|
||||
loop {
|
||||
let path = path.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut opt = OpenOptions::new();
|
||||
opt.read(true);
|
||||
opt.open(&path).await.unwrap();
|
||||
});
|
||||
|
||||
// Avoid busy looping.
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
});
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let rt: Runtime = rx.recv().unwrap();
|
||||
rt.shutdown_timeout(Duration::from_millis(300));
|
||||
done_tx.send(()).unwrap();
|
||||
});
|
||||
|
||||
tx.send(rt).unwrap();
|
||||
done_rx.recv().unwrap();
|
||||
}
|
||||
|
||||
for rt in rt_combinations() {
|
||||
run(rt());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_many_files() {
|
||||
fn run(rt: Runtime) {
|
||||
const NUM_FILES: usize = 512;
|
||||
|
||||
let (_tmp_files, paths): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(NUM_FILES);
|
||||
|
||||
rt.block_on(async move {
|
||||
let tracker = TaskTracker::new();
|
||||
|
||||
for i in 0..10_000 {
|
||||
let path = paths.get(i % NUM_FILES).unwrap().clone();
|
||||
tracker.spawn(async move {
|
||||
let _file = OpenOptions::new().read(true).open(path).await.unwrap();
|
||||
});
|
||||
}
|
||||
tracker.close();
|
||||
tracker.wait().await;
|
||||
});
|
||||
}
|
||||
|
||||
for rt in rt_combinations() {
|
||||
run(rt());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cancel_op_future() {
|
||||
let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let handle = tokio::spawn(async move {
|
||||
poll_fn(|cx| {
|
||||
let opt = {
|
||||
let mut opt = tokio::fs::OpenOptions::new();
|
||||
opt.read(true);
|
||||
opt
|
||||
};
|
||||
|
||||
let fut = opt.open(&path[0]);
|
||||
|
||||
// If io_uring is enabled (and not falling back to the thread pool),
|
||||
// the first poll should return Pending.
|
||||
let _pending = Box::pin(fut).poll_unpin(cx);
|
||||
|
||||
tx.send(()).unwrap();
|
||||
|
||||
Poll::<()>::Pending
|
||||
})
|
||||
.await;
|
||||
});
|
||||
|
||||
// Wait for the first poll
|
||||
rx.recv().await.unwrap();
|
||||
|
||||
handle.abort();
|
||||
|
||||
let res = handle.await.unwrap_err();
|
||||
assert!(res.is_cancelled());
|
||||
}
|
||||
|
||||
fn create_tmp_files(num_files: usize) -> (Vec<NamedTempFile>, Vec<PathBuf>) {
|
||||
let mut files = Vec::with_capacity(num_files);
|
||||
for _ in 0..num_files {
|
||||
let tmp = NamedTempFile::new().unwrap();
|
||||
let path = tmp.path().to_path_buf();
|
||||
files.push((tmp, path));
|
||||
}
|
||||
|
||||
files.into_iter().unzip()
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user