rt: refactor current-thread scheduler (take 2) (#4395)

Re-applies #4377 and fixes the bug resulting in Hyper's double panic.

Revert: #4394

Original PR:

This PR does some refactoring to the current-thread scheduler bringing it closer to the structure of the
multi-threaded scheduler. More specifically, the core scheduler data is stored in a Core struct and that
struct is passed around as a "token" indicating permission to do work. The Core structure is also stored
in the thread-local context.

This refactor is intended to support #4373, making it easier to track counters in more locations in the
current-thread scheduler.

I tried to keep commits small, but the "set Core in thread-local context" is both the biggest commit and
the key one.
This commit is contained in:
Carl Lerche 2022-01-11 18:39:56 -08:00 committed by GitHub
parent 1d698b5a90
commit e951d55720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 280 additions and 220 deletions

View File

@ -3,10 +3,12 @@ use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
use crate::util::{waker_ref, Wake, WakerRef};
use std::cell::RefCell;
@ -19,13 +21,12 @@ use std::task::Poll::{Pending, Ready};
use std::time::Duration;
/// Executes tasks on the current thread
pub(crate) struct BasicScheduler<P: Park> {
/// Inner state guarded by a mutex that is shared
/// between all `block_on` calls.
inner: Mutex<Option<Inner<P>>>,
pub(crate) struct BasicScheduler {
/// Core scheduler data is acquired by a thread entering `block_on`.
core: AtomicCell<Core>,
/// Notifier for waking up other threads to steal the
/// parker.
/// driver.
notify: Notify,
/// Sendable task spawner
@ -38,15 +39,11 @@ pub(crate) struct BasicScheduler<P: Park> {
context_guard: Option<EnterGuard>,
}
/// The inner scheduler that owns the task queue and the main parker P.
struct Inner<P: Park> {
/// Data required for executing the scheduler. The struct is passed around to
/// a function that will perform the scheduling work and acts as a capability token.
struct Core {
/// Scheduler run queue
///
/// When the scheduler is executed, the queue is removed from `self` and
/// moved into `Context`.
///
/// This indirection is to allow `BasicScheduler` to be `Send`.
tasks: Option<Tasks>,
tasks: VecDeque<task::Notified<Arc<Shared>>>,
/// Sendable task spawner
spawner: Spawner,
@ -54,13 +51,10 @@ struct Inner<P: Park> {
/// Current tick
tick: u8,
/// Thread park handle
park: P,
/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
/// Runtime driver
///
/// The driver is removed before starting to park the thread
driver: Option<Driver>,
/// Stats batcher
stats: WorkerStatsBatcher,
@ -71,13 +65,6 @@ pub(crate) struct Spawner {
shared: Arc<Shared>,
}
struct Tasks {
/// Local run queue.
///
/// Tasks notified from the current thread are pushed into this queue.
queue: VecDeque<task::Notified<Arc<Shared>>>,
}
/// A remote scheduler entry.
///
/// These are filled in by remote threads sending instructions to the scheduler.
@ -100,22 +87,29 @@ struct Shared {
owned: OwnedTasks<Arc<Shared>>,
/// Unpark the blocked thread.
unpark: Box<dyn Unpark>,
unpark: <Driver as Park>::Unpark,
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,
/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
/// Keeps track of various runtime stats.
stats: RuntimeStats,
}
/// Thread-local context.
struct Context {
/// Shared scheduler state
shared: Arc<Shared>,
/// Handle to the spawner
spawner: Spawner,
/// Local queue
tasks: RefCell<Tasks>,
/// Scheduler core, enabling the holder of `Context` to execute the
/// scheduler.
core: RefCell<Option<Box<Core>>>,
}
/// Initial queue capacity.
@ -133,38 +127,36 @@ const REMOTE_FIRST_INTERVAL: u8 = 31;
// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);
impl<P: Park> BasicScheduler<P> {
impl BasicScheduler {
pub(crate) fn new(
park: P,
driver: Driver,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());
) -> BasicScheduler {
let unpark = driver.unpark();
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
unpark,
woken: AtomicBool::new(false),
before_park,
after_unpark,
stats: RuntimeStats::new(1),
}),
};
let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
let core = AtomicCell::new(Some(Box::new(Core {
tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
spawner: spawner.clone(),
tick: 0,
park,
before_park,
after_unpark,
driver: Some(driver),
stats: WorkerStatsBatcher::new(0),
}));
})));
BasicScheduler {
inner,
core,
notify: Notify::new(),
spawner,
context_guard: None,
@ -178,12 +170,12 @@ impl<P: Park> BasicScheduler<P> {
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);
// Attempt to steal the dedicated parker and block_on the future if we can there,
// otherwise, lets select on a notification that the parker is available
// or the future is complete.
// Attempt to steal the scheduler core and block_on the future if we can
// there, otherwise, lets select on a notification that the core is
// available or the future is complete.
loop {
if let Some(inner) = &mut self.take_inner() {
return inner.block_on(future);
if let Some(core) = self.take_core() {
return core.block_on(future);
} else {
let mut enter = crate::runtime::enter(false);
@ -210,11 +202,14 @@ impl<P: Park> BasicScheduler<P> {
}
}
fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let inner = self.inner.lock().take()?;
fn take_core(&self) -> Option<CoreGuard<'_>> {
let core = self.core.take()?;
Some(InnerGuard {
inner: Some(inner),
Some(CoreGuard {
context: Context {
spawner: self.spawner.clone(),
core: RefCell::new(Some(core)),
},
basic_scheduler: self,
})
}
@ -224,156 +219,109 @@ impl<P: Park> BasicScheduler<P> {
}
}
impl<P: Park> Inner<P> {
/// Blocks on the provided future and drives the runtime's driver.
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
'outer: loop {
if scheduler.spawner.reset_woken() {
scheduler.stats.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
}
for _ in 0..MAX_TASKS_PER_TICK {
// Get and increment the current tick
let tick = scheduler.tick;
scheduler.tick = scheduler.tick.wrapping_add(1);
let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
scheduler.spawner.pop().or_else(|| {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(RemoteMsg::Schedule)
})
} else {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(RemoteMsg::Schedule)
.or_else(|| scheduler.spawner.pop())
};
let entry = match entry {
Some(entry) => entry,
None => {
if let Some(f) = &scheduler.before_park {
f();
}
// This check will fail if `before_park` spawns a task for us to run
// instead of parking the thread
if context.tasks.borrow_mut().queue.is_empty() {
// Park until the thread is signaled
scheduler.stats.about_to_park();
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler.park.park().expect("failed to park");
scheduler.stats.returned_from_park();
}
if let Some(f) = &scheduler.after_unpark {
f();
}
// Try polling the `block_on` future next
continue 'outer;
}
};
match entry {
RemoteMsg::Schedule(task) => {
scheduler.stats.incr_poll_count();
let task = context.shared.owned.assert_owner(task);
crate::coop::budget(|| task.run())
}
}
}
// Yield to the park, this drives the timer and pulls any pending
// I/O events.
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler
.park
.park_timeout(Duration::from_millis(0))
.expect("failed to park");
}
})
}
}
/// Enters the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local.
fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
F: FnOnce(&mut Inner<P>, &Context) -> R,
P: Park,
{
// Ensures the run queue is placed back in the `BasicScheduler` instance
// once `block_on` returns.`
struct Guard<'a, P: Park> {
context: Option<Context>,
scheduler: &'a mut Inner<P>,
impl Context {
/// Execute the closure with the given scheduler core stored in the
/// thread-local context.
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.stats.incr_poll_count();
self.enter(core, || crate::coop::budget(f))
}
impl<P: Park> Drop for Guard<'_, P> {
fn drop(&mut self) {
let Context { tasks, .. } = self.context.take().expect("context missing");
self.scheduler.tasks = Some(tasks.into_inner());
/// Blocks the current thread until an event is received by the driver,
/// including I/O events, timer events, ...
fn park(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
if let Some(f) = &self.spawner.shared.before_park {
// Incorrect lint, the closures are actually different types so `f`
// cannot be passed as an argument to `enter`.
#[allow(clippy::redundant_closure)]
let (c, _) = self.enter(core, || f());
core = c;
}
// This check will fail if `before_park` spawns a task for us to run
// instead of parking the thread
if core.tasks.is_empty() {
// Park until the thread is signaled
core.stats.about_to_park();
core.stats.submit(&core.spawner.shared.stats);
let (c, _) = self.enter(core, || {
driver.park().expect("failed to park");
});
core = c;
core.stats.returned_from_park();
}
if let Some(f) = &self.spawner.shared.after_unpark {
// Incorrect lint, the closures are actually different types so `f`
// cannot be passed as an argument to `enter`.
#[allow(clippy::redundant_closure)]
let (c, _) = self.enter(core, || f());
core = c;
}
core.driver = Some(driver);
core
}
// Remove `tasks` from `self` and place it in a `Context`.
let tasks = scheduler.tasks.take().expect("invalid state");
/// Checks the driver for new events without blocking the thread.
fn park_yield(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");
let guard = Guard {
context: Some(Context {
shared: scheduler.spawner.shared.clone(),
tasks: RefCell::new(tasks),
}),
scheduler,
};
core.stats.submit(&core.spawner.shared.stats);
let (mut core, _) = self.enter(core, || {
driver
.park_timeout(Duration::from_millis(0))
.expect("failed to park");
});
let context = guard.context.as_ref().unwrap();
let scheduler = &mut *guard.scheduler;
core.driver = Some(driver);
core
}
CURRENT.set(context, || f(scheduler, context))
fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
// Store the scheduler core in the thread-local context
//
// A drop-guard is employed at a higher level.
*self.core.borrow_mut() = Some(core);
// Execute the closure while tracking the execution budget
let ret = f();
// Take the scheduler core back
let core = self.core.borrow_mut().take().expect("core missing");
(core, ret)
}
}
impl<P: Park> Drop for BasicScheduler<P> {
impl Drop for BasicScheduler {
fn drop(&mut self) {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.
let mut inner = match self.inner.lock().take() {
Some(inner) => inner,
let core = match self.take_core() {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};
enter(&mut inner, |scheduler, context| {
core.enter(|mut core, context| {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
context.shared.owned.close_and_shutdown_all();
context.spawner.shared.owned.close_and_shutdown_all();
// Drain local queue
// We already shut down every task, so we just need to drop the task.
for task in context.tasks.borrow_mut().queue.drain(..) {
while let Some(task) = core.tasks.pop_front() {
drop(task);
}
// Drain remote queue and set it to None
let remote_queue = scheduler.spawner.shared.queue.lock().take();
let remote_queue = core.spawner.shared.queue.lock().take();
// Using `Option::take` to replace the shared queue with `None`.
// We already shut down every task, so we just need to drop the task.
@ -387,12 +335,14 @@ impl<P: Park> Drop for BasicScheduler<P> {
}
}
assert!(context.shared.owned.is_empty());
assert!(context.spawner.shared.owned.is_empty());
(core, ())
});
}
}
impl<P: Park> fmt::Debug for BasicScheduler<P> {
impl fmt::Debug for BasicScheduler {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
}
@ -455,8 +405,14 @@ impl Schedule for Arc<Shared> {
fn schedule(&self, task: task::Notified<Self>) {
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
cx.tasks.borrow_mut().queue.push_back(task);
Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => {
let mut core = cx.core.borrow_mut();
// If `None`, the runtime is shutting down, so there is no need
// to schedule the task.
if let Some(core) = core.as_mut() {
core.tasks.push_back(task);
}
}
_ => {
// If the queue is None, then the runtime has shut down. We
@ -484,35 +440,107 @@ impl Wake for Shared {
}
}
// ===== InnerGuard =====
// ===== CoreGuard =====
/// Used to ensure we always place the Inner value
/// back into its slot in `BasicScheduler`, even if the
/// future panics.
struct InnerGuard<'a, P: Park> {
inner: Option<Inner<P>>,
basic_scheduler: &'a BasicScheduler<P>,
/// Used to ensure we always place the `Core` value back into its slot in
/// `BasicScheduler`, even if the future panics.
struct CoreGuard<'a> {
context: Context,
basic_scheduler: &'a BasicScheduler,
}
impl<P: Park> InnerGuard<'_, P> {
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
// The only time inner gets set to `None` is if we have dropped
// already so this unwrap is safe.
self.inner.as_mut().unwrap().block_on(future)
impl CoreGuard<'_> {
fn block_on<F: Future>(self, future: F) -> F::Output {
self.enter(|mut core, context| {
let _enter = crate::runtime::enter(false);
let waker = context.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
'outer: loop {
if core.spawner.reset_woken() {
let (c, res) = context.run_task(core, || future.as_mut().poll(&mut cx));
core = c;
if let Ready(v) = res {
return (core, v);
}
}
for _ in 0..MAX_TASKS_PER_TICK {
// Get and increment the current tick
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);
let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
core.spawner
.pop()
.or_else(|| core.tasks.pop_front().map(RemoteMsg::Schedule))
} else {
core.tasks
.pop_front()
.map(RemoteMsg::Schedule)
.or_else(|| core.spawner.pop())
};
let entry = match entry {
Some(entry) => entry,
None => {
core = context.park(core);
// Try polling the `block_on` future next
continue 'outer;
}
};
match entry {
RemoteMsg::Schedule(task) => {
let task = context.spawner.shared.owned.assert_owner(task);
let (c, _) = context.run_task(core, || {
task.run();
});
core = c;
}
}
}
// Yield to the driver, this drives the timer and pulls any
// pending I/O events.
core = context.park_yield(core);
}
})
}
/// Enters the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local.
fn enter<F, R>(self, f: F) -> R
where
F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
{
// Remove `core` from `context` to pass into the closure.
let core = self.context.core.borrow_mut().take().expect("core missing");
// Call the closure and place `core` back
let (core, ret) = CURRENT.set(&self.context, || f(core, &self.context));
*self.context.core.borrow_mut() = Some(core);
ret
}
}
impl<P: Park> Drop for InnerGuard<'_, P> {
impl Drop for CoreGuard<'_> {
fn drop(&mut self) {
if let Some(scheduler) = self.inner.take() {
let mut lock = self.basic_scheduler.inner.lock();
if let Some(core) = self.context.core.borrow_mut().take() {
// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
lock.replace(scheduler);
self.basic_scheduler.core.set(core);
// Wake up other possible threads that could steal
// the dedicated parker P.
// Wake up other possible threads that could steal the driver.
self.basic_scheduler.notify.notify_one()
}
}

View File

@ -283,7 +283,7 @@ cfg_rt! {
#[derive(Debug)]
enum Kind {
/// Execute all tasks on the current-thread.
CurrentThread(BasicScheduler<driver::Driver>),
CurrentThread(BasicScheduler),
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-multi-thread")]

View File

@ -34,20 +34,22 @@ fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
#[test]
fn block_on_num_polls() {
loom::model(|| {
// we expect at most 3 number of polls because there are
// three points at which we poll the future. At any of these
// points it can be ready:
// we expect at most 4 number of polls because there are three points at
// which we poll the future and an opportunity for a false-positive.. At
// any of these points it can be ready:
//
// - when we fail to steal the parker and we block on a
// notification that it is available.
// - when we fail to steal the parker and we block on a notification
// that it is available.
//
// - when we steal the parker and we schedule the future
//
// - when the future is woken up and we have ran the max
// number of tasks for the current tick or there are no
// more tasks to run.
// - when the future is woken up and we have ran the max number of tasks
// for the current tick or there are no more tasks to run.
//
let at_most = 3;
// - a thread is notified that the parker is available but a third
// thread acquires it before the notified thread can.
//
let at_most = 4;
let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
let rt2 = rt1.clone();

View File

@ -1,8 +1,5 @@
//! Threadpool
mod atomic_cell;
use atomic_cell::AtomicCell;
mod idle;
use self::idle::Idle;

View File

@ -66,8 +66,9 @@ use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::thread_pool::Idle;
use crate::runtime::{queue, task, Callback};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;
use std::cell::RefCell;

View File

@ -3,7 +3,7 @@ use crate::loom::sync::atomic::AtomicPtr;
use std::ptr;
use std::sync::atomic::Ordering::AcqRel;
pub(super) struct AtomicCell<T> {
pub(crate) struct AtomicCell<T> {
data: AtomicPtr<T>,
}
@ -11,22 +11,22 @@ unsafe impl<T: Send> Send for AtomicCell<T> {}
unsafe impl<T: Send> Sync for AtomicCell<T> {}
impl<T> AtomicCell<T> {
pub(super) fn new(data: Option<Box<T>>) -> AtomicCell<T> {
pub(crate) fn new(data: Option<Box<T>>) -> AtomicCell<T> {
AtomicCell {
data: AtomicPtr::new(to_raw(data)),
}
}
pub(super) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
pub(crate) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
let old = self.data.swap(to_raw(val), AcqRel);
from_raw(old)
}
pub(super) fn set(&self, val: Box<T>) {
pub(crate) fn set(&self, val: Box<T>) {
let _ = self.swap(Some(val));
}
pub(super) fn take(&self) -> Option<Box<T>> {
pub(crate) fn take(&self) -> Option<Box<T>> {
self.swap(None)
}
}

View File

@ -3,6 +3,9 @@ cfg_io_driver! {
pub(crate) mod slab;
}
#[cfg(feature = "rt")]
pub(crate) mod atomic_cell;
#[cfg(any(
// io driver uses `WakeList` directly
feature = "net",

View File

@ -168,6 +168,35 @@ fn drop_tasks_in_context() {
assert!(SUCCESS.load(Ordering::SeqCst));
}
#[test]
#[should_panic(expected = "boom")]
fn wake_in_drop_after_panic() {
let (tx, rx) = oneshot::channel::<()>();
struct WakeOnDrop(Option<oneshot::Sender<()>>);
impl Drop for WakeOnDrop {
fn drop(&mut self) {
self.0.take().unwrap().send(()).unwrap();
}
}
let rt = rt();
rt.spawn(async move {
let _wake_on_drop = WakeOnDrop(Some(tx));
// wait forever
futures::future::pending::<()>().await;
});
let _join = rt.spawn(async move { rx.await });
rt.block_on(async {
tokio::task::yield_now().await;
panic!("boom");
});
}
#[test]
#[should_panic(
expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."