mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
rt: batch pop from injection queue when idle (#5705)
In the multi-threaded scheduler, when there are no tasks on the local queue, a worker will attempt to pull tasks from the injection queue. Previously, the worker would only attempt to poll one task from the injection queue then continue trying to find work from other sources. This can result in the injection queue backing up when there are many tasks being scheduled from outside of the runtime. This patch updates the worker to try to poll more than one task from the injection queue when it has no more local work. Note that we also don't want a single worker to poll **all** tasks on the injection queue as that would result in work becoming unbalanced.
This commit is contained in:
parent
93bde0870f
commit
3a94eb0893
22
.github/workflows/loom.yml
vendored
22
.github/workflows/loom.yml
vendored
@ -24,13 +24,19 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
scope:
|
||||
- --skip loom_pool
|
||||
- loom_pool::group_a
|
||||
- loom_pool::group_b
|
||||
- loom_pool::group_c
|
||||
- loom_pool::group_d
|
||||
- time::driver
|
||||
include:
|
||||
- scope: --skip loom_pool
|
||||
max_preemptions: 2
|
||||
- scope: loom_pool::group_a
|
||||
max_preemptions: 1
|
||||
- scope: loom_pool::group_b
|
||||
max_preemptions: 2
|
||||
- scope: loom_pool::group_c
|
||||
max_preemptions: 1
|
||||
- scope: loom_pool::group_d
|
||||
max_preemptions: 1
|
||||
- scope: time::driver
|
||||
max_preemptions: 2
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Install Rust ${{ env.rust_stable }}
|
||||
@ -43,6 +49,6 @@ jobs:
|
||||
working-directory: tokio
|
||||
env:
|
||||
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings
|
||||
LOOM_MAX_PREEMPTIONS: 2
|
||||
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
|
||||
LOOM_MAX_BRANCHES: 10000
|
||||
SCOPE: ${{ matrix.scope }}
|
||||
|
@ -10,9 +10,10 @@ use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
use std::sync::{mpsc, Arc};
|
||||
|
||||
fn spawn_many(b: &mut Bencher) {
|
||||
const NUM_SPAWN: usize = 10_000;
|
||||
const NUM_WORKERS: usize = 4;
|
||||
const NUM_SPAWN: usize = 10_000;
|
||||
|
||||
fn spawn_many_local(b: &mut Bencher) {
|
||||
let rt = rt();
|
||||
|
||||
let (tx, rx) = mpsc::sync_channel(1000);
|
||||
@ -38,6 +39,52 @@ fn spawn_many(b: &mut Bencher) {
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_many_remote_idle(b: &mut Bencher) {
|
||||
let rt = rt();
|
||||
|
||||
let mut handles = Vec::with_capacity(NUM_SPAWN);
|
||||
|
||||
b.iter(|| {
|
||||
for _ in 0..NUM_SPAWN {
|
||||
handles.push(rt.spawn(async {}));
|
||||
}
|
||||
|
||||
rt.block_on(async {
|
||||
for handle in handles.drain(..) {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_many_remote_busy(b: &mut Bencher) {
|
||||
let rt = rt();
|
||||
let rt_handle = rt.handle();
|
||||
let mut handles = Vec::with_capacity(NUM_SPAWN);
|
||||
|
||||
// Spawn some tasks to keep the runtimes busy
|
||||
for _ in 0..(2 * NUM_WORKERS) {
|
||||
rt.spawn(async {
|
||||
loop {
|
||||
tokio::task::yield_now().await;
|
||||
std::thread::sleep(std::time::Duration::from_micros(10));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
b.iter(|| {
|
||||
for _ in 0..NUM_SPAWN {
|
||||
handles.push(rt_handle.spawn(async {}));
|
||||
}
|
||||
|
||||
rt.block_on(async {
|
||||
for handle in handles.drain(..) {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
fn yield_many(b: &mut Bencher) {
|
||||
const NUM_YIELD: usize = 1_000;
|
||||
const TASKS: usize = 200;
|
||||
@ -140,12 +187,20 @@ fn chained_spawn(b: &mut Bencher) {
|
||||
|
||||
fn rt() -> Runtime {
|
||||
runtime::Builder::new_multi_thread()
|
||||
.worker_threads(4)
|
||||
.worker_threads(NUM_WORKERS)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,);
|
||||
benchmark_group!(
|
||||
scheduler,
|
||||
spawn_many_local,
|
||||
spawn_many_remote_idle,
|
||||
spawn_many_remote_busy,
|
||||
ping_pong,
|
||||
yield_many,
|
||||
chained_spawn,
|
||||
);
|
||||
|
||||
benchmark_main!(scheduler);
|
||||
|
@ -110,6 +110,15 @@ impl<T> Local<T> {
|
||||
!self.inner.is_empty()
|
||||
}
|
||||
|
||||
/// How many tasks can be pushed into the queue
|
||||
pub(crate) fn remaining_slots(&self) -> usize {
|
||||
self.inner.remaining_slots()
|
||||
}
|
||||
|
||||
pub(crate) fn max_capacity(&self) -> usize {
|
||||
LOCAL_QUEUE_CAPACITY
|
||||
}
|
||||
|
||||
/// Returns false if there are any entries in the queue
|
||||
///
|
||||
/// Separate to is_stealable so that refactors of is_stealable to "protect"
|
||||
@ -118,8 +127,62 @@ impl<T> Local<T> {
|
||||
!self.inner.is_empty()
|
||||
}
|
||||
|
||||
/// Pushes a task to the back of the local queue, skipping the LIFO slot.
|
||||
pub(crate) fn push_back(
|
||||
/// Pushes a batch of tasks to the back of the queue. All tasks must fit in
|
||||
/// the local queue.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The method panics if there is not enough capacity to fit in the queue.
|
||||
pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
|
||||
let len = tasks.len();
|
||||
assert!(len <= LOCAL_QUEUE_CAPACITY);
|
||||
|
||||
if len == 0 {
|
||||
// Nothing to do
|
||||
return;
|
||||
}
|
||||
|
||||
let head = self.inner.head.load(Acquire);
|
||||
let (steal, _) = unpack(head);
|
||||
|
||||
// safety: this is the **only** thread that updates this cell.
|
||||
let mut tail = unsafe { self.inner.tail.unsync_load() };
|
||||
|
||||
if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort {
|
||||
// Yes, this if condition is structured a bit weird (first block
|
||||
// does nothing, second returns an error). It is this way to match
|
||||
// `push_back_or_overflow`.
|
||||
} else {
|
||||
panic!()
|
||||
}
|
||||
|
||||
for task in tasks {
|
||||
let idx = tail as usize & MASK;
|
||||
|
||||
self.inner.buffer[idx].with_mut(|ptr| {
|
||||
// Write the task to the slot
|
||||
//
|
||||
// Safety: There is only one producer and the above `if`
|
||||
// condition ensures we don't touch a cell if there is a
|
||||
// value, thus no consumer.
|
||||
unsafe {
|
||||
ptr::write((*ptr).as_mut_ptr(), task);
|
||||
}
|
||||
});
|
||||
|
||||
tail = tail.wrapping_add(1);
|
||||
}
|
||||
|
||||
self.inner.tail.store(tail, Release);
|
||||
}
|
||||
|
||||
/// Pushes a task to the back of the local queue, if there is not enough
|
||||
/// capacity in the queue, this triggers the overflow operation.
|
||||
///
|
||||
/// When the queue overflows, half of the curent contents of the queue is
|
||||
/// moved to the given Injection queue. This frees up capacity for more
|
||||
/// tasks to be pushed into the local queue.
|
||||
pub(crate) fn push_back_or_overflow(
|
||||
&mut self,
|
||||
mut task: task::Notified<T>,
|
||||
inject: &Inject<T>,
|
||||
@ -153,6 +216,11 @@ impl<T> Local<T> {
|
||||
}
|
||||
};
|
||||
|
||||
self.push_back_finish(task, tail);
|
||||
}
|
||||
|
||||
// Second half of `push_back`
|
||||
fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
|
||||
// Map the position to a slot index.
|
||||
let idx = tail as usize & MASK;
|
||||
|
||||
@ -501,6 +569,13 @@ impl<T> Drop for Local<T> {
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
fn remaining_slots(&self) -> usize {
|
||||
let (steal, _) = unpack(self.head.load(Acquire));
|
||||
let tail = self.tail.load(Acquire);
|
||||
|
||||
LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize)
|
||||
}
|
||||
|
||||
fn len(&self) -> UnsignedShort {
|
||||
let (_, head) = unpack(self.head.load(Acquire));
|
||||
let tail = self.tail.load(Acquire);
|
||||
|
@ -509,8 +509,11 @@ impl Context {
|
||||
} else {
|
||||
// Not enough budget left to run the LIFO task, push it to
|
||||
// the back of the queue and return.
|
||||
core.run_queue
|
||||
.push_back(task, self.worker.inject(), &mut core.metrics);
|
||||
core.run_queue.push_back_or_overflow(
|
||||
task,
|
||||
self.worker.inject(),
|
||||
&mut core.metrics,
|
||||
);
|
||||
return Ok(core);
|
||||
}
|
||||
}
|
||||
@ -612,7 +615,38 @@ impl Core {
|
||||
if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
|
||||
worker.inject().pop().or_else(|| self.next_local_task())
|
||||
} else {
|
||||
self.next_local_task().or_else(|| worker.inject().pop())
|
||||
let maybe_task = self.next_local_task();
|
||||
|
||||
if maybe_task.is_some() {
|
||||
return maybe_task;
|
||||
}
|
||||
|
||||
// Other threads can only **remove** tasks from the current worker's
|
||||
// `run_queue`. So, we can be confident that by the time we call
|
||||
// `run_queue.push_back` below, there will be *at least* `cap`
|
||||
// available slots in the queue.
|
||||
let cap = usize::min(
|
||||
self.run_queue.remaining_slots(),
|
||||
self.run_queue.max_capacity() / 2,
|
||||
);
|
||||
|
||||
// The worker is currently idle, pull a batch of work from the
|
||||
// injection queue. We don't want to pull *all* the work so other
|
||||
// workers can also get some.
|
||||
let n = usize::min(
|
||||
worker.inject().len() / worker.handle.shared.remotes.len() + 1,
|
||||
cap,
|
||||
);
|
||||
|
||||
let mut tasks = worker.inject().pop_n(n);
|
||||
|
||||
// Pop the first task to return immedietly
|
||||
let ret = tasks.next();
|
||||
|
||||
// Push the rest of the on the run queue
|
||||
self.run_queue.push_back(tasks);
|
||||
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
@ -808,7 +842,7 @@ impl Handle {
|
||||
// flexibility and the task may go to the front of the queue.
|
||||
let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
|
||||
core.run_queue
|
||||
.push_back(task, &self.shared.inject, &mut core.metrics);
|
||||
.push_back_or_overflow(task, &self.shared.inject, &mut core.metrics);
|
||||
true
|
||||
} else {
|
||||
// Push to the LIFO slot
|
||||
@ -817,7 +851,7 @@ impl Handle {
|
||||
|
||||
if let Some(prev) = prev {
|
||||
core.run_queue
|
||||
.push_back(prev, &self.shared.inject, &mut core.metrics);
|
||||
.push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics);
|
||||
}
|
||||
|
||||
core.lifo_slot = Some(task);
|
||||
|
@ -1,7 +1,7 @@
|
||||
//! Inject queue used to send wakeups to a work-stealing scheduler
|
||||
|
||||
use crate::loom::sync::atomic::AtomicUsize;
|
||||
use crate::loom::sync::Mutex;
|
||||
use crate::loom::sync::{Mutex, MutexGuard};
|
||||
use crate::runtime::task;
|
||||
|
||||
use std::marker::PhantomData;
|
||||
@ -32,6 +32,12 @@ struct Pointers {
|
||||
tail: Option<NonNull<task::Header>>,
|
||||
}
|
||||
|
||||
pub(crate) struct Pop<'a, T: 'static> {
|
||||
len: usize,
|
||||
pointers: Option<MutexGuard<'a, Pointers>>,
|
||||
_p: PhantomData<T>,
|
||||
}
|
||||
|
||||
unsafe impl<T> Send for Inject<T> {}
|
||||
unsafe impl<T> Sync for Inject<T> {}
|
||||
|
||||
@ -107,34 +113,38 @@ impl<T: 'static> Inject<T> {
|
||||
}
|
||||
|
||||
pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
|
||||
self.pop_n(1).next()
|
||||
}
|
||||
|
||||
pub(crate) fn pop_n(&self, n: usize) -> Pop<'_, T> {
|
||||
use std::cmp;
|
||||
|
||||
// Fast path, if len == 0, then there are no values
|
||||
if self.is_empty() {
|
||||
return None;
|
||||
return Pop {
|
||||
len: 0,
|
||||
pointers: None,
|
||||
_p: PhantomData,
|
||||
};
|
||||
}
|
||||
|
||||
let mut p = self.pointers.lock();
|
||||
// Lock the queue
|
||||
let p = self.pointers.lock();
|
||||
|
||||
// It is possible to hit null here if another thread popped the last
|
||||
// task between us checking `len` and acquiring the lock.
|
||||
let task = p.head?;
|
||||
|
||||
p.head = get_next(task);
|
||||
|
||||
if p.head.is_none() {
|
||||
p.tail = None;
|
||||
}
|
||||
|
||||
set_next(task, None);
|
||||
|
||||
// Decrement the count.
|
||||
//
|
||||
// safety: All updates to the len atomic are guarded by the mutex. As
|
||||
// such, a non-atomic load followed by a store is safe.
|
||||
self.len
|
||||
.store(unsafe { self.len.unsync_load() } - 1, Release);
|
||||
let len = unsafe { self.len.unsync_load() };
|
||||
|
||||
// safety: a `Notified` is pushed into the queue and now it is popped!
|
||||
Some(unsafe { task::Notified::from_raw(task) })
|
||||
let n = cmp::min(n, len);
|
||||
|
||||
// Decrement the count.
|
||||
self.len.store(len - n, Release);
|
||||
|
||||
Pop {
|
||||
len: n,
|
||||
pointers: Some(p),
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -215,6 +225,63 @@ impl<T: 'static> Drop for Inject<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: 'static> Iterator for Pop<'a, T> {
|
||||
type Item = task::Notified<T>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.len == 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// `pointers` is always `Some` when `len() > 0`
|
||||
let pointers = self.pointers.as_mut().unwrap();
|
||||
let ret = pointers.pop();
|
||||
|
||||
debug_assert!(ret.is_some());
|
||||
|
||||
self.len -= 1;
|
||||
|
||||
if self.len == 0 {
|
||||
self.pointers = None;
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(self.len, Some(self.len))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: 'static> ExactSizeIterator for Pop<'a, T> {
|
||||
fn len(&self) -> usize {
|
||||
self.len
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: 'static> Drop for Pop<'a, T> {
|
||||
fn drop(&mut self) {
|
||||
for _ in self.by_ref() {}
|
||||
}
|
||||
}
|
||||
|
||||
impl Pointers {
|
||||
fn pop<T: 'static>(&mut self) -> Option<task::Notified<T>> {
|
||||
let task = self.head?;
|
||||
|
||||
self.head = get_next(task);
|
||||
|
||||
if self.head.is_none() {
|
||||
self.tail = None;
|
||||
}
|
||||
|
||||
set_next(task, None);
|
||||
|
||||
// safety: a `Notified` is pushed into the queue and now it is popped!
|
||||
Some(unsafe { task::Notified::from_raw(task) })
|
||||
}
|
||||
}
|
||||
|
||||
fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
|
||||
unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
|
||||
}
|
||||
|
38
tokio/src/runtime/tests/inject.rs
Normal file
38
tokio/src/runtime/tests/inject.rs
Normal file
@ -0,0 +1,38 @@
|
||||
use crate::runtime::task::Inject;
|
||||
|
||||
#[test]
|
||||
fn push_and_pop() {
|
||||
let inject = Inject::new();
|
||||
|
||||
for _ in 0..10 {
|
||||
let (task, _) = super::unowned(async {});
|
||||
inject.push(task);
|
||||
}
|
||||
|
||||
for _ in 0..10 {
|
||||
assert!(inject.pop().is_some());
|
||||
}
|
||||
|
||||
assert!(inject.pop().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn push_batch_and_pop() {
|
||||
let inject = Inject::new();
|
||||
|
||||
inject.push_batch((0..10).map(|_| super::unowned(async {}).0));
|
||||
|
||||
assert_eq!(5, inject.pop_n(5).count());
|
||||
assert_eq!(5, inject.pop_n(5).count());
|
||||
assert_eq!(0, inject.pop_n(5).count());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pop_n_drains_on_drop() {
|
||||
let inject = Inject::new();
|
||||
|
||||
inject.push_batch((0..10).map(|_| super::unowned(async {}).0));
|
||||
let _ = inject.pop_n(10);
|
||||
|
||||
assert_eq!(inject.len(), 0);
|
||||
}
|
@ -39,7 +39,7 @@ fn basic() {
|
||||
for _ in 0..2 {
|
||||
for _ in 0..2 {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
if local.pop().is_some() {
|
||||
@ -48,7 +48,7 @@ fn basic() {
|
||||
|
||||
// Push another task
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
|
||||
while local.pop().is_some() {
|
||||
n += 1;
|
||||
@ -92,7 +92,7 @@ fn steal_overflow() {
|
||||
|
||||
// push a task, pop a task
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
|
||||
if local.pop().is_some() {
|
||||
n += 1;
|
||||
@ -100,7 +100,7 @@ fn steal_overflow() {
|
||||
|
||||
for _ in 0..6 {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
n += th.join().unwrap();
|
||||
@ -146,7 +146,7 @@ fn multi_stealer() {
|
||||
// Push work
|
||||
for _ in 0..NUM_TASKS {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
let th1 = {
|
||||
@ -184,10 +184,10 @@ fn chained_steal() {
|
||||
// Load up some tasks
|
||||
for _ in 0..4 {
|
||||
let (task, _) = super::unowned(async {});
|
||||
l1.push_back(task, &inject, &mut metrics);
|
||||
l1.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
|
||||
let (task, _) = super::unowned(async {});
|
||||
l2.push_back(task, &inject, &mut metrics);
|
||||
l2.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
// Spawn a task to steal from **our** queue
|
||||
|
@ -63,6 +63,7 @@ cfg_loom! {
|
||||
}
|
||||
|
||||
cfg_not_loom! {
|
||||
mod inject;
|
||||
mod queue;
|
||||
|
||||
#[cfg(not(miri))]
|
||||
|
@ -27,14 +27,14 @@ fn metrics_batch() -> MetricsBatch {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fits_256() {
|
||||
fn fits_256_one_at_a_time() {
|
||||
let (_, mut local) = queue::local();
|
||||
let inject = Inject::new();
|
||||
let mut metrics = metrics_batch();
|
||||
|
||||
for _ in 0..256 {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
cfg_metrics! {
|
||||
@ -46,6 +46,44 @@ fn fits_256() {
|
||||
while local.pop().is_some() {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fits_256_all_at_once() {
|
||||
let (_, mut local) = queue::local();
|
||||
|
||||
let mut tasks = (0..256)
|
||||
.map(|_| super::unowned(async {}).0)
|
||||
.collect::<Vec<_>>();
|
||||
local.push_back(tasks.drain(..));
|
||||
|
||||
let mut i = 0;
|
||||
while local.pop().is_some() {
|
||||
i += 1;
|
||||
}
|
||||
|
||||
assert_eq!(i, 256);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fits_256_all_in_chunks() {
|
||||
let (_, mut local) = queue::local();
|
||||
|
||||
let mut tasks = (0..256)
|
||||
.map(|_| super::unowned(async {}).0)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
local.push_back(tasks.drain(..10));
|
||||
local.push_back(tasks.drain(..100));
|
||||
local.push_back(tasks.drain(..46));
|
||||
local.push_back(tasks.drain(..100));
|
||||
|
||||
let mut i = 0;
|
||||
while local.pop().is_some() {
|
||||
i += 1;
|
||||
}
|
||||
|
||||
assert_eq!(i, 256);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn overflow() {
|
||||
let (_, mut local) = queue::local();
|
||||
@ -54,7 +92,7 @@ fn overflow() {
|
||||
|
||||
for _ in 0..257 {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
cfg_metrics! {
|
||||
@ -84,7 +122,7 @@ fn steal_batch() {
|
||||
|
||||
for _ in 0..4 {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local1.push_back(task, &inject, &mut metrics);
|
||||
local1.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
assert!(steal1.steal_into(&mut local2, &mut metrics).is_some());
|
||||
@ -157,7 +195,7 @@ fn stress1() {
|
||||
for _ in 0..NUM_LOCAL {
|
||||
for _ in 0..NUM_PUSH {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
}
|
||||
|
||||
for _ in 0..NUM_POP {
|
||||
@ -215,7 +253,7 @@ fn stress2() {
|
||||
|
||||
for i in 0..NUM_TASKS {
|
||||
let (task, _) = super::unowned(async {});
|
||||
local.push_back(task, &inject, &mut metrics);
|
||||
local.push_back_or_overflow(task, &inject, &mut metrics);
|
||||
|
||||
if i % 128 == 0 && local.pop().is_some() {
|
||||
num_pop += 1;
|
||||
|
@ -510,10 +510,20 @@ fn injection_queue_depth() {
|
||||
// First we need to block the runtime workers
|
||||
let (tx1, rx1) = std::sync::mpsc::channel();
|
||||
let (tx2, rx2) = std::sync::mpsc::channel();
|
||||
let (tx3, rx3) = std::sync::mpsc::channel();
|
||||
let rx3 = Arc::new(Mutex::new(rx3));
|
||||
|
||||
rt.spawn(async move { rx1.recv().unwrap() });
|
||||
rt.spawn(async move { rx2.recv().unwrap() });
|
||||
|
||||
// Spawn some more to make sure there are items
|
||||
for _ in 0..10 {
|
||||
let rx = rx3.clone();
|
||||
rt.spawn(async move {
|
||||
rx.lock().unwrap().recv().unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
thread::spawn(move || {
|
||||
handle.spawn(async {});
|
||||
})
|
||||
@ -522,7 +532,11 @@ fn injection_queue_depth() {
|
||||
|
||||
let n = metrics.injection_queue_depth();
|
||||
assert!(1 <= n, "{}", n);
|
||||
assert!(3 >= n, "{}", n);
|
||||
assert!(15 >= n, "{}", n);
|
||||
|
||||
for _ in 0..10 {
|
||||
tx3.send(()).unwrap();
|
||||
}
|
||||
|
||||
tx1.send(()).unwrap();
|
||||
tx2.send(()).unwrap();
|
||||
|
Loading…
x
Reference in New Issue
Block a user