rt: fix nesting block_in_place with block_on (#5837)

This patch fixes a bug where nesting `block_in_place` with a `block_on`
between could lead to a panic. This happened because the nested
`block_in_place` would try to acquire a core on return when it should
not attempt to do so. The `block_on` between the two nested
`block_in_place` altered the thread-local state to lead to the incorrect
behavior.

The fix is for each call to `block_in_place` to track if it needs to try
to steal a core back.

Fixes #5239
This commit is contained in:
Carl Lerche 2023-06-29 13:47:14 -07:00 committed by GitHub
parent b573adc733
commit 6e990eb2c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 7 deletions

View File

@ -323,26 +323,32 @@ where
F: FnOnce() -> R,
{
// Try to steal the worker core back
struct Reset(coop::Budget);
struct Reset {
take_core: bool,
budget: coop::Budget,
}
impl Drop for Reset {
fn drop(&mut self) {
with_current(|maybe_cx| {
if let Some(cx) = maybe_cx {
let core = cx.worker.core.take();
let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
if self.take_core {
let core = cx.worker.core.take();
let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
}
// Reset the task budget as we are re-entering the
// runtime.
coop::set(self.0);
coop::set(self.budget);
}
});
}
}
let mut had_entered = false;
let mut take_core = false;
let setup_result = with_current(|maybe_cx| {
match (
@ -394,6 +400,10 @@ where
None => return Ok(()),
};
// We are taking the core from the context and sending it to another
// thread.
take_core = true;
// The parker should be set here
assert!(core.park.is_some());
@ -420,7 +430,10 @@ where
if had_entered {
// Unset the current task's budget. Blocking sections are not
// constrained by task budgets.
let _reset = Reset(coop::stop());
let _reset = Reset {
take_core,
budget: coop::stop(),
};
crate::runtime::context::exit_runtime(f)
} else {

View File

@ -588,6 +588,34 @@ async fn test_block_in_place4() {
tokio::task::block_in_place(|| {});
}
// Repro for tokio-rs/tokio#5239
#[test]
fn test_nested_block_in_place_with_block_on_between() {
let rt = runtime::Builder::new_multi_thread()
.worker_threads(1)
// Needs to be more than 0
.max_blocking_threads(1)
.build()
.unwrap();
// Triggered by a race condition, so run a few times to make sure it is OK.
for _ in 0..100 {
let h = rt.handle().clone();
rt.block_on(async move {
tokio::spawn(async move {
tokio::task::block_in_place(|| {
h.block_on(async {
tokio::task::block_in_place(|| {});
});
})
})
.await
.unwrap()
});
}
}
// Testing the tuning logic is tricky as it is inherently timing based, and more
// of a heuristic than an exact behavior. This test checks that the interval
// changes over time based on load factors. There are no assertions, completion