mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
runtime: reduce the lock contention in task spawn (#6001)
This commit is contained in:
parent
a0a58d7edd
commit
3a4aef17b2
@ -12,6 +12,7 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
|
|||||||
criterion = "0.5.1"
|
criterion = "0.5.1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
rand_chacha = "0.3"
|
rand_chacha = "0.3"
|
||||||
|
num_cpus = "1.16.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
|
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
|
||||||
|
@ -1279,7 +1279,6 @@ cfg_rt_multi_thread! {
|
|||||||
use crate::runtime::scheduler::MultiThreadAlt;
|
use crate::runtime::scheduler::MultiThreadAlt;
|
||||||
|
|
||||||
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
|
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
|
||||||
|
|
||||||
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
|
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
|
||||||
|
|
||||||
// Create the blocking pool
|
// Create the blocking pool
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::num::NonZeroU64;
|
use std::num::{NonZeroU32, NonZeroU64};
|
||||||
|
|
||||||
/// An opaque ID that uniquely identifies a runtime relative to all other currently
|
/// An opaque ID that uniquely identifies a runtime relative to all other currently
|
||||||
/// running runtimes.
|
/// running runtimes.
|
||||||
@ -39,6 +39,12 @@ impl From<NonZeroU64> for Id {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<NonZeroU32> for Id {
|
||||||
|
fn from(value: NonZeroU32) -> Self {
|
||||||
|
Id(value.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl fmt::Display for Id {
|
impl fmt::Display for Id {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
self.0.fmt(f)
|
self.0.fmt(f)
|
||||||
|
@ -132,7 +132,7 @@ impl CurrentThread {
|
|||||||
let handle = Arc::new(Handle {
|
let handle = Arc::new(Handle {
|
||||||
shared: Shared {
|
shared: Shared {
|
||||||
inject: Inject::new(),
|
inject: Inject::new(),
|
||||||
owned: OwnedTasks::new(),
|
owned: OwnedTasks::new(1),
|
||||||
woken: AtomicBool::new(false),
|
woken: AtomicBool::new(false),
|
||||||
config,
|
config,
|
||||||
scheduler_metrics: SchedulerMetrics::new(),
|
scheduler_metrics: SchedulerMetrics::new(),
|
||||||
@ -248,7 +248,7 @@ fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
|
|||||||
// Drain the OwnedTasks collection. This call also closes the
|
// Drain the OwnedTasks collection. This call also closes the
|
||||||
// collection, ensuring that no tasks are ever pushed after this
|
// collection, ensuring that no tasks are ever pushed after this
|
||||||
// call returns.
|
// call returns.
|
||||||
handle.shared.owned.close_and_shutdown_all();
|
handle.shared.owned.close_and_shutdown_all(0);
|
||||||
|
|
||||||
// Drain local queue
|
// Drain local queue
|
||||||
// We already shut down every task, so we just need to drop the task.
|
// We already shut down every task, so we just need to drop the task.
|
||||||
@ -614,7 +614,7 @@ impl Schedule for Arc<Handle> {
|
|||||||
// If `None`, the runtime is shutting down, so there is no need to signal shutdown
|
// If `None`, the runtime is shutting down, so there is no need to signal shutdown
|
||||||
if let Some(core) = core.as_mut() {
|
if let Some(core) = core.as_mut() {
|
||||||
core.unhandled_panic = true;
|
core.unhandled_panic = true;
|
||||||
self.shared.owned.close_and_shutdown_all();
|
self.shared.owned.close_and_shutdown_all(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => unreachable!("runtime core not set in CURRENT thread-local"),
|
_ => unreachable!("runtime core not set in CURRENT thread-local"),
|
||||||
|
@ -287,7 +287,7 @@ pub(super) fn create(
|
|||||||
remotes: remotes.into_boxed_slice(),
|
remotes: remotes.into_boxed_slice(),
|
||||||
inject,
|
inject,
|
||||||
idle,
|
idle,
|
||||||
owned: OwnedTasks::new(),
|
owned: OwnedTasks::new(size),
|
||||||
synced: Mutex::new(Synced {
|
synced: Mutex::new(Synced {
|
||||||
idle: idle_synced,
|
idle: idle_synced,
|
||||||
inject: inject_synced,
|
inject: inject_synced,
|
||||||
@ -548,7 +548,6 @@ impl Context {
|
|||||||
}
|
}
|
||||||
|
|
||||||
core.pre_shutdown(&self.worker);
|
core.pre_shutdown(&self.worker);
|
||||||
|
|
||||||
// Signal shutdown
|
// Signal shutdown
|
||||||
self.worker.handle.shutdown_core(core);
|
self.worker.handle.shutdown_core(core);
|
||||||
Err(())
|
Err(())
|
||||||
@ -955,8 +954,16 @@ impl Core {
|
|||||||
/// Signals all tasks to shut down, and waits for them to complete. Must run
|
/// Signals all tasks to shut down, and waits for them to complete. Must run
|
||||||
/// before we enter the single-threaded phase of shutdown processing.
|
/// before we enter the single-threaded phase of shutdown processing.
|
||||||
fn pre_shutdown(&mut self, worker: &Worker) {
|
fn pre_shutdown(&mut self, worker: &Worker) {
|
||||||
|
// Start from a random inner list
|
||||||
|
let start = self
|
||||||
|
.rand
|
||||||
|
.fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
|
||||||
// Signal to all tasks to shut down.
|
// Signal to all tasks to shut down.
|
||||||
worker.handle.shared.owned.close_and_shutdown_all();
|
worker
|
||||||
|
.handle
|
||||||
|
.shared
|
||||||
|
.owned
|
||||||
|
.close_and_shutdown_all(start as usize);
|
||||||
|
|
||||||
self.stats
|
self.stats
|
||||||
.submit(&worker.handle.shared.worker_metrics[worker.index]);
|
.submit(&worker.handle.shared.worker_metrics[worker.index]);
|
||||||
|
@ -307,7 +307,7 @@ pub(super) fn create(
|
|||||||
remotes: remotes.into_boxed_slice(),
|
remotes: remotes.into_boxed_slice(),
|
||||||
inject,
|
inject,
|
||||||
idle,
|
idle,
|
||||||
owned: OwnedTasks::new(),
|
owned: OwnedTasks::new(num_cores),
|
||||||
synced: Mutex::new(Synced {
|
synced: Mutex::new(Synced {
|
||||||
assigned_cores: (0..num_workers).map(|_| None).collect(),
|
assigned_cores: (0..num_workers).map(|_| None).collect(),
|
||||||
shutdown_cores: Vec::with_capacity(num_cores),
|
shutdown_cores: Vec::with_capacity(num_cores),
|
||||||
@ -1460,7 +1460,9 @@ impl Shared {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
|
pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
|
||||||
self.owned.close_and_shutdown_all();
|
// Start from a random inner list
|
||||||
|
let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
|
||||||
|
self.owned.close_and_shutdown_all(start as usize);
|
||||||
|
|
||||||
core.stats.submit(&self.worker_metrics[core.index]);
|
core.stats.submit(&self.worker_metrics[core.index]);
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ use std::fmt;
|
|||||||
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
|
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
|
||||||
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
|
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
|
||||||
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
|
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
|
||||||
pub struct Id(u64);
|
pub struct Id(pub(crate) u64);
|
||||||
|
|
||||||
/// Returns the [`Id`] of the currently running task.
|
/// Returns the [`Id`] of the currently running task.
|
||||||
///
|
///
|
||||||
@ -74,11 +74,22 @@ impl fmt::Display for Id {
|
|||||||
|
|
||||||
impl Id {
|
impl Id {
|
||||||
pub(crate) fn next() -> Self {
|
pub(crate) fn next() -> Self {
|
||||||
use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
|
use crate::loom::sync::atomic::Ordering::Relaxed;
|
||||||
|
use crate::loom::sync::atomic::StaticAtomicU64;
|
||||||
|
|
||||||
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
|
#[cfg(all(test, loom))]
|
||||||
|
{
|
||||||
|
crate::loom::lazy_static! {
|
||||||
|
static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
|
||||||
|
}
|
||||||
|
Self(NEXT_ID.fetch_add(1, Relaxed))
|
||||||
|
}
|
||||||
|
|
||||||
Self(NEXT_ID.fetch_add(1, Relaxed))
|
#[cfg(not(all(test, loom)))]
|
||||||
|
{
|
||||||
|
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
|
||||||
|
Self(NEXT_ID.fetch_add(1, Relaxed))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn as_u64(&self) -> u64 {
|
pub(crate) fn as_u64(&self) -> u64 {
|
||||||
|
@ -8,10 +8,11 @@
|
|||||||
|
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::loom::cell::UnsafeCell;
|
use crate::loom::cell::UnsafeCell;
|
||||||
use crate::loom::sync::Mutex;
|
|
||||||
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
|
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
|
||||||
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};
|
use crate::util::linked_list::{Link, LinkedList};
|
||||||
|
use crate::util::sharded_list;
|
||||||
|
|
||||||
|
use crate::loom::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::num::NonZeroU64;
|
use std::num::NonZeroU64;
|
||||||
|
|
||||||
@ -25,7 +26,7 @@ use std::num::NonZeroU64;
|
|||||||
// mixed up runtimes happen to have the same id.
|
// mixed up runtimes happen to have the same id.
|
||||||
|
|
||||||
cfg_has_atomic_u64! {
|
cfg_has_atomic_u64! {
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::AtomicU64;
|
||||||
|
|
||||||
static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);
|
static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);
|
||||||
|
|
||||||
@ -40,7 +41,7 @@ cfg_has_atomic_u64! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg_not_has_atomic_u64! {
|
cfg_not_has_atomic_u64! {
|
||||||
use std::sync::atomic::{AtomicU32, Ordering};
|
use std::sync::atomic::AtomicU32;
|
||||||
|
|
||||||
static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);
|
static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);
|
||||||
|
|
||||||
@ -55,30 +56,30 @@ cfg_not_has_atomic_u64! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct OwnedTasks<S: 'static> {
|
pub(crate) struct OwnedTasks<S: 'static> {
|
||||||
inner: Mutex<CountedOwnedTasksInner<S>>,
|
list: List<S>,
|
||||||
pub(crate) id: NonZeroU64,
|
pub(crate) id: NonZeroU64,
|
||||||
|
closed: AtomicBool,
|
||||||
}
|
}
|
||||||
struct CountedOwnedTasksInner<S: 'static> {
|
|
||||||
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
|
type List<S> = sharded_list::ShardedList<Task<S>, <Task<S> as Link>::Target>;
|
||||||
closed: bool,
|
|
||||||
}
|
|
||||||
pub(crate) struct LocalOwnedTasks<S: 'static> {
|
pub(crate) struct LocalOwnedTasks<S: 'static> {
|
||||||
inner: UnsafeCell<OwnedTasksInner<S>>,
|
inner: UnsafeCell<OwnedTasksInner<S>>,
|
||||||
pub(crate) id: NonZeroU64,
|
pub(crate) id: NonZeroU64,
|
||||||
_not_send_or_sync: PhantomData<*const ()>,
|
_not_send_or_sync: PhantomData<*const ()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct OwnedTasksInner<S: 'static> {
|
struct OwnedTasksInner<S: 'static> {
|
||||||
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
|
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
|
||||||
closed: bool,
|
closed: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: 'static> OwnedTasks<S> {
|
impl<S: 'static> OwnedTasks<S> {
|
||||||
pub(crate) fn new() -> Self {
|
pub(crate) fn new(num_cores: usize) -> Self {
|
||||||
|
let shard_size = Self::gen_shared_list_size(num_cores);
|
||||||
Self {
|
Self {
|
||||||
inner: Mutex::new(CountedOwnedTasksInner {
|
list: List::new(shard_size),
|
||||||
list: CountedLinkedList::new(),
|
closed: AtomicBool::new(false),
|
||||||
closed: false,
|
|
||||||
}),
|
|
||||||
id: get_next_id(),
|
id: get_next_id(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -112,16 +113,16 @@ impl<S: 'static> OwnedTasks<S> {
|
|||||||
task.header().set_owner_id(self.id);
|
task.header().set_owner_id(self.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut lock = self.inner.lock();
|
let shard = self.list.lock_shard(&task);
|
||||||
if lock.closed {
|
// Check the closed flag in the lock for ensuring all that tasks
|
||||||
drop(lock);
|
// will shut down after the OwnedTasks has been closed.
|
||||||
drop(notified);
|
if self.closed.load(Ordering::Acquire) {
|
||||||
|
drop(shard);
|
||||||
task.shutdown();
|
task.shutdown();
|
||||||
None
|
return None;
|
||||||
} else {
|
|
||||||
lock.list.push_front(task);
|
|
||||||
Some(notified)
|
|
||||||
}
|
}
|
||||||
|
shard.push(task);
|
||||||
|
Some(notified)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Asserts that the given task is owned by this OwnedTasks and convert it to
|
/// Asserts that the given task is owned by this OwnedTasks and convert it to
|
||||||
@ -129,7 +130,6 @@ impl<S: 'static> OwnedTasks<S> {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
|
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
|
||||||
debug_assert_eq!(task.header().get_owner_id(), Some(self.id));
|
debug_assert_eq!(task.header().get_owner_id(), Some(self.id));
|
||||||
|
|
||||||
// safety: All tasks bound to this OwnedTasks are Send, so it is safe
|
// safety: All tasks bound to this OwnedTasks are Send, so it is safe
|
||||||
// to poll it on this thread no matter what thread we are on.
|
// to poll it on this thread no matter what thread we are on.
|
||||||
LocalNotified {
|
LocalNotified {
|
||||||
@ -140,34 +140,34 @@ impl<S: 'static> OwnedTasks<S> {
|
|||||||
|
|
||||||
/// Shuts down all tasks in the collection. This call also closes the
|
/// Shuts down all tasks in the collection. This call also closes the
|
||||||
/// collection, preventing new items from being added.
|
/// collection, preventing new items from being added.
|
||||||
pub(crate) fn close_and_shutdown_all(&self)
|
///
|
||||||
|
/// The parameter start determines which shard this method will start at.
|
||||||
|
/// Using different values for each worker thread reduces contention.
|
||||||
|
pub(crate) fn close_and_shutdown_all(&self, start: usize)
|
||||||
where
|
where
|
||||||
S: Schedule,
|
S: Schedule,
|
||||||
{
|
{
|
||||||
// The first iteration of the loop was unrolled so it can set the
|
self.closed.store(true, Ordering::Release);
|
||||||
// closed bool.
|
for i in start..self.get_shard_size() + start {
|
||||||
let first_task = {
|
loop {
|
||||||
let mut lock = self.inner.lock();
|
let task = self.list.pop_back(i);
|
||||||
lock.closed = true;
|
match task {
|
||||||
lock.list.pop_back()
|
Some(task) => {
|
||||||
};
|
task.shutdown();
|
||||||
match first_task {
|
}
|
||||||
Some(task) => task.shutdown(),
|
None => break,
|
||||||
None => return,
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
|
||||||
let task = match self.inner.lock().list.pop_back() {
|
|
||||||
Some(task) => task,
|
|
||||||
None => return,
|
|
||||||
};
|
|
||||||
|
|
||||||
task.shutdown();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn get_shard_size(&self) -> usize {
|
||||||
|
self.list.shard_size()
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn active_tasks_count(&self) -> usize {
|
pub(crate) fn active_tasks_count(&self) -> usize {
|
||||||
self.inner.lock().list.count()
|
self.list.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
|
pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
|
||||||
@ -179,11 +179,27 @@ impl<S: 'static> OwnedTasks<S> {
|
|||||||
|
|
||||||
// safety: We just checked that the provided task is not in some other
|
// safety: We just checked that the provided task is not in some other
|
||||||
// linked list.
|
// linked list.
|
||||||
unsafe { self.inner.lock().list.remove(task.header_ptr()) }
|
unsafe { self.list.remove(task.header_ptr()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn is_empty(&self) -> bool {
|
pub(crate) fn is_empty(&self) -> bool {
|
||||||
self.inner.lock().list.is_empty()
|
self.list.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generates the size of the sharded list based on the number of worker threads.
|
||||||
|
///
|
||||||
|
/// The sharded lock design can effectively alleviate
|
||||||
|
/// lock contention performance problems caused by high concurrency.
|
||||||
|
///
|
||||||
|
/// However, as the number of shards increases, the memory continuity between
|
||||||
|
/// nodes in the intrusive linked list will diminish. Furthermore,
|
||||||
|
/// the construction time of the sharded list will also increase with a higher number of shards.
|
||||||
|
///
|
||||||
|
/// Due to the above reasons, we set a maximum value for the shared list size,
|
||||||
|
/// denoted as `MAX_SHARED_LIST_SIZE`.
|
||||||
|
fn gen_shared_list_size(num_cores: usize) -> usize {
|
||||||
|
const MAX_SHARED_LIST_SIZE: usize = 1 << 16;
|
||||||
|
usize::min(MAX_SHARED_LIST_SIZE, num_cores.next_power_of_two() * 4)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,9 +208,9 @@ cfg_taskdump! {
|
|||||||
/// Locks the tasks, and calls `f` on an iterator over them.
|
/// Locks the tasks, and calls `f` on an iterator over them.
|
||||||
pub(crate) fn for_each<F>(&self, f: F)
|
pub(crate) fn for_each<F>(&self, f: F)
|
||||||
where
|
where
|
||||||
F: FnMut(&Task<S>)
|
F: FnMut(&Task<S>),
|
||||||
{
|
{
|
||||||
self.inner.lock().list.for_each(f)
|
self.list.for_each(f);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -208,6 +208,7 @@ cfg_taskdump! {
|
|||||||
|
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::util::linked_list;
|
use crate::util::linked_list;
|
||||||
|
use crate::util::sharded_list;
|
||||||
|
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::ptr::NonNull;
|
use std::ptr::NonNull;
|
||||||
@ -503,3 +504,16 @@ unsafe impl<S> linked_list::Link for Task<S> {
|
|||||||
self::core::Trailer::addr_of_owned(Header::get_trailer(target))
|
self::core::Trailer::addr_of_owned(Header::get_trailer(target))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// The id of a task is never changed after creation of the task, so the return value of
|
||||||
|
/// `get_shard_id` will not change. (The cast may throw away the upper 32 bits of the task id, but
|
||||||
|
/// the shard id still won't change from call to call.)
|
||||||
|
unsafe impl<S> sharded_list::ShardedListItem for Task<S> {
|
||||||
|
unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize {
|
||||||
|
// SAFETY: The caller guarantees that `target` points at a valid task.
|
||||||
|
let task_id = unsafe { Header::get_id(target) };
|
||||||
|
task_id.0 as usize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -241,7 +241,7 @@ fn with(f: impl FnOnce(Runtime)) {
|
|||||||
let _reset = Reset;
|
let _reset = Reset;
|
||||||
|
|
||||||
let rt = Runtime(Arc::new(Inner {
|
let rt = Runtime(Arc::new(Inner {
|
||||||
owned: OwnedTasks::new(),
|
owned: OwnedTasks::new(16),
|
||||||
core: Mutex::new(Core {
|
core: Mutex::new(Core {
|
||||||
queue: VecDeque::new(),
|
queue: VecDeque::new(),
|
||||||
}),
|
}),
|
||||||
@ -308,14 +308,13 @@ impl Runtime {
|
|||||||
fn shutdown(&self) {
|
fn shutdown(&self) {
|
||||||
let mut core = self.0.core.try_lock().unwrap();
|
let mut core = self.0.core.try_lock().unwrap();
|
||||||
|
|
||||||
self.0.owned.close_and_shutdown_all();
|
self.0.owned.close_and_shutdown_all(0);
|
||||||
|
|
||||||
while let Some(task) = core.queue.pop_back() {
|
while let Some(task) = core.queue.pop_back() {
|
||||||
drop(task);
|
drop(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(core);
|
drop(core);
|
||||||
|
|
||||||
assert!(self.0.owned.is_empty());
|
assert!(self.0.owned.is_empty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -228,53 +228,6 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl CountedLinkedList ====
|
|
||||||
|
|
||||||
// Delegates operations to the base LinkedList implementation, and adds a counter to the elements
|
|
||||||
// in the list.
|
|
||||||
pub(crate) struct CountedLinkedList<L: Link, T> {
|
|
||||||
list: LinkedList<L, T>,
|
|
||||||
count: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<L: Link> CountedLinkedList<L, L::Target> {
|
|
||||||
pub(crate) fn new() -> CountedLinkedList<L, L::Target> {
|
|
||||||
CountedLinkedList {
|
|
||||||
list: LinkedList::new(),
|
|
||||||
count: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn push_front(&mut self, val: L::Handle) {
|
|
||||||
self.list.push_front(val);
|
|
||||||
self.count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
|
|
||||||
let val = self.list.pop_back();
|
|
||||||
if val.is_some() {
|
|
||||||
self.count -= 1;
|
|
||||||
}
|
|
||||||
val
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn is_empty(&self) -> bool {
|
|
||||||
self.list.is_empty()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> {
|
|
||||||
let val = self.list.remove(node);
|
|
||||||
if val.is_some() {
|
|
||||||
self.count -= 1;
|
|
||||||
}
|
|
||||||
val
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn count(&self) -> usize {
|
|
||||||
self.count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(any(
|
#[cfg(any(
|
||||||
feature = "fs",
|
feature = "fs",
|
||||||
feature = "rt",
|
feature = "rt",
|
||||||
@ -342,22 +295,11 @@ cfg_io_driver_impl! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg_taskdump! {
|
cfg_taskdump! {
|
||||||
impl<T: Link> CountedLinkedList<T, T::Target> {
|
|
||||||
pub(crate) fn for_each<F>(&mut self, f: F)
|
|
||||||
where
|
|
||||||
F: FnMut(&T::Handle),
|
|
||||||
{
|
|
||||||
self.list.for_each(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Link> LinkedList<T, T::Target> {
|
impl<T: Link> LinkedList<T, T::Target> {
|
||||||
pub(crate) fn for_each<F>(&mut self, mut f: F)
|
pub(crate) fn for_each<F>(&mut self, mut f: F)
|
||||||
where
|
where
|
||||||
F: FnMut(&T::Handle),
|
F: FnMut(&T::Handle),
|
||||||
{
|
{
|
||||||
use std::mem::ManuallyDrop;
|
|
||||||
|
|
||||||
let mut next = self.head;
|
let mut next = self.head;
|
||||||
|
|
||||||
while let Some(curr) = next {
|
while let Some(curr) = next {
|
||||||
@ -796,26 +738,6 @@ pub(crate) mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn count() {
|
|
||||||
let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new();
|
|
||||||
assert_eq!(0, list.count());
|
|
||||||
|
|
||||||
let a = entry(5);
|
|
||||||
let b = entry(7);
|
|
||||||
list.push_front(a.as_ref());
|
|
||||||
list.push_front(b.as_ref());
|
|
||||||
assert_eq!(2, list.count());
|
|
||||||
|
|
||||||
list.pop_back();
|
|
||||||
assert_eq!(1, list.count());
|
|
||||||
|
|
||||||
unsafe {
|
|
||||||
list.remove(ptr(&b));
|
|
||||||
}
|
|
||||||
assert_eq!(0, list.count());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module.
|
/// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module.
|
||||||
#[cfg(fuzzing)]
|
#[cfg(fuzzing)]
|
||||||
pub fn fuzz_linked_list(ops: &[u8]) {
|
pub fn fuzz_linked_list(ops: &[u8]) {
|
||||||
|
@ -42,6 +42,10 @@ pub(crate) use wake_list::WakeList;
|
|||||||
))]
|
))]
|
||||||
pub(crate) mod linked_list;
|
pub(crate) mod linked_list;
|
||||||
|
|
||||||
|
cfg_rt! {
|
||||||
|
pub(crate) mod sharded_list;
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(any(feature = "rt", feature = "macros"))]
|
#[cfg(any(feature = "rt", feature = "macros"))]
|
||||||
pub(crate) mod rand;
|
pub(crate) mod rand;
|
||||||
|
|
||||||
|
149
tokio/src/util/sharded_list.rs
Normal file
149
tokio/src/util/sharded_list.rs
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
use std::ptr::NonNull;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
|
use crate::loom::sync::{Mutex, MutexGuard};
|
||||||
|
use std::sync::atomic::AtomicUsize;
|
||||||
|
|
||||||
|
use super::linked_list::{Link, LinkedList};
|
||||||
|
|
||||||
|
/// An intrusive linked list supporting highly concurrent updates.
|
||||||
|
///
|
||||||
|
/// It currently relies on `LinkedList`, so it is the caller's
|
||||||
|
/// responsibility to ensure the list is empty before dropping it.
|
||||||
|
///
|
||||||
|
/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
|
||||||
|
pub(crate) struct ShardedList<L, T> {
|
||||||
|
lists: Box<[Mutex<LinkedList<L, T>>]>,
|
||||||
|
count: AtomicUsize,
|
||||||
|
shard_mask: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Determines which linked list an item should be stored in.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// Implementations must guarantee that the id of an item does not change from
|
||||||
|
/// call to call.
|
||||||
|
pub(crate) unsafe trait ShardedListItem: Link {
|
||||||
|
/// # Safety
|
||||||
|
/// The provided pointer must point at a valid list item.
|
||||||
|
unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L, T> ShardedList<L, T> {
|
||||||
|
/// Creates a new and empty sharded linked list with the specified size.
|
||||||
|
pub(crate) fn new(sharded_size: usize) -> Self {
|
||||||
|
assert!(sharded_size.is_power_of_two());
|
||||||
|
|
||||||
|
let shard_mask = sharded_size - 1;
|
||||||
|
let mut lists = Vec::with_capacity(sharded_size);
|
||||||
|
for _ in 0..sharded_size {
|
||||||
|
lists.push(Mutex::new(LinkedList::<L, T>::new()))
|
||||||
|
}
|
||||||
|
Self {
|
||||||
|
lists: lists.into_boxed_slice(),
|
||||||
|
count: AtomicUsize::new(0),
|
||||||
|
shard_mask,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Used to get the lock of shard.
|
||||||
|
pub(crate) struct ShardGuard<'a, L, T> {
|
||||||
|
lock: MutexGuard<'a, LinkedList<L, T>>,
|
||||||
|
count: &'a AtomicUsize,
|
||||||
|
id: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L: ShardedListItem> ShardedList<L, L::Target> {
|
||||||
|
/// Removes the last element from a list specified by shard_id and returns it, or None if it is
|
||||||
|
/// empty.
|
||||||
|
pub(crate) fn pop_back(&self, shard_id: usize) -> Option<L::Handle> {
|
||||||
|
let mut lock = self.shard_inner(shard_id);
|
||||||
|
let node = lock.pop_back();
|
||||||
|
if node.is_some() {
|
||||||
|
self.count.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
node
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes the specified node from the list.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// The caller **must** ensure that exactly one of the following is true:
|
||||||
|
/// - `node` is currently contained by `self`,
|
||||||
|
/// - `node` is not contained by any list,
|
||||||
|
/// - `node` is currently contained by some other `GuardedLinkedList`.
|
||||||
|
pub(crate) unsafe fn remove(&self, node: NonNull<L::Target>) -> Option<L::Handle> {
|
||||||
|
let id = L::get_shard_id(node);
|
||||||
|
let mut lock = self.shard_inner(id);
|
||||||
|
// SAFETY: Since the shard id cannot change, it's not possible for this node
|
||||||
|
// to be in any other list of the same sharded list.
|
||||||
|
let node = unsafe { lock.remove(node) };
|
||||||
|
if node.is_some() {
|
||||||
|
self.count.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
node
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the lock of ShardedList, makes us have the write permission.
|
||||||
|
pub(crate) fn lock_shard(&self, val: &L::Handle) -> ShardGuard<'_, L, L::Target> {
|
||||||
|
let id = unsafe { L::get_shard_id(L::as_raw(val)) };
|
||||||
|
ShardGuard {
|
||||||
|
lock: self.shard_inner(id),
|
||||||
|
count: &self.count,
|
||||||
|
id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the count of elements in this list.
|
||||||
|
pub(crate) fn len(&self) -> usize {
|
||||||
|
self.count.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns whether the linked list does not contain any node.
|
||||||
|
pub(crate) fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the shard size of this SharedList.
|
||||||
|
///
|
||||||
|
/// Used to help us to decide the parameter `shard_id` of the `pop_back` method.
|
||||||
|
pub(crate) fn shard_size(&self) -> usize {
|
||||||
|
self.shard_mask + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn shard_inner(&self, id: usize) -> MutexGuard<'_, LinkedList<L, <L as Link>::Target>> {
|
||||||
|
// Safety: This modulo operation ensures that the index is not out of bounds.
|
||||||
|
unsafe { self.lists.get_unchecked(id & self.shard_mask).lock() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
|
||||||
|
/// Push a value to this shard.
|
||||||
|
pub(crate) fn push(mut self, val: L::Handle) {
|
||||||
|
let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
|
||||||
|
assert_eq!(id, self.id);
|
||||||
|
self.lock.push_front(val);
|
||||||
|
self.count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg_taskdump! {
|
||||||
|
impl<L: ShardedListItem> ShardedList<L, L::Target> {
|
||||||
|
pub(crate) fn for_each<F>(&self, mut f: F)
|
||||||
|
where
|
||||||
|
F: FnMut(&L::Handle),
|
||||||
|
{
|
||||||
|
let mut guards = Vec::with_capacity(self.lists.len());
|
||||||
|
for list in self.lists.iter() {
|
||||||
|
guards.push(list.lock());
|
||||||
|
}
|
||||||
|
for g in &mut guards {
|
||||||
|
g.for_each(&mut f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user