diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 388171d01..008686d3f 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -7,21 +7,33 @@ use std::cell::Cell; use crate::util::rand::FastRand; cfg_rt! { + mod blocking; + pub(crate) use blocking::{disallow_block_in_place, try_enter_blocking_region, BlockingRegionGuard}; + + mod current; + pub(crate) use current::{with_current, try_set_current, SetCurrentGuard}; + + mod runtime; + pub(crate) use runtime::{EnterRuntime, enter_runtime}; + mod scoped; use scoped::Scoped; use crate::runtime::{scheduler, task::Id}; use std::cell::RefCell; - use std::marker::PhantomData; use std::task::Waker; - use std::time::Duration; cfg_taskdump! { use crate::runtime::task::trace; } } +cfg_rt_multi_thread! { + mod runtime_mt; + pub(crate) use runtime_mt::{current_enter_context, exit_runtime}; +} + struct Context { /// Uniquely identifies the current thread #[cfg(feature = "rt")] @@ -125,10 +137,7 @@ pub(super) fn budget(f: impl FnOnce(&Cell) -> R) -> Result Result { CONTEXT.try_with(|ctx| { @@ -143,45 +152,6 @@ cfg_rt! { }) } - #[derive(Debug, Clone, Copy)] - #[must_use] - pub(crate) enum EnterRuntime { - /// Currently in a runtime context. - #[cfg_attr(not(feature = "rt"), allow(dead_code))] - Entered { allow_block_in_place: bool }, - - /// Not in a runtime context **or** a blocking region. - NotEntered, - } - - #[derive(Debug)] - #[must_use] - pub(crate) struct SetCurrentGuard { - old_handle: Option, - old_seed: RngSeed, - // Should not be `Send` since it must be *dropped* on the same thread as - // created, but there is no issue with sync access. - _p: PhantomData, - } - - /// Guard tracking that a caller has entered a runtime context. - #[must_use] - pub(crate) struct EnterRuntimeGuard { - /// Tracks that the current thread has entered a blocking function call. - pub(crate) blocking: BlockingRegionGuard, - - #[allow(dead_code)] // Only tracking the guard. - pub(crate) handle: SetCurrentGuard, - } - - /// Guard tracking that a caller has entered a blocking region. - #[must_use] - pub(crate) struct BlockingRegionGuard { - _p: PhantomData>, - } - - pub(crate) struct DisallowBlockInPlaceGuard(bool); - pub(crate) fn set_current_task_id(id: Option) -> Option { CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None) } @@ -190,93 +160,6 @@ cfg_rt! { CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None) } - pub(crate) fn with_current(f: F) -> Result - where - F: FnOnce(&scheduler::Handle) -> R, - { - - match CONTEXT.try_with(|ctx| ctx.handle.borrow().as_ref().map(f)) { - Ok(Some(ret)) => Ok(ret), - Ok(None) => Err(TryCurrentError::new_no_context()), - Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), - } - } - - /// Sets this [`Handle`] as the current active [`Handle`]. - /// - /// [`Handle`]: crate::runtime::scheduler::Handle - pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option { - CONTEXT.try_with(|ctx| ctx.set_current(handle)).ok() - } - - - /// Marks the current thread as being within the dynamic extent of an - /// executor. - #[track_caller] - pub(crate) fn enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> EnterRuntimeGuard { - if let Some(enter) = try_enter_runtime(handle, allow_block_in_place) { - return enter; - } - - panic!( - "Cannot start a runtime from within a runtime. This happens \ - because a function (like `block_on`) attempted to block the \ - current thread while the thread is being used to drive \ - asynchronous tasks." - ); - } - - /// Tries to enter a runtime context, returns `None` if already in a runtime - /// context. - fn try_enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> Option { - CONTEXT.with(|c| { - if c.runtime.get().is_entered() { - None - } else { - // Set the entered flag - c.runtime.set(EnterRuntime::Entered { allow_block_in_place }); - - Some(EnterRuntimeGuard { - blocking: BlockingRegionGuard::new(), - handle: c.set_current(handle), - }) - } - }) - } - - pub(crate) fn try_enter_blocking_region() -> Option { - CONTEXT.try_with(|c| { - if c.runtime.get().is_entered() { - None - } else { - Some(BlockingRegionGuard::new()) - } - // If accessing the thread-local fails, the thread is terminating - // and thread-locals are being destroyed. Because we don't know if - // we are currently in a runtime or not, we default to being - // permissive. - }).unwrap_or_else(|_| Some(BlockingRegionGuard::new())) - } - - /// Disallows blocking in the current runtime context until the guard is dropped. - pub(crate) fn disallow_block_in_place() -> DisallowBlockInPlaceGuard { - let reset = CONTEXT.with(|c| { - if let EnterRuntime::Entered { - allow_block_in_place: true, - } = c.runtime.get() - { - c.runtime.set(EnterRuntime::Entered { - allow_block_in_place: false, - }); - true - } else { - false - } - }); - - DisallowBlockInPlaceGuard(reset) - } - #[track_caller] pub(crate) fn defer(waker: &Waker) { with_scheduler(|maybe_scheduler| { @@ -299,127 +182,6 @@ cfg_rt! { CONTEXT.with(|c| c.scheduler.with(f)) } - impl Context { - fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard { - let rng_seed = handle.seed_generator().next_seed(); - - let old_handle = self.handle.borrow_mut().replace(handle.clone()); - let mut rng = self.rng.get().unwrap_or_else(FastRand::new); - let old_seed = rng.replace_seed(rng_seed); - self.rng.set(Some(rng)); - - SetCurrentGuard { - old_handle, - old_seed, - _p: PhantomData, - } - } - } - - impl Drop for SetCurrentGuard { - fn drop(&mut self) { - CONTEXT.with(|ctx| { - *ctx.handle.borrow_mut() = self.old_handle.take(); - - let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new); - rng.replace_seed(self.old_seed.clone()); - ctx.rng.set(Some(rng)); - }); - } - } - - impl fmt::Debug for EnterRuntimeGuard { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Enter").finish() - } - } - - impl Drop for EnterRuntimeGuard { - fn drop(&mut self) { - CONTEXT.with(|c| { - assert!(c.runtime.get().is_entered()); - c.runtime.set(EnterRuntime::NotEntered); - }); - } - } - - impl BlockingRegionGuard { - fn new() -> BlockingRegionGuard { - BlockingRegionGuard { _p: PhantomData } - } - - /// Blocks the thread on the specified future, returning the value with - /// which that future completes. - pub(crate) fn block_on(&mut self, f: F) -> Result - where - F: std::future::Future, - { - use crate::runtime::park::CachedParkThread; - - let mut park = CachedParkThread::new(); - park.block_on(f) - } - - /// Blocks the thread on the specified future for **at most** `timeout` - /// - /// If the future completes before `timeout`, the result is returned. If - /// `timeout` elapses, then `Err` is returned. - pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result - where - F: std::future::Future, - { - use crate::runtime::park::CachedParkThread; - use std::task::Context; - use std::task::Poll::Ready; - use std::time::Instant; - - let mut park = CachedParkThread::new(); - let waker = park.waker().map_err(|_| ())?; - let mut cx = Context::from_waker(&waker); - - pin!(f); - let when = Instant::now() + timeout; - - loop { - if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { - return Ok(v); - } - - let now = Instant::now(); - - if now >= when { - return Err(()); - } - - park.park_timeout(when - now); - } - } - } - - impl Drop for DisallowBlockInPlaceGuard { - fn drop(&mut self) { - if self.0 { - // XXX: Do we want some kind of assertion here, or is "best effort" okay? - CONTEXT.with(|c| { - if let EnterRuntime::Entered { - allow_block_in_place: false, - } = c.runtime.get() - { - c.runtime.set(EnterRuntime::Entered { - allow_block_in_place: true, - }); - } - }) - } - } - } - - impl EnterRuntime { - pub(crate) fn is_entered(self) -> bool { - matches!(self, EnterRuntime::Entered { .. }) - } - } - cfg_taskdump! { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. @@ -428,42 +190,3 @@ cfg_rt! { } } } - -// Forces the current "entered" state to be cleared while the closure -// is executed. -// -// # Warning -// -// This is hidden for a reason. Do not use without fully understanding -// executors. Misusing can easily cause your program to deadlock. -cfg_rt_multi_thread! { - /// Returns true if in a runtime context. - pub(crate) fn current_enter_context() -> EnterRuntime { - CONTEXT.with(|c| c.runtime.get()) - } - - pub(crate) fn exit_runtime R, R>(f: F) -> R { - // Reset in case the closure panics - struct Reset(EnterRuntime); - - impl Drop for Reset { - fn drop(&mut self) { - CONTEXT.with(|c| { - assert!(!c.runtime.get().is_entered(), "closure claimed permanent executor"); - c.runtime.set(self.0); - }); - } - } - - let was = CONTEXT.with(|c| { - let e = c.runtime.get(); - assert!(e.is_entered(), "asked to exit when not entered"); - c.runtime.set(EnterRuntime::NotEntered); - e - }); - - let _reset = Reset(was); - // dropping _reset after f() will reset ENTERED - f() - } -} diff --git a/tokio/src/runtime/context/blocking.rs b/tokio/src/runtime/context/blocking.rs new file mode 100644 index 000000000..8ae4f570e --- /dev/null +++ b/tokio/src/runtime/context/blocking.rs @@ -0,0 +1,121 @@ +use super::{EnterRuntime, CONTEXT}; + +use crate::loom::thread::AccessError; +use crate::util::markers::NotSendOrSync; + +use std::marker::PhantomData; +use std::time::Duration; + +/// Guard tracking that a caller has entered a blocking region. +#[must_use] +pub(crate) struct BlockingRegionGuard { + _p: PhantomData, +} + +pub(crate) struct DisallowBlockInPlaceGuard(bool); + +pub(crate) fn try_enter_blocking_region() -> Option { + CONTEXT + .try_with(|c| { + if c.runtime.get().is_entered() { + None + } else { + Some(BlockingRegionGuard::new()) + } + // If accessing the thread-local fails, the thread is terminating + // and thread-locals are being destroyed. Because we don't know if + // we are currently in a runtime or not, we default to being + // permissive. + }) + .unwrap_or_else(|_| Some(BlockingRegionGuard::new())) +} + +/// Disallows blocking in the current runtime context until the guard is dropped. +pub(crate) fn disallow_block_in_place() -> DisallowBlockInPlaceGuard { + let reset = CONTEXT.with(|c| { + if let EnterRuntime::Entered { + allow_block_in_place: true, + } = c.runtime.get() + { + c.runtime.set(EnterRuntime::Entered { + allow_block_in_place: false, + }); + true + } else { + false + } + }); + + DisallowBlockInPlaceGuard(reset) +} + +impl BlockingRegionGuard { + pub(super) fn new() -> BlockingRegionGuard { + BlockingRegionGuard { _p: PhantomData } + } + + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub(crate) fn block_on(&mut self, f: F) -> Result + where + F: std::future::Future, + { + use crate::runtime::park::CachedParkThread; + + let mut park = CachedParkThread::new(); + park.block_on(f) + } + + /// Blocks the thread on the specified future for **at most** `timeout` + /// + /// If the future completes before `timeout`, the result is returned. If + /// `timeout` elapses, then `Err` is returned. + pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result + where + F: std::future::Future, + { + use crate::runtime::park::CachedParkThread; + use std::task::Context; + use std::task::Poll::Ready; + use std::time::Instant; + + let mut park = CachedParkThread::new(); + let waker = park.waker().map_err(|_| ())?; + let mut cx = Context::from_waker(&waker); + + pin!(f); + let when = Instant::now() + timeout; + + loop { + if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { + return Ok(v); + } + + let now = Instant::now(); + + if now >= when { + return Err(()); + } + + park.park_timeout(when - now); + } + } +} + +impl Drop for DisallowBlockInPlaceGuard { + fn drop(&mut self) { + if self.0 { + // XXX: Do we want some kind of assertion here, or is "best effort" okay? + CONTEXT.with(|c| { + if let EnterRuntime::Entered { + allow_block_in_place: false, + } = c.runtime.get() + { + c.runtime.set(EnterRuntime::Entered { + allow_block_in_place: true, + }); + } + }) + } + } +} diff --git a/tokio/src/runtime/context/current.rs b/tokio/src/runtime/context/current.rs new file mode 100644 index 000000000..a19a73224 --- /dev/null +++ b/tokio/src/runtime/context/current.rs @@ -0,0 +1,59 @@ +use super::{Context, CONTEXT}; + +use crate::runtime::{scheduler, TryCurrentError}; +use crate::util::markers::SyncNotSend; + +use std::marker::PhantomData; + +#[derive(Debug)] +#[must_use] +pub(crate) struct SetCurrentGuard { + old_handle: Option, + _p: PhantomData, +} + +/// Sets this [`Handle`] as the current active [`Handle`]. +/// +/// [`Handle`]: crate::runtime::scheduler::Handle +pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option { + CONTEXT + .try_with(|ctx| { + let old_handle = ctx.handle.borrow_mut().replace(handle.clone()); + + SetCurrentGuard { + old_handle, + _p: PhantomData, + } + }) + .ok() +} + +pub(crate) fn with_current(f: F) -> Result +where + F: FnOnce(&scheduler::Handle) -> R, +{ + match CONTEXT.try_with(|ctx| ctx.handle.borrow().as_ref().map(f)) { + Ok(Some(ret)) => Ok(ret), + Ok(None) => Err(TryCurrentError::new_no_context()), + Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), + } +} + +impl Context { + pub(super) fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard { + let old_handle = self.handle.borrow_mut().replace(handle.clone()); + + SetCurrentGuard { + old_handle, + _p: PhantomData, + } + } +} + +impl Drop for SetCurrentGuard { + fn drop(&mut self) { + CONTEXT.with(|ctx| { + *ctx.handle.borrow_mut() = self.old_handle.take(); + }); + } +} diff --git a/tokio/src/runtime/context/runtime.rs b/tokio/src/runtime/context/runtime.rs new file mode 100644 index 000000000..f2e29899a --- /dev/null +++ b/tokio/src/runtime/context/runtime.rs @@ -0,0 +1,99 @@ +use super::{BlockingRegionGuard, SetCurrentGuard, CONTEXT}; + +use crate::runtime::scheduler; +use crate::util::rand::{FastRand, RngSeed}; + +use std::fmt; + +#[derive(Debug, Clone, Copy)] +#[must_use] +pub(crate) enum EnterRuntime { + /// Currently in a runtime context. + #[cfg_attr(not(feature = "rt"), allow(dead_code))] + Entered { allow_block_in_place: bool }, + + /// Not in a runtime context **or** a blocking region. + NotEntered, +} + +/// Guard tracking that a caller has entered a runtime context. +#[must_use] +pub(crate) struct EnterRuntimeGuard { + /// Tracks that the current thread has entered a blocking function call. + pub(crate) blocking: BlockingRegionGuard, + + #[allow(dead_code)] // Only tracking the guard. + pub(crate) handle: SetCurrentGuard, + + // Tracks the previous random number generator seed + old_seed: RngSeed, +} + +/// Marks the current thread as being within the dynamic extent of an +/// executor. +#[track_caller] +pub(crate) fn enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool, f: F) -> R +where + F: FnOnce(&mut BlockingRegionGuard) -> R, +{ + let maybe_guard = CONTEXT.with(|c| { + if c.runtime.get().is_entered() { + None + } else { + // Set the entered flag + c.runtime.set(EnterRuntime::Entered { + allow_block_in_place, + }); + + // Generate a new seed + let rng_seed = handle.seed_generator().next_seed(); + + // Swap the RNG seed + let mut rng = c.rng.get().unwrap_or_else(FastRand::new); + let old_seed = rng.replace_seed(rng_seed); + c.rng.set(Some(rng)); + + Some(EnterRuntimeGuard { + blocking: BlockingRegionGuard::new(), + handle: c.set_current(handle), + old_seed, + }) + } + }); + + if let Some(mut guard) = maybe_guard { + return f(&mut guard.blocking); + } + + panic!( + "Cannot start a runtime from within a runtime. This happens \ + because a function (like `block_on`) attempted to block the \ + current thread while the thread is being used to drive \ + asynchronous tasks." + ); +} + +impl fmt::Debug for EnterRuntimeGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for EnterRuntimeGuard { + fn drop(&mut self) { + CONTEXT.with(|c| { + assert!(c.runtime.get().is_entered()); + c.runtime.set(EnterRuntime::NotEntered); + // Replace the previous RNG seed + let mut rng = c.rng.get().unwrap_or_else(FastRand::new); + rng.replace_seed(self.old_seed.clone()); + c.rng.set(Some(rng)); + }); + } +} + +impl EnterRuntime { + pub(crate) fn is_entered(self) -> bool { + matches!(self, EnterRuntime::Entered { .. }) + } +} diff --git a/tokio/src/runtime/context/runtime_mt.rs b/tokio/src/runtime/context/runtime_mt.rs new file mode 100644 index 000000000..728caeae9 --- /dev/null +++ b/tokio/src/runtime/context/runtime_mt.rs @@ -0,0 +1,36 @@ +use super::{EnterRuntime, CONTEXT}; + +/// Returns true if in a runtime context. +pub(crate) fn current_enter_context() -> EnterRuntime { + CONTEXT.with(|c| c.runtime.get()) +} + +/// Forces the current "entered" state to be cleared while the closure +/// is executed. +pub(crate) fn exit_runtime R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset(EnterRuntime); + + impl Drop for Reset { + fn drop(&mut self) { + CONTEXT.with(|c| { + assert!( + !c.runtime.get().is_entered(), + "closure claimed permanent executor" + ); + c.runtime.set(self.0); + }); + } + } + + let was = CONTEXT.with(|c| { + let e = c.runtime.get(); + assert!(e.is_entered(), "asked to exit when not entered"); + c.runtime.set(EnterRuntime::NotEntered); + e + }); + + let _reset = Reset(was); + // dropping _reset after f() will reset ENTERED + f() +} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index a4dc437db..920c26521 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -269,13 +269,9 @@ impl Handle { // Enter the runtime context. This sets the current driver handles and // prevents blocking an existing runtime. - let mut enter = context::enter_runtime(&self.inner, true); - - // Block on the future - enter - .blocking - .block_on(future) - .expect("failed to park thread") + context::enter_runtime(&self.inner, true, |blocking| { + blocking.block_on(future).expect("failed to park thread") + }) } #[track_caller] diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index ac859de67..ac4a8d6fa 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -164,38 +164,38 @@ impl CurrentThread { pub(crate) fn block_on(&self, handle: &scheduler::Handle, future: F) -> F::Output { pin!(future); - let mut enter = crate::runtime::context::enter_runtime(handle, false); - let handle = handle.as_current_thread(); + crate::runtime::context::enter_runtime(handle, false, |blocking| { + let handle = handle.as_current_thread(); - // Attempt to steal the scheduler core and block_on the future if we can - // there, otherwise, lets select on a notification that the core is - // available or the future is complete. - loop { - if let Some(core) = self.take_core(handle) { - return core.block_on(future); - } else { - let notified = self.notify.notified(); - pin!(notified); + // Attempt to steal the scheduler core and block_on the future if we can + // there, otherwise, lets select on a notification that the core is + // available or the future is complete. + loop { + if let Some(core) = self.take_core(handle) { + return core.block_on(future); + } else { + let notified = self.notify.notified(); + pin!(notified); - if let Some(out) = enter - .blocking - .block_on(poll_fn(|cx| { - if notified.as_mut().poll(cx).is_ready() { - return Ready(None); - } + if let Some(out) = blocking + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } - if let Ready(out) = future.as_mut().poll(cx) { - return Ready(Some(out)); - } + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } - Pending - })) - .expect("Failed to `Enter::block_on`") - { - return out; + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } } } - } + }) } fn take_core(&self, handle: &Arc) -> Option> { diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 306a622b3..04ec2a671 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -71,11 +71,9 @@ impl MultiThread { where F: Future, { - let mut enter = crate::runtime::context::enter_runtime(handle, true); - enter - .blocking - .block_on(future) - .expect("failed to park thread") + crate::runtime::context::enter_runtime(handle, true, |blocking| { + blocking.block_on(future).expect("failed to park thread") + }) } pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 947b6fb7f..c48a726af 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -440,26 +440,27 @@ fn run(worker: Arc) { }; let handle = scheduler::Handle::MultiThread(worker.handle.clone()); - let _enter = crate::runtime::context::enter_runtime(&handle, true); - // Set the worker context. - let cx = scheduler::Context::MultiThread(Context { - worker, - core: RefCell::new(None), - defer: Defer::new(), - }); + crate::runtime::context::enter_runtime(&handle, true, |_| { + // Set the worker context. + let cx = scheduler::Context::MultiThread(Context { + worker, + core: RefCell::new(None), + defer: Defer::new(), + }); - context::set_scheduler(&cx, || { - let cx = cx.expect_multi_thread(); + context::set_scheduler(&cx, || { + let cx = cx.expect_multi_thread(); - // This should always be an error. It only returns a `Result` to support - // using `?` to short circuit. - assert!(cx.run(core).is_err()); + // This should always be an error. It only returns a `Result` to support + // using `?` to short circuit. + assert!(cx.run(core).is_err()); - // Check if there are any deferred tasks to notify. This can happen when - // the worker core is lost due to `block_in_place()` being called from - // within the task. - cx.defer.wake(); + // Check if there are any deferred tasks to notify. This can happen when + // the worker core is lost due to `block_in_place()` being called from + // within the task. + cx.defer.wake(); + }); }); } diff --git a/tokio/src/util/markers.rs b/tokio/src/util/markers.rs index 1da09da94..7031fb6bc 100644 --- a/tokio/src/util/markers.rs +++ b/tokio/src/util/markers.rs @@ -2,3 +2,7 @@ pub(crate) struct SyncNotSend(*mut ()); unsafe impl Sync for SyncNotSend {} + +cfg_rt! { + pub(crate) struct NotSendOrSync(*mut ()); +}