From 6e990eb2c814d7f327b773714d6ea6982fdd9b9c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 29 Jun 2023 13:47:14 -0700 Subject: [PATCH] rt: fix nesting block_in_place with block_on (#5837) This patch fixes a bug where nesting `block_in_place` with a `block_on` between could lead to a panic. This happened because the nested `block_in_place` would try to acquire a core on return when it should not attempt to do so. The `block_on` between the two nested `block_in_place` altered the thread-local state to lead to the incorrect behavior. The fix is for each call to `block_in_place` to track if it needs to try to steal a core back. Fixes #5239 --- .../runtime/scheduler/multi_thread/worker.rs | 27 +++++++++++++----- tokio/tests/rt_threaded.rs | 28 +++++++++++++++++++ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 7fc335f51..6ae114633 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -323,26 +323,32 @@ where F: FnOnce() -> R, { // Try to steal the worker core back - struct Reset(coop::Budget); + struct Reset { + take_core: bool, + budget: coop::Budget, + } impl Drop for Reset { fn drop(&mut self) { with_current(|maybe_cx| { if let Some(cx) = maybe_cx { - let core = cx.worker.core.take(); - let mut cx_core = cx.core.borrow_mut(); - assert!(cx_core.is_none()); - *cx_core = core; + if self.take_core { + let core = cx.worker.core.take(); + let mut cx_core = cx.core.borrow_mut(); + assert!(cx_core.is_none()); + *cx_core = core; + } // Reset the task budget as we are re-entering the // runtime. - coop::set(self.0); + coop::set(self.budget); } }); } } let mut had_entered = false; + let mut take_core = false; let setup_result = with_current(|maybe_cx| { match ( @@ -394,6 +400,10 @@ where None => return Ok(()), }; + // We are taking the core from the context and sending it to another + // thread. + take_core = true; + // The parker should be set here assert!(core.park.is_some()); @@ -420,7 +430,10 @@ where if had_entered { // Unset the current task's budget. Blocking sections are not // constrained by task budgets. - let _reset = Reset(coop::stop()); + let _reset = Reset { + take_core, + budget: coop::stop(), + }; crate::runtime::context::exit_runtime(f) } else { diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 6631768c3..69b186947 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -588,6 +588,34 @@ async fn test_block_in_place4() { tokio::task::block_in_place(|| {}); } +// Repro for tokio-rs/tokio#5239 +#[test] +fn test_nested_block_in_place_with_block_on_between() { + let rt = runtime::Builder::new_multi_thread() + .worker_threads(1) + // Needs to be more than 0 + .max_blocking_threads(1) + .build() + .unwrap(); + + // Triggered by a race condition, so run a few times to make sure it is OK. + for _ in 0..100 { + let h = rt.handle().clone(); + + rt.block_on(async move { + tokio::spawn(async move { + tokio::task::block_in_place(|| { + h.block_on(async { + tokio::task::block_in_place(|| {}); + }); + }) + }) + .await + .unwrap() + }); + } +} + // Testing the tuning logic is tricky as it is inherently timing based, and more // of a heuristic than an exact behavior. This test checks that the interval // changes over time based on load factors. There are no assertions, completion