mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
runtime: add Handle::block_on (#3569)
Add `runtime::Handle::block_on`. The function enters the runtime context and blocks the current thread while the future executes. Refs: #3097 Fixes #2965, #3096
This commit is contained in:
parent
e4f76688a0
commit
c39d9867bb
@ -235,7 +235,10 @@ cfg_io_readiness! {
|
||||
|
||||
crate::future::poll_fn(|cx| {
|
||||
if self.handle.inner().is_none() {
|
||||
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
|
||||
)));
|
||||
}
|
||||
|
||||
Pin::new(&mut fut).poll(cx).map(Ok)
|
||||
|
@ -202,6 +202,93 @@ impl Handle {
|
||||
let _ = self.blocking_spawner.spawn(task, &self);
|
||||
handle
|
||||
}
|
||||
|
||||
/// Run a future to completion on this `Handle`'s associated `Runtime`.
|
||||
///
|
||||
/// This runs the given future on the runtime, blocking until it is
|
||||
/// complete, and yielding its resolved result. Any tasks or timers which
|
||||
/// the future spawns internally will be executed on the runtime.
|
||||
///
|
||||
/// When this is used on a `current_thread` runtime, only the
|
||||
/// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
|
||||
/// `Handle::block_on` method cannot drive them. This means that, when using
|
||||
/// this method on a current_thread runtime, anything that relies on IO or
|
||||
/// timers will not work unless there is another thread currently calling
|
||||
/// [`Runtime::block_on`] on the same runtime.
|
||||
///
|
||||
/// # If the runtime has been shut down
|
||||
///
|
||||
/// If the `Handle`'s associated `Runtime` has been shut down (through
|
||||
/// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
|
||||
/// dropping it) and `Handle::block_on` is used it might return an error or
|
||||
/// panic. Specifically IO resources will return an error and timers will
|
||||
/// panic. Runtime independent futures will run as normal.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function panics if the provided future panics, if called within an
|
||||
/// asynchronous execution context, or if a timer future is executed on a
|
||||
/// runtime that has been shut down.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::runtime::Runtime;
|
||||
///
|
||||
/// // Create the runtime
|
||||
/// let rt = Runtime::new().unwrap();
|
||||
///
|
||||
/// // Get a handle from this runtime
|
||||
/// let handle = rt.handle();
|
||||
///
|
||||
/// // Execute the future, blocking the current thread until completion
|
||||
/// handle.block_on(async {
|
||||
/// println!("hello");
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// Or using `Handle::current`:
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::runtime::Handle;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main () {
|
||||
/// let handle = Handle::current();
|
||||
/// std::thread::spawn(move || {
|
||||
/// // Using Handle::block_on to run async code in the new thread.
|
||||
/// handle.block_on(async {
|
||||
/// println!("hello");
|
||||
/// });
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// [`JoinError`]: struct@crate::task::JoinError
|
||||
/// [`JoinHandle`]: struct@crate::task::JoinHandle
|
||||
/// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
|
||||
/// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
|
||||
/// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
|
||||
/// [`spawn_blocking`]: crate::task::spawn_blocking
|
||||
/// [`tokio::fs`]: crate::fs
|
||||
/// [`tokio::net`]: crate::net
|
||||
/// [`tokio::time`]: crate::time
|
||||
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
|
||||
// Enter the **runtime** context. This configures spawning, the current I/O driver, ...
|
||||
let _rt_enter = self.enter();
|
||||
|
||||
// Enter a **blocking** context. This prevents blocking from a runtime.
|
||||
let mut blocking_enter = crate::runtime::enter(true);
|
||||
|
||||
// Block on the future
|
||||
blocking_enter
|
||||
.block_on(future)
|
||||
.expect("failed to park thread")
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(mut self) {
|
||||
self.spawner.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/// Error returned by `try_current` when no Runtime has been started
|
||||
|
@ -526,7 +526,7 @@ cfg_rt! {
|
||||
/// ```
|
||||
pub fn shutdown_timeout(mut self, duration: Duration) {
|
||||
// Wakeup and shutdown all the worker threads
|
||||
self.handle.spawner.shutdown();
|
||||
self.handle.shutdown();
|
||||
self.blocking_pool.shutdown(Some(duration));
|
||||
}
|
||||
|
||||
|
@ -543,6 +543,10 @@ impl TimerEntry {
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), super::Error>> {
|
||||
if self.driver.is_shutdown() {
|
||||
panic!(crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
|
||||
}
|
||||
|
||||
if let Some(deadline) = self.initial_deadline {
|
||||
self.as_mut().reset(deadline);
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::loom::sync::{Arc, Mutex};
|
||||
use crate::loom::sync::Arc;
|
||||
use crate::time::driver::ClockTime;
|
||||
use std::fmt;
|
||||
|
||||
@ -6,13 +6,13 @@ use std::fmt;
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Handle {
|
||||
time_source: ClockTime,
|
||||
inner: Arc<Mutex<super::Inner>>,
|
||||
inner: Arc<super::Inner>,
|
||||
}
|
||||
|
||||
impl Handle {
|
||||
/// Creates a new timer `Handle` from a shared `Inner` timer state.
|
||||
pub(super) fn new(inner: Arc<Mutex<super::Inner>>) -> Self {
|
||||
let time_source = inner.lock().time_source.clone();
|
||||
pub(super) fn new(inner: Arc<super::Inner>) -> Self {
|
||||
let time_source = inner.state.lock().time_source.clone();
|
||||
Handle { time_source, inner }
|
||||
}
|
||||
|
||||
@ -21,9 +21,14 @@ impl Handle {
|
||||
&self.time_source
|
||||
}
|
||||
|
||||
/// Locks the driver's inner structure
|
||||
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> {
|
||||
self.inner.lock()
|
||||
/// Access the driver's inner structure
|
||||
pub(super) fn get(&self) -> &super::Inner {
|
||||
&*self.inner
|
||||
}
|
||||
|
||||
// Check whether the driver has been shutdown
|
||||
pub(super) fn is_shutdown(&self) -> bool {
|
||||
self.inner.is_shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@ mod wheel;
|
||||
|
||||
pub(super) mod sleep;
|
||||
|
||||
use crate::loom::sync::atomic::{AtomicBool, Ordering};
|
||||
use crate::loom::sync::{Arc, Mutex};
|
||||
use crate::park::{Park, Unpark};
|
||||
use crate::time::error::Error;
|
||||
@ -86,7 +87,7 @@ pub(crate) struct Driver<P: Park + 'static> {
|
||||
time_source: ClockTime,
|
||||
|
||||
/// Shared state
|
||||
inner: Handle,
|
||||
handle: Handle,
|
||||
|
||||
/// Parker to delegate to
|
||||
park: P,
|
||||
@ -132,7 +133,16 @@ impl ClockTime {
|
||||
}
|
||||
|
||||
/// Timer state shared between `Driver`, `Handle`, and `Registration`.
|
||||
pub(self) struct Inner {
|
||||
struct Inner {
|
||||
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
|
||||
pub(super) state: Mutex<InnerState>,
|
||||
|
||||
/// True if the driver is being shutdown
|
||||
pub(super) is_shutdown: AtomicBool,
|
||||
}
|
||||
|
||||
/// Time state shared which must be protected by a `Mutex`
|
||||
struct InnerState {
|
||||
/// Timing backend in use
|
||||
time_source: ClockTime,
|
||||
|
||||
@ -145,9 +155,6 @@ pub(self) struct Inner {
|
||||
/// Timer wheel
|
||||
wheel: wheel::Wheel,
|
||||
|
||||
/// True if the driver is being shutdown
|
||||
is_shutdown: bool,
|
||||
|
||||
/// Unparker that can be used to wake the time driver
|
||||
unpark: Box<dyn Unpark>,
|
||||
}
|
||||
@ -169,7 +176,7 @@ where
|
||||
|
||||
Driver {
|
||||
time_source,
|
||||
inner: Handle::new(Arc::new(Mutex::new(inner))),
|
||||
handle: Handle::new(Arc::new(inner)),
|
||||
park,
|
||||
}
|
||||
}
|
||||
@ -181,15 +188,15 @@ where
|
||||
/// `with_default`, setting the timer as the default timer for the execution
|
||||
/// context.
|
||||
pub(crate) fn handle(&self) -> Handle {
|
||||
self.inner.clone()
|
||||
self.handle.clone()
|
||||
}
|
||||
|
||||
fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
|
||||
let clock = &self.time_source.clock;
|
||||
|
||||
let mut lock = self.inner.lock();
|
||||
let mut lock = self.handle.get().state.lock();
|
||||
|
||||
assert!(!lock.is_shutdown);
|
||||
assert!(!self.handle.is_shutdown());
|
||||
|
||||
let next_wake = lock.wheel.next_expiration_time();
|
||||
lock.next_wake =
|
||||
@ -237,7 +244,7 @@ where
|
||||
}
|
||||
|
||||
// Process pending timers after waking up
|
||||
self.inner.process();
|
||||
self.handle.process();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -255,7 +262,7 @@ impl Handle {
|
||||
let mut waker_list: [Option<Waker>; 32] = Default::default();
|
||||
let mut waker_idx = 0;
|
||||
|
||||
let mut lock = self.lock();
|
||||
let mut lock = self.get().lock();
|
||||
|
||||
assert!(now >= lock.elapsed);
|
||||
|
||||
@ -278,7 +285,7 @@ impl Handle {
|
||||
|
||||
waker_idx = 0;
|
||||
|
||||
lock = self.lock();
|
||||
lock = self.get().lock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -309,7 +316,7 @@ impl Handle {
|
||||
/// `add_entry` must not be called concurrently.
|
||||
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
|
||||
unsafe {
|
||||
let mut lock = self.lock();
|
||||
let mut lock = self.get().lock();
|
||||
|
||||
if entry.as_ref().might_be_registered() {
|
||||
lock.wheel.remove(entry);
|
||||
@ -327,7 +334,7 @@ impl Handle {
|
||||
/// the `TimerEntry`)
|
||||
pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
|
||||
let waker = unsafe {
|
||||
let mut lock = self.lock();
|
||||
let mut lock = self.get().lock();
|
||||
|
||||
// We may have raced with a firing/deregistration, so check before
|
||||
// deregistering.
|
||||
@ -338,7 +345,7 @@ impl Handle {
|
||||
// Now that we have exclusive control of this entry, mint a handle to reinsert it.
|
||||
let entry = entry.as_ref().handle();
|
||||
|
||||
if lock.is_shutdown {
|
||||
if self.is_shutdown() {
|
||||
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
|
||||
} else {
|
||||
entry.set_expiration(new_tick);
|
||||
@ -396,19 +403,15 @@ where
|
||||
}
|
||||
|
||||
fn shutdown(&mut self) {
|
||||
let mut lock = self.inner.lock();
|
||||
|
||||
if lock.is_shutdown {
|
||||
if self.handle.is_shutdown() {
|
||||
return;
|
||||
}
|
||||
|
||||
lock.is_shutdown = true;
|
||||
|
||||
drop(lock);
|
||||
self.handle.get().is_shutdown.store(true, Ordering::SeqCst);
|
||||
|
||||
// Advance time forward to the end of time.
|
||||
|
||||
self.inner.process_at_time(u64::MAX);
|
||||
self.handle.process_at_time(u64::MAX);
|
||||
|
||||
self.park.shutdown();
|
||||
}
|
||||
@ -428,14 +431,26 @@ where
|
||||
impl Inner {
|
||||
pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
|
||||
Inner {
|
||||
time_source,
|
||||
elapsed: 0,
|
||||
next_wake: None,
|
||||
unpark,
|
||||
wheel: wheel::Wheel::new(),
|
||||
is_shutdown: false,
|
||||
state: Mutex::new(InnerState {
|
||||
time_source,
|
||||
elapsed: 0,
|
||||
next_wake: None,
|
||||
unpark,
|
||||
wheel: wheel::Wheel::new(),
|
||||
}),
|
||||
is_shutdown: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Locks the driver's inner structure
|
||||
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
|
||||
self.state.lock()
|
||||
}
|
||||
|
||||
// Check whether the driver has been shutdown
|
||||
pub(super) fn is_shutdown(&self) -> bool {
|
||||
self.is_shutdown.load(Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Inner {
|
||||
|
@ -3,7 +3,7 @@ use std::{task::Context, time::Duration};
|
||||
#[cfg(not(loom))]
|
||||
use futures::task::noop_waker_ref;
|
||||
|
||||
use crate::loom::sync::{Arc, Mutex};
|
||||
use crate::loom::sync::Arc;
|
||||
use crate::loom::thread;
|
||||
use crate::{
|
||||
loom::sync::atomic::{AtomicBool, Ordering},
|
||||
@ -45,7 +45,7 @@ fn single_timer() {
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
let handle = Handle::new(Arc::new(Mutex::new(inner)));
|
||||
let handle = Handle::new(Arc::new(inner));
|
||||
|
||||
let handle_ = handle.clone();
|
||||
let jh = thread::spawn(move || {
|
||||
@ -76,7 +76,7 @@ fn drop_timer() {
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
let handle = Handle::new(Arc::new(Mutex::new(inner)));
|
||||
let handle = Handle::new(Arc::new(inner));
|
||||
|
||||
let handle_ = handle.clone();
|
||||
let jh = thread::spawn(move || {
|
||||
@ -107,7 +107,7 @@ fn change_waker() {
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
let handle = Handle::new(Arc::new(Mutex::new(inner)));
|
||||
let handle = Handle::new(Arc::new(inner));
|
||||
|
||||
let handle_ = handle.clone();
|
||||
let jh = thread::spawn(move || {
|
||||
@ -142,7 +142,7 @@ fn reset_future() {
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
let handle = Handle::new(Arc::new(Mutex::new(inner)));
|
||||
let handle = Handle::new(Arc::new(inner));
|
||||
|
||||
let handle_ = handle.clone();
|
||||
let finished_early_ = finished_early.clone();
|
||||
@ -191,7 +191,7 @@ fn poll_process_levels() {
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source, MockUnpark::mock());
|
||||
let handle = Handle::new(Arc::new(Mutex::new(inner)));
|
||||
let handle = Handle::new(Arc::new(inner));
|
||||
|
||||
let mut entries = vec![];
|
||||
|
||||
@ -232,7 +232,7 @@ fn poll_process_levels_targeted() {
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source, MockUnpark::mock());
|
||||
let handle = Handle::new(Arc::new(Mutex::new(inner)));
|
||||
let handle = Handle::new(Arc::new(inner));
|
||||
|
||||
let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193));
|
||||
pin!(e1);
|
||||
|
@ -1,3 +1,9 @@
|
||||
/// Error string explaining that the Tokio context hasn't been instantiated.
|
||||
pub(crate) const CONTEXT_MISSING_ERROR: &str =
|
||||
"there is no reactor running, must be called from the context of a Tokio 1.x runtime";
|
||||
|
||||
// some combinations of features might not use this
|
||||
#[allow(dead_code)]
|
||||
/// Error string explaining that the Tokio context is shutting down and cannot drive timers.
|
||||
pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str =
|
||||
"A Tokio 1.x context was found, but it is being shutdown.";
|
||||
|
511
tokio/tests/rt_handle_block_on.rs
Normal file
511
tokio/tests/rt_handle_block_on.rs
Normal file
@ -0,0 +1,511 @@
|
||||
#![warn(rust_2018_idioms)]
|
||||
#![cfg(feature = "full")]
|
||||
|
||||
// All io tests that deal with shutdown is currently ignored because there are known bugs in with
|
||||
// shutting down the io driver while concurrently registering new resources. See
|
||||
// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details.
|
||||
//
|
||||
// When this has been fixed we want to re-enable these tests.
|
||||
|
||||
use std::time::Duration;
|
||||
use tokio::runtime::{Handle, Runtime};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::{fs, net, time};
|
||||
|
||||
macro_rules! multi_threaded_rt_test {
|
||||
($($t:tt)*) => {
|
||||
mod threaded_scheduler_4_threads_only {
|
||||
use super::*;
|
||||
|
||||
$($t)*
|
||||
|
||||
fn rt() -> Runtime {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(4)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
mod threaded_scheduler_1_thread_only {
|
||||
use super::*;
|
||||
|
||||
$($t)*
|
||||
|
||||
fn rt() -> Runtime {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! rt_test {
|
||||
($($t:tt)*) => {
|
||||
mod current_thread_scheduler {
|
||||
use super::*;
|
||||
|
||||
$($t)*
|
||||
|
||||
fn rt() -> Runtime {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
mod threaded_scheduler_4_threads {
|
||||
use super::*;
|
||||
|
||||
$($t)*
|
||||
|
||||
fn rt() -> Runtime {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(4)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
mod threaded_scheduler_1_thread {
|
||||
use super::*;
|
||||
|
||||
$($t)*
|
||||
|
||||
fn rt() -> Runtime {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==== runtime independent futures ======
|
||||
|
||||
#[test]
|
||||
fn basic() {
|
||||
test_with_runtimes(|| {
|
||||
let one = Handle::current().block_on(async { 1 });
|
||||
assert_eq!(1, one);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounded_mpsc_channel() {
|
||||
test_with_runtimes(|| {
|
||||
let (tx, mut rx) = mpsc::channel(1024);
|
||||
|
||||
Handle::current().block_on(tx.send(42)).unwrap();
|
||||
|
||||
let value = Handle::current().block_on(rx.recv()).unwrap();
|
||||
assert_eq!(value, 42);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unbounded_mpsc_channel() {
|
||||
test_with_runtimes(|| {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
let _ = tx.send(42);
|
||||
|
||||
let value = Handle::current().block_on(rx.recv()).unwrap();
|
||||
assert_eq!(value, 42);
|
||||
})
|
||||
}
|
||||
|
||||
rt_test! {
|
||||
// ==== spawn blocking futures ======
|
||||
|
||||
#[test]
|
||||
fn basic_fs() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let contents = Handle::current()
|
||||
.block_on(fs::read_to_string("Cargo.toml"))
|
||||
.unwrap();
|
||||
assert!(contents.contains("Cargo.toml"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fs_shutdown_before_started() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let err: std::io::Error = Handle::current()
|
||||
.block_on(fs::read_to_string("Cargo.toml"))
|
||||
.unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
|
||||
let inner_err = err.get_ref().expect("no inner error");
|
||||
assert_eq!(inner_err.to_string(), "background task failed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_spawn_blocking() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let answer = Handle::current()
|
||||
.block_on(spawn_blocking(|| {
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
42
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(answer, 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spawn_blocking_after_shutdown_fails() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let join_err = Handle::current()
|
||||
.block_on(spawn_blocking(|| {
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
42
|
||||
}))
|
||||
.unwrap_err();
|
||||
|
||||
assert!(join_err.is_cancelled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spawn_blocking_started_before_shutdown_continues() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let handle = spawn_blocking(|| {
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
42
|
||||
});
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let answer = Handle::current().block_on(handle).unwrap();
|
||||
|
||||
assert_eq!(answer, 42);
|
||||
}
|
||||
|
||||
// ==== net ======
|
||||
|
||||
#[test]
|
||||
fn tcp_listener_bind() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
Handle::current()
|
||||
.block_on(net::TcpListener::bind("127.0.0.1:0"))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// All io tests are ignored for now. See above why that is.
|
||||
#[ignore]
|
||||
#[test]
|
||||
fn tcp_listener_connect_after_shutdown() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let err = Handle::current()
|
||||
.block_on(net::TcpListener::bind("127.0.0.1:0"))
|
||||
.unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
assert_eq!(
|
||||
err.get_ref().unwrap().to_string(),
|
||||
"A Tokio 1.x context was found, but it is being shutdown.",
|
||||
);
|
||||
}
|
||||
|
||||
// All io tests are ignored for now. See above why that is.
|
||||
#[ignore]
|
||||
#[test]
|
||||
fn tcp_listener_connect_before_shutdown() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let bind_future = net::TcpListener::bind("127.0.0.1:0");
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let err = Handle::current().block_on(bind_future).unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
assert_eq!(
|
||||
err.get_ref().unwrap().to_string(),
|
||||
"A Tokio 1.x context was found, but it is being shutdown.",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn udp_socket_bind() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
Handle::current()
|
||||
.block_on(net::UdpSocket::bind("127.0.0.1:0"))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// All io tests are ignored for now. See above why that is.
|
||||
#[ignore]
|
||||
#[test]
|
||||
fn udp_stream_bind_after_shutdown() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let err = Handle::current()
|
||||
.block_on(net::UdpSocket::bind("127.0.0.1:0"))
|
||||
.unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
assert_eq!(
|
||||
err.get_ref().unwrap().to_string(),
|
||||
"A Tokio 1.x context was found, but it is being shutdown.",
|
||||
);
|
||||
}
|
||||
|
||||
// All io tests are ignored for now. See above why that is.
|
||||
#[ignore]
|
||||
#[test]
|
||||
fn udp_stream_bind_before_shutdown() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let bind_future = net::UdpSocket::bind("127.0.0.1:0");
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let err = Handle::current().block_on(bind_future).unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
assert_eq!(
|
||||
err.get_ref().unwrap().to_string(),
|
||||
"A Tokio 1.x context was found, but it is being shutdown.",
|
||||
);
|
||||
}
|
||||
|
||||
// All io tests are ignored for now. See above why that is.
|
||||
#[ignore]
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn unix_listener_bind_after_shutdown() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let path = dir.path().join("socket");
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
let err = net::UnixListener::bind(path).unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
assert_eq!(
|
||||
err.get_ref().unwrap().to_string(),
|
||||
"A Tokio 1.x context was found, but it is being shutdown.",
|
||||
);
|
||||
}
|
||||
|
||||
// All io tests are ignored for now. See above why that is.
|
||||
#[ignore]
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn unix_listener_shutdown_after_bind() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let path = dir.path().join("socket");
|
||||
|
||||
let listener = net::UnixListener::bind(path).unwrap();
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
// this should not timeout but fail immediately since the runtime has been shutdown
|
||||
let err = Handle::current().block_on(listener.accept()).unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
|
||||
}
|
||||
|
||||
// All io tests are ignored for now. See above why that is.
|
||||
#[ignore]
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn unix_listener_shutdown_after_accept() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let path = dir.path().join("socket");
|
||||
|
||||
let listener = net::UnixListener::bind(path).unwrap();
|
||||
|
||||
let accept_future = listener.accept();
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
// this should not timeout but fail immediately since the runtime has been shutdown
|
||||
let err = Handle::current().block_on(accept_future).unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::Other);
|
||||
assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
|
||||
}
|
||||
|
||||
// ==== nesting ======
|
||||
|
||||
#[test]
|
||||
#[should_panic(
|
||||
expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
|
||||
)]
|
||||
fn nesting() {
|
||||
fn some_non_async_function() -> i32 {
|
||||
Handle::current().block_on(time::sleep(Duration::from_millis(10)));
|
||||
1
|
||||
}
|
||||
|
||||
let rt = rt();
|
||||
|
||||
rt.block_on(async { some_non_async_function() });
|
||||
}
|
||||
}
|
||||
|
||||
multi_threaded_rt_test! {
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn unix_listener_bind() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let path = dir.path().join("socket");
|
||||
|
||||
let listener = net::UnixListener::bind(path).unwrap();
|
||||
|
||||
// this should timeout and not fail immediately since the runtime has not been shutdown
|
||||
let _: tokio::time::error::Elapsed = Handle::current()
|
||||
.block_on(tokio::time::timeout(
|
||||
Duration::from_millis(10),
|
||||
listener.accept(),
|
||||
))
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
// ==== timers ======
|
||||
|
||||
// `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
|
||||
// one to drive the timers so they will just hang forever. Therefore they are not tested.
|
||||
|
||||
#[test]
|
||||
fn sleep() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
Handle::current().block_on(time::sleep(Duration::from_millis(100)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
|
||||
fn sleep_before_shutdown_panics() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
let f = time::sleep(Duration::from_millis(100));
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
Handle::current().block_on(f);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
|
||||
fn sleep_after_shutdown_panics() {
|
||||
let rt = rt();
|
||||
let _enter = rt.enter();
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
|
||||
Handle::current().block_on(time::sleep(Duration::from_millis(100)));
|
||||
}
|
||||
}
|
||||
|
||||
// ==== utils ======
|
||||
|
||||
/// Create a new multi threaded runtime
|
||||
fn new_multi_thread(n: usize) -> Runtime {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(n)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Create a new single threaded runtime
|
||||
fn new_current_thread() -> Runtime {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Utility to test things on both kinds of runtimes both before and after shutting it down.
|
||||
fn test_with_runtimes<F>(f: F)
|
||||
where
|
||||
F: Fn(),
|
||||
{
|
||||
{
|
||||
println!("current thread runtime");
|
||||
|
||||
let rt = new_current_thread();
|
||||
let _enter = rt.enter();
|
||||
f();
|
||||
|
||||
println!("current thread runtime after shutdown");
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
f();
|
||||
}
|
||||
|
||||
{
|
||||
println!("multi thread (1 thread) runtime");
|
||||
|
||||
let rt = new_multi_thread(1);
|
||||
let _enter = rt.enter();
|
||||
f();
|
||||
|
||||
println!("multi thread runtime after shutdown");
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
f();
|
||||
}
|
||||
|
||||
{
|
||||
println!("multi thread (4 threads) runtime");
|
||||
|
||||
let rt = new_multi_thread(4);
|
||||
let _enter = rt.enter();
|
||||
f();
|
||||
|
||||
println!("multi thread runtime after shutdown");
|
||||
rt.shutdown_timeout(Duration::from_secs(1000));
|
||||
f();
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user