rt: split runtime::context into multiple files (#5768)

This PR restructures `runtime::context` into multiple files by component and feature flag. The goal is to reduce code defined in macros and make each context component more manageable.

There should be no behavior changes except tweaking how the RNG seed is set. Instead of putting it in `set_current`, we set it when entering the runtime. This aligns better with the feature's original intent, enabling users to make a runtime's RNG deterministic. The seed should not be changed by `Handle::enter()`, so there is no need to have the code in `context::set_current`.
This commit is contained in:
Carl Lerche 2023-06-06 08:37:11 -07:00 committed by GitHub
parent e75ca93d30
commit 1204da7300
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 383 additions and 346 deletions

View File

@ -7,21 +7,33 @@ use std::cell::Cell;
use crate::util::rand::FastRand;
cfg_rt! {
mod blocking;
pub(crate) use blocking::{disallow_block_in_place, try_enter_blocking_region, BlockingRegionGuard};
mod current;
pub(crate) use current::{with_current, try_set_current, SetCurrentGuard};
mod runtime;
pub(crate) use runtime::{EnterRuntime, enter_runtime};
mod scoped;
use scoped::Scoped;
use crate::runtime::{scheduler, task::Id};
use std::cell::RefCell;
use std::marker::PhantomData;
use std::task::Waker;
use std::time::Duration;
cfg_taskdump! {
use crate::runtime::task::trace;
}
}
cfg_rt_multi_thread! {
mod runtime_mt;
pub(crate) use runtime_mt::{current_enter_context, exit_runtime};
}
struct Context {
/// Uniquely identifies the current thread
#[cfg(feature = "rt")]
@ -125,10 +137,7 @@ pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, A
}
cfg_rt! {
use crate::runtime::{ThreadId, TryCurrentError};
use crate::util::rand::RngSeed;
use std::fmt;
use crate::runtime::ThreadId;
pub(crate) fn thread_id() -> Result<ThreadId, AccessError> {
CONTEXT.try_with(|ctx| {
@ -143,45 +152,6 @@ cfg_rt! {
})
}
#[derive(Debug, Clone, Copy)]
#[must_use]
pub(crate) enum EnterRuntime {
/// Currently in a runtime context.
#[cfg_attr(not(feature = "rt"), allow(dead_code))]
Entered { allow_block_in_place: bool },
/// Not in a runtime context **or** a blocking region.
NotEntered,
}
#[derive(Debug)]
#[must_use]
pub(crate) struct SetCurrentGuard {
old_handle: Option<scheduler::Handle>,
old_seed: RngSeed,
// Should not be `Send` since it must be *dropped* on the same thread as
// created, but there is no issue with sync access.
_p: PhantomData<crate::util::markers::SyncNotSend>,
}
/// Guard tracking that a caller has entered a runtime context.
#[must_use]
pub(crate) struct EnterRuntimeGuard {
/// Tracks that the current thread has entered a blocking function call.
pub(crate) blocking: BlockingRegionGuard,
#[allow(dead_code)] // Only tracking the guard.
pub(crate) handle: SetCurrentGuard,
}
/// Guard tracking that a caller has entered a blocking region.
#[must_use]
pub(crate) struct BlockingRegionGuard {
_p: PhantomData<RefCell<()>>,
}
pub(crate) struct DisallowBlockInPlaceGuard(bool);
pub(crate) fn set_current_task_id(id: Option<Id>) -> Option<Id> {
CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None)
}
@ -190,93 +160,6 @@ cfg_rt! {
CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None)
}
pub(crate) fn with_current<F, R>(f: F) -> Result<R, TryCurrentError>
where
F: FnOnce(&scheduler::Handle) -> R,
{
match CONTEXT.try_with(|ctx| ctx.handle.borrow().as_ref().map(f)) {
Ok(Some(ret)) => Ok(ret),
Ok(None) => Err(TryCurrentError::new_no_context()),
Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
}
}
/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: crate::runtime::scheduler::Handle
pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option<SetCurrentGuard> {
CONTEXT.try_with(|ctx| ctx.set_current(handle)).ok()
}
/// Marks the current thread as being within the dynamic extent of an
/// executor.
#[track_caller]
pub(crate) fn enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> EnterRuntimeGuard {
if let Some(enter) = try_enter_runtime(handle, allow_block_in_place) {
return enter;
}
panic!(
"Cannot start a runtime from within a runtime. This happens \
because a function (like `block_on`) attempted to block the \
current thread while the thread is being used to drive \
asynchronous tasks."
);
}
/// Tries to enter a runtime context, returns `None` if already in a runtime
/// context.
fn try_enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> Option<EnterRuntimeGuard> {
CONTEXT.with(|c| {
if c.runtime.get().is_entered() {
None
} else {
// Set the entered flag
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });
Some(EnterRuntimeGuard {
blocking: BlockingRegionGuard::new(),
handle: c.set_current(handle),
})
}
})
}
pub(crate) fn try_enter_blocking_region() -> Option<BlockingRegionGuard> {
CONTEXT.try_with(|c| {
if c.runtime.get().is_entered() {
None
} else {
Some(BlockingRegionGuard::new())
}
// If accessing the thread-local fails, the thread is terminating
// and thread-locals are being destroyed. Because we don't know if
// we are currently in a runtime or not, we default to being
// permissive.
}).unwrap_or_else(|_| Some(BlockingRegionGuard::new()))
}
/// Disallows blocking in the current runtime context until the guard is dropped.
pub(crate) fn disallow_block_in_place() -> DisallowBlockInPlaceGuard {
let reset = CONTEXT.with(|c| {
if let EnterRuntime::Entered {
allow_block_in_place: true,
} = c.runtime.get()
{
c.runtime.set(EnterRuntime::Entered {
allow_block_in_place: false,
});
true
} else {
false
}
});
DisallowBlockInPlaceGuard(reset)
}
#[track_caller]
pub(crate) fn defer(waker: &Waker) {
with_scheduler(|maybe_scheduler| {
@ -299,127 +182,6 @@ cfg_rt! {
CONTEXT.with(|c| c.scheduler.with(f))
}
impl Context {
fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard {
let rng_seed = handle.seed_generator().next_seed();
let old_handle = self.handle.borrow_mut().replace(handle.clone());
let mut rng = self.rng.get().unwrap_or_else(FastRand::new);
let old_seed = rng.replace_seed(rng_seed);
self.rng.set(Some(rng));
SetCurrentGuard {
old_handle,
old_seed,
_p: PhantomData,
}
}
}
impl Drop for SetCurrentGuard {
fn drop(&mut self) {
CONTEXT.with(|ctx| {
*ctx.handle.borrow_mut() = self.old_handle.take();
let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new);
rng.replace_seed(self.old_seed.clone());
ctx.rng.set(Some(rng));
});
}
}
impl fmt::Debug for EnterRuntimeGuard {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Enter").finish()
}
}
impl Drop for EnterRuntimeGuard {
fn drop(&mut self) {
CONTEXT.with(|c| {
assert!(c.runtime.get().is_entered());
c.runtime.set(EnterRuntime::NotEntered);
});
}
}
impl BlockingRegionGuard {
fn new() -> BlockingRegionGuard {
BlockingRegionGuard { _p: PhantomData }
}
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, AccessError>
where
F: std::future::Future,
{
use crate::runtime::park::CachedParkThread;
let mut park = CachedParkThread::new();
park.block_on(f)
}
/// Blocks the thread on the specified future for **at most** `timeout`
///
/// If the future completes before `timeout`, the result is returned. If
/// `timeout` elapses, then `Err` is returned.
pub(crate) fn block_on_timeout<F>(&mut self, f: F, timeout: Duration) -> Result<F::Output, ()>
where
F: std::future::Future,
{
use crate::runtime::park::CachedParkThread;
use std::task::Context;
use std::task::Poll::Ready;
use std::time::Instant;
let mut park = CachedParkThread::new();
let waker = park.waker().map_err(|_| ())?;
let mut cx = Context::from_waker(&waker);
pin!(f);
let when = Instant::now() + timeout;
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
let now = Instant::now();
if now >= when {
return Err(());
}
park.park_timeout(when - now);
}
}
}
impl Drop for DisallowBlockInPlaceGuard {
fn drop(&mut self) {
if self.0 {
// XXX: Do we want some kind of assertion here, or is "best effort" okay?
CONTEXT.with(|c| {
if let EnterRuntime::Entered {
allow_block_in_place: false,
} = c.runtime.get()
{
c.runtime.set(EnterRuntime::Entered {
allow_block_in_place: true,
});
}
})
}
}
}
impl EnterRuntime {
pub(crate) fn is_entered(self) -> bool {
matches!(self, EnterRuntime::Entered { .. })
}
}
cfg_taskdump! {
/// SAFETY: Callers of this function must ensure that trace frames always
/// form a valid linked list.
@ -428,42 +190,3 @@ cfg_rt! {
}
}
}
// Forces the current "entered" state to be cleared while the closure
// is executed.
//
// # Warning
//
// This is hidden for a reason. Do not use without fully understanding
// executors. Misusing can easily cause your program to deadlock.
cfg_rt_multi_thread! {
/// Returns true if in a runtime context.
pub(crate) fn current_enter_context() -> EnterRuntime {
CONTEXT.with(|c| c.runtime.get())
}
pub(crate) fn exit_runtime<F: FnOnce() -> R, R>(f: F) -> R {
// Reset in case the closure panics
struct Reset(EnterRuntime);
impl Drop for Reset {
fn drop(&mut self) {
CONTEXT.with(|c| {
assert!(!c.runtime.get().is_entered(), "closure claimed permanent executor");
c.runtime.set(self.0);
});
}
}
let was = CONTEXT.with(|c| {
let e = c.runtime.get();
assert!(e.is_entered(), "asked to exit when not entered");
c.runtime.set(EnterRuntime::NotEntered);
e
});
let _reset = Reset(was);
// dropping _reset after f() will reset ENTERED
f()
}
}

View File

@ -0,0 +1,121 @@
use super::{EnterRuntime, CONTEXT};
use crate::loom::thread::AccessError;
use crate::util::markers::NotSendOrSync;
use std::marker::PhantomData;
use std::time::Duration;
/// Guard tracking that a caller has entered a blocking region.
#[must_use]
pub(crate) struct BlockingRegionGuard {
_p: PhantomData<NotSendOrSync>,
}
pub(crate) struct DisallowBlockInPlaceGuard(bool);
pub(crate) fn try_enter_blocking_region() -> Option<BlockingRegionGuard> {
CONTEXT
.try_with(|c| {
if c.runtime.get().is_entered() {
None
} else {
Some(BlockingRegionGuard::new())
}
// If accessing the thread-local fails, the thread is terminating
// and thread-locals are being destroyed. Because we don't know if
// we are currently in a runtime or not, we default to being
// permissive.
})
.unwrap_or_else(|_| Some(BlockingRegionGuard::new()))
}
/// Disallows blocking in the current runtime context until the guard is dropped.
pub(crate) fn disallow_block_in_place() -> DisallowBlockInPlaceGuard {
let reset = CONTEXT.with(|c| {
if let EnterRuntime::Entered {
allow_block_in_place: true,
} = c.runtime.get()
{
c.runtime.set(EnterRuntime::Entered {
allow_block_in_place: false,
});
true
} else {
false
}
});
DisallowBlockInPlaceGuard(reset)
}
impl BlockingRegionGuard {
pub(super) fn new() -> BlockingRegionGuard {
BlockingRegionGuard { _p: PhantomData }
}
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, AccessError>
where
F: std::future::Future,
{
use crate::runtime::park::CachedParkThread;
let mut park = CachedParkThread::new();
park.block_on(f)
}
/// Blocks the thread on the specified future for **at most** `timeout`
///
/// If the future completes before `timeout`, the result is returned. If
/// `timeout` elapses, then `Err` is returned.
pub(crate) fn block_on_timeout<F>(&mut self, f: F, timeout: Duration) -> Result<F::Output, ()>
where
F: std::future::Future,
{
use crate::runtime::park::CachedParkThread;
use std::task::Context;
use std::task::Poll::Ready;
use std::time::Instant;
let mut park = CachedParkThread::new();
let waker = park.waker().map_err(|_| ())?;
let mut cx = Context::from_waker(&waker);
pin!(f);
let when = Instant::now() + timeout;
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
let now = Instant::now();
if now >= when {
return Err(());
}
park.park_timeout(when - now);
}
}
}
impl Drop for DisallowBlockInPlaceGuard {
fn drop(&mut self) {
if self.0 {
// XXX: Do we want some kind of assertion here, or is "best effort" okay?
CONTEXT.with(|c| {
if let EnterRuntime::Entered {
allow_block_in_place: false,
} = c.runtime.get()
{
c.runtime.set(EnterRuntime::Entered {
allow_block_in_place: true,
});
}
})
}
}
}

View File

@ -0,0 +1,59 @@
use super::{Context, CONTEXT};
use crate::runtime::{scheduler, TryCurrentError};
use crate::util::markers::SyncNotSend;
use std::marker::PhantomData;
#[derive(Debug)]
#[must_use]
pub(crate) struct SetCurrentGuard {
old_handle: Option<scheduler::Handle>,
_p: PhantomData<SyncNotSend>,
}
/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: crate::runtime::scheduler::Handle
pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option<SetCurrentGuard> {
CONTEXT
.try_with(|ctx| {
let old_handle = ctx.handle.borrow_mut().replace(handle.clone());
SetCurrentGuard {
old_handle,
_p: PhantomData,
}
})
.ok()
}
pub(crate) fn with_current<F, R>(f: F) -> Result<R, TryCurrentError>
where
F: FnOnce(&scheduler::Handle) -> R,
{
match CONTEXT.try_with(|ctx| ctx.handle.borrow().as_ref().map(f)) {
Ok(Some(ret)) => Ok(ret),
Ok(None) => Err(TryCurrentError::new_no_context()),
Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
}
}
impl Context {
pub(super) fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard {
let old_handle = self.handle.borrow_mut().replace(handle.clone());
SetCurrentGuard {
old_handle,
_p: PhantomData,
}
}
}
impl Drop for SetCurrentGuard {
fn drop(&mut self) {
CONTEXT.with(|ctx| {
*ctx.handle.borrow_mut() = self.old_handle.take();
});
}
}

View File

@ -0,0 +1,99 @@
use super::{BlockingRegionGuard, SetCurrentGuard, CONTEXT};
use crate::runtime::scheduler;
use crate::util::rand::{FastRand, RngSeed};
use std::fmt;
#[derive(Debug, Clone, Copy)]
#[must_use]
pub(crate) enum EnterRuntime {
/// Currently in a runtime context.
#[cfg_attr(not(feature = "rt"), allow(dead_code))]
Entered { allow_block_in_place: bool },
/// Not in a runtime context **or** a blocking region.
NotEntered,
}
/// Guard tracking that a caller has entered a runtime context.
#[must_use]
pub(crate) struct EnterRuntimeGuard {
/// Tracks that the current thread has entered a blocking function call.
pub(crate) blocking: BlockingRegionGuard,
#[allow(dead_code)] // Only tracking the guard.
pub(crate) handle: SetCurrentGuard,
// Tracks the previous random number generator seed
old_seed: RngSeed,
}
/// Marks the current thread as being within the dynamic extent of an
/// executor.
#[track_caller]
pub(crate) fn enter_runtime<F, R>(handle: &scheduler::Handle, allow_block_in_place: bool, f: F) -> R
where
F: FnOnce(&mut BlockingRegionGuard) -> R,
{
let maybe_guard = CONTEXT.with(|c| {
if c.runtime.get().is_entered() {
None
} else {
// Set the entered flag
c.runtime.set(EnterRuntime::Entered {
allow_block_in_place,
});
// Generate a new seed
let rng_seed = handle.seed_generator().next_seed();
// Swap the RNG seed
let mut rng = c.rng.get().unwrap_or_else(FastRand::new);
let old_seed = rng.replace_seed(rng_seed);
c.rng.set(Some(rng));
Some(EnterRuntimeGuard {
blocking: BlockingRegionGuard::new(),
handle: c.set_current(handle),
old_seed,
})
}
});
if let Some(mut guard) = maybe_guard {
return f(&mut guard.blocking);
}
panic!(
"Cannot start a runtime from within a runtime. This happens \
because a function (like `block_on`) attempted to block the \
current thread while the thread is being used to drive \
asynchronous tasks."
);
}
impl fmt::Debug for EnterRuntimeGuard {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Enter").finish()
}
}
impl Drop for EnterRuntimeGuard {
fn drop(&mut self) {
CONTEXT.with(|c| {
assert!(c.runtime.get().is_entered());
c.runtime.set(EnterRuntime::NotEntered);
// Replace the previous RNG seed
let mut rng = c.rng.get().unwrap_or_else(FastRand::new);
rng.replace_seed(self.old_seed.clone());
c.rng.set(Some(rng));
});
}
}
impl EnterRuntime {
pub(crate) fn is_entered(self) -> bool {
matches!(self, EnterRuntime::Entered { .. })
}
}

View File

@ -0,0 +1,36 @@
use super::{EnterRuntime, CONTEXT};
/// Returns true if in a runtime context.
pub(crate) fn current_enter_context() -> EnterRuntime {
CONTEXT.with(|c| c.runtime.get())
}
/// Forces the current "entered" state to be cleared while the closure
/// is executed.
pub(crate) fn exit_runtime<F: FnOnce() -> R, R>(f: F) -> R {
// Reset in case the closure panics
struct Reset(EnterRuntime);
impl Drop for Reset {
fn drop(&mut self) {
CONTEXT.with(|c| {
assert!(
!c.runtime.get().is_entered(),
"closure claimed permanent executor"
);
c.runtime.set(self.0);
});
}
}
let was = CONTEXT.with(|c| {
let e = c.runtime.get();
assert!(e.is_entered(), "asked to exit when not entered");
c.runtime.set(EnterRuntime::NotEntered);
e
});
let _reset = Reset(was);
// dropping _reset after f() will reset ENTERED
f()
}

View File

@ -269,13 +269,9 @@ impl Handle {
// Enter the runtime context. This sets the current driver handles and
// prevents blocking an existing runtime.
let mut enter = context::enter_runtime(&self.inner, true);
// Block on the future
enter
.blocking
.block_on(future)
.expect("failed to park thread")
context::enter_runtime(&self.inner, true, |blocking| {
blocking.block_on(future).expect("failed to park thread")
})
}
#[track_caller]

View File

@ -164,38 +164,38 @@ impl CurrentThread {
pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
pin!(future);
let mut enter = crate::runtime::context::enter_runtime(handle, false);
let handle = handle.as_current_thread();
crate::runtime::context::enter_runtime(handle, false, |blocking| {
let handle = handle.as_current_thread();
// Attempt to steal the scheduler core and block_on the future if we can
// there, otherwise, lets select on a notification that the core is
// available or the future is complete.
loop {
if let Some(core) = self.take_core(handle) {
return core.block_on(future);
} else {
let notified = self.notify.notified();
pin!(notified);
// Attempt to steal the scheduler core and block_on the future if we can
// there, otherwise, lets select on a notification that the core is
// available or the future is complete.
loop {
if let Some(core) = self.take_core(handle) {
return core.block_on(future);
} else {
let notified = self.notify.notified();
pin!(notified);
if let Some(out) = enter
.blocking
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}
if let Some(out) = blocking
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}
if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}
if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}
})
}
fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {

View File

@ -71,11 +71,9 @@ impl MultiThread {
where
F: Future,
{
let mut enter = crate::runtime::context::enter_runtime(handle, true);
enter
.blocking
.block_on(future)
.expect("failed to park thread")
crate::runtime::context::enter_runtime(handle, true, |blocking| {
blocking.block_on(future).expect("failed to park thread")
})
}
pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {

View File

@ -440,26 +440,27 @@ fn run(worker: Arc<Worker>) {
};
let handle = scheduler::Handle::MultiThread(worker.handle.clone());
let _enter = crate::runtime::context::enter_runtime(&handle, true);
// Set the worker context.
let cx = scheduler::Context::MultiThread(Context {
worker,
core: RefCell::new(None),
defer: Defer::new(),
});
crate::runtime::context::enter_runtime(&handle, true, |_| {
// Set the worker context.
let cx = scheduler::Context::MultiThread(Context {
worker,
core: RefCell::new(None),
defer: Defer::new(),
});
context::set_scheduler(&cx, || {
let cx = cx.expect_multi_thread();
context::set_scheduler(&cx, || {
let cx = cx.expect_multi_thread();
// This should always be an error. It only returns a `Result` to support
// using `?` to short circuit.
assert!(cx.run(core).is_err());
// This should always be an error. It only returns a `Result` to support
// using `?` to short circuit.
assert!(cx.run(core).is_err());
// Check if there are any deferred tasks to notify. This can happen when
// the worker core is lost due to `block_in_place()` being called from
// within the task.
cx.defer.wake();
// Check if there are any deferred tasks to notify. This can happen when
// the worker core is lost due to `block_in_place()` being called from
// within the task.
cx.defer.wake();
});
});
}

View File

@ -2,3 +2,7 @@
pub(crate) struct SyncNotSend(*mut ());
unsafe impl Sync for SyncNotSend {}
cfg_rt! {
pub(crate) struct NotSendOrSync(*mut ());
}