mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
rt: Allow concurrent Shell:block_on
calls (#2868)
This commit is contained in:
parent
c29f13b7a5
commit
4dfbdbff7e
@ -1,4 +1,3 @@
|
||||
use crate::loom::sync::Mutex;
|
||||
use crate::runtime::handle::Handle;
|
||||
use crate::runtime::shell::Shell;
|
||||
use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};
|
||||
@ -377,7 +376,7 @@ impl Builder {
|
||||
let blocking_spawner = blocking_pool.spawner().clone();
|
||||
|
||||
Ok(Runtime {
|
||||
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
|
||||
kind: Kind::Shell(Shell::new(driver)),
|
||||
handle: Handle {
|
||||
spawner,
|
||||
io_handle: resources.io_handle,
|
||||
|
@ -243,7 +243,6 @@ cfg_rt_core! {
|
||||
use crate::task::JoinHandle;
|
||||
}
|
||||
|
||||
use crate::loom::sync::Mutex;
|
||||
use std::future::Future;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -292,7 +291,7 @@ pub struct Runtime {
|
||||
enum Kind {
|
||||
/// Not able to execute concurrent tasks. This variant is mostly used to get
|
||||
/// access to the driver handles.
|
||||
Shell(Mutex<Option<Shell>>),
|
||||
Shell(Shell),
|
||||
|
||||
/// Execute all tasks on the current-thread.
|
||||
#[cfg(feature = "rt-core")]
|
||||
@ -442,24 +441,7 @@ impl Runtime {
|
||||
/// [handle]: fn@Handle::block_on
|
||||
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
|
||||
self.handle.enter(|| match &self.kind {
|
||||
Kind::Shell(exec) => {
|
||||
// TODO(lucio): clean this up and move this impl into
|
||||
// `shell.rs`, this is hacky and bad but will work for
|
||||
// now.
|
||||
let exec_temp = {
|
||||
let mut lock = exec.lock().unwrap();
|
||||
lock.take()
|
||||
};
|
||||
|
||||
if let Some(mut exec_temp) = exec_temp {
|
||||
let res = exec_temp.block_on(future);
|
||||
exec.lock().unwrap().replace(exec_temp);
|
||||
res
|
||||
} else {
|
||||
let mut enter = crate::runtime::enter(true);
|
||||
enter.block_on(future).unwrap()
|
||||
}
|
||||
}
|
||||
Kind::Shell(exec) => exec.block_on(future),
|
||||
#[cfg(feature = "rt-core")]
|
||||
Kind::Basic(exec) => exec.block_on(future),
|
||||
#[cfg(feature = "rt-threaded")]
|
||||
|
@ -1,18 +1,21 @@
|
||||
#![allow(clippy::redundant_clone)]
|
||||
|
||||
use crate::future::poll_fn;
|
||||
use crate::park::{Park, Unpark};
|
||||
use crate::runtime::driver::Driver;
|
||||
use crate::runtime::enter;
|
||||
use crate::sync::Notify;
|
||||
use crate::util::{waker_ref, Wake};
|
||||
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::Context;
|
||||
use std::task::Poll::Ready;
|
||||
use std::task::Poll::{Pending, Ready};
|
||||
use std::{future::Future, sync::PoisonError};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct Shell {
|
||||
driver: Driver,
|
||||
driver: Mutex<Option<Driver>>,
|
||||
|
||||
notify: Notify,
|
||||
|
||||
/// TODO: don't store this
|
||||
unpark: Arc<Handle>,
|
||||
@ -25,28 +28,57 @@ impl Shell {
|
||||
pub(super) fn new(driver: Driver) -> Shell {
|
||||
let unpark = Arc::new(Handle(driver.unpark()));
|
||||
|
||||
Shell { driver, unpark }
|
||||
Shell {
|
||||
driver: Mutex::new(Some(driver)),
|
||||
notify: Notify::new(),
|
||||
unpark,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
|
||||
pub(super) fn block_on<F>(&self, f: F) -> F::Output
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
let _e = enter(true);
|
||||
let mut enter = crate::runtime::enter(true);
|
||||
|
||||
pin!(f);
|
||||
|
||||
let waker = waker_ref(&self.unpark);
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
|
||||
loop {
|
||||
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
|
||||
return v;
|
||||
}
|
||||
if let Some(driver) = &mut self.take_driver() {
|
||||
return driver.block_on(f);
|
||||
} else {
|
||||
let notified = self.notify.notified();
|
||||
pin!(notified);
|
||||
|
||||
self.driver.park().unwrap();
|
||||
if let Some(out) = enter
|
||||
.block_on(poll_fn(|cx| {
|
||||
if notified.as_mut().poll(cx).is_ready() {
|
||||
return Ready(None);
|
||||
}
|
||||
|
||||
if let Ready(out) = f.as_mut().poll(cx) {
|
||||
return Ready(Some(out));
|
||||
}
|
||||
|
||||
Pending
|
||||
}))
|
||||
.expect("Failed to `Enter::block_on`")
|
||||
{
|
||||
return out;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn take_driver(&self) -> Option<DriverGuard<'_>> {
|
||||
let mut lock = self.driver.lock().unwrap();
|
||||
let driver = lock.take()?;
|
||||
|
||||
Some(DriverGuard {
|
||||
inner: Some(driver),
|
||||
shell: &self,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Wake for Handle {
|
||||
@ -60,3 +92,41 @@ impl Wake for Handle {
|
||||
arc_self.0.unpark();
|
||||
}
|
||||
}
|
||||
|
||||
struct DriverGuard<'a> {
|
||||
inner: Option<Driver>,
|
||||
shell: &'a Shell,
|
||||
}
|
||||
|
||||
impl DriverGuard<'_> {
|
||||
fn block_on<F: Future>(&mut self, f: F) -> F::Output {
|
||||
let driver = self.inner.as_mut().unwrap();
|
||||
|
||||
pin!(f);
|
||||
|
||||
let waker = waker_ref(&self.shell.unpark);
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
|
||||
loop {
|
||||
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
|
||||
return v;
|
||||
}
|
||||
|
||||
driver.park().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DriverGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
self.shell
|
||||
.driver
|
||||
.lock()
|
||||
.unwrap_or_else(PoisonError::into_inner)
|
||||
.replace(inner);
|
||||
|
||||
self.shell.notify.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -457,13 +457,9 @@ cfg_sync! {
|
||||
}
|
||||
|
||||
cfg_not_sync! {
|
||||
cfg_rt_core! {
|
||||
mod notify;
|
||||
pub(crate) use notify::Notify;
|
||||
}
|
||||
}
|
||||
mod notify;
|
||||
pub(crate) use notify::Notify;
|
||||
|
||||
cfg_not_sync! {
|
||||
cfg_atomic_waker_impl! {
|
||||
mod task;
|
||||
pub(crate) use task::AtomicWaker;
|
||||
|
@ -3,7 +3,6 @@ cfg_io_driver! {
|
||||
pub(crate) mod slab;
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))]
|
||||
pub(crate) mod linked_list;
|
||||
|
||||
#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]
|
||||
|
Loading…
x
Reference in New Issue
Block a user