diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index aed5105d4..203a2f396 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -308,7 +308,12 @@ impl Future for LocalFuture { return Poll::Ready(output); } - scheduler.tick(); + if scheduler.tick() { + // If `tick` returns true, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + } + Poll::Pending } } @@ -388,7 +393,9 @@ impl Scheduler { .unwrap_or(false) } - fn tick(&self) { + /// Tick the scheduler, returning whether the local future needs to be + /// notified again. + fn tick(&self) -> bool { assert!(self.is_current()); for _ in 0..MAX_TASKS_PER_TICK { let tick = self.tick.get().wrapping_add(1); @@ -400,7 +407,10 @@ impl Scheduler { self.queues.next_task(tick) } { Some(task) => task, - None => return, + // We have fully drained the queue of notified tasks, so the + // local future doesn't need to be notified again — it can wait + // until something else wakes a task in the local set. + None => return false, }; if let Some(task) = task.run(&mut || Some(self.into())) { @@ -411,6 +421,8 @@ impl Scheduler { } } } + + true } } @@ -435,7 +447,11 @@ impl Drop for Scheduler { #[cfg(all(test, not(loom)))] mod tests { use super::*; - use crate::{runtime, task}; + use crate::{ + runtime, + sync::{mpsc, oneshot}, + task, time, + }; use std::time::Duration; #[test] @@ -469,7 +485,6 @@ mod tests { fn local_threadpool_timer() { // This test ensures that runtime services like the timer are properly // set for the local task set. - use std::time::Duration; thread_local! { static ON_RT_THREAD: Cell = Cell::new(false); } @@ -659,8 +674,6 @@ mod tests { #[test] fn drop_cancels_tasks() { // This test reproduces issue #1842 - use crate::sync::oneshot; - let mut rt = runtime::Builder::new() .enable_time() .basic_scheduler() @@ -673,7 +686,7 @@ mod tests { local.spawn_local(async move { started_tx.send(()).unwrap(); loop { - crate::time::delay_for(Duration::from_secs(3600)).await; + time::delay_for(Duration::from_secs(3600)).await; } }); @@ -741,4 +754,64 @@ mod tests { thread.join().expect("test thread should not panic!") } + + #[test] + fn local_tasks_are_polled_after_tick() { + // Reproduces issues #1899 and #1900 + use std::sync::atomic::{AtomicUsize, Ordering::SeqCst}; + + static RX1: AtomicUsize = AtomicUsize::new(0); + static RX2: AtomicUsize = AtomicUsize::new(0); + static EXPECTED: usize = 500; + + let (tx, mut rx) = mpsc::unbounded_channel(); + + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + + let local = LocalSet::new(); + + local.block_on(&mut rt, async { + let task2 = task::spawn(async move { + // Wait a bit + time::delay_for(Duration::from_millis(100)).await; + + let mut oneshots = Vec::with_capacity(EXPECTED); + + // Send values + for _ in 0..EXPECTED { + let (oneshot_tx, oneshot_rx) = oneshot::channel(); + oneshots.push(oneshot_tx); + tx.send(oneshot_rx).unwrap(); + } + + time::delay_for(Duration::from_millis(100)).await; + + for tx in oneshots.drain(..) { + tx.send(()).unwrap(); + } + + time::delay_for(Duration::from_millis(300)).await; + let rx1 = RX1.load(SeqCst); + let rx2 = RX2.load(SeqCst); + println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2); + assert_eq!(EXPECTED, rx1); + assert_eq!(EXPECTED, rx2); + }); + + while let Some(oneshot) = rx.recv().await { + RX1.fetch_add(1, SeqCst); + + task::spawn_local(async move { + oneshot.await.unwrap(); + RX2.fetch_add(1, SeqCst); + }); + } + + task2.await.unwrap(); + }); + } }