mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
task: introduce RcCell
helper (#4977)
This commit is contained in:
parent
1fd7f82468
commit
116fa7c614
@ -2,7 +2,7 @@
|
|||||||
use crate::loom::sync::{Arc, Mutex};
|
use crate::loom::sync::{Arc, Mutex};
|
||||||
use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
|
use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
|
||||||
use crate::sync::AtomicWaker;
|
use crate::sync::AtomicWaker;
|
||||||
use crate::util::VecDequeCell;
|
use crate::util::{RcCell, VecDequeCell};
|
||||||
|
|
||||||
use std::cell::Cell;
|
use std::cell::Cell;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
@ -261,7 +261,11 @@ pin_project! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
thread_local!(static CURRENT: Cell<Option<Rc<Context>>> = Cell::new(None));
|
#[cfg(any(loom, tokio_no_const_thread_local))]
|
||||||
|
thread_local!(static CURRENT: RcCell<Context> = RcCell::new());
|
||||||
|
|
||||||
|
#[cfg(not(any(loom, tokio_no_const_thread_local)))]
|
||||||
|
thread_local!(static CURRENT: RcCell<Context> = const { RcCell::new() });
|
||||||
|
|
||||||
cfg_rt! {
|
cfg_rt! {
|
||||||
/// Spawns a `!Send` future on the local task set.
|
/// Spawns a `!Send` future on the local task set.
|
||||||
@ -311,12 +315,10 @@ cfg_rt! {
|
|||||||
F::Output: 'static
|
F::Output: 'static
|
||||||
{
|
{
|
||||||
CURRENT.with(|maybe_cx| {
|
CURRENT.with(|maybe_cx| {
|
||||||
let ctx = clone_rc(maybe_cx);
|
match maybe_cx.get() {
|
||||||
match ctx {
|
|
||||||
None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
|
None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
|
||||||
Some(cx) => cx.spawn(future, name)
|
Some(cx) => cx.spawn(future, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -336,7 +338,7 @@ pub struct LocalEnterGuard(Option<Rc<Context>>);
|
|||||||
impl Drop for LocalEnterGuard {
|
impl Drop for LocalEnterGuard {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
CURRENT.with(|ctx| {
|
CURRENT.with(|ctx| {
|
||||||
ctx.replace(self.0.take());
|
ctx.set(self.0.take());
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -615,12 +617,12 @@ impl LocalSet {
|
|||||||
fn with<T>(&self, f: impl FnOnce() -> T) -> T {
|
fn with<T>(&self, f: impl FnOnce() -> T) -> T {
|
||||||
CURRENT.with(|ctx| {
|
CURRENT.with(|ctx| {
|
||||||
struct Reset<'a> {
|
struct Reset<'a> {
|
||||||
ctx_ref: &'a Cell<Option<Rc<Context>>>,
|
ctx_ref: &'a RcCell<Context>,
|
||||||
val: Option<Rc<Context>>,
|
val: Option<Rc<Context>>,
|
||||||
}
|
}
|
||||||
impl<'a> Drop for Reset<'a> {
|
impl<'a> Drop for Reset<'a> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.ctx_ref.replace(self.val.take());
|
self.ctx_ref.set(self.val.take());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let old = ctx.replace(Some(self.context.clone()));
|
let old = ctx.replace(Some(self.context.clone()));
|
||||||
@ -822,19 +824,11 @@ impl<T: Future> Future for RunUntil<'_, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clone_rc<T>(rc: &Cell<Option<Rc<T>>>) -> Option<Rc<T>> {
|
|
||||||
let value = rc.take();
|
|
||||||
let cloned = value.clone();
|
|
||||||
rc.set(value);
|
|
||||||
cloned
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Shared {
|
impl Shared {
|
||||||
/// Schedule the provided task on the scheduler.
|
/// Schedule the provided task on the scheduler.
|
||||||
fn schedule(&self, task: task::Notified<Arc<Self>>) {
|
fn schedule(&self, task: task::Notified<Arc<Self>>) {
|
||||||
CURRENT.with(|maybe_cx| {
|
CURRENT.with(|maybe_cx| {
|
||||||
let ctx = clone_rc(maybe_cx);
|
match maybe_cx.get() {
|
||||||
match ctx {
|
|
||||||
Some(cx) if cx.shared.ptr_eq(self) => {
|
Some(cx) if cx.shared.ptr_eq(self) => {
|
||||||
cx.queue.push_back(task);
|
cx.queue.push_back(task);
|
||||||
}
|
}
|
||||||
@ -861,14 +855,11 @@ impl Shared {
|
|||||||
|
|
||||||
impl task::Schedule for Arc<Shared> {
|
impl task::Schedule for Arc<Shared> {
|
||||||
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
|
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
|
||||||
CURRENT.with(|maybe_cx| {
|
CURRENT.with(|maybe_cx| match maybe_cx.get() {
|
||||||
let ctx = clone_rc(maybe_cx);
|
None => panic!("scheduler context missing"),
|
||||||
match ctx {
|
Some(cx) => {
|
||||||
None => panic!("scheduler context missing"),
|
assert!(cx.shared.ptr_eq(self));
|
||||||
Some(cx) => {
|
cx.owned.remove(task)
|
||||||
assert!(cx.shared.ptr_eq(self));
|
|
||||||
cx.owned.remove(task)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -889,15 +880,13 @@ impl task::Schedule for Arc<Shared> {
|
|||||||
// This hook is only called from within the runtime, so
|
// This hook is only called from within the runtime, so
|
||||||
// `CURRENT` should match with `&self`, i.e. there is no
|
// `CURRENT` should match with `&self`, i.e. there is no
|
||||||
// opportunity for a nested scheduler to be called.
|
// opportunity for a nested scheduler to be called.
|
||||||
CURRENT.with(|maybe_cx| {
|
CURRENT.with(|maybe_cx| match maybe_cx.get() {
|
||||||
let ctx = clone_rc(maybe_cx);
|
|
||||||
match ctx {
|
|
||||||
Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
|
Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
|
||||||
cx.unhandled_panic.set(true);
|
cx.unhandled_panic.set(true);
|
||||||
cx.owned.close_and_shutdown_all();
|
cx.owned.close_and_shutdown_all();
|
||||||
}
|
}
|
||||||
_ => unreachable!("runtime core not set in CURRENT thread-local"),
|
_ => unreachable!("runtime core not set in CURRENT thread-local"),
|
||||||
}})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,9 @@ cfg_rt! {
|
|||||||
|
|
||||||
mod vec_deque_cell;
|
mod vec_deque_cell;
|
||||||
pub(crate) use vec_deque_cell::VecDequeCell;
|
pub(crate) use vec_deque_cell::VecDequeCell;
|
||||||
|
|
||||||
|
mod rc_cell;
|
||||||
|
pub(crate) use rc_cell::RcCell;
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg_rt_multi_thread! {
|
cfg_rt_multi_thread! {
|
||||||
|
57
tokio/src/util/rc_cell.rs
Normal file
57
tokio/src/util/rc_cell.rs
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
use crate::loom::cell::UnsafeCell;
|
||||||
|
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
/// This is exactly like `Cell<Option<Rc<T>>>`, except that it provides a `get`
|
||||||
|
/// method even though `Rc` is not `Copy`.
|
||||||
|
pub(crate) struct RcCell<T> {
|
||||||
|
inner: UnsafeCell<Option<Rc<T>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RcCell<T> {
|
||||||
|
#[cfg(not(loom))]
|
||||||
|
pub(crate) const fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: UnsafeCell::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The UnsafeCell in loom does not have a const `new` fn.
|
||||||
|
#[cfg(loom)]
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: UnsafeCell::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Safety: This method may not be called recursively.
|
||||||
|
#[inline]
|
||||||
|
unsafe fn with_inner<F, R>(&self, f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut Option<Rc<T>>) -> R,
|
||||||
|
{
|
||||||
|
// safety: This type is not Sync, so concurrent calls of this method
|
||||||
|
// cannot happen. Furthermore, the caller guarantees that the method is
|
||||||
|
// not called recursively. Finally, this is the only place that can
|
||||||
|
// create mutable references to the inner Rc. This ensures that any
|
||||||
|
// mutable references created here are exclusive.
|
||||||
|
self.inner.with_mut(|ptr| f(&mut *ptr))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get(&self) -> Option<Rc<T>> {
|
||||||
|
// safety: The `Rc::clone` method will not call any unknown user-code,
|
||||||
|
// so it will not result in a recursive call to `with_inner`.
|
||||||
|
unsafe { self.with_inner(|rc| rc.clone()) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn replace(&self, val: Option<Rc<T>>) -> Option<Rc<T>> {
|
||||||
|
// safety: No destructors or other unknown user-code will run inside the
|
||||||
|
// `with_inner` call, so no recursive call to `with_inner` can happen.
|
||||||
|
unsafe { self.with_inner(|rc| std::mem::replace(rc, val)) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set(&self, val: Option<Rc<T>>) {
|
||||||
|
let old = self.replace(val);
|
||||||
|
drop(old);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user