task: fix LocalSet failing to poll all local futures (#1905)

Currently, a `LocalSet` does not notify the `LocalFuture` again at the
end of a tick. This means that if we didn't poll every task in the run
queue during that tick (e.g. there are more than 61 tasks enqueued),
those tasks will not be polled.

This commit fixes this issue by changing `local::Scheduler::tick` to
return whether or not the local future needs to be notified again, and
waking the task if so.

Fixes #1899
Fixes #1900

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2019-12-05 12:00:10 -08:00 committed by Carl Lerche
parent dbcd1f9a09
commit b7ecd35036

View File

@ -308,7 +308,12 @@ impl<F: Future> Future for LocalFuture<F> {
return Poll::Ready(output);
}
scheduler.tick();
if scheduler.tick() {
// If `tick` returns true, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
}
Poll::Pending
}
}
@ -388,7 +393,9 @@ impl Scheduler {
.unwrap_or(false)
}
fn tick(&self) {
/// Tick the scheduler, returning whether the local future needs to be
/// notified again.
fn tick(&self) -> bool {
assert!(self.is_current());
for _ in 0..MAX_TASKS_PER_TICK {
let tick = self.tick.get().wrapping_add(1);
@ -400,7 +407,10 @@ impl Scheduler {
self.queues.next_task(tick)
} {
Some(task) => task,
None => return,
// We have fully drained the queue of notified tasks, so the
// local future doesn't need to be notified again — it can wait
// until something else wakes a task in the local set.
None => return false,
};
if let Some(task) = task.run(&mut || Some(self.into())) {
@ -411,6 +421,8 @@ impl Scheduler {
}
}
}
true
}
}
@ -435,7 +447,11 @@ impl Drop for Scheduler {
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::{runtime, task};
use crate::{
runtime,
sync::{mpsc, oneshot},
task, time,
};
use std::time::Duration;
#[test]
@ -469,7 +485,6 @@ mod tests {
fn local_threadpool_timer() {
// This test ensures that runtime services like the timer are properly
// set for the local task set.
use std::time::Duration;
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
@ -659,8 +674,6 @@ mod tests {
#[test]
fn drop_cancels_tasks() {
// This test reproduces issue #1842
use crate::sync::oneshot;
let mut rt = runtime::Builder::new()
.enable_time()
.basic_scheduler()
@ -673,7 +686,7 @@ mod tests {
local.spawn_local(async move {
started_tx.send(()).unwrap();
loop {
crate::time::delay_for(Duration::from_secs(3600)).await;
time::delay_for(Duration::from_secs(3600)).await;
}
});
@ -741,4 +754,64 @@ mod tests {
thread.join().expect("test thread should not panic!")
}
#[test]
fn local_tasks_are_polled_after_tick() {
// Reproduces issues #1899 and #1900
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
static RX1: AtomicUsize = AtomicUsize::new(0);
static RX2: AtomicUsize = AtomicUsize::new(0);
static EXPECTED: usize = 500;
let (tx, mut rx) = mpsc::unbounded_channel();
let mut rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let local = LocalSet::new();
local.block_on(&mut rt, async {
let task2 = task::spawn(async move {
// Wait a bit
time::delay_for(Duration::from_millis(100)).await;
let mut oneshots = Vec::with_capacity(EXPECTED);
// Send values
for _ in 0..EXPECTED {
let (oneshot_tx, oneshot_rx) = oneshot::channel();
oneshots.push(oneshot_tx);
tx.send(oneshot_rx).unwrap();
}
time::delay_for(Duration::from_millis(100)).await;
for tx in oneshots.drain(..) {
tx.send(()).unwrap();
}
time::delay_for(Duration::from_millis(300)).await;
let rx1 = RX1.load(SeqCst);
let rx2 = RX2.load(SeqCst);
println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
assert_eq!(EXPECTED, rx1);
assert_eq!(EXPECTED, rx2);
});
while let Some(oneshot) = rx.recv().await {
RX1.fetch_add(1, SeqCst);
task::spawn_local(async move {
oneshot.await.unwrap();
RX2.fetch_add(1, SeqCst);
});
}
task2.await.unwrap();
});
}
}