Support nested block_in_place (#2409)

This commit is contained in:
Jon Gjengset 2020-04-16 16:40:11 -04:00 committed by GitHub
parent 8381dff39b
commit 67c4cc0391
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 19 deletions

View File

@ -51,26 +51,20 @@ pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
impl Drop for Reset {
fn drop(&mut self) {
ENTERED.with(|c| {
assert!(!c.get(), "closure claimed permanent executor");
c.set(true);
});
}
}
ENTERED.with(|c| {
debug_assert!(c.get());
assert!(c.get(), "asked to exit when not entered");
c.set(false);
});
let reset = Reset;
let ret = f();
std::mem::forget(reset);
ENTERED.with(|c| {
assert!(!c.get(), "closure claimed permanent executor");
c.set(true);
});
ret
let _reset = Reset;
// dropping reset after f() will do c.set(true)
f()
}
cfg_blocking_impl! {

View File

@ -177,21 +177,33 @@ cfg_blocking! {
F: FnOnce() -> R,
{
// Try to steal the worker core back
struct Reset;
struct Reset(bool);
impl Drop for Reset {
fn drop(&mut self) {
CURRENT.with(|maybe_cx| {
if !self.0 {
// We were not the ones to give away the core,
// so we do not get to restore it either.
// This is necessary so that with a nested
// block_in_place, the inner block_in_place
// does not restore the core.
return;
}
if let Some(cx) = maybe_cx {
let core = cx.worker.core.take();
*cx.core.borrow_mut() = core;
let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
}
});
}
}
let mut had_core = false;
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("can call blocking only when running in a spawned task");
let cx = maybe_cx.expect("can call blocking only when running in a spawned task on the multi-threaded runtime");
// Get the worker core. If none is set, then blocking is fine!
let core = match cx.core.borrow_mut().take() {
@ -212,6 +224,7 @@ cfg_blocking! {
//
// First, move the core back into the worker's shared core slot.
cx.worker.core.set(core);
had_core = true;
// Next, clone the worker handle and send it to a new thread for
// processing.
@ -222,9 +235,13 @@ cfg_blocking! {
runtime::spawn_blocking(move || run(worker));
});
let _reset = Reset;
let _reset = Reset(had_core);
f()
if had_core {
crate::runtime::enter::exit(f)
} else {
f()
}
}
}

View File

@ -32,9 +32,7 @@ cfg_rt_threaded! {
where
F: FnOnce() -> R,
{
use crate::runtime::{enter, thread_pool};
enter::exit(|| thread_pool::block_in_place(f))
crate::runtime::thread_pool::block_in_place(f)
}
}

View File

@ -27,3 +27,23 @@ async fn basic_blocking() {
assert_eq!(out, "hello");
}
}
#[tokio::test(threaded_scheduler)]
async fn block_in_block() {
// Run a few times
for _ in 0..100 {
let out = assert_ok!(
tokio::spawn(async {
task::block_in_place(|| {
task::block_in_place(|| {
thread::sleep(Duration::from_millis(5));
});
"hello"
})
})
.await
);
assert_eq!(out, "hello");
}
}