mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
coop: expose an unconstrained() opt-out (#3547)
This commit is contained in:
parent
f70b9b84f7
commit
05eeea570e
@ -1,55 +1,33 @@
|
||||
#![cfg_attr(not(feature = "full"), allow(dead_code))]
|
||||
|
||||
//! Opt-in yield points for improved cooperative scheduling.
|
||||
//! Yield points for improved cooperative scheduling.
|
||||
//!
|
||||
//! A single call to [`poll`] on a top-level task may potentially do a lot of
|
||||
//! work before it returns `Poll::Pending`. If a task runs for a long period of
|
||||
//! time without yielding back to the executor, it can starve other tasks
|
||||
//! waiting on that executor to execute them, or drive underlying resources.
|
||||
//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
|
||||
//! long-running task. Instead, this module provides an opt-in mechanism for
|
||||
//! futures to collaborate with the executor to avoid starvation.
|
||||
//! Documentation for this can be found in the [`tokio::task`] module.
|
||||
//!
|
||||
//! Consider a future like this one:
|
||||
//!
|
||||
//! ```
|
||||
//! # use tokio_stream::{Stream, StreamExt};
|
||||
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
|
||||
//! while let Some(_) = input.next().await {}
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! It may look harmless, but consider what happens under heavy load if the
|
||||
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
|
||||
//! yield, and will starve other tasks and resources on the same executor. With
|
||||
//! opt-in yield points, this problem is alleviated:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! # use tokio_stream::{Stream, StreamExt};
|
||||
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
|
||||
//! while let Some(_) = input.next().await {
|
||||
//! tokio::coop::proceed().await;
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! The `proceed` future will coordinate with the executor to make sure that
|
||||
//! every so often control is yielded back to the executor so it can run other
|
||||
//! tasks.
|
||||
//!
|
||||
//! # Placing yield points
|
||||
//!
|
||||
//! Voluntary yield points should be placed _after_ at least some work has been
|
||||
//! done. If they are not, a future sufficiently deep in the task hierarchy may
|
||||
//! end up _never_ getting to run because of the number of yield points that
|
||||
//! inevitably appear before it is reached. In general, you will want yield
|
||||
//! points to only appear in "leaf" futures -- those that do not themselves poll
|
||||
//! other futures. By doing this, you avoid double-counting each iteration of
|
||||
//! the outer future against the cooperating budget.
|
||||
//!
|
||||
//! [`poll`]: method@std::future::Future::poll
|
||||
//! [`tokio::task`]: crate::task.
|
||||
|
||||
// NOTE: The doctests in this module are ignored since the whole module is (currently) private.
|
||||
// ```ignore
|
||||
// # use tokio_stream::{Stream, StreamExt};
|
||||
// async fn drop_all<I: Stream + Unpin>(mut input: I) {
|
||||
// while let Some(_) = input.next().await {
|
||||
// tokio::coop::proceed().await;
|
||||
// }
|
||||
// }
|
||||
// ```
|
||||
//
|
||||
// The `proceed` future will coordinate with the executor to make sure that
|
||||
// every so often control is yielded back to the executor so it can run other
|
||||
// tasks.
|
||||
//
|
||||
// # Placing yield points
|
||||
//
|
||||
// Voluntary yield points should be placed _after_ at least some work has been
|
||||
// done. If they are not, a future sufficiently deep in the task hierarchy may
|
||||
// end up _never_ getting to run because of the number of yield points that
|
||||
// inevitably appear before it is reached. In general, you will want yield
|
||||
// points to only appear in "leaf" futures -- those that do not themselves poll
|
||||
// other futures. By doing this, you avoid double-counting each iteration of
|
||||
// the outer future against the cooperating budget.
|
||||
|
||||
use std::cell::Cell;
|
||||
|
||||
@ -98,6 +76,13 @@ pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
|
||||
with_budget(Budget::initial(), f)
|
||||
}
|
||||
|
||||
/// Run the given closure with an unconstrained task budget. When the function returns, the budget
|
||||
/// is reset to the value prior to calling the function.
|
||||
#[inline(always)]
|
||||
pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
|
||||
with_budget(Budget::unconstrained(), f)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
|
||||
struct ResetGuard<'a> {
|
||||
|
@ -357,3 +357,21 @@ macro_rules! cfg_coop {
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! cfg_not_coop {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
#[cfg(not(any(
|
||||
feature = "fs",
|
||||
feature = "io-std",
|
||||
feature = "net",
|
||||
feature = "process",
|
||||
feature = "rt",
|
||||
feature = "signal",
|
||||
feature = "sync",
|
||||
feature = "time",
|
||||
)))]
|
||||
$item
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
@ -209,11 +209,66 @@
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ### Cooperative scheduling
|
||||
//!
|
||||
//! A single call to [`poll`] on a top-level task may potentially do a lot of
|
||||
//! work before it returns `Poll::Pending`. If a task runs for a long period of
|
||||
//! time without yielding back to the executor, it can starve other tasks
|
||||
//! waiting on that executor to execute them, or drive underlying resources.
|
||||
//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
|
||||
//! long-running task. Instead, this module provides an opt-in mechanism for
|
||||
//! futures to collaborate with the executor to avoid starvation.
|
||||
//!
|
||||
//! Consider a future like this one:
|
||||
//!
|
||||
//! ```
|
||||
//! # use tokio_stream::{Stream, StreamExt};
|
||||
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
|
||||
//! while let Some(_) = input.next().await {}
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! It may look harmless, but consider what happens under heavy load if the
|
||||
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
|
||||
//! yield, and will starve other tasks and resources on the same executor.
|
||||
//!
|
||||
//! To account for this, Tokio has explicit yield points in a number of library
|
||||
//! functions, which force tasks to return to the executor periodically.
|
||||
//!
|
||||
//!
|
||||
//! #### unconstrained
|
||||
//!
|
||||
//! If necessary, [`task::unconstrained`] lets you opt out a future of Tokio's cooperative
|
||||
//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to
|
||||
//! Tokio. For example:
|
||||
//!
|
||||
//! ```
|
||||
//! # #[tokio::main]
|
||||
//! # async fn main() {
|
||||
//! use tokio::{task, sync::mpsc};
|
||||
//!
|
||||
//! let fut = async {
|
||||
//! let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
//!
|
||||
//! for i in 0..1000 {
|
||||
//! let _ = tx.send(());
|
||||
//! // This will always be ready. If coop was in effect, this code would be forced to yield
|
||||
//! // periodically. However, if left unconstrained, then this code will never yield.
|
||||
//! rx.recv().await;
|
||||
//! }
|
||||
//! };
|
||||
//!
|
||||
//! task::unconstrained(fut).await;
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! [`task::spawn_blocking`]: crate::task::spawn_blocking
|
||||
//! [`task::block_in_place`]: crate::task::block_in_place
|
||||
//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
|
||||
//! [`task::yield_now`]: crate::task::yield_now()
|
||||
//! [`thread::yield_now`]: std::thread::yield_now
|
||||
//! [`task::unconstrained`]: crate::task::unconstrained()
|
||||
//! [`poll`]: method@std::future::Future::poll
|
||||
|
||||
cfg_rt! {
|
||||
pub use crate::runtime::task::{JoinError, JoinHandle};
|
||||
@ -236,4 +291,7 @@ cfg_rt! {
|
||||
|
||||
mod task_local;
|
||||
pub use task_local::LocalKey;
|
||||
|
||||
mod unconstrained;
|
||||
pub use unconstrained::{unconstrained, Unconstrained};
|
||||
}
|
||||
|
43
tokio/src/task/unconstrained.rs
Normal file
43
tokio/src/task/unconstrained.rs
Normal file
@ -0,0 +1,43 @@
|
||||
use pin_project_lite::pin_project;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
/// Future for the [`unconstrained`](unconstrained) method.
|
||||
#[must_use = "Unconstrained does nothing unless polled"]
|
||||
pub struct Unconstrained<F> {
|
||||
#[pin]
|
||||
inner: F,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Future for Unconstrained<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
type Output = <F as Future>::Output;
|
||||
|
||||
cfg_coop! {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let inner = self.project().inner;
|
||||
crate::coop::with_unconstrained(|| inner.poll(cx))
|
||||
}
|
||||
}
|
||||
|
||||
cfg_not_coop! {
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let inner = self.project().inner;
|
||||
inner.poll(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Turn off cooperative scheduling for a future. The future will never be forced to yield by
|
||||
/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields
|
||||
/// otherwise.
|
||||
///
|
||||
/// See also the usage example in the [task module](index.html#unconstrained).
|
||||
pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
|
||||
Unconstrained { inner }
|
||||
}
|
@ -1017,6 +1017,32 @@ rt_test! {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn coop_unconstrained() {
|
||||
use std::task::Poll::Ready;
|
||||
|
||||
let rt = rt();
|
||||
|
||||
rt.block_on(async {
|
||||
// Create a bunch of tasks
|
||||
let mut tasks = (0..1_000).map(|_| {
|
||||
tokio::spawn(async { })
|
||||
}).collect::<Vec<_>>();
|
||||
|
||||
// Hope that all the tasks complete...
|
||||
time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
tokio::task::unconstrained(poll_fn(|cx| {
|
||||
// All the tasks should be ready
|
||||
for task in &mut tasks {
|
||||
assert!(Pin::new(task).poll(cx).is_ready());
|
||||
}
|
||||
|
||||
Ready(())
|
||||
})).await;
|
||||
});
|
||||
}
|
||||
|
||||
// Tests that the "next task" scheduler optimization is not able to starve
|
||||
// other tasks.
|
||||
#[test]
|
||||
|
Loading…
x
Reference in New Issue
Block a user