rt: add tokio::task::Builder (#3881)

Adds a builder API for spawning tasks. Initially, this enables the caller to name the spawned
task in order to provide better visibility into all tasks in the system.
This commit is contained in:
Jacob Rothstein 2021-06-29 10:47:30 -07:00 committed by GitHub
parent b521cc2689
commit 8fa29cb00a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 219 additions and 12 deletions

View File

@ -145,7 +145,7 @@ impl Handle {
F::Output: Send + 'static,
{
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task");
let future = crate::util::trace::task(future, "task", None);
self.spawner.spawn(future)
}
@ -170,6 +170,15 @@ impl Handle {
/// # }
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.spawn_blocking_inner(func, None)
}
#[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn spawn_blocking_inner<F, R>(&self, func: F, name: Option<&str>) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@ -187,6 +196,7 @@ impl Handle {
"task",
kind = %"blocking",
function = %std::any::type_name::<F>(),
task.name = %name.unwrap_or_default(),
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
);
#[cfg(not(tokio_track_caller))]
@ -194,10 +204,15 @@ impl Handle {
target: "tokio::task",
"task",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
function = %std::any::type_name::<F>(),
);
fut.instrument(span)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;
let (task, handle) = task::joinable(fut);
let _ = self.blocking_spawner.spawn(task, &self);
handle

105
tokio/src/task/builder.rs Normal file
View File

@ -0,0 +1,105 @@
#![allow(unreachable_pub)]
use crate::util::error::CONTEXT_MISSING_ERROR;
use crate::{runtime::context, task::JoinHandle};
use std::future::Future;
/// Factory which is used to configure the properties of a new task.
///
/// Methods can be chained in order to configure it.
///
/// Currently, there is only one configuration option:
///
/// - [`name`], which specifies an associated name for
/// the task
///
/// There are three types of task that can be spawned from a Builder:
/// - [`spawn_local`] for executing futures on the current thread
/// - [`spawn`] for executing [`Send`] futures on the runtime
/// - [`spawn_blocking`] for executing blocking code in the
/// blocking thread pool.
///
/// ## Example
///
/// ```no_run
/// use tokio::net::{TcpListener, TcpStream};
///
/// use std::io;
///
/// async fn process(socket: TcpStream) {
/// // ...
/// # drop(socket);
/// }
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
///
/// tokio::task::Builder::new()
/// .name("tcp connection handler")
/// .spawn(async move {
/// // Process each socket concurrently.
/// process(socket).await
/// });
/// }
/// }
/// ```
#[derive(Default, Debug)]
pub struct Builder<'a> {
name: Option<&'a str>,
}
impl<'a> Builder<'a> {
/// Creates a new task builder.
pub fn new() -> Self {
Self::default()
}
/// Assigns a name to the task which will be spawned.
pub fn name(&self, name: &'a str) -> Self {
Self { name: Some(name) }
}
/// Spawns a task on the executor.
///
/// See [`task::spawn`](crate::task::spawn) for
/// more details.
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
super::spawn::spawn_inner(future, self.name)
}
/// Spawns a task on the current thread.
///
/// See [`task::spawn_local`](crate::task::spawn_local)
/// for more details.
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_local<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + 'static,
Fut::Output: 'static,
{
super::local::spawn_local_inner(future, self.name)
}
/// Spawns blocking code on the blocking threadpool.
///
/// See [`task::spawn_blocking`](crate::task::spawn_blocking)
/// for more details.
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<Function, Output>(self, function: Function) -> JoinHandle<Output>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
context::current()
.expect(CONTEXT_MISSING_ERROR)
.spawn_blocking_inner(function, self.name)
}
}

View File

@ -297,7 +297,14 @@ cfg_rt! {
F: Future + 'static,
F::Output: 'static,
{
let future = crate::util::trace::task(future, "local");
spawn_local_inner(future, None)
}
pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
where F: Future + 'static,
F::Output: 'static
{
let future = crate::util::trace::task(future, "local", name);
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");
@ -381,7 +388,7 @@ impl LocalSet {
F: Future + 'static,
F::Output: 'static,
{
let future = crate::util::trace::task(future, "local");
let future = crate::util::trace::task(future, "local", None);
let (task, handle) = unsafe { task::joinable_local(future) };
self.context.tasks.borrow_mut().queue.push_back(task);
self.context.shared.waker.wake();

View File

@ -299,4 +299,9 @@ cfg_rt! {
mod unconstrained;
pub use unconstrained::{unconstrained, Unconstrained};
cfg_trace! {
mod builder;
pub use builder::Builder;
}
}

View File

@ -1,6 +1,4 @@
use crate::runtime;
use crate::task::JoinHandle;
use crate::util::error::CONTEXT_MISSING_ERROR;
use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR};
use std::future::Future;
@ -124,14 +122,22 @@ cfg_rt! {
/// error[E0391]: cycle detected when processing `main`
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let spawn_handle = runtime::context::spawn_handle()
.expect(CONTEXT_MISSING_ERROR);
let task = crate::util::trace::task(task, "task");
spawn_inner(future, None)
}
#[cfg_attr(tokio_track_caller, track_caller)]
pub(super) fn spawn_inner<T>(future: T, name: Option<&str>) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let spawn_handle = crate::runtime::context::spawn_handle().expect(CONTEXT_MISSING_ERROR);
let task = crate::util::trace::task(future, "task", name);
spawn_handle.spawn(task)
}
}

View File

@ -4,7 +4,7 @@ cfg_trace! {
#[inline]
#[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn task<F>(task: F, kind: &'static str) -> Instrumented<F> {
pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>) -> Instrumented<F> {
use tracing::instrument::Instrument;
#[cfg(tokio_track_caller)]
let location = std::panic::Location::caller();
@ -14,12 +14,14 @@ cfg_trace! {
"task",
%kind,
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
task.name = %name.unwrap_or_default()
);
#[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
target: "tokio::task",
"task",
%kind,
task.name = %name.unwrap_or_default()
);
task.instrument(span)
}
@ -29,7 +31,7 @@ cfg_trace! {
cfg_not_trace! {
cfg_rt! {
#[inline]
pub(crate) fn task<F>(task: F, _: &'static str) -> F {
pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>) -> F {
// nop
task
}

View File

@ -0,0 +1,67 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
mod tests {
use std::rc::Rc;
use tokio::{
task::{Builder, LocalSet},
test,
};
#[test]
async fn spawn_with_name() {
let result = Builder::new()
.name("name")
.spawn(async { "task executed" })
.await;
assert_eq!(result.unwrap(), "task executed");
}
#[test]
async fn spawn_blocking_with_name() {
let result = Builder::new()
.name("name")
.spawn_blocking(|| "task executed")
.await;
assert_eq!(result.unwrap(), "task executed");
}
#[test]
async fn spawn_local_with_name() {
let unsend_data = Rc::new("task executed");
let result = LocalSet::new()
.run_until(async move {
Builder::new()
.name("name")
.spawn_local(async move { unsend_data })
.await
})
.await;
assert_eq!(*result.unwrap(), "task executed");
}
#[test]
async fn spawn_without_name() {
let result = Builder::new().spawn(async { "task executed" }).await;
assert_eq!(result.unwrap(), "task executed");
}
#[test]
async fn spawn_blocking_without_name() {
let result = Builder::new().spawn_blocking(|| "task executed").await;
assert_eq!(result.unwrap(), "task executed");
}
#[test]
async fn spawn_local_without_name() {
let unsend_data = Rc::new("task executed");
let result = LocalSet::new()
.run_until(async move { Builder::new().spawn_local(async move { unsend_data }).await })
.await;
assert_eq!(*result.unwrap(), "task executed");
}
}