mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
rt: clean up arguments passed to basic scheduler (#4767)
Extracts the refactor from #4518. The basic scheduler takes many configuration options as arguments to the constructor. This cleans it up a bit by defining a `Config` struct and using that to pass arguments to the constructor.
This commit is contained in:
parent
8d29edca24
commit
f7a64538f7
@ -57,12 +57,6 @@ struct Core {
|
|||||||
|
|
||||||
/// Metrics batch
|
/// Metrics batch
|
||||||
metrics: MetricsBatch,
|
metrics: MetricsBatch,
|
||||||
|
|
||||||
/// How many ticks before pulling a task from the global/remote queue?
|
|
||||||
global_queue_interval: u32,
|
|
||||||
|
|
||||||
/// How many ticks before yielding to the driver for timer and I/O events?
|
|
||||||
event_interval: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -70,6 +64,20 @@ pub(crate) struct Spawner {
|
|||||||
shared: Arc<Shared>,
|
shared: Arc<Shared>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) struct Config {
|
||||||
|
/// How many ticks before pulling a task from the global/remote queue?
|
||||||
|
pub(crate) global_queue_interval: u32,
|
||||||
|
|
||||||
|
/// How many ticks before yielding to the driver for timer and I/O events?
|
||||||
|
pub(crate) event_interval: u32,
|
||||||
|
|
||||||
|
/// Callback for a worker parking itself
|
||||||
|
pub(crate) before_park: Option<Callback>,
|
||||||
|
|
||||||
|
/// Callback for a worker unparking itself
|
||||||
|
pub(crate) after_unpark: Option<Callback>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Scheduler state shared between threads.
|
/// Scheduler state shared between threads.
|
||||||
struct Shared {
|
struct Shared {
|
||||||
/// Remote run queue. None if the `Runtime` has been dropped.
|
/// Remote run queue. None if the `Runtime` has been dropped.
|
||||||
@ -87,11 +95,8 @@ struct Shared {
|
|||||||
/// Handle to I/O driver, timer, blocking pool, ...
|
/// Handle to I/O driver, timer, blocking pool, ...
|
||||||
handle_inner: HandleInner,
|
handle_inner: HandleInner,
|
||||||
|
|
||||||
/// Callback for a worker parking itself
|
/// Scheduler configuration options
|
||||||
before_park: Option<Callback>,
|
config: Config,
|
||||||
|
|
||||||
/// Callback for a worker unparking itself
|
|
||||||
after_unpark: Option<Callback>,
|
|
||||||
|
|
||||||
/// Keeps track of various runtime metrics.
|
/// Keeps track of various runtime metrics.
|
||||||
scheduler_metrics: SchedulerMetrics,
|
scheduler_metrics: SchedulerMetrics,
|
||||||
@ -117,14 +122,7 @@ const INITIAL_CAPACITY: usize = 64;
|
|||||||
scoped_thread_local!(static CURRENT: Context);
|
scoped_thread_local!(static CURRENT: Context);
|
||||||
|
|
||||||
impl BasicScheduler {
|
impl BasicScheduler {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> BasicScheduler {
|
||||||
driver: Driver,
|
|
||||||
handle_inner: HandleInner,
|
|
||||||
before_park: Option<Callback>,
|
|
||||||
after_unpark: Option<Callback>,
|
|
||||||
global_queue_interval: u32,
|
|
||||||
event_interval: u32,
|
|
||||||
) -> BasicScheduler {
|
|
||||||
let unpark = driver.unpark();
|
let unpark = driver.unpark();
|
||||||
|
|
||||||
let spawner = Spawner {
|
let spawner = Spawner {
|
||||||
@ -134,8 +132,7 @@ impl BasicScheduler {
|
|||||||
unpark,
|
unpark,
|
||||||
woken: AtomicBool::new(false),
|
woken: AtomicBool::new(false),
|
||||||
handle_inner,
|
handle_inner,
|
||||||
before_park,
|
config,
|
||||||
after_unpark,
|
|
||||||
scheduler_metrics: SchedulerMetrics::new(),
|
scheduler_metrics: SchedulerMetrics::new(),
|
||||||
worker_metrics: WorkerMetrics::new(),
|
worker_metrics: WorkerMetrics::new(),
|
||||||
}),
|
}),
|
||||||
@ -147,8 +144,6 @@ impl BasicScheduler {
|
|||||||
tick: 0,
|
tick: 0,
|
||||||
driver: Some(driver),
|
driver: Some(driver),
|
||||||
metrics: MetricsBatch::new(),
|
metrics: MetricsBatch::new(),
|
||||||
global_queue_interval,
|
|
||||||
event_interval,
|
|
||||||
})));
|
})));
|
||||||
|
|
||||||
BasicScheduler {
|
BasicScheduler {
|
||||||
@ -302,7 +297,7 @@ impl Context {
|
|||||||
fn park(&self, mut core: Box<Core>) -> Box<Core> {
|
fn park(&self, mut core: Box<Core>) -> Box<Core> {
|
||||||
let mut driver = core.driver.take().expect("driver missing");
|
let mut driver = core.driver.take().expect("driver missing");
|
||||||
|
|
||||||
if let Some(f) = &self.spawner.shared.before_park {
|
if let Some(f) = &self.spawner.shared.config.before_park {
|
||||||
// Incorrect lint, the closures are actually different types so `f`
|
// Incorrect lint, the closures are actually different types so `f`
|
||||||
// cannot be passed as an argument to `enter`.
|
// cannot be passed as an argument to `enter`.
|
||||||
#[allow(clippy::redundant_closure)]
|
#[allow(clippy::redundant_closure)]
|
||||||
@ -325,7 +320,7 @@ impl Context {
|
|||||||
core.metrics.returned_from_park();
|
core.metrics.returned_from_park();
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(f) = &self.spawner.shared.after_unpark {
|
if let Some(f) = &self.spawner.shared.config.after_unpark {
|
||||||
// Incorrect lint, the closures are actually different types so `f`
|
// Incorrect lint, the closures are actually different types so `f`
|
||||||
// cannot be passed as an argument to `enter`.
|
// cannot be passed as an argument to `enter`.
|
||||||
#[allow(clippy::redundant_closure)]
|
#[allow(clippy::redundant_closure)]
|
||||||
@ -515,12 +510,12 @@ impl CoreGuard<'_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _ in 0..core.event_interval {
|
for _ in 0..core.spawner.shared.config.event_interval {
|
||||||
// Get and increment the current tick
|
// Get and increment the current tick
|
||||||
let tick = core.tick;
|
let tick = core.tick;
|
||||||
core.tick = core.tick.wrapping_add(1);
|
core.tick = core.tick.wrapping_add(1);
|
||||||
|
|
||||||
let entry = if tick % core.global_queue_interval == 0 {
|
let entry = if tick % core.spawner.shared.config.global_queue_interval == 0 {
|
||||||
core.spawner.pop().or_else(|| core.tasks.pop_front())
|
core.spawner.pop().or_else(|| core.tasks.pop_front())
|
||||||
} else {
|
} else {
|
||||||
core.tasks.pop_front().or_else(|| core.spawner.pop())
|
core.tasks.pop_front().or_else(|| core.spawner.pop())
|
||||||
|
@ -632,6 +632,7 @@ impl Builder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
|
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
|
||||||
|
use crate::runtime::basic_scheduler::Config;
|
||||||
use crate::runtime::{BasicScheduler, HandleInner, Kind};
|
use crate::runtime::{BasicScheduler, HandleInner, Kind};
|
||||||
|
|
||||||
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
|
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
|
||||||
@ -655,10 +656,12 @@ impl Builder {
|
|||||||
let scheduler = BasicScheduler::new(
|
let scheduler = BasicScheduler::new(
|
||||||
driver,
|
driver,
|
||||||
handle_inner,
|
handle_inner,
|
||||||
self.before_park.clone(),
|
Config {
|
||||||
self.after_unpark.clone(),
|
before_park: self.before_park.clone(),
|
||||||
self.global_queue_interval,
|
after_unpark: self.after_unpark.clone(),
|
||||||
self.event_interval,
|
global_queue_interval: self.global_queue_interval,
|
||||||
|
event_interval: self.event_interval,
|
||||||
|
},
|
||||||
);
|
);
|
||||||
let spawner = Spawner::Basic(scheduler.spawner().clone());
|
let spawner = Spawner::Basic(scheduler.spawner().clone());
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user