diff --git a/tokio/src/runtime/thread_pool/queue/global.rs b/tokio/src/runtime/thread_pool/queue/global.rs index a6f49c016..36dcc729e 100644 --- a/tokio/src/runtime/thread_pool/queue/global.rs +++ b/tokio/src/runtime/thread_pool/queue/global.rs @@ -92,6 +92,7 @@ impl Queue { // Check if the queue is closed. This must happen in the lock. let len = self.len.unsync_load(); if len & CLOSED == CLOSED { + drop(p); f(Err(task)); return; } diff --git a/tokio/src/runtime/thread_pool/tests/loom_pool.rs b/tokio/src/runtime/thread_pool/tests/loom_pool.rs index aee66e7f6..0394db166 100644 --- a/tokio/src/runtime/thread_pool/tests/loom_pool.rs +++ b/tokio/src/runtime/thread_pool/tests/loom_pool.rs @@ -168,6 +168,33 @@ fn complete_block_on_under_load() { }); } +#[test] +fn shutdown_with_notification() { + use crate::stream::StreamExt; + use crate::sync::{mpsc, oneshot}; + + loom::model(|| { + let rt = mk_pool(2); + let (done_tx, done_rx) = oneshot::channel::<()>(); + + rt.spawn(async move { + let (mut tx, mut rx) = mpsc::channel::<()>(10); + + crate::spawn(async move { + crate::task::spawn_blocking(move || { + let _ = tx.try_send(()); + }); + + let _ = done_rx.await; + }); + + while let Some(_) = rx.next().await {} + + let _ = done_tx.send(()); + }); + }); +} + fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new() .threaded_scheduler()