mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
rt: fix new yield_now behavior with block_in_place (#5251)
PR #5223 changed the behavior of `yield_now()` to store yielded tasks and notify them *after* polling the resource drivers. This PR fixes a couple of bugs with this new behavior when combined with `block_in_place()`. First, we need to avoid freeing the deferred task queue when exiting a runtime if it is *not* the root runtime. Because `block_in_place()` allows a user to start a new runtime from within an existing task, this check is necessary. Second, when a worker core is stolen from a thread during a `block_in_place()` call, we need to ensure that deferred tasks are notified anyway.
This commit is contained in:
parent
22862739dd
commit
d1b789f33a
@ -107,9 +107,17 @@ cfg_rt! {
|
|||||||
/// Guard tracking that a caller has entered a runtime context.
|
/// Guard tracking that a caller has entered a runtime context.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub(crate) struct EnterRuntimeGuard {
|
pub(crate) struct EnterRuntimeGuard {
|
||||||
|
/// Tracks that the current thread has entered a blocking function call.
|
||||||
pub(crate) blocking: BlockingRegionGuard,
|
pub(crate) blocking: BlockingRegionGuard,
|
||||||
|
|
||||||
#[allow(dead_code)] // Only tracking the guard.
|
#[allow(dead_code)] // Only tracking the guard.
|
||||||
pub(crate) handle: SetCurrentGuard,
|
pub(crate) handle: SetCurrentGuard,
|
||||||
|
|
||||||
|
/// If true, then this is the root runtime guard. It is possible to nest
|
||||||
|
/// runtime guards by using `block_in_place` between the calls. We need
|
||||||
|
/// to track the root guard as this is the guard responsible for freeing
|
||||||
|
/// the deferred task queue.
|
||||||
|
is_root: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Guard tracking that a caller has entered a blocking region.
|
/// Guard tracking that a caller has entered a blocking region.
|
||||||
@ -171,11 +179,19 @@ cfg_rt! {
|
|||||||
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });
|
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });
|
||||||
|
|
||||||
// Initialize queue to track yielded tasks
|
// Initialize queue to track yielded tasks
|
||||||
*c.defer.borrow_mut() = Some(Defer::new());
|
let mut defer = c.defer.borrow_mut();
|
||||||
|
|
||||||
|
let is_root = if defer.is_none() {
|
||||||
|
*defer = Some(Defer::new());
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
Some(EnterRuntimeGuard {
|
Some(EnterRuntimeGuard {
|
||||||
blocking: BlockingRegionGuard::new(),
|
blocking: BlockingRegionGuard::new(),
|
||||||
handle: c.set_current(handle),
|
handle: c.set_current(handle),
|
||||||
|
is_root,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -217,7 +233,6 @@ cfg_rt! {
|
|||||||
pub(crate) fn with_defer<R>(f: impl FnOnce(&mut Defer) -> R) -> Option<R> {
|
pub(crate) fn with_defer<R>(f: impl FnOnce(&mut Defer) -> R) -> Option<R> {
|
||||||
CONTEXT.with(|c| {
|
CONTEXT.with(|c| {
|
||||||
let mut defer = c.defer.borrow_mut();
|
let mut defer = c.defer.borrow_mut();
|
||||||
|
|
||||||
defer.as_mut().map(f)
|
defer.as_mut().map(f)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -256,7 +271,10 @@ cfg_rt! {
|
|||||||
CONTEXT.with(|c| {
|
CONTEXT.with(|c| {
|
||||||
assert!(c.runtime.get().is_entered());
|
assert!(c.runtime.get().is_entered());
|
||||||
c.runtime.set(EnterRuntime::NotEntered);
|
c.runtime.set(EnterRuntime::NotEntered);
|
||||||
|
|
||||||
|
if self.is_root {
|
||||||
*c.defer.borrow_mut() = None;
|
*c.defer.borrow_mut() = None;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -368,6 +368,22 @@ impl Launch {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn run(worker: Arc<Worker>) {
|
fn run(worker: Arc<Worker>) {
|
||||||
|
struct AbortOnPanic;
|
||||||
|
|
||||||
|
impl Drop for AbortOnPanic {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if std::thread::panicking() {
|
||||||
|
eprintln!("worker thread panicking; aborting process");
|
||||||
|
std::process::abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Catching panics on worker threads in tests is quite tricky. Instead, when
|
||||||
|
// debug assertions are enabled, we just abort the process.
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
let _abort_on_panic = AbortOnPanic;
|
||||||
|
|
||||||
// Acquire a core. If this fails, then another thread is running this
|
// Acquire a core. If this fails, then another thread is running this
|
||||||
// worker and there is nothing further to do.
|
// worker and there is nothing further to do.
|
||||||
let core = match worker.core.take() {
|
let core = match worker.core.take() {
|
||||||
@ -388,6 +404,11 @@ fn run(worker: Arc<Worker>) {
|
|||||||
// This should always be an error. It only returns a `Result` to support
|
// This should always be an error. It only returns a `Result` to support
|
||||||
// using `?` to short circuit.
|
// using `?` to short circuit.
|
||||||
assert!(cx.run(core).is_err());
|
assert!(cx.run(core).is_err());
|
||||||
|
|
||||||
|
// Check if there are any deferred tasks to notify. This can happen when
|
||||||
|
// the worker core is lost due to `block_in_place()` being called from
|
||||||
|
// within the task.
|
||||||
|
wake_deferred_tasks();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,6 +415,32 @@ fn coop_and_block_in_place() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn yield_after_block_in_place() {
|
||||||
|
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.worker_threads(1)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
rt.block_on(async {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Block in place then enter a new runtime
|
||||||
|
tokio::task::block_in_place(|| {
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
rt.block_on(async {});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Yield, then complete
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Testing this does not panic
|
// Testing this does not panic
|
||||||
#[test]
|
#[test]
|
||||||
fn max_blocking_threads() {
|
fn max_blocking_threads() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user