From 1204da730000f2eab19d2c05eea12ee3071b3f31 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 6 Jun 2023 08:37:11 -0700 Subject: [PATCH] rt: split `runtime::context` into multiple files (#5768) This PR restructures `runtime::context` into multiple files by component and feature flag. The goal is to reduce code defined in macros and make each context component more manageable. There should be no behavior changes except tweaking how the RNG seed is set. Instead of putting it in `set_current`, we set it when entering the runtime. This aligns better with the feature's original intent, enabling users to make a runtime's RNG deterministic. The seed should not be changed by `Handle::enter()`, so there is no need to have the code in `context::set_current`. --- tokio/src/runtime/context.rs | 307 +----------------- tokio/src/runtime/context/blocking.rs | 121 +++++++ tokio/src/runtime/context/current.rs | 59 ++++ tokio/src/runtime/context/runtime.rs | 99 ++++++ tokio/src/runtime/context/runtime_mt.rs | 36 ++ tokio/src/runtime/handle.rs | 10 +- tokio/src/runtime/scheduler/current_thread.rs | 52 +-- .../src/runtime/scheduler/multi_thread/mod.rs | 8 +- .../runtime/scheduler/multi_thread/worker.rs | 33 +- tokio/src/util/markers.rs | 4 + 10 files changed, 383 insertions(+), 346 deletions(-) create mode 100644 tokio/src/runtime/context/blocking.rs create mode 100644 tokio/src/runtime/context/current.rs create mode 100644 tokio/src/runtime/context/runtime.rs create mode 100644 tokio/src/runtime/context/runtime_mt.rs diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 388171d01..008686d3f 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -7,21 +7,33 @@ use std::cell::Cell; use crate::util::rand::FastRand; cfg_rt! { + mod blocking; + pub(crate) use blocking::{disallow_block_in_place, try_enter_blocking_region, BlockingRegionGuard}; + + mod current; + pub(crate) use current::{with_current, try_set_current, SetCurrentGuard}; + + mod runtime; + pub(crate) use runtime::{EnterRuntime, enter_runtime}; + mod scoped; use scoped::Scoped; use crate::runtime::{scheduler, task::Id}; use std::cell::RefCell; - use std::marker::PhantomData; use std::task::Waker; - use std::time::Duration; cfg_taskdump! { use crate::runtime::task::trace; } } +cfg_rt_multi_thread! { + mod runtime_mt; + pub(crate) use runtime_mt::{current_enter_context, exit_runtime}; +} + struct Context { /// Uniquely identifies the current thread #[cfg(feature = "rt")] @@ -125,10 +137,7 @@ pub(super) fn budget(f: impl FnOnce(&Cell) -> R) -> Result Result { CONTEXT.try_with(|ctx| { @@ -143,45 +152,6 @@ cfg_rt! { }) } - #[derive(Debug, Clone, Copy)] - #[must_use] - pub(crate) enum EnterRuntime { - /// Currently in a runtime context. - #[cfg_attr(not(feature = "rt"), allow(dead_code))] - Entered { allow_block_in_place: bool }, - - /// Not in a runtime context **or** a blocking region. - NotEntered, - } - - #[derive(Debug)] - #[must_use] - pub(crate) struct SetCurrentGuard { - old_handle: Option, - old_seed: RngSeed, - // Should not be `Send` since it must be *dropped* on the same thread as - // created, but there is no issue with sync access. - _p: PhantomData, - } - - /// Guard tracking that a caller has entered a runtime context. - #[must_use] - pub(crate) struct EnterRuntimeGuard { - /// Tracks that the current thread has entered a blocking function call. - pub(crate) blocking: BlockingRegionGuard, - - #[allow(dead_code)] // Only tracking the guard. - pub(crate) handle: SetCurrentGuard, - } - - /// Guard tracking that a caller has entered a blocking region. - #[must_use] - pub(crate) struct BlockingRegionGuard { - _p: PhantomData>, - } - - pub(crate) struct DisallowBlockInPlaceGuard(bool); - pub(crate) fn set_current_task_id(id: Option) -> Option { CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None) } @@ -190,93 +160,6 @@ cfg_rt! { CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None) } - pub(crate) fn with_current(f: F) -> Result - where - F: FnOnce(&scheduler::Handle) -> R, - { - - match CONTEXT.try_with(|ctx| ctx.handle.borrow().as_ref().map(f)) { - Ok(Some(ret)) => Ok(ret), - Ok(None) => Err(TryCurrentError::new_no_context()), - Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), - } - } - - /// Sets this [`Handle`] as the current active [`Handle`]. - /// - /// [`Handle`]: crate::runtime::scheduler::Handle - pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option { - CONTEXT.try_with(|ctx| ctx.set_current(handle)).ok() - } - - - /// Marks the current thread as being within the dynamic extent of an - /// executor. - #[track_caller] - pub(crate) fn enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> EnterRuntimeGuard { - if let Some(enter) = try_enter_runtime(handle, allow_block_in_place) { - return enter; - } - - panic!( - "Cannot start a runtime from within a runtime. This happens \ - because a function (like `block_on`) attempted to block the \ - current thread while the thread is being used to drive \ - asynchronous tasks." - ); - } - - /// Tries to enter a runtime context, returns `None` if already in a runtime - /// context. - fn try_enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> Option { - CONTEXT.with(|c| { - if c.runtime.get().is_entered() { - None - } else { - // Set the entered flag - c.runtime.set(EnterRuntime::Entered { allow_block_in_place }); - - Some(EnterRuntimeGuard { - blocking: BlockingRegionGuard::new(), - handle: c.set_current(handle), - }) - } - }) - } - - pub(crate) fn try_enter_blocking_region() -> Option { - CONTEXT.try_with(|c| { - if c.runtime.get().is_entered() { - None - } else { - Some(BlockingRegionGuard::new()) - } - // If accessing the thread-local fails, the thread is terminating - // and thread-locals are being destroyed. Because we don't know if - // we are currently in a runtime or not, we default to being - // permissive. - }).unwrap_or_else(|_| Some(BlockingRegionGuard::new())) - } - - /// Disallows blocking in the current runtime context until the guard is dropped. - pub(crate) fn disallow_block_in_place() -> DisallowBlockInPlaceGuard { - let reset = CONTEXT.with(|c| { - if let EnterRuntime::Entered { - allow_block_in_place: true, - } = c.runtime.get() - { - c.runtime.set(EnterRuntime::Entered { - allow_block_in_place: false, - }); - true - } else { - false - } - }); - - DisallowBlockInPlaceGuard(reset) - } - #[track_caller] pub(crate) fn defer(waker: &Waker) { with_scheduler(|maybe_scheduler| { @@ -299,127 +182,6 @@ cfg_rt! { CONTEXT.with(|c| c.scheduler.with(f)) } - impl Context { - fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard { - let rng_seed = handle.seed_generator().next_seed(); - - let old_handle = self.handle.borrow_mut().replace(handle.clone()); - let mut rng = self.rng.get().unwrap_or_else(FastRand::new); - let old_seed = rng.replace_seed(rng_seed); - self.rng.set(Some(rng)); - - SetCurrentGuard { - old_handle, - old_seed, - _p: PhantomData, - } - } - } - - impl Drop for SetCurrentGuard { - fn drop(&mut self) { - CONTEXT.with(|ctx| { - *ctx.handle.borrow_mut() = self.old_handle.take(); - - let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new); - rng.replace_seed(self.old_seed.clone()); - ctx.rng.set(Some(rng)); - }); - } - } - - impl fmt::Debug for EnterRuntimeGuard { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Enter").finish() - } - } - - impl Drop for EnterRuntimeGuard { - fn drop(&mut self) { - CONTEXT.with(|c| { - assert!(c.runtime.get().is_entered()); - c.runtime.set(EnterRuntime::NotEntered); - }); - } - } - - impl BlockingRegionGuard { - fn new() -> BlockingRegionGuard { - BlockingRegionGuard { _p: PhantomData } - } - - /// Blocks the thread on the specified future, returning the value with - /// which that future completes. - pub(crate) fn block_on(&mut self, f: F) -> Result - where - F: std::future::Future, - { - use crate::runtime::park::CachedParkThread; - - let mut park = CachedParkThread::new(); - park.block_on(f) - } - - /// Blocks the thread on the specified future for **at most** `timeout` - /// - /// If the future completes before `timeout`, the result is returned. If - /// `timeout` elapses, then `Err` is returned. - pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result - where - F: std::future::Future, - { - use crate::runtime::park::CachedParkThread; - use std::task::Context; - use std::task::Poll::Ready; - use std::time::Instant; - - let mut park = CachedParkThread::new(); - let waker = park.waker().map_err(|_| ())?; - let mut cx = Context::from_waker(&waker); - - pin!(f); - let when = Instant::now() + timeout; - - loop { - if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { - return Ok(v); - } - - let now = Instant::now(); - - if now >= when { - return Err(()); - } - - park.park_timeout(when - now); - } - } - } - - impl Drop for DisallowBlockInPlaceGuard { - fn drop(&mut self) { - if self.0 { - // XXX: Do we want some kind of assertion here, or is "best effort" okay? - CONTEXT.with(|c| { - if let EnterRuntime::Entered { - allow_block_in_place: false, - } = c.runtime.get() - { - c.runtime.set(EnterRuntime::Entered { - allow_block_in_place: true, - }); - } - }) - } - } - } - - impl EnterRuntime { - pub(crate) fn is_entered(self) -> bool { - matches!(self, EnterRuntime::Entered { .. }) - } - } - cfg_taskdump! { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. @@ -428,42 +190,3 @@ cfg_rt! { } } } - -// Forces the current "entered" state to be cleared while the closure -// is executed. -// -// # Warning -// -// This is hidden for a reason. Do not use without fully understanding -// executors. Misusing can easily cause your program to deadlock. -cfg_rt_multi_thread! { - /// Returns true if in a runtime context. - pub(crate) fn current_enter_context() -> EnterRuntime { - CONTEXT.with(|c| c.runtime.get()) - } - - pub(crate) fn exit_runtime R, R>(f: F) -> R { - // Reset in case the closure panics - struct Reset(EnterRuntime); - - impl Drop for Reset { - fn drop(&mut self) { - CONTEXT.with(|c| { - assert!(!c.runtime.get().is_entered(), "closure claimed permanent executor"); - c.runtime.set(self.0); - }); - } - } - - let was = CONTEXT.with(|c| { - let e = c.runtime.get(); - assert!(e.is_entered(), "asked to exit when not entered"); - c.runtime.set(EnterRuntime::NotEntered); - e - }); - - let _reset = Reset(was); - // dropping _reset after f() will reset ENTERED - f() - } -} diff --git a/tokio/src/runtime/context/blocking.rs b/tokio/src/runtime/context/blocking.rs new file mode 100644 index 000000000..8ae4f570e --- /dev/null +++ b/tokio/src/runtime/context/blocking.rs @@ -0,0 +1,121 @@ +use super::{EnterRuntime, CONTEXT}; + +use crate::loom::thread::AccessError; +use crate::util::markers::NotSendOrSync; + +use std::marker::PhantomData; +use std::time::Duration; + +/// Guard tracking that a caller has entered a blocking region. +#[must_use] +pub(crate) struct BlockingRegionGuard { + _p: PhantomData, +} + +pub(crate) struct DisallowBlockInPlaceGuard(bool); + +pub(crate) fn try_enter_blocking_region() -> Option { + CONTEXT + .try_with(|c| { + if c.runtime.get().is_entered() { + None + } else { + Some(BlockingRegionGuard::new()) + } + // If accessing the thread-local fails, the thread is terminating + // and thread-locals are being destroyed. Because we don't know if + // we are currently in a runtime or not, we default to being + // permissive. + }) + .unwrap_or_else(|_| Some(BlockingRegionGuard::new())) +} + +/// Disallows blocking in the current runtime context until the guard is dropped. +pub(crate) fn disallow_block_in_place() -> DisallowBlockInPlaceGuard { + let reset = CONTEXT.with(|c| { + if let EnterRuntime::Entered { + allow_block_in_place: true, + } = c.runtime.get() + { + c.runtime.set(EnterRuntime::Entered { + allow_block_in_place: false, + }); + true + } else { + false + } + }); + + DisallowBlockInPlaceGuard(reset) +} + +impl BlockingRegionGuard { + pub(super) fn new() -> BlockingRegionGuard { + BlockingRegionGuard { _p: PhantomData } + } + + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub(crate) fn block_on(&mut self, f: F) -> Result + where + F: std::future::Future, + { + use crate::runtime::park::CachedParkThread; + + let mut park = CachedParkThread::new(); + park.block_on(f) + } + + /// Blocks the thread on the specified future for **at most** `timeout` + /// + /// If the future completes before `timeout`, the result is returned. If + /// `timeout` elapses, then `Err` is returned. + pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result + where + F: std::future::Future, + { + use crate::runtime::park::CachedParkThread; + use std::task::Context; + use std::task::Poll::Ready; + use std::time::Instant; + + let mut park = CachedParkThread::new(); + let waker = park.waker().map_err(|_| ())?; + let mut cx = Context::from_waker(&waker); + + pin!(f); + let when = Instant::now() + timeout; + + loop { + if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { + return Ok(v); + } + + let now = Instant::now(); + + if now >= when { + return Err(()); + } + + park.park_timeout(when - now); + } + } +} + +impl Drop for DisallowBlockInPlaceGuard { + fn drop(&mut self) { + if self.0 { + // XXX: Do we want some kind of assertion here, or is "best effort" okay? + CONTEXT.with(|c| { + if let EnterRuntime::Entered { + allow_block_in_place: false, + } = c.runtime.get() + { + c.runtime.set(EnterRuntime::Entered { + allow_block_in_place: true, + }); + } + }) + } + } +} diff --git a/tokio/src/runtime/context/current.rs b/tokio/src/runtime/context/current.rs new file mode 100644 index 000000000..a19a73224 --- /dev/null +++ b/tokio/src/runtime/context/current.rs @@ -0,0 +1,59 @@ +use super::{Context, CONTEXT}; + +use crate::runtime::{scheduler, TryCurrentError}; +use crate::util::markers::SyncNotSend; + +use std::marker::PhantomData; + +#[derive(Debug)] +#[must_use] +pub(crate) struct SetCurrentGuard { + old_handle: Option, + _p: PhantomData, +} + +/// Sets this [`Handle`] as the current active [`Handle`]. +/// +/// [`Handle`]: crate::runtime::scheduler::Handle +pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option { + CONTEXT + .try_with(|ctx| { + let old_handle = ctx.handle.borrow_mut().replace(handle.clone()); + + SetCurrentGuard { + old_handle, + _p: PhantomData, + } + }) + .ok() +} + +pub(crate) fn with_current(f: F) -> Result +where + F: FnOnce(&scheduler::Handle) -> R, +{ + match CONTEXT.try_with(|ctx| ctx.handle.borrow().as_ref().map(f)) { + Ok(Some(ret)) => Ok(ret), + Ok(None) => Err(TryCurrentError::new_no_context()), + Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), + } +} + +impl Context { + pub(super) fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard { + let old_handle = self.handle.borrow_mut().replace(handle.clone()); + + SetCurrentGuard { + old_handle, + _p: PhantomData, + } + } +} + +impl Drop for SetCurrentGuard { + fn drop(&mut self) { + CONTEXT.with(|ctx| { + *ctx.handle.borrow_mut() = self.old_handle.take(); + }); + } +} diff --git a/tokio/src/runtime/context/runtime.rs b/tokio/src/runtime/context/runtime.rs new file mode 100644 index 000000000..f2e29899a --- /dev/null +++ b/tokio/src/runtime/context/runtime.rs @@ -0,0 +1,99 @@ +use super::{BlockingRegionGuard, SetCurrentGuard, CONTEXT}; + +use crate::runtime::scheduler; +use crate::util::rand::{FastRand, RngSeed}; + +use std::fmt; + +#[derive(Debug, Clone, Copy)] +#[must_use] +pub(crate) enum EnterRuntime { + /// Currently in a runtime context. + #[cfg_attr(not(feature = "rt"), allow(dead_code))] + Entered { allow_block_in_place: bool }, + + /// Not in a runtime context **or** a blocking region. + NotEntered, +} + +/// Guard tracking that a caller has entered a runtime context. +#[must_use] +pub(crate) struct EnterRuntimeGuard { + /// Tracks that the current thread has entered a blocking function call. + pub(crate) blocking: BlockingRegionGuard, + + #[allow(dead_code)] // Only tracking the guard. + pub(crate) handle: SetCurrentGuard, + + // Tracks the previous random number generator seed + old_seed: RngSeed, +} + +/// Marks the current thread as being within the dynamic extent of an +/// executor. +#[track_caller] +pub(crate) fn enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool, f: F) -> R +where + F: FnOnce(&mut BlockingRegionGuard) -> R, +{ + let maybe_guard = CONTEXT.with(|c| { + if c.runtime.get().is_entered() { + None + } else { + // Set the entered flag + c.runtime.set(EnterRuntime::Entered { + allow_block_in_place, + }); + + // Generate a new seed + let rng_seed = handle.seed_generator().next_seed(); + + // Swap the RNG seed + let mut rng = c.rng.get().unwrap_or_else(FastRand::new); + let old_seed = rng.replace_seed(rng_seed); + c.rng.set(Some(rng)); + + Some(EnterRuntimeGuard { + blocking: BlockingRegionGuard::new(), + handle: c.set_current(handle), + old_seed, + }) + } + }); + + if let Some(mut guard) = maybe_guard { + return f(&mut guard.blocking); + } + + panic!( + "Cannot start a runtime from within a runtime. This happens \ + because a function (like `block_on`) attempted to block the \ + current thread while the thread is being used to drive \ + asynchronous tasks." + ); +} + +impl fmt::Debug for EnterRuntimeGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for EnterRuntimeGuard { + fn drop(&mut self) { + CONTEXT.with(|c| { + assert!(c.runtime.get().is_entered()); + c.runtime.set(EnterRuntime::NotEntered); + // Replace the previous RNG seed + let mut rng = c.rng.get().unwrap_or_else(FastRand::new); + rng.replace_seed(self.old_seed.clone()); + c.rng.set(Some(rng)); + }); + } +} + +impl EnterRuntime { + pub(crate) fn is_entered(self) -> bool { + matches!(self, EnterRuntime::Entered { .. }) + } +} diff --git a/tokio/src/runtime/context/runtime_mt.rs b/tokio/src/runtime/context/runtime_mt.rs new file mode 100644 index 000000000..728caeae9 --- /dev/null +++ b/tokio/src/runtime/context/runtime_mt.rs @@ -0,0 +1,36 @@ +use super::{EnterRuntime, CONTEXT}; + +/// Returns true if in a runtime context. +pub(crate) fn current_enter_context() -> EnterRuntime { + CONTEXT.with(|c| c.runtime.get()) +} + +/// Forces the current "entered" state to be cleared while the closure +/// is executed. +pub(crate) fn exit_runtime R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset(EnterRuntime); + + impl Drop for Reset { + fn drop(&mut self) { + CONTEXT.with(|c| { + assert!( + !c.runtime.get().is_entered(), + "closure claimed permanent executor" + ); + c.runtime.set(self.0); + }); + } + } + + let was = CONTEXT.with(|c| { + let e = c.runtime.get(); + assert!(e.is_entered(), "asked to exit when not entered"); + c.runtime.set(EnterRuntime::NotEntered); + e + }); + + let _reset = Reset(was); + // dropping _reset after f() will reset ENTERED + f() +} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index a4dc437db..920c26521 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -269,13 +269,9 @@ impl Handle { // Enter the runtime context. This sets the current driver handles and // prevents blocking an existing runtime. - let mut enter = context::enter_runtime(&self.inner, true); - - // Block on the future - enter - .blocking - .block_on(future) - .expect("failed to park thread") + context::enter_runtime(&self.inner, true, |blocking| { + blocking.block_on(future).expect("failed to park thread") + }) } #[track_caller] diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index ac859de67..ac4a8d6fa 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -164,38 +164,38 @@ impl CurrentThread { pub(crate) fn block_on(&self, handle: &scheduler::Handle, future: F) -> F::Output { pin!(future); - let mut enter = crate::runtime::context::enter_runtime(handle, false); - let handle = handle.as_current_thread(); + crate::runtime::context::enter_runtime(handle, false, |blocking| { + let handle = handle.as_current_thread(); - // Attempt to steal the scheduler core and block_on the future if we can - // there, otherwise, lets select on a notification that the core is - // available or the future is complete. - loop { - if let Some(core) = self.take_core(handle) { - return core.block_on(future); - } else { - let notified = self.notify.notified(); - pin!(notified); + // Attempt to steal the scheduler core and block_on the future if we can + // there, otherwise, lets select on a notification that the core is + // available or the future is complete. + loop { + if let Some(core) = self.take_core(handle) { + return core.block_on(future); + } else { + let notified = self.notify.notified(); + pin!(notified); - if let Some(out) = enter - .blocking - .block_on(poll_fn(|cx| { - if notified.as_mut().poll(cx).is_ready() { - return Ready(None); - } + if let Some(out) = blocking + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } - if let Ready(out) = future.as_mut().poll(cx) { - return Ready(Some(out)); - } + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } - Pending - })) - .expect("Failed to `Enter::block_on`") - { - return out; + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } } } - } + }) } fn take_core(&self, handle: &Arc) -> Option> { diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 306a622b3..04ec2a671 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -71,11 +71,9 @@ impl MultiThread { where F: Future, { - let mut enter = crate::runtime::context::enter_runtime(handle, true); - enter - .blocking - .block_on(future) - .expect("failed to park thread") + crate::runtime::context::enter_runtime(handle, true, |blocking| { + blocking.block_on(future).expect("failed to park thread") + }) } pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 947b6fb7f..c48a726af 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -440,26 +440,27 @@ fn run(worker: Arc) { }; let handle = scheduler::Handle::MultiThread(worker.handle.clone()); - let _enter = crate::runtime::context::enter_runtime(&handle, true); - // Set the worker context. - let cx = scheduler::Context::MultiThread(Context { - worker, - core: RefCell::new(None), - defer: Defer::new(), - }); + crate::runtime::context::enter_runtime(&handle, true, |_| { + // Set the worker context. + let cx = scheduler::Context::MultiThread(Context { + worker, + core: RefCell::new(None), + defer: Defer::new(), + }); - context::set_scheduler(&cx, || { - let cx = cx.expect_multi_thread(); + context::set_scheduler(&cx, || { + let cx = cx.expect_multi_thread(); - // This should always be an error. It only returns a `Result` to support - // using `?` to short circuit. - assert!(cx.run(core).is_err()); + // This should always be an error. It only returns a `Result` to support + // using `?` to short circuit. + assert!(cx.run(core).is_err()); - // Check if there are any deferred tasks to notify. This can happen when - // the worker core is lost due to `block_in_place()` being called from - // within the task. - cx.defer.wake(); + // Check if there are any deferred tasks to notify. This can happen when + // the worker core is lost due to `block_in_place()` being called from + // within the task. + cx.defer.wake(); + }); }); } diff --git a/tokio/src/util/markers.rs b/tokio/src/util/markers.rs index 1da09da94..7031fb6bc 100644 --- a/tokio/src/util/markers.rs +++ b/tokio/src/util/markers.rs @@ -2,3 +2,7 @@ pub(crate) struct SyncNotSend(*mut ()); unsafe impl Sync for SyncNotSend {} + +cfg_rt! { + pub(crate) struct NotSendOrSync(*mut ()); +}