rt: allow configuring I/O events capacity (#5186)

Adds a method `Builder::max_io_events_per_tick()` to the runtime builder. This can be used to configure the capacity of events that may be processed per OS poll.
This commit is contained in:
Divy Srivastava 2022-12-07 10:15:03 -08:00 committed by GitHub
parent 22cff80048
commit 36039d0bb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 7 deletions

View File

@ -294,7 +294,7 @@ pub(crate) mod test {
#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let (io_driver, io_handle) = IoDriver::new().unwrap();
let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
let handle = signal_driver.handle();

View File

@ -44,6 +44,7 @@ pub struct Builder {
/// Whether or not to enable the I/O driver
enable_io: bool,
nevents: usize,
/// Whether or not to enable the time driver
enable_time: bool,
@ -228,6 +229,7 @@ impl Builder {
// I/O defaults to "off"
enable_io: false,
nevents: 1024,
// Time defaults to "off"
enable_time: false,
@ -647,6 +649,7 @@ impl Builder {
enable_io: self.enable_io,
enable_time: self.enable_time,
start_paused: self.start_paused,
nevents: self.nevents,
}
}
@ -938,6 +941,25 @@ cfg_io_driver! {
self.enable_io = true;
self
}
/// Enables the I/O driver and configures the max number of events to be
/// processed per tick.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .max_io_events_per_tick(1024)
/// .build()
/// .unwrap();
/// ```
pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
self.nevents = capacity;
self
}
}
}

View File

@ -36,11 +36,12 @@ pub(crate) struct Cfg {
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
pub(crate) nevents: usize,
}
impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
@ -135,12 +136,12 @@ cfg_io_driver! {
Disabled(UnparkThread),
}
fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
#[cfg(loom)]
assert!(!enabled);
let ret = if enabled {
let (io_driver, io_handle) = crate::runtime::io::Driver::new()?;
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);
@ -201,7 +202,7 @@ cfg_not_io_driver! {
#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);
fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((IoStack(park_thread), unpark_thread, Default::default()))

View File

@ -104,7 +104,7 @@ fn _assert_kinds() {
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<(Driver, Handle)> {
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
@ -116,7 +116,7 @@ impl Driver {
let driver = Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(1024),
events: mio::Events::with_capacity(nevents),
poll,
resources: slab,
};