rt: fix nesting block_in_place with block_on (#5837)

This patch fixes a bug where nesting `block_in_place` with a `block_on`
between could lead to a panic. This happened because the nested
`block_in_place` would try to acquire a core on return when it should
not attempt to do so. The `block_on` between the two nested
`block_in_place` altered the thread-local state to lead to the incorrect
behavior.

The fix is for each call to `block_in_place` to track if it needs to try
to steal a core back.

Fixes #5239
This commit is contained in:
Carl Lerche 2023-06-29 13:47:14 -07:00
parent 48c55768fd
commit 012c848401
No known key found for this signature in database
GPG Key ID: FC5ADF3A4B2E5977
2 changed files with 48 additions and 7 deletions

View File

@ -323,26 +323,32 @@ where
F: FnOnce() -> R, F: FnOnce() -> R,
{ {
// Try to steal the worker core back // Try to steal the worker core back
struct Reset(coop::Budget); struct Reset {
take_core: bool,
budget: coop::Budget,
}
impl Drop for Reset { impl Drop for Reset {
fn drop(&mut self) { fn drop(&mut self) {
with_current(|maybe_cx| { with_current(|maybe_cx| {
if let Some(cx) = maybe_cx { if let Some(cx) = maybe_cx {
let core = cx.worker.core.take(); if self.take_core {
let mut cx_core = cx.core.borrow_mut(); let core = cx.worker.core.take();
assert!(cx_core.is_none()); let mut cx_core = cx.core.borrow_mut();
*cx_core = core; assert!(cx_core.is_none());
*cx_core = core;
}
// Reset the task budget as we are re-entering the // Reset the task budget as we are re-entering the
// runtime. // runtime.
coop::set(self.0); coop::set(self.budget);
} }
}); });
} }
} }
let mut had_entered = false; let mut had_entered = false;
let mut take_core = false;
let setup_result = with_current(|maybe_cx| { let setup_result = with_current(|maybe_cx| {
match ( match (
@ -394,6 +400,10 @@ where
None => return Ok(()), None => return Ok(()),
}; };
// We are taking the core from the context and sending it to another
// thread.
take_core = true;
// The parker should be set here // The parker should be set here
assert!(core.park.is_some()); assert!(core.park.is_some());
@ -420,7 +430,10 @@ where
if had_entered { if had_entered {
// Unset the current task's budget. Blocking sections are not // Unset the current task's budget. Blocking sections are not
// constrained by task budgets. // constrained by task budgets.
let _reset = Reset(coop::stop()); let _reset = Reset {
take_core,
budget: coop::stop(),
};
crate::runtime::context::exit_runtime(f) crate::runtime::context::exit_runtime(f)
} else { } else {

View File

@ -588,6 +588,34 @@ async fn test_block_in_place4() {
tokio::task::block_in_place(|| {}); tokio::task::block_in_place(|| {});
} }
// Repro for tokio-rs/tokio#5239
#[test]
fn test_nested_block_in_place_with_block_on_between() {
let rt = runtime::Builder::new_multi_thread()
.worker_threads(1)
// Needs to be more than 0
.max_blocking_threads(1)
.build()
.unwrap();
// Triggered by a race condition, so run a few times to make sure it is OK.
for _ in 0..100 {
let h = rt.handle().clone();
rt.block_on(async move {
tokio::spawn(async move {
tokio::task::block_in_place(|| {
h.block_on(async {
tokio::task::block_in_place(|| {});
});
})
})
.await
.unwrap()
});
}
}
// Testing the tuning logic is tricky as it is inherently timing based, and more // Testing the tuning logic is tricky as it is inherently timing based, and more
// of a heuristic than an exact behavior. This test checks that the interval // of a heuristic than an exact behavior. This test checks that the interval
// changes over time based on load factors. There are no assertions, completion // changes over time based on load factors. There are no assertions, completion