rt: add Handle::spawn_blocking method (#2501)

This follows a similar pattern to `Handle::spawn` to add the
blocking spawn capabilities to `Handle`.
This commit is contained in:
Adam C. Foltzer 2020-05-07 16:24:24 -07:00 committed by GitHub
parent 4748b2571f
commit 07533a5255
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 5 deletions

View File

@ -9,7 +9,7 @@ cfg_blocking_impl! {
mod schedule;
mod shutdown;
mod task;
pub(crate) mod task;
use crate::runtime::Builder;

View File

@ -148,7 +148,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====
impl Spawner {
fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();

View File

@ -6,7 +6,7 @@ use crate::runtime::task::{self, Task};
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
/// in `release.
pub(super) struct NoopSchedule;
pub(crate) struct NoopSchedule;
impl task::Schedule for NoopSchedule {
fn bind(_task: Task<Self>) -> NoopSchedule {

View File

@ -3,13 +3,13 @@ use std::pin::Pin;
use std::task::{Context, Poll};
/// Converts a function to a future that completes on poll
pub(super) struct BlockingTask<T> {
pub(crate) struct BlockingTask<T> {
func: Option<T>,
}
impl<T> BlockingTask<T> {
/// Initializes a new blocking task from the given function
pub(super) fn new(func: T) -> BlockingTask<T> {
pub(crate) fn new(func: T) -> BlockingTask<T> {
BlockingTask { func: Some(func) }
}
}

View File

@ -1,6 +1,11 @@
use crate::runtime::{blocking, context, io, time, Spawner};
use std::{error, fmt};
cfg_blocking! {
use crate::runtime::task;
use crate::runtime::blocking::task::BlockingTask;
}
cfg_rt_core! {
use crate::task::JoinHandle;
@ -263,6 +268,79 @@ cfg_rt_core! {
}
}
cfg_blocking! {
impl Handle {
/// Runs the provided closure on a thread where blocking is acceptable.
///
/// In general, issuing a blocking call or performing a lot of compute in a
/// future without yielding is not okay, as it may prevent the executor from
/// driving other futures forward. This function runs the provided closure
/// on a thread dedicated to blocking operations. See the [CPU-bound tasks
/// and blocking code][blocking] section for more information.
///
/// Tokio will spawn more blocking threads when they are requested through
/// this function until the upper limit configured on the [`Builder`] is
/// reached. This limit is very large by default, because `spawn_blocking` is
/// often used for various kinds of IO operations that cannot be performed
/// asynchronously. When you run CPU-bound code using `spawn_blocking`, you
/// should keep this large upper limit in mind; to run your CPU-bound
/// computations on only a few threads, you should use a separate thread
/// pool such as [rayon] rather than configuring the number of blocking
/// threads.
///
/// This function is intended for non-async operations that eventually
/// finish on their own. If you want to spawn an ordinary thread, you should
/// use [`thread::spawn`] instead.
///
/// Closures spawned using `spawn_blocking` cannot be cancelled. When you
/// shut down the executor, it will wait indefinitely for all blocking
/// operations to finish. You can use [`shutdown_timeout`] to stop waiting
/// for them after a certain timeout. Be aware that this will still not
/// cancel the tasks — they are simply allowed to keep running after the
/// method returns.
///
/// Note that if you are using the [basic scheduler], this function will
/// still spawn additional threads for blocking operations. The basic
/// scheduler's single thread is only used for asynchronous code.
///
/// [`Builder`]: struct@crate::runtime::Builder
/// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
/// [rayon]: https://docs.rs/rayon
/// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
/// [`thread::spawn`]: fn@std::thread::spawn
/// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// let handle = rt.handle();
///
/// let res = handle.spawn_blocking(move || {
/// // do some compute-heavy work or call synchronous code
/// "done computing"
/// }).await?;
///
/// assert_eq!(res, "done computing");
/// # Ok(())
/// # }
/// ```
pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (task, handle) = task::joinable(BlockingTask::new(f));
let _ = self.blocking_spawner.spawn(task, self);
handle
}
}
}
/// Error returned by `try_current` when no Runtime has been started
pub struct TryCurrentError(());