mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
chore: stabilize JoinSet and AbortHandle (#4920)
Closes #4535. This leaves the ID-related APIs unstable.
This commit is contained in:
parent
de81985762
commit
b67b8c1398
@ -416,7 +416,6 @@ where
|
||||
/// * `None` if the `JoinMap` is empty.
|
||||
///
|
||||
/// [`tokio::select!`]: tokio::select
|
||||
#[doc(alias = "join_one")]
|
||||
pub async fn join_next(&mut self) -> Option<(K, Result<V, JoinError>)> {
|
||||
let (res, id) = match self.tasks.join_next_with_id().await {
|
||||
Some(Ok((id, output))) => (Ok(output), id),
|
||||
@ -430,12 +429,6 @@ where
|
||||
Some((key, res))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated(since = "0.7.4", note = "renamed to `JoinMap::join_next`.")]
|
||||
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
|
||||
self.join_next().await
|
||||
}
|
||||
|
||||
/// Aborts all tasks and waits for them to finish shutting down.
|
||||
///
|
||||
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
|
||||
|
@ -347,7 +347,6 @@
|
||||
//!
|
||||
//! Likewise, some parts of the API are only available with the same flag:
|
||||
//!
|
||||
//! - [`task::JoinSet`]
|
||||
//! - [`task::Builder`]
|
||||
//!
|
||||
//! This flag enables **unstable** features. The public API of these features
|
||||
|
@ -406,6 +406,16 @@ macro_rules! cfg_unstable {
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! cfg_not_unstable {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
#[cfg(not(tokio_unstable))]
|
||||
#[cfg_attr(docsrs, doc(cfg(not(tokio_unstable))))]
|
||||
$item
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! cfg_not_trace {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
|
@ -11,14 +11,8 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
/// Dropping an `AbortHandle` releases the permission to terminate the task
|
||||
/// --- it does *not* abort the task.
|
||||
///
|
||||
/// **Note**: This is an [unstable API][unstable]. The public API of this type
|
||||
/// may break in 1.x releases. See [the documentation on unstable
|
||||
/// features][unstable] for details.
|
||||
///
|
||||
/// [unstable]: crate#unstable-features
|
||||
/// [`JoinHandle`]: crate::task::JoinHandle
|
||||
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
|
||||
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
|
||||
pub struct AbortHandle {
|
||||
raw: Option<RawTask>,
|
||||
id: Id,
|
||||
@ -40,9 +34,6 @@ impl AbortHandle {
|
||||
///
|
||||
/// [cancelled]: method@super::error::JoinError::is_cancelled
|
||||
/// [`JoinHandle::abort`]: method@super::JoinHandle::abort
|
||||
// the `AbortHandle` type is only publicly exposed when `tokio_unstable` is
|
||||
// enabled, but it is still defined for testing purposes.
|
||||
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
|
||||
pub fn abort(&self) {
|
||||
if let Some(ref raw) = self.raw {
|
||||
raw.remote_abort();
|
||||
@ -55,7 +46,6 @@ impl AbortHandle {
|
||||
/// called on the task. This is because the cancellation process may take
|
||||
/// some time, and this method does not return `true` until it has
|
||||
/// completed.
|
||||
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
|
||||
pub fn is_finished(&self) -> bool {
|
||||
if let Some(raw) = self.raw {
|
||||
let state = raw.header().state.load();
|
||||
|
@ -260,7 +260,6 @@ impl<T> JoinHandle<T> {
|
||||
}
|
||||
|
||||
/// Returns a new `AbortHandle` that can be used to remotely abort this task.
|
||||
#[cfg(any(tokio_unstable, test))]
|
||||
pub(crate) fn abort_handle(&self) -> super::AbortHandle {
|
||||
let raw = self.raw.map(|raw| {
|
||||
raw.ref_inc();
|
||||
|
@ -155,11 +155,11 @@ cfg_rt_multi_thread! {
|
||||
pub(super) use self::inject::Inject;
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
|
||||
#[cfg(feature = "rt")]
|
||||
mod abort;
|
||||
mod join;
|
||||
|
||||
#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
|
||||
#[cfg(feature = "rt")]
|
||||
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
|
||||
pub use self::abort::AbortHandle;
|
||||
|
||||
|
@ -10,7 +10,9 @@ use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use crate::runtime::Handle;
|
||||
use crate::task::{AbortHandle, Id, JoinError, JoinHandle, LocalSet};
|
||||
#[cfg(tokio_unstable)]
|
||||
use crate::task::Id;
|
||||
use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet};
|
||||
use crate::util::IdleNotifiedSet;
|
||||
|
||||
/// A collection of tasks spawned on a Tokio runtime.
|
||||
@ -23,10 +25,6 @@ use crate::util::IdleNotifiedSet;
|
||||
///
|
||||
/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
|
||||
///
|
||||
/// **Note**: This is an [unstable API][unstable]. The public API of this type
|
||||
/// may break in 1.x releases. See [the documentation on unstable
|
||||
/// features][unstable] for details.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// Spawn multiple tasks and wait for them.
|
||||
@ -53,9 +51,7 @@ use crate::util::IdleNotifiedSet;
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// [unstable]: crate#unstable-features
|
||||
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
|
||||
pub struct JoinSet<T> {
|
||||
inner: IdleNotifiedSet<JoinHandle<T>>,
|
||||
}
|
||||
@ -197,12 +193,6 @@ impl<T: 'static> JoinSet<T> {
|
||||
abort
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next`.")]
|
||||
pub async fn join_one(&mut self) -> Option<Result<T, JoinError>> {
|
||||
self.join_next().await
|
||||
}
|
||||
|
||||
/// Waits until one of the tasks in the set completes and returns its output.
|
||||
///
|
||||
/// Returns `None` if the set is empty.
|
||||
@ -212,11 +202,8 @@ impl<T: 'static> JoinSet<T> {
|
||||
/// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
|
||||
/// statement and some other branch completes first, it is guaranteed that no tasks were
|
||||
/// removed from this `JoinSet`.
|
||||
#[doc(alias = "join_one")]
|
||||
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
|
||||
crate::future::poll_fn(|cx| self.poll_join_next(cx))
|
||||
.await
|
||||
.map(|opt| opt.map(|(_, res)| res))
|
||||
crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
|
||||
}
|
||||
|
||||
/// Waits until one of the tasks in the set completes and returns its
|
||||
@ -235,15 +222,10 @@ impl<T: 'static> JoinSet<T> {
|
||||
///
|
||||
/// [task ID]: crate::task::Id
|
||||
/// [`JoinError::id`]: fn@crate::task::JoinError::id
|
||||
#[doc(alias = "join_one_with_id")]
|
||||
#[cfg(tokio_unstable)]
|
||||
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
|
||||
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
|
||||
crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next_with_id`")]
|
||||
pub async fn join_one_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
|
||||
self.join_next_with_id().await
|
||||
crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
|
||||
}
|
||||
|
||||
/// Aborts all tasks and waits for them to finish shutting down.
|
||||
@ -277,6 +259,60 @@ impl<T: 'static> JoinSet<T> {
|
||||
self.inner.drain(drop);
|
||||
}
|
||||
|
||||
/// Polls for one of the tasks in the set to complete.
|
||||
///
|
||||
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
|
||||
///
|
||||
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
|
||||
/// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
|
||||
/// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
|
||||
/// scheduled to receive a wakeup.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// This function returns:
|
||||
///
|
||||
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
|
||||
/// available right now.
|
||||
/// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
|
||||
/// The `value` is the return value of one of the tasks that completed.
|
||||
/// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
|
||||
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
|
||||
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
|
||||
///
|
||||
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
|
||||
/// This can happen if the [coop budget] is reached.
|
||||
///
|
||||
/// [coop budget]: crate::task#cooperative-scheduling
|
||||
fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
|
||||
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
|
||||
// the `notified` list if the waker is notified in the `poll` call below.
|
||||
let mut entry = match self.inner.pop_notified(cx.waker()) {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
if self.is_empty() {
|
||||
return Poll::Ready(None);
|
||||
} else {
|
||||
// The waker was set by `pop_notified`.
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
|
||||
|
||||
if let Poll::Ready(res) = res {
|
||||
let _entry = entry.remove();
|
||||
Poll::Ready(Some(res))
|
||||
} else {
|
||||
// A JoinHandle generally won't emit a wakeup without being ready unless
|
||||
// the coop limit has been reached. We yield to the executor in this
|
||||
// case.
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// Polls for one of the tasks in the set to complete.
|
||||
///
|
||||
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
|
||||
@ -304,7 +340,11 @@ impl<T: 'static> JoinSet<T> {
|
||||
///
|
||||
/// [coop budget]: crate::task#cooperative-scheduling
|
||||
/// [task ID]: crate::task::Id
|
||||
fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(Id, T), JoinError>>> {
|
||||
#[cfg(tokio_unstable)]
|
||||
fn poll_join_next_with_id(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<(Id, T), JoinError>>> {
|
||||
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
|
||||
// the `notified` list if the waker is notified in the `poll` call below.
|
||||
let mut entry = match self.inner.pop_notified(cx.waker()) {
|
||||
|
@ -307,11 +307,18 @@ cfg_rt! {
|
||||
mod unconstrained;
|
||||
pub use unconstrained::{unconstrained, Unconstrained};
|
||||
|
||||
#[doc(inline)]
|
||||
pub use join_set::JoinSet;
|
||||
pub use crate::runtime::task::AbortHandle;
|
||||
|
||||
cfg_not_unstable! {
|
||||
mod join_set;
|
||||
}
|
||||
|
||||
cfg_unstable! {
|
||||
pub use crate::runtime::task::Id;
|
||||
|
||||
pub mod join_set;
|
||||
#[doc(inline)]
|
||||
pub use join_set::JoinSet;
|
||||
pub use crate::runtime::task::{Id, AbortHandle};
|
||||
}
|
||||
|
||||
cfg_trace! {
|
||||
|
@ -44,10 +44,8 @@ pub(crate) mod linked_list;
|
||||
mod rand;
|
||||
|
||||
cfg_rt! {
|
||||
cfg_unstable! {
|
||||
mod idle_notified_set;
|
||||
pub(crate) use idle_notified_set::IdleNotifiedSet;
|
||||
}
|
||||
mod idle_notified_set;
|
||||
pub(crate) use idle_notified_set::IdleNotifiedSet;
|
||||
|
||||
mod wake;
|
||||
pub(crate) use wake::WakerRef;
|
||||
|
Loading…
x
Reference in New Issue
Block a user