rt: Refactor Runtime::block_on to take &self (#2782)

Co-authored-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Lucio Franco 2020-08-27 20:05:48 -04:00 committed by GitHub
parent d9d909cb4c
commit d600ab9a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 345 additions and 707 deletions

View File

@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) {
}
fn contention_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) {
}
fn contention_bounded_full(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) {
}
fn contention_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) {
}
fn uncontented_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) {
}
fn uncontented_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()

View File

@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc};
fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;
let mut rt = rt();
let rt = rt();
let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) {
fn ping_pong(b: &mut Bencher) {
const NUM_PINGS: usize = 1_000;
let mut rt = rt();
let rt = rt();
let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) {
fn chained_spawn(b: &mut Bencher) {
const ITER: usize = 1_000;
let mut rt = rt();
let rt = rt();
fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {

View File

@ -10,7 +10,7 @@ async fn work() -> usize {
}
fn basic_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) {
}
fn threaded_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
@ -40,9 +40,9 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) {
.basic_scheduler()
.build()
.unwrap();
let handle = runtime.handle();
bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}
@ -52,9 +52,9 @@ fn threaded_scheduler_remote_spawn(bench: &mut Bencher) {
.threaded_scheduler()
.build()
.unwrap();
let handle = runtime.handle();
bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::RwLock, task};
fn read_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) {
}
fn read_concurrent_uncontended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) {
}
fn read_concurrent_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) {
}
fn read_concurrent_contended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) {
}
fn read_concurrent_contended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::Semaphore, task};
fn uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -27,7 +27,7 @@ async fn task(s: Arc<Semaphore>) {
}
fn uncontended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) {
}
fn uncontended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) {
}
fn contended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) {
}
fn contended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();

View File

@ -18,7 +18,7 @@ fn basic_shell_rt() {
});
for _ in 0..1_000 {
let mut rt = runtime::Builder::new().build().unwrap();
let rt = runtime::Builder::new().build().unwrap();
let (tx, rx) = oneshot::channel();

View File

@ -28,7 +28,7 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;
let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()

View File

@ -12,21 +12,21 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
use tokio::runtime::Runtime;
pin_project! {
/// `TokioContext` allows connecting a custom executor with the tokio runtime.
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
pub struct TokioContext<F> {
pub struct TokioContext<'a, F> {
#[pin]
inner: F,
handle: Handle,
handle: &'a Runtime,
}
}
impl<F: Future> Future for TokioContext<F> {
impl<F: Future> Future for TokioContext<'_, F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -39,16 +39,16 @@ impl<F: Future> Future for TokioContext<F> {
}
/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt {
pub trait RuntimeExt {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// ```no_run
/// use tokio_util::context::HandleExt;
/// use tokio_util::context::RuntimeExt;
/// use tokio::time::{delay_for, Duration};
///
/// let mut rt = tokio::runtime::Builder::new()
/// let rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build().unwrap();
@ -61,18 +61,17 @@ pub trait HandleExt {
///
/// rt.block_on(
/// rt2
/// .handle()
/// .wrap(async { delay_for(Duration::from_millis(2)).await }),
/// );
///```
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F>;
}
impl HandleExt for Handle {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
impl RuntimeExt for Runtime {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F> {
TokioContext {
inner: fut,
handle: self.clone(),
handle: self,
}
}
}

View File

@ -2,11 +2,11 @@
use tokio::runtime::Builder;
use tokio::time::*;
use tokio_util::context::HandleExt;
use tokio_util::context::RuntimeExt;
#[test]
fn tokio_context_with_another_runtime() {
let mut rt1 = Builder::new()
let rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
@ -21,8 +21,5 @@ fn tokio_context_with_another_runtime() {
// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(
rt2.handle()
.wrap(async move { delay_for(Duration::from_millis(2)).await }),
);
let _ = rt1.block_on(rt2.wrap(async move { delay_for(Duration::from_millis(2)).await }));
}

View File

@ -173,7 +173,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
@ -201,7 +201,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Ok(Self {

View File

@ -67,7 +67,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
@ -104,7 +104,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,

View File

@ -262,7 +262,7 @@ impl TcpListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented::new(io)?;

View File

@ -187,7 +187,7 @@ impl TcpStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented::new(io)?;

View File

@ -64,7 +64,7 @@ impl UdpSocket {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new(io)?;

View File

@ -164,7 +164,7 @@ impl UnixDatagram {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
/// # Examples
/// ```
/// # use std::error::Error;

View File

@ -60,7 +60,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
@ -82,7 +82,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
let listener = mio_uds::UnixListener::from_listener(listener)?;
let io = PollEvented::new(listener)?;

View File

@ -54,7 +54,7 @@ impl UnixStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
let stream = mio_uds::UnixStream::from_stream(stream)?;
let io = PollEvented::new(stream)?;

View File

@ -42,9 +42,7 @@ cfg_resource_drivers! {
mod thread;
pub(crate) use self::thread::ParkThread;
cfg_block_on! {
pub(crate) use self::thread::{CachedParkThread, ParkError};
}
pub(crate) use self::thread::{CachedParkThread, ParkError};
use std::sync::Arc;
use std::time::Duration;

View File

@ -212,118 +212,114 @@ impl Unpark for UnparkThread {
}
}
cfg_block_on! {
use std::marker::PhantomData;
use std::rc::Rc;
use std::marker::PhantomData;
use std::rc::Rc;
use std::mem;
use std::task::{RawWaker, RawWakerVTable, Waker};
use std::mem;
use std::task::{RawWaker, RawWakerVTable, Waker};
/// Blocks the current thread using a condition variable.
#[derive(Debug)]
pub(crate) struct CachedParkThread {
_anchor: PhantomData<Rc<()>>,
}
/// Blocks the current thread using a condition variable.
#[derive(Debug)]
pub(crate) struct CachedParkThread {
_anchor: PhantomData<Rc<()>>,
}
impl CachedParkThread {
/// Create a new `ParkThread` handle for the current thread.
///
/// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park.
pub(crate) fn new() -> CachedParkThread {
CachedParkThread {
_anchor: PhantomData,
}
}
pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
self.with_current(|park_thread| park_thread.unpark())
}
/// Get a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner))
.map_err(|_| ())
impl CachedParkThread {
/// Create a new `ParkThread` handle for the current thread.
///
/// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park.
pub(crate) fn new() -> CachedParkThread {
CachedParkThread {
_anchor: PhantomData,
}
}
impl Park for CachedParkThread {
type Unpark = UnparkThread;
type Error = ParkError;
fn unpark(&self) -> Self::Unpark {
self.get_unpark().unwrap()
}
fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park())?;
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}
fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
self.with_current(|park_thread| park_thread.unpark())
}
impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
let raw = unparker_to_raw_waker(self.inner);
Waker::from_raw(raw)
}
}
}
impl Inner {
#[allow(clippy::wrong_self_convention)]
fn into_raw(this: Arc<Inner>) -> *const () {
Arc::into_raw(this) as *const ()
}
unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
Arc::from_raw(ptr as *const Inner)
}
}
unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
RawWaker::new(
Inner::into_raw(unparker),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
)
}
unsafe fn clone(raw: *const ()) -> RawWaker {
let unparker = Inner::from_raw(raw);
// Increment the ref count
mem::forget(unparker.clone());
unparker_to_raw_waker(unparker)
}
unsafe fn drop_waker(raw: *const ()) {
let _ = Inner::from_raw(raw);
}
unsafe fn wake(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
}
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
// We don't actually own a reference to the unparker
mem::forget(unparker);
/// Get a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ())
}
}
impl Park for CachedParkThread {
type Unpark = UnparkThread;
type Error = ParkError;
fn unpark(&self) -> Self::Unpark {
self.get_unpark().unwrap()
}
fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park())?;
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}
fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
}
impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
let raw = unparker_to_raw_waker(self.inner);
Waker::from_raw(raw)
}
}
}
impl Inner {
#[allow(clippy::wrong_self_convention)]
fn into_raw(this: Arc<Inner>) -> *const () {
Arc::into_raw(this) as *const ()
}
unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
Arc::from_raw(ptr as *const Inner)
}
}
unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
RawWaker::new(
Inner::into_raw(unparker),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
)
}
unsafe fn clone(raw: *const ()) -> RawWaker {
let unparker = Inner::from_raw(raw);
// Increment the ref count
mem::forget(unparker.clone());
unparker_to_raw_waker(unparker)
}
unsafe fn drop_waker(raw: *const ()) {
let _ = Inner::from_raw(raw);
}
unsafe fn wake(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
}
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
// We don't actually own a reference to the unparker
mem::forget(unparker);
}

View File

@ -108,6 +108,7 @@ where
}
/// Spawns a future onto the thread pool
#[allow(dead_code)]
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,

View File

@ -15,7 +15,6 @@ cfg_blocking_impl! {
pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
BlockingPool::new(builder, thread_cap)
}
}

View File

@ -6,6 +6,7 @@ use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
@ -67,7 +68,7 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
let rt = Handle::current();
let rt = context::current().expect("not currently running on the Tokio runtime.");
let (task, handle) = task::joinable(BlockingTask::new(func));
let _ = rt.blocking_spawner.spawn(task, &rt);
@ -79,7 +80,7 @@ pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
where
F: FnOnce() -> R + Send + 'static,
{
let rt = Handle::current();
let rt = context::current().expect("not currently running on the Tokio runtime.");
let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)

View File

@ -1,9 +1,9 @@
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
use std::fmt;
use std::sync::Arc;
/// Builds Tokio Runtime with custom configuration values.
///
@ -67,7 +67,7 @@ pub struct Builder {
pub(super) before_stop: Option<Callback>,
}
pub(crate) type ThreadNameFn = Arc<dyn Fn() -> String + Send + Sync + 'static>;
pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
#[derive(Debug, Clone, Copy)]
enum Kind {
@ -100,7 +100,7 @@ impl Builder {
max_threads: 512,
// Default thread name
thread_name: Arc::new(|| "tokio-runtime-worker".into()),
thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
// Do not set a stack size by default
thread_stack_size: None,
@ -212,7 +212,7 @@ impl Builder {
/// ```
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
let val = val.into();
self.thread_name = Arc::new(move || val.clone());
self.thread_name = std::sync::Arc::new(move || val.clone());
self
}
@ -240,7 +240,7 @@ impl Builder {
where
F: Fn() -> String + Send + Sync + 'static,
{
self.thread_name = Arc::new(f);
self.thread_name = std::sync::Arc::new(f);
self
}
@ -293,7 +293,7 @@ impl Builder {
where
F: Fn() + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self.after_start = Some(std::sync::Arc::new(f));
self
}
@ -333,7 +333,7 @@ impl Builder {
/// ```
/// use tokio::runtime::Builder;
///
/// let mut rt = Builder::new().build().unwrap();
/// let rt = Builder::new().build().unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
@ -364,7 +364,7 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::Shell(Shell::new(driver)),
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
handle: Handle {
spawner,
io_handle,
@ -463,7 +463,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::Basic(scheduler),
kind: Kind::Basic(Mutex::new(Some(scheduler))),
handle: Handle {
spawner,
io_handle,

View File

@ -7,8 +7,10 @@ thread_local! {
static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
}
pub(crate) fn current() -> Option<Handle> {
CONTEXT.with(|ctx| ctx.borrow().clone())
cfg_blocking_impl! {
pub(crate) fn current() -> Option<Handle> {
CONTEXT.with(|ctx| ctx.borrow().clone())
}
}
cfg_io_driver! {

View File

@ -138,31 +138,29 @@ cfg_rt_threaded! {
}
}
cfg_block_on! {
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
where
F: std::future::Future,
{
use crate::park::{CachedParkThread, Park};
use std::task::Context;
use std::task::Poll::Ready;
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
where
F: std::future::Future,
{
use crate::park::{CachedParkThread, Park};
use std::task::Context;
use std::task::Poll::Ready;
let mut park = CachedParkThread::new();
let waker = park.get_unpark()?.into_waker();
let mut cx = Context::from_waker(&waker);
let mut park = CachedParkThread::new();
let waker = park.get_unpark()?.into_waker();
let mut cx = Context::from_waker(&waker);
pin!(f);
pin!(f);
loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
park.park()?;
loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
park.park()?;
}
}
}

View File

@ -1,16 +1,4 @@
use crate::runtime::{blocking, context, io, time, Spawner};
use std::{error, fmt};
cfg_blocking! {
use crate::runtime::task;
use crate::runtime::blocking::task::BlockingTask;
}
cfg_rt_core! {
use crate::task::JoinHandle;
use std::future::Future;
}
/// Handle to the runtime.
///
@ -19,7 +7,7 @@ cfg_rt_core! {
///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
pub struct Handle {
pub(crate) struct Handle {
pub(super) spawner: Spawner,
/// Handles to the I/O drivers
@ -39,333 +27,10 @@ impl Handle {
/// Enter the runtime context. This allows you to construct types that must
/// have an executor available on creation such as [`Delay`] or [`TcpStream`].
/// It will also allow you to call methods such as [`tokio::spawn`].
///
/// This function is also available as [`Runtime::enter`].
///
/// [`Delay`]: struct@crate::time::Delay
/// [`TcpStream`]: struct@crate::net::TcpStream
/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
/// [`tokio::spawn`]: fn@crate::spawn
///
/// # Example
///
/// ```
/// use tokio::runtime::Runtime;
///
/// fn function_that_spawns(msg: String) {
/// // Had we not used `handle.enter` below, this would panic.
/// tokio::spawn(async move {
/// println!("{}", msg);
/// });
/// }
///
/// fn main() {
/// let rt = Runtime::new().unwrap();
/// let handle = rt.handle().clone();
///
/// let s = "Hello World!".to_string();
///
/// // By entering the context, we tie `tokio::spawn` to this executor.
/// handle.enter(|| function_that_spawns(s));
/// }
/// ```
pub fn enter<F, R>(&self, f: F) -> R
pub(crate) fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
context::enter(self.clone(), f)
}
/// Returns a `Handle` view over the currently running `Runtime`
///
/// # Panic
///
/// This will panic if called outside the context of a Tokio runtime. That means that you must
/// call this on one of the threads **being run by the runtime**. Calling this from within a
/// thread created by `std::thread::spawn` (for example) will cause a panic.
///
/// # Examples
///
/// This can be used to obtain the handle of the surrounding runtime from an async
/// block or function running on that runtime.
///
/// ```
/// # use std::thread;
/// # use tokio::runtime::Runtime;
/// # fn dox() {
/// # let rt = Runtime::new().unwrap();
/// # rt.spawn(async {
/// use tokio::runtime::Handle;
///
/// // Inside an async block or function.
/// let handle = Handle::current();
/// handle.spawn(async {
/// println!("now running in the existing Runtime");
/// });
///
/// # let handle =
/// thread::spawn(move || {
/// // Notice that the handle is created outside of this thread and then moved in
/// handle.block_on(async { /* ... */ })
/// // This next line would cause a panic
/// // let handle2 = Handle::current();
/// });
/// # handle.join().unwrap();
/// # });
/// # }
/// ```
pub fn current() -> Self {
context::current().expect("not currently running on the Tokio runtime.")
}
/// Returns a Handle view over the currently running Runtime
///
/// Returns an error if no Runtime has been started
///
/// Contrary to `current`, this never panics
pub fn try_current() -> Result<Self, TryCurrentError> {
context::current().ok_or(TryCurrentError(()))
}
}
cfg_rt_core! {
impl Handle {
/// Spawns a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// let handle = rt.handle();
///
/// // Spawn a future onto the runtime
/// handle.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
/// ```
///
/// # Panics
///
/// This function will not panic unless task execution is disabled on the
/// executor. This can only happen if the runtime was built using
/// [`Builder`] without picking either [`basic_scheduler`] or
/// [`threaded_scheduler`].
///
/// [`Builder`]: struct@crate::runtime::Builder
/// [`threaded_scheduler`]: fn@crate::runtime::Builder::threaded_scheduler
/// [`basic_scheduler`]: fn@crate::runtime::Builder::basic_scheduler
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.spawner.spawn(future)
}
/// Run a future to completion on the Tokio runtime from a synchronous
/// context.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// If the provided executor currently has no active core thread, this
/// function might hang until a core thread is added. This is not a
/// concern when using the [threaded scheduler], as it always has active
/// core threads, but if you use the [basic scheduler], some other
/// thread must currently be inside a call to [`Runtime::block_on`].
/// See also [the module level documentation][1], which has a section on
/// scheduler types.
///
/// This method may not be called from an asynchronous context.
///
/// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler
/// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
/// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
/// [1]: index.html#runtime-configurations
///
/// # Panics
///
/// This function panics if the provided future panics, or if called
/// within an asynchronous execution context.
///
/// # Examples
///
/// Using `block_on` with the [threaded scheduler].
///
/// ```
/// use tokio::runtime::Runtime;
/// use std::thread;
///
/// // Create the runtime.
/// //
/// // If the rt-threaded feature is enabled, this creates a threaded
/// // scheduler by default.
/// let rt = Runtime::new().unwrap();
/// let handle = rt.handle().clone();
///
/// // Use the runtime from another thread.
/// let th = thread::spawn(move || {
/// // Execute the future, blocking the current thread until completion.
/// //
/// // This example uses the threaded scheduler, so no concurrent call to
/// // `rt.block_on` is required.
/// handle.block_on(async {
/// println!("hello");
/// });
/// });
///
/// th.join().unwrap();
/// ```
///
/// Using the [basic scheduler] requires a concurrent call to
/// [`Runtime::block_on`]:
///
/// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler
/// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
/// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
///
/// ```
/// use tokio::runtime::Builder;
/// use tokio::sync::oneshot;
/// use std::thread;
///
/// // Create the runtime.
/// let mut rt = Builder::new()
/// .enable_all()
/// .basic_scheduler()
/// .build()
/// .unwrap();
///
/// let handle = rt.handle().clone();
///
/// // Signal main thread when task has finished.
/// let (send, recv) = oneshot::channel();
///
/// // Use the runtime from another thread.
/// let th = thread::spawn(move || {
/// // Execute the future, blocking the current thread until completion.
/// handle.block_on(async {
/// send.send("done").unwrap();
/// });
/// });
///
/// // The basic scheduler is used, so the thread above might hang if we
/// // didn't call block_on on the rt too.
/// rt.block_on(async {
/// assert_eq!(recv.await.unwrap(), "done");
/// });
/// # th.join().unwrap();
/// ```
///
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.enter(|| {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).expect("failed to park thread")
})
}
}
}
cfg_blocking! {
impl Handle {
/// Runs the provided closure on a thread where blocking is acceptable.
///
/// In general, issuing a blocking call or performing a lot of compute in a
/// future without yielding is not okay, as it may prevent the executor from
/// driving other futures forward. This function runs the provided closure
/// on a thread dedicated to blocking operations. See the [CPU-bound tasks
/// and blocking code][blocking] section for more information.
///
/// Tokio will spawn more blocking threads when they are requested through
/// this function until the upper limit configured on the [`Builder`] is
/// reached. This limit is very large by default, because `spawn_blocking` is
/// often used for various kinds of IO operations that cannot be performed
/// asynchronously. When you run CPU-bound code using `spawn_blocking`, you
/// should keep this large upper limit in mind; to run your CPU-bound
/// computations on only a few threads, you should use a separate thread
/// pool such as [rayon] rather than configuring the number of blocking
/// threads.
///
/// This function is intended for non-async operations that eventually
/// finish on their own. If you want to spawn an ordinary thread, you should
/// use [`thread::spawn`] instead.
///
/// Closures spawned using `spawn_blocking` cannot be cancelled. When you
/// shut down the executor, it will wait indefinitely for all blocking
/// operations to finish. You can use [`shutdown_timeout`] to stop waiting
/// for them after a certain timeout. Be aware that this will still not
/// cancel the tasks — they are simply allowed to keep running after the
/// method returns.
///
/// Note that if you are using the [basic scheduler], this function will
/// still spawn additional threads for blocking operations. The basic
/// scheduler's single thread is only used for asynchronous code.
///
/// [`Builder`]: struct@crate::runtime::Builder
/// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
/// [rayon]: https://docs.rs/rayon
/// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
/// [`thread::spawn`]: fn@std::thread::spawn
/// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// let handle = rt.handle();
///
/// let res = handle.spawn_blocking(move || {
/// // do some compute-heavy work or call synchronous code
/// "done computing"
/// }).await?;
///
/// assert_eq!(res, "done computing");
/// # Ok(())
/// # }
/// ```
pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (task, handle) = task::joinable(BlockingTask::new(f));
let _ = self.blocking_spawner.spawn(task, self);
handle
}
}
}
/// Error returned by `try_current` when no Runtime has been started
pub struct TryCurrentError(());
impl fmt::Debug for TryCurrentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryCurrentError").finish()
}
}
impl fmt::Display for TryCurrentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("no tokio Runtime has been initialized")
}
}
impl error::Error for TryCurrentError {}

View File

@ -69,7 +69,7 @@
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Create the runtime
//! let mut rt = Runtime::new()?;
//! let rt = Runtime::new()?;
//!
//! // Spawn the root task
//! rt.block_on(async {
@ -212,7 +212,7 @@ pub(crate) mod enter;
use self::enter::enter;
mod handle;
pub use self::handle::{Handle, TryCurrentError};
use handle::Handle;
mod io;
@ -240,6 +240,7 @@ cfg_rt_core! {
use crate::task::JoinHandle;
}
use crate::loom::sync::Mutex;
use std::future::Future;
use std::time::Duration;
@ -288,11 +289,11 @@ pub struct Runtime {
enum Kind {
/// Not able to execute concurrent tasks. This variant is mostly used to get
/// access to the driver handles.
Shell(Shell),
Shell(Mutex<Option<Shell>>),
/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Basic(BasicScheduler<time::Driver>),
Basic(Mutex<Option<BasicScheduler<time::Driver>>>),
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]
@ -397,7 +398,7 @@ impl Runtime {
Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.spawn(future),
Kind::Basic(exec) => exec.spawn(future),
Kind::Basic(_exec) => self.handle.spawner.spawn(future),
}
}
@ -408,10 +409,11 @@ impl Runtime {
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// `&mut` is required as calling `block_on` **may** result in advancing the
/// state of the runtime. The details depend on how the runtime is
/// configured. [`runtime::Handle::block_on`][handle] provides a version
/// that takes `&self`.
/// When this runtime is configured with `core_threads = 0`, only the first call
/// to `block_on` will run the IO and timer drivers. Calls to other methods _before_ the first
/// `block_on` completes will just hook into the driver running on the thread
/// that first called `block_on`. This means that the driver may be passed
/// from thread to thread by the user between calls to `block_on`.
///
/// This method may not be called from an asynchronous context.
///
@ -426,7 +428,7 @@ impl Runtime {
/// use tokio::runtime::Runtime;
///
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
/// let rt = Runtime::new().unwrap();
///
/// // Execute the future, blocking the current thread until completion
/// rt.block_on(async {
@ -435,13 +437,45 @@ impl Runtime {
/// ```
///
/// [handle]: fn@Handle::block_on
pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
let kind = &mut self.kind;
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.enter(|| match &self.kind {
Kind::Shell(exec) => {
// TODO(lucio): clean this up and move this impl into
// `shell.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};
self.handle.enter(|| match kind {
Kind::Shell(exec) => exec.block_on(future),
if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => exec.block_on(future),
Kind::Basic(exec) => {
// TODO(lucio): clean this up and move this impl into
// `basic_scheduler.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};
if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.block_on(future),
})
@ -451,11 +485,8 @@ impl Runtime {
/// have an executor available on creation such as [`Delay`] or [`TcpStream`].
/// It will also allow you to call methods such as [`tokio::spawn`].
///
/// This function is also available as [`Handle::enter`].
///
/// [`Delay`]: struct@crate::time::Delay
/// [`TcpStream`]: struct@crate::net::TcpStream
/// [`Handle::enter`]: fn@crate::runtime::Handle::enter
/// [`tokio::spawn`]: fn@crate::spawn
///
/// # Example
@ -486,27 +517,6 @@ impl Runtime {
self.handle.enter(f)
}
/// Return a handle to the runtime's spawner.
///
/// The returned handle can be used to spawn tasks that run on this runtime, and can
/// be cloned to allow moving the `Handle` to other threads.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let handle = rt.handle();
///
/// handle.spawn(async { println!("hello"); });
/// ```
pub fn handle(&self) -> &Handle {
&self.handle
}
/// Shutdown the runtime, waiting for at most `duration` for all spawned
/// task to shutdown.
///
@ -531,7 +541,7 @@ impl Runtime {
/// use std::time::Duration;
///
/// fn main() {
/// let mut runtime = Runtime::new().unwrap();
/// let runtime = Runtime::new().unwrap();
///
/// runtime.block_on(async move {
/// task::spawn_blocking(move || {
@ -565,7 +575,7 @@ impl Runtime {
/// use tokio::runtime::Runtime;
///
/// fn main() {
/// let mut runtime = Runtime::new().unwrap();
/// let runtime = Runtime::new().unwrap();
///
/// runtime.block_on(async move {
/// let inner_runtime = Runtime::new().unwrap();

View File

@ -178,7 +178,7 @@ mod group_b {
#[test]
fn join_output() {
loom::model(|| {
let mut rt = mk_pool(1);
let rt = mk_pool(1);
rt.block_on(async {
let t = crate::spawn(track(async { "hello" }));
@ -192,7 +192,7 @@ mod group_b {
#[test]
fn poll_drop_handle_then_drop() {
loom::model(|| {
let mut rt = mk_pool(1);
let rt = mk_pool(1);
rt.block_on(async move {
let mut t = crate::spawn(track(async { "hello" }));

View File

@ -185,7 +185,7 @@ mod tests {
#[test]
fn smoke() {
let mut rt = rt();
let rt = rt();
rt.block_on(async move {
let registry = Registry::new(vec![
EventInfo::default(),
@ -247,7 +247,7 @@ mod tests {
#[test]
fn broadcast_cleans_up_disconnected_listeners() {
let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(async {
let registry = Registry::new(vec![EventInfo::default()]);

View File

@ -272,7 +272,7 @@ mod tests {
#[test]
fn ctrl_break() {
let mut rt = rt();
let rt = rt();
rt.block_on(async {
let mut ctrl_break = assert_ok!(super::ctrl_break());

View File

@ -312,9 +312,9 @@ impl LocalSet {
/// use tokio::runtime::Runtime;
/// use tokio::task;
///
/// let mut rt = Runtime::new().unwrap();
/// let rt = Runtime::new().unwrap();
/// let local = task::LocalSet::new();
/// local.block_on(&mut rt, async {
/// local.block_on(&rt, async {
/// let join = task::spawn_local(async {
/// let blocking_result = task::block_in_place(|| {
/// // ...
@ -329,9 +329,9 @@ impl LocalSet {
/// use tokio::runtime::Runtime;
/// use tokio::task;
///
/// let mut rt = Runtime::new().unwrap();
/// let rt = Runtime::new().unwrap();
/// let local = task::LocalSet::new();
/// local.block_on(&mut rt, async {
/// local.block_on(&rt, async {
/// let join = task::spawn_local(async {
/// let blocking_result = task::spawn_blocking(|| {
/// // ...
@ -346,7 +346,7 @@ impl LocalSet {
/// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
/// [in-place blocking]: fn@crate::task::block_in_place
/// [`spawn_blocking`]: fn@crate::task::spawn_blocking
pub fn block_on<F>(&self, rt: &mut crate::runtime::Runtime, future: F) -> F::Output
pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
where
F: Future,
{

View File

@ -18,7 +18,7 @@ doc_rt_core! {
///
/// This function must be called from the context of a Tokio runtime. Tasks running on
/// the Tokio runtime are always inside its context, but you can also enter the context
/// using the [`Handle::enter`](crate::runtime::Handle::enter()) method.
/// using the [`Runtime::enter`](crate::runtime::Runtime::enter()) method.
///
/// # Examples
///

View File

@ -45,7 +45,7 @@ fn test_drop_on_notify() {
// shutting down. Then, when the task handle is dropped, the task itself is
// dropped.
let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()

View File

@ -12,7 +12,7 @@ use std::time::Duration;
fn spawned_task_does_not_progress_without_block_on() {
let (tx, mut rx) = oneshot::channel();
let mut rt = rt();
let rt = rt();
rt.spawn(async move {
assert_ok!(tx.send("hello"));
@ -65,7 +65,7 @@ fn no_extra_poll() {
};
let npolls = Arc::clone(&rx.npolls);
let mut rt = rt();
let rt = rt();
rt.spawn(async move { while rx.next().await.is_some() {} });
rt.block_on(async {
@ -100,7 +100,7 @@ fn acquire_mutex_in_drop() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut rt = rt();
let rt = rt();
rt.spawn(async move {
let _ = rx2.await;

View File

@ -9,38 +9,41 @@ macro_rules! rt_test {
mod basic_scheduler {
$($t)*
fn rt() -> Runtime {
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap()
.into()
}
}
mod threaded_scheduler_4_threads {
$($t)*
fn rt() -> Runtime {
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(4)
.enable_all()
.build()
.unwrap()
.into()
}
}
mod threaded_scheduler_1_thread {
$($t)*
fn rt() -> Runtime {
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.unwrap()
.into()
}
}
}
@ -72,7 +75,7 @@ rt_test! {
#[test]
fn block_on_sync() {
let mut rt = rt();
let rt = rt();
let mut win = false;
rt.block_on(async {
@ -82,21 +85,10 @@ rt_test! {
assert!(win);
}
#[test]
fn block_on_handle_sync() {
let rt = rt();
let mut win = false;
rt.handle().block_on(async {
win = true;
});
assert!(win);
}
#[test]
fn block_on_async() {
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
@ -112,27 +104,9 @@ rt_test! {
assert_eq!(out, "ZOMG");
}
#[test]
fn block_on_handle_async() {
let rt = rt();
let out = rt.handle().block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_one_bg() {
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
@ -149,7 +123,7 @@ rt_test! {
#[test]
fn spawn_one_join() {
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
@ -172,7 +146,7 @@ rt_test! {
#[test]
fn spawn_two() {
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
@ -199,7 +173,7 @@ rt_test! {
const ITER: usize = 200;
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
@ -249,7 +223,7 @@ rt_test! {
const ITER: usize = 500;
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async {
tokio::spawn(async move {
@ -305,7 +279,7 @@ rt_test! {
#[test]
fn spawn_await_chain() {
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async {
assert_ok!(tokio::spawn(async {
@ -320,7 +294,7 @@ rt_test! {
#[test]
fn outstanding_tasks_dropped() {
let mut rt = rt();
let rt = rt();
let cnt = Arc::new(());
@ -343,16 +317,16 @@ rt_test! {
#[test]
#[should_panic]
fn nested_rt() {
let mut rt1 = rt();
let mut rt2 = rt();
let rt1 = rt();
let rt2 = rt();
rt1.block_on(async { rt2.block_on(async { "hello" }) });
}
#[test]
fn create_rt_in_block_on() {
let mut rt1 = rt();
let mut rt2 = rt1.block_on(async { rt() });
let rt1 = rt();
let rt2 = rt1.block_on(async { rt() });
let out = rt2.block_on(async { "ZOMG" });
assert_eq!(out, "ZOMG");
@ -360,7 +334,7 @@ rt_test! {
#[test]
fn complete_block_on_under_load() {
let mut rt = rt();
let rt = rt();
rt.block_on(async {
let (tx, rx) = oneshot::channel();
@ -383,7 +357,7 @@ rt_test! {
#[test]
fn complete_task_under_load() {
let mut rt = rt();
let rt = rt();
rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
@ -412,8 +386,8 @@ rt_test! {
#[test]
fn spawn_from_other_thread_idle() {
let mut rt = rt();
let handle = rt.handle().clone();
let rt = rt();
let handle = rt.clone();
let (tx, rx) = oneshot::channel();
@ -432,8 +406,8 @@ rt_test! {
#[test]
fn spawn_from_other_thread_under_load() {
let mut rt = rt();
let handle = rt.handle().clone();
let rt = rt();
let handle = rt.clone();
let (tx, rx) = oneshot::channel();
@ -457,7 +431,7 @@ rt_test! {
#[test]
fn delay_at_root() {
let mut rt = rt();
let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
@ -471,7 +445,7 @@ rt_test! {
#[test]
fn delay_in_spawn() {
let mut rt = rt();
let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
@ -492,7 +466,7 @@ rt_test! {
#[test]
fn block_on_socket() {
let mut rt = rt();
let rt = rt();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
@ -512,7 +486,7 @@ rt_test! {
#[test]
fn spawn_from_blocking() {
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
@ -527,7 +501,7 @@ rt_test! {
#[test]
fn spawn_blocking_from_blocking() {
let mut rt = rt();
let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
@ -542,7 +516,7 @@ rt_test! {
#[test]
fn delay_from_blocking() {
let mut rt = rt();
let rt = rt();
rt.block_on(async move {
assert_ok!(tokio::task::spawn_blocking(|| {
@ -562,7 +536,7 @@ rt_test! {
#[test]
fn socket_from_blocking() {
let mut rt = rt();
let rt = rt();
rt.block_on(async move {
let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@ -586,7 +560,7 @@ rt_test! {
#[test]
fn spawn_blocking_after_shutdown() {
let rt = rt();
let handle = rt.handle().clone();
let handle = rt.clone();
// Shutdown
drop(rt);
@ -615,7 +589,7 @@ rt_test! {
// test is disabled.
#[cfg(not(windows))]
fn io_driver_called_when_under_load() {
let mut rt = rt();
let rt = rt();
// Create a lot of constant load. The scheduler will always be busy.
for _ in 0..100 {
@ -651,7 +625,7 @@ rt_test! {
#[test]
fn client_server_block_on() {
let mut rt = rt();
let rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async move { client_server(tx).await });
@ -662,7 +636,7 @@ rt_test! {
#[test]
fn panic_in_task() {
let mut rt = rt();
let rt = rt();
let (tx, rx) = oneshot::channel();
struct Boom(Option<oneshot::Sender<()>>);
@ -689,7 +663,7 @@ rt_test! {
#[test]
#[should_panic]
fn panic_in_block_on() {
let mut rt = rt();
let rt = rt();
rt.block_on(async { panic!() });
}
@ -709,7 +683,7 @@ rt_test! {
#[test]
fn enter_and_spawn() {
let mut rt = rt();
let rt = rt();
let handle = rt.enter(|| {
tokio::spawn(async {})
});
@ -739,7 +713,7 @@ rt_test! {
}
}
let mut rt = rt();
let rt = rt();
let (drop_tx, drop_rx) = mpsc::channel();
let (run_tx, run_rx) = oneshot::channel();
@ -775,17 +749,17 @@ rt_test! {
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let mut rt = rt();
let rt = rt();
let h1 = rt.handle().clone();
let h1 = rt.clone();
rt.handle().spawn(async move {
rt.spawn(async move {
// Ensure a waker gets stored in oneshot 1.
let _ = rx1.await;
tx3.send(()).unwrap();
});
rt.handle().spawn(async move {
rt.spawn(async move {
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
@ -802,7 +776,7 @@ rt_test! {
let _ = rx2.await;
});
rt.handle().spawn(async move {
rt.spawn(async move {
let _ = rx3.await;
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
@ -823,7 +797,7 @@ rt_test! {
use std::net::Ipv6Addr;
for _ in 1..10 {
let mut runtime = rt();
let runtime = rt();
runtime.block_on(async {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
@ -854,7 +828,7 @@ rt_test! {
#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
let mut runtime = rt();
let runtime = rt();
runtime.block_on(async move {
task::spawn_blocking(move || {
@ -865,18 +839,18 @@ rt_test! {
rx.await.unwrap();
});
runtime.shutdown_timeout(Duration::from_millis(100));
Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
}
#[test]
fn shutdown_wakeup_time() {
let mut runtime = rt();
let runtime = rt();
runtime.block_on(async move {
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
});
runtime.shutdown_timeout(Duration::from_secs(10_000));
Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
}
// This test is currently ignored on Windows because of a
@ -894,7 +868,9 @@ rt_test! {
thread::spawn(|| {
R.with(|cell| {
*cell.borrow_mut() = Some(rt());
let rt = rt();
let rt = Arc::try_unwrap(rt).unwrap();
*cell.borrow_mut() = Some(rt);
});
let _rt = rt();
@ -927,10 +903,10 @@ rt_test! {
#[test]
fn local_set_block_on_socket() {
let mut rt = rt();
let rt = rt();
let local = task::LocalSet::new();
local.block_on(&mut rt, async move {
local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
@ -948,12 +924,12 @@ rt_test! {
#[test]
fn local_set_client_server_block_on() {
let mut rt = rt();
let rt = rt();
let (tx, rx) = mpsc::channel();
let local = task::LocalSet::new();
local.block_on(&mut rt, async move { client_server_local(tx).await });
local.block_on(&rt, async move { client_server_local(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
@ -987,7 +963,7 @@ rt_test! {
fn coop() {
use std::task::Poll::Ready;
let mut rt = rt();
let rt = rt();
rt.block_on(async {
// Create a bunch of tasks
@ -1019,7 +995,7 @@ rt_test! {
const NUM: usize = 100;
let mut rt = rt();
let rt = rt();
rt.block_on(async {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();

View File

@ -57,22 +57,20 @@ fn many_oneshot_futures() {
}
#[test]
fn many_multishot_futures() {
use tokio::sync::mpsc;
const CHAIN: usize = 200;
const CYCLES: usize = 5;
const TRACKS: usize = 50;
for _ in 0..50 {
let mut rt = rt();
let rt = rt();
let mut start_txs = Vec::with_capacity(TRACKS);
let mut final_rxs = Vec::with_capacity(TRACKS);
for _ in 0..TRACKS {
let (start_tx, mut chain_rx) = mpsc::channel(10);
let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10);
for _ in 0..CHAIN {
let (mut next_tx, next_rx) = mpsc::channel(10);
let (mut next_tx, next_rx) = tokio::sync::mpsc::channel(10);
// Forward all the messages
rt.spawn(async move {
@ -85,7 +83,7 @@ fn many_multishot_futures() {
}
// This final task cycles if needed
let (mut final_tx, final_rx) = mpsc::channel(10);
let (mut final_tx, final_rx) = tokio::sync::mpsc::channel(10);
let mut cycle_tx = start_tx.clone();
let mut rem = CYCLES;
@ -123,7 +121,7 @@ fn many_multishot_futures() {
#[test]
fn spawn_shutdown() {
let mut rt = rt();
let rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async {
@ -230,7 +228,7 @@ fn start_stop_callbacks_called() {
let after_inner = after_start.clone();
let before_inner = before_stop.clone();
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.on_thread_start(move || {
@ -331,9 +329,7 @@ fn multi_threadpool() {
// channel yields occasionally even if there are values ready to receive.
#[test]
fn coop_and_block_in_place() {
use tokio::sync::mpsc;
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.threaded_scheduler()
// Setting max threads to 1 prevents another thread from claiming the
// runtime worker yielded as part of `block_in_place` and guarantees the
@ -344,7 +340,7 @@ fn coop_and_block_in_place() {
.unwrap();
rt.block_on(async move {
let (mut tx, mut rx) = mpsc::channel(1024);
let (mut tx, mut rx) = tokio::sync::mpsc::channel(1024);
// Fill the channel
for _ in 0..1024 {

View File

@ -14,11 +14,11 @@ use tokio::signal::unix::{signal, SignalKind};
fn dropping_loops_does_not_cause_starvation() {
let kind = SignalKind::user_defined1();
let mut first_rt = rt();
let first_rt = rt();
let mut first_signal =
first_rt.block_on(async { signal(kind).expect("failed to register first signal") });
let mut second_rt = rt();
let second_rt = rt();
let mut second_signal =
second_rt.block_on(async { signal(kind).expect("failed to register second signal") });

View File

@ -24,7 +24,7 @@ fn multi_loop() {
.map(|_| {
let sender = sender.clone();
thread::spawn(move || {
let mut rt = rt();
let rt = rt();
let _ = rt.block_on(async {
let mut signal = signal(SignalKind::hangup()).unwrap();
sender.send(()).unwrap();

View File

@ -79,7 +79,7 @@ async fn no_block_in_basic_scheduler() {
#[test]
fn yes_block_in_threaded_block_on() {
let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
@ -91,7 +91,7 @@ fn yes_block_in_threaded_block_on() {
#[test]
#[should_panic]
fn no_block_in_basic_block_on() {
let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
let rt = runtime::Builder::new().basic_scheduler().build().unwrap();
rt.block_on(async {
task::block_in_place(|| {});
});
@ -99,14 +99,14 @@ fn no_block_in_basic_block_on() {
#[test]
fn can_enter_basic_rt_from_within_block_in_place() {
let mut outer = tokio::runtime::Builder::new()
let outer = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
outer.block_on(async {
tokio::task::block_in_place(|| {
let mut inner = tokio::runtime::Builder::new()
let inner = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
@ -120,7 +120,7 @@ fn can_enter_basic_rt_from_within_block_in_place() {
fn useful_panic_message_when_dropping_rt_in_rt() {
use std::panic::{catch_unwind, AssertUnwindSafe};
let mut outer = tokio::runtime::Builder::new()
let outer = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
@ -147,7 +147,7 @@ fn useful_panic_message_when_dropping_rt_in_rt() {
#[test]
fn can_shutdown_with_zero_timeout_in_runtime() {
let mut outer = tokio::runtime::Builder::new()
let outer = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
@ -163,7 +163,7 @@ fn can_shutdown_with_zero_timeout_in_runtime() {
#[test]
fn can_shutdown_now_in_runtime() {
let mut outer = tokio::runtime::Builder::new()
let outer = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
@ -179,7 +179,7 @@ fn can_shutdown_now_in_runtime() {
#[test]
fn coop_disabled_in_block_in_place() {
let mut outer = tokio::runtime::Builder::new()
let outer = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_time()
.build()
@ -213,7 +213,7 @@ fn coop_disabled_in_block_in_place_in_block_on() {
let (done_tx, done_rx) = std::sync::mpsc::channel();
let done = done_tx.clone();
thread::spawn(move || {
let mut outer = tokio::runtime::Builder::new()
let outer = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();

View File

@ -133,12 +133,12 @@ fn local_threadpool_blocking_in_place() {
ON_RT_THREAD.with(|cell| cell.set(true));
let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
LocalSet::new().block_on(&mut rt, async {
LocalSet::new().block_on(&rt, async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let join = task::spawn_local(async move {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
@ -246,12 +246,12 @@ fn join_local_future_elsewhere() {
ON_RT_THREAD.with(|cell| cell.set(true));
let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
let local = LocalSet::new();
local.block_on(&mut rt, async move {
local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
let join = task::spawn_local(async move {
println!("hello world running...");
@ -286,7 +286,7 @@ fn drop_cancels_tasks() {
use std::rc::Rc;
// This test reproduces issue #1842
let mut rt = rt();
let rt = rt();
let rc1 = Rc::new(());
let rc2 = rc1.clone();
@ -303,7 +303,7 @@ fn drop_cancels_tasks() {
}
});
local.block_on(&mut rt, async {
local.block_on(&rt, async {
started_rx.await.unwrap();
});
drop(local);
@ -362,11 +362,11 @@ fn drop_cancels_remote_tasks() {
with_timeout(Duration::from_secs(60), || {
let (tx, mut rx) = mpsc::channel::<()>(1024);
let mut rt = rt();
let rt = rt();
let local = LocalSet::new();
local.spawn_local(async move { while rx.recv().await.is_some() {} });
local.block_on(&mut rt, async {
local.block_on(&rt, async {
time::delay_for(Duration::from_millis(1)).await;
});
@ -385,7 +385,7 @@ fn local_tasks_wake_join_all() {
use futures::future::join_all;
use tokio::task::LocalSet;
let mut rt = rt();
let rt = rt();
let set = LocalSet::new();
let mut handles = Vec::new();

View File

@ -28,7 +28,7 @@ fn timer_with_threaded_runtime() {
fn timer_with_basic_scheduler() {
use tokio::runtime::Builder;
let mut rt = Builder::new()
let rt = Builder::new()
.basic_scheduler()
.enable_all()
.build()