rt: extract basic_scheduler::Config (#4935)

This commit is contained in:
Carl Lerche 2022-08-24 04:49:30 -07:00 committed by GitHub
parent d720770b07
commit df28ac092f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 45 additions and 63 deletions

View File

@ -5,7 +5,7 @@ use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{Callback, HandleInner};
use crate::runtime::{Config, HandleInner};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
@ -68,24 +68,6 @@ pub(crate) struct Spawner {
shared: Arc<Shared>,
}
pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
pub(crate) global_queue_interval: u32,
/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,
/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,
/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
}
/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.

View File

@ -784,8 +784,7 @@ impl Builder {
}
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::basic_scheduler::Config;
use crate::runtime::{BasicScheduler, HandleInner, Kind};
use crate::runtime::{BasicScheduler, Config, HandleInner, Kind};
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
@ -903,7 +902,7 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{HandleInner, Kind, ThreadPool};
use crate::runtime::{Config, HandleInner, Kind, ThreadPool};
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
@ -926,10 +925,14 @@ cfg_rt_multi_thread! {
core_threads,
driver,
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
self.global_queue_interval,
self.event_interval,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
},
);
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

View File

@ -0,0 +1,19 @@
use crate::runtime::Callback;
pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
pub(crate) global_queue_interval: u32,
/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,
/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,
/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
}

View File

@ -202,6 +202,9 @@ cfg_rt! {
mod basic_scheduler;
use basic_scheduler::BasicScheduler;
mod config;
use config::Config;
mod blocking;
use blocking::BlockingPool;
#[cfg_attr(tokio_wasi, allow(unused_imports))]

View File

@ -15,7 +15,7 @@ pub(crate) use worker::block_in_place;
use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Callback, Driver, HandleInner};
use crate::runtime::{Config, Driver, HandleInner};
use std::fmt;
use std::future::Future;
@ -49,21 +49,10 @@ impl ThreadPool {
size: usize,
driver: Driver,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
config: Config,
) -> (ThreadPool, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(
size,
parker,
handle_inner,
before_park,
after_unpark,
global_queue_interval,
event_interval,
);
let (shared, launch) = worker::create(size, parker, handle_inner, config);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };

View File

@ -65,7 +65,7 @@ use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker};
use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{task, Config, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;
@ -117,12 +117,6 @@ struct Core {
/// Fast random number generator.
rand: FastRand,
/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u32,
/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u32,
}
/// State shared across all workers
@ -152,10 +146,8 @@ pub(super) struct Shared {
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,
/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
/// Scheduler configuration options
config: Config,
/// Collects metrics from the runtime.
pub(super) scheduler_metrics: SchedulerMetrics,
@ -202,10 +194,7 @@ pub(super) fn create(
size: usize,
park: Parker,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
config: Config,
) -> (Arc<Shared>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
@ -227,8 +216,6 @@ pub(super) fn create(
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(seed()),
global_queue_interval,
event_interval,
}));
remotes.push(Remote { steal, unpark });
@ -242,8 +229,7 @@ pub(super) fn create(
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
before_park,
after_unpark,
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
});
@ -468,7 +454,7 @@ impl Context {
}
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % core.event_interval == 0 {
if core.tick % self.worker.shared.config.event_interval == 0 {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
@ -492,7 +478,7 @@ impl Context {
/// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
/// after all the IOs get dispatched
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.shared.before_park {
if let Some(f) = &self.worker.shared.config.before_park {
f();
}
@ -511,7 +497,7 @@ impl Context {
}
}
if let Some(f) = &self.worker.shared.after_unpark {
if let Some(f) = &self.worker.shared.config.after_unpark {
f();
}
core
@ -555,7 +541,7 @@ impl Core {
/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % self.global_queue_interval == 0 {
if self.tick % worker.shared.config.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())