rt: move driver unpark out of multi-thread parker (#5026)

This patch removes the driver Unpark handle out of the multi-thread
parker and passes a reference in when it is needed. This is a first step
towards getting rid of the separate driver unpark handle in favor of
just using the regular driver handle. Because the regular driver handle
is owned at a higher level (at the top of the worker struct).
This commit is contained in:
Carl Lerche 2022-09-17 16:46:44 -07:00 committed by GitHub
parent cdd6eeaf70
commit cba5c1009e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 14 additions and 17 deletions

View File

@ -39,11 +39,13 @@ impl MultiThread {
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let driver_unpark = driver.unpark();
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
driver_unpark,
blocking_spawner,
seed_generator,
config,

View File

@ -5,7 +5,7 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::driver::{Driver, Unpark};
use crate::runtime::driver::{self, Driver};
use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
@ -42,15 +42,10 @@ const NOTIFIED: usize = 3;
struct Shared {
/// Shared driver. Only one thread at a time can use this
driver: TryLock<Driver>,
/// Unpark handle
handle: Unpark,
}
impl Parker {
pub(crate) fn new(driver: Driver) -> Parker {
let handle = driver.unpark();
Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
@ -58,7 +53,6 @@ impl Parker {
condvar: Condvar::new(),
shared: Arc::new(Shared {
driver: TryLock::new(driver),
handle,
}),
}),
}
@ -102,8 +96,8 @@ impl Clone for Parker {
}
impl Unparker {
pub(crate) fn unpark(&self) {
self.inner.unpark();
pub(crate) fn unpark(&self, driver: &driver::Unpark) {
self.inner.unpark(driver);
}
}
@ -201,7 +195,7 @@ impl Inner {
}
}
fn unpark(&self) {
fn unpark(&self, driver: &driver::Unpark) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
@ -211,7 +205,7 @@ impl Inner {
EMPTY => {} // no one was waiting
NOTIFIED => {} // already unparked
PARKED_CONDVAR => self.unpark_condvar(),
PARKED_DRIVER => self.unpark_driver(),
PARKED_DRIVER => driver.unpark(),
actual => panic!("inconsistent state in unpark; actual = {}", actual),
}
}
@ -233,10 +227,6 @@ impl Inner {
self.condvar.notify_one()
}
fn unpark_driver(&self) {
self.shared.handle.unpark();
}
fn shutdown(&self) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();

View File

@ -124,6 +124,9 @@ pub(super) struct Shared {
/// how they communicate between each other.
remotes: Box<[Remote]>,
/// Used to unpark threads blocked on the I/O driver
driver: driver::Unpark,
/// Global task queue used for:
/// 1. Submit work to the scheduler while **not** currently on a worker thread.
/// 2. Submit work to the scheduler when a worker run queue is saturated
@ -190,6 +193,7 @@ pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
driver_unpark: driver::Unpark,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
@ -223,6 +227,7 @@ pub(super) fn create(
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
driver: driver_unpark,
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
@ -774,13 +779,13 @@ impl Shared {
fn notify_parked(&self) {
if let Some(index) = self.idle.worker_to_notify() {
self.remotes[index].unpark.unpark();
self.remotes[index].unpark.unpark(&self.driver);
}
}
fn notify_all(&self) {
for remote in &self.remotes[..] {
remote.unpark.unpark();
remote.unpark.unpark(&self.driver);
}
}