mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
rt: simplify coop implementation (#2498)
Simplifies coop implementation. Prunes unused code, create a `Budget` type to track the current budget.
This commit is contained in:
parent
66fef4a9bc
commit
4748b2571f
@ -1,11 +1,12 @@
|
||||
//! Opt-in yield points for improved cooperative scheduling.
|
||||
//!
|
||||
//! A single call to [`poll`] on a top-level task may potentially do a lot of work before it
|
||||
//! returns `Poll::Pending`. If a task runs for a long period of time without yielding back to the
|
||||
//! executor, it can starve other tasks waiting on that executor to execute them, or drive
|
||||
//! underlying resources. Since Rust does not have a runtime, it is difficult to forcibly preempt a
|
||||
//! long-running task. Instead, this module provides an opt-in mechanism for futures to collaborate
|
||||
//! with the executor to avoid starvation.
|
||||
//! A single call to [`poll`] on a top-level task may potentially do a lot of
|
||||
//! work before it returns `Poll::Pending`. If a task runs for a long period of
|
||||
//! time without yielding back to the executor, it can starve other tasks
|
||||
//! waiting on that executor to execute them, or drive underlying resources.
|
||||
//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
|
||||
//! long-running task. Instead, this module provides an opt-in mechanism for
|
||||
//! futures to collaborate with the executor to avoid starvation.
|
||||
//!
|
||||
//! Consider a future like this one:
|
||||
//!
|
||||
@ -16,9 +17,10 @@
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! It may look harmless, but consider what happens under heavy load if the input stream is
|
||||
//! _always_ ready. If we spawn `drop_all`, the task will never yield, and will starve other tasks
|
||||
//! and resources on the same executor. With opt-in yield points, this problem is alleviated:
|
||||
//! It may look harmless, but consider what happens under heavy load if the
|
||||
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
|
||||
//! yield, and will starve other tasks and resources on the same executor. With
|
||||
//! opt-in yield points, this problem is alleviated:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! # use tokio::stream::{Stream, StreamExt};
|
||||
@ -29,67 +31,89 @@
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! The `proceed` future will coordinate with the executor to make sure that every so often control
|
||||
//! is yielded back to the executor so it can run other tasks.
|
||||
//! The `proceed` future will coordinate with the executor to make sure that
|
||||
//! every so often control is yielded back to the executor so it can run other
|
||||
//! tasks.
|
||||
//!
|
||||
//! # Placing yield points
|
||||
//!
|
||||
//! Voluntary yield points should be placed _after_ at least some work has been done. If they are
|
||||
//! not, a future sufficiently deep in the task hierarchy may end up _never_ getting to run because
|
||||
//! of the number of yield points that inevitably appear before it is reached. In general, you will
|
||||
//! want yield points to only appear in "leaf" futures -- those that do not themselves poll other
|
||||
//! futures. By doing this, you avoid double-counting each iteration of the outer future against
|
||||
//! the cooperating budget.
|
||||
//! Voluntary yield points should be placed _after_ at least some work has been
|
||||
//! done. If they are not, a future sufficiently deep in the task hierarchy may
|
||||
//! end up _never_ getting to run because of the number of yield points that
|
||||
//! inevitably appear before it is reached. In general, you will want yield
|
||||
//! points to only appear in "leaf" futures -- those that do not themselves poll
|
||||
//! other futures. By doing this, you avoid double-counting each iteration of
|
||||
//! the outer future against the cooperating budget.
|
||||
//!
|
||||
//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
|
||||
//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
|
||||
|
||||
// NOTE: The doctests in this module are ignored since the whole module is (currently) private.
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// Constant used to determine how much "work" a task is allowed to do without yielding.
|
||||
///
|
||||
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough to amortize wakeup
|
||||
/// and scheduling costs, but low enough that we do not starve other tasks for too long. The value
|
||||
/// also needs to be high enough that particularly deep tasks are able to do at least some useful
|
||||
/// work at all.
|
||||
///
|
||||
/// Note that as more yield points are added in the ecosystem, this value will probably also have
|
||||
/// to be raised.
|
||||
const BUDGET: usize = 128;
|
||||
|
||||
/// Constant used to determine if budgeting has been disabled.
|
||||
const UNCONSTRAINED: usize = usize::max_value();
|
||||
|
||||
thread_local! {
|
||||
static HITS: Cell<usize> = Cell::new(UNCONSTRAINED);
|
||||
static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
|
||||
}
|
||||
|
||||
/// Run the given closure with a cooperative task budget.
|
||||
///
|
||||
/// Enabling budgeting when it is already enabled is a no-op.
|
||||
/// Opaque type tracking the amount of "work" a task may still do before
|
||||
/// yielding back to the scheduler.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub(crate) struct Budget(Option<u8>);
|
||||
|
||||
impl Budget {
|
||||
/// Budget assigned to a task on each poll.
|
||||
///
|
||||
/// The value itself is chosen somewhat arbitrarily. It needs to be high
|
||||
/// enough to amortize wakeup and scheduling costs, but low enough that we
|
||||
/// do not starve other tasks for too long. The value also needs to be high
|
||||
/// enough that particularly deep tasks are able to do at least some useful
|
||||
/// work at all.
|
||||
///
|
||||
/// Note that as more yield points are added in the ecosystem, this value
|
||||
/// will probably also have to be raised.
|
||||
const fn initial() -> Budget {
|
||||
Budget(Some(128))
|
||||
}
|
||||
|
||||
/// Returns an unconstrained budget. Operations will not be limited.
|
||||
const fn unconstrained() -> Budget {
|
||||
Budget(None)
|
||||
}
|
||||
}
|
||||
|
||||
cfg_rt_threaded! {
|
||||
impl Budget {
|
||||
fn has_remaining(self) -> bool {
|
||||
self.0.map(|budget| budget > 0).unwrap_or(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the given closure with a cooperative task budget. When the function
|
||||
/// returns, the budget is reset to the value prior to calling the function.
|
||||
#[inline(always)]
|
||||
pub(crate) fn budget<F, R>(f: F) -> R
|
||||
where
|
||||
F: FnOnce() -> R,
|
||||
{
|
||||
HITS.with(move |hits| {
|
||||
if hits.get() != UNCONSTRAINED {
|
||||
// We are already being budgeted.
|
||||
//
|
||||
// Arguably this should be an error, but it can happen "correctly"
|
||||
// such as with block_on + LocalSet, so we make it a no-op.
|
||||
return f();
|
||||
}
|
||||
struct ResetGuard<'a> {
|
||||
cell: &'a Cell<Budget>,
|
||||
prev: Budget,
|
||||
}
|
||||
|
||||
impl<'a> Drop for ResetGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
self.cell.set(self.prev);
|
||||
}
|
||||
}
|
||||
|
||||
CURRENT.with(move |cell| {
|
||||
let prev = cell.get();
|
||||
|
||||
cell.set(Budget::initial());
|
||||
|
||||
let _guard = ResetGuard { cell, prev };
|
||||
|
||||
hits.set(BUDGET);
|
||||
let _guard = ResetGuard {
|
||||
hits,
|
||||
prev: UNCONSTRAINED,
|
||||
};
|
||||
f()
|
||||
})
|
||||
}
|
||||
@ -97,266 +121,53 @@ where
|
||||
cfg_rt_threaded! {
|
||||
#[inline(always)]
|
||||
pub(crate) fn has_budget_remaining() -> bool {
|
||||
HITS.with(|hits| hits.get() > 0)
|
||||
CURRENT.with(|cell| cell.get().has_remaining())
|
||||
}
|
||||
}
|
||||
|
||||
cfg_blocking_impl! {
|
||||
/// Forcibly remove the budgeting constraints early.
|
||||
pub(crate) fn stop() {
|
||||
HITS.with(|hits| {
|
||||
hits.set(UNCONSTRAINED);
|
||||
CURRENT.with(|cell| {
|
||||
cell.set(Budget::unconstrained());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
cfg_rt_core! {
|
||||
cfg_rt_util! {
|
||||
/// Run the given closure with a new task budget, resetting the previous
|
||||
/// budget when the closure finishes.
|
||||
///
|
||||
/// This is intended for internal use by `LocalSet` and (potentially) other
|
||||
/// similar schedulers which are themselves futures, and need a fresh budget
|
||||
/// for each of their children.
|
||||
#[inline(always)]
|
||||
pub(crate) fn reset<F, R>(f: F) -> R
|
||||
where
|
||||
F: FnOnce() -> R,
|
||||
{
|
||||
HITS.with(move |hits| {
|
||||
let prev = hits.get();
|
||||
hits.set(UNCONSTRAINED);
|
||||
let _guard = ResetGuard {
|
||||
hits,
|
||||
prev,
|
||||
};
|
||||
f()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
cfg_coop! {
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// Invoke `f` with a subset of the remaining budget.
|
||||
///
|
||||
/// This is useful if you have sub-futures that you need to poll, but that you want to restrict
|
||||
/// from using up your entire budget. For example, imagine the following future:
|
||||
///
|
||||
/// ```rust
|
||||
/// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
|
||||
/// use futures::stream::FuturesUnordered;
|
||||
/// struct MyFuture<F1, F2> {
|
||||
/// big: FuturesUnordered<F1>,
|
||||
/// small: F2,
|
||||
/// }
|
||||
///
|
||||
/// use tokio::stream::Stream;
|
||||
/// impl<F1, F2> Future for MyFuture<F1, F2>
|
||||
/// where F1: Future, F2: Future
|
||||
/// # , F1: Unpin, F2: Unpin
|
||||
/// {
|
||||
/// type Output = F2::Output;
|
||||
///
|
||||
/// // fn poll(...)
|
||||
/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
|
||||
/// # let this = &mut *self;
|
||||
/// let mut big = // something to pin self.big
|
||||
/// # Pin::new(&mut this.big);
|
||||
/// let small = // something to pin self.small
|
||||
/// # Pin::new(&mut this.small);
|
||||
///
|
||||
/// // see if any of the big futures have finished
|
||||
/// while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) {
|
||||
/// // do something with e
|
||||
/// # let _ = e;
|
||||
/// }
|
||||
///
|
||||
/// // see if the small future has finished
|
||||
/// small.poll(cx)
|
||||
/// }
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and
|
||||
/// `small` never gets polled. That would be sad. If you want to stick up for the little future,
|
||||
/// that's what `limit` is for. It lets you portion out a smaller part of the yield budget to a
|
||||
/// particular segment of your code. In the code above, you would write
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
|
||||
/// # use futures::stream::FuturesUnordered;
|
||||
/// # struct MyFuture<F1, F2> {
|
||||
/// # big: FuturesUnordered<F1>,
|
||||
/// # small: F2,
|
||||
/// # }
|
||||
/// #
|
||||
/// # use tokio::stream::Stream;
|
||||
/// # impl<F1, F2> Future for MyFuture<F1, F2>
|
||||
/// # where F1: Future, F2: Future
|
||||
/// # , F1: Unpin, F2: Unpin
|
||||
/// # {
|
||||
/// # type Output = F2::Output;
|
||||
/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
|
||||
/// # let this = &mut *self;
|
||||
/// # let mut big = Pin::new(&mut this.big);
|
||||
/// # let small = Pin::new(&mut this.small);
|
||||
/// #
|
||||
/// // see if any of the big futures have finished
|
||||
/// while let Some(e) = futures::ready!(tokio::coop::limit(64, || big.as_mut().poll_next(cx))) {
|
||||
/// # // do something with e
|
||||
/// # let _ = e;
|
||||
/// # }
|
||||
/// # small.poll(cx)
|
||||
/// # }
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left
|
||||
/// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`,
|
||||
/// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its
|
||||
/// entire budget.
|
||||
///
|
||||
/// Note that you cannot _increase_ your budget by calling `limit`. The budget provided to the code
|
||||
/// inside the buget is the _minimum_ of the _current_ budget and the bound.
|
||||
///
|
||||
#[allow(unreachable_pub, dead_code)]
|
||||
pub fn limit<R>(bound: usize, f: impl FnOnce() -> R) -> R {
|
||||
HITS.with(|hits| {
|
||||
let budget = hits.get();
|
||||
// with_bound cannot _increase_ the remaining budget
|
||||
let bound = std::cmp::min(budget, bound);
|
||||
// When f() exits, how much should we add to what is left?
|
||||
let floor = budget.saturating_sub(bound);
|
||||
// Make sure we restore the remaining budget even on panic
|
||||
struct RestoreBudget<'a>(&'a Cell<usize>, usize);
|
||||
impl<'a> Drop for RestoreBudget<'a> {
|
||||
fn drop(&mut self) {
|
||||
let left = self.0.get();
|
||||
self.0.set(self.1 + left);
|
||||
/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
|
||||
#[inline]
|
||||
pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
|
||||
CURRENT.with(|cell| {
|
||||
let mut budget = cell.get();
|
||||
|
||||
if budget.decrement() {
|
||||
cell.set(budget);
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
// Time to restrict!
|
||||
hits.set(bound);
|
||||
let _restore = RestoreBudget(&hits, floor);
|
||||
f()
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
|
||||
#[allow(unreachable_pub, dead_code)]
|
||||
#[inline]
|
||||
pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
|
||||
HITS.with(|hits| {
|
||||
let n = hits.get();
|
||||
if n == UNCONSTRAINED {
|
||||
// opted out of budgeting
|
||||
Poll::Ready(())
|
||||
} else if n == 0 {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
} else {
|
||||
hits.set(n.saturating_sub(1));
|
||||
Poll::Ready(())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Resolves immediately unless the current task has already exceeded its budget.
|
||||
///
|
||||
/// This should be placed after at least some work has been done. Otherwise a future sufficiently
|
||||
/// deep in the task hierarchy may end up never getting to run because of the number of yield
|
||||
/// points that inevitably appear before it is even reached. For example:
|
||||
///
|
||||
/// ```ignore
|
||||
/// # use tokio::stream::{Stream, StreamExt};
|
||||
/// async fn drop_all<I: Stream + Unpin>(mut input: I) {
|
||||
/// while let Some(_) = input.next().await {
|
||||
/// tokio::coop::proceed().await;
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
#[allow(unreachable_pub, dead_code)]
|
||||
#[inline]
|
||||
pub async fn proceed() {
|
||||
use crate::future::poll_fn;
|
||||
poll_fn(|cx| poll_proceed(cx)).await;
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
/// A future that cooperatively yields to the task scheduler when polling,
|
||||
/// if the task's budget is exhausted.
|
||||
///
|
||||
/// Internally, this is simply a future combinator which calls
|
||||
/// [`poll_proceed`] in its `poll` implementation before polling the wrapped
|
||||
/// future.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// use tokio::coop::CoopFutureExt;
|
||||
///
|
||||
/// async { /* ... */ }
|
||||
/// .cooperate()
|
||||
/// .await;
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// [`poll_proceed`]: fn@poll_proceed
|
||||
#[derive(Debug)]
|
||||
#[allow(unreachable_pub, dead_code)]
|
||||
pub struct CoopFuture<F> {
|
||||
#[pin]
|
||||
future: F,
|
||||
}
|
||||
}
|
||||
|
||||
struct ResetGuard<'a> {
|
||||
hits: &'a Cell<usize>,
|
||||
prev: usize,
|
||||
}
|
||||
|
||||
impl<F: Future> Future for CoopFuture<F> {
|
||||
type Output = F::Output;
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
ready!(poll_proceed(cx));
|
||||
self.project().future.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future> CoopFuture<F> {
|
||||
/// Returns a new `CoopFuture` wrapping the given future.
|
||||
///
|
||||
#[allow(unreachable_pub, dead_code)]
|
||||
pub fn new(future: F) -> Self {
|
||||
Self { future }
|
||||
}
|
||||
}
|
||||
|
||||
// Currently only used by `tokio::sync`; and if we make this combinator public,
|
||||
// it should probably be on the `FutureExt` trait instead.
|
||||
cfg_sync! {
|
||||
/// Extension trait providing `Future::cooperate` extension method.
|
||||
///
|
||||
/// Note: if/when the co-op API becomes public, this method should probably be
|
||||
/// provided by `FutureExt`, instead.
|
||||
pub(crate) trait CoopFutureExt: Future {
|
||||
/// Wrap `self` to cooperatively yield to the scheduler when polling, if the
|
||||
/// task's budget is exhausted.
|
||||
fn cooperate(self) -> CoopFuture<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
CoopFuture::new(self)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
impl<F> CoopFutureExt for F where F: Future {}
|
||||
}
|
||||
|
||||
impl<'a> Drop for ResetGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
self.hits.set(self.prev);
|
||||
impl Budget {
|
||||
/// Decrement the budget. Returns `true` if successful. Decrementing fails
|
||||
/// when there is not enough remaining budget.
|
||||
fn decrement(&mut self) -> bool {
|
||||
if let Some(num) = &mut self.0 {
|
||||
if *num > 0 {
|
||||
*num -= 1;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -364,49 +175,53 @@ impl<'a> Drop for ResetGuard<'a> {
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
fn get() -> usize {
|
||||
HITS.with(|hits| hits.get())
|
||||
fn get() -> Budget {
|
||||
CURRENT.with(|cell| cell.get())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bugeting() {
|
||||
use futures::future::poll_fn;
|
||||
use tokio_test::*;
|
||||
|
||||
assert_eq!(get(), UNCONSTRAINED);
|
||||
assert!(get().0.is_none());
|
||||
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), UNCONSTRAINED);
|
||||
budget(|| {
|
||||
assert_eq!(get(), BUDGET);
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), BUDGET - 1);
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), BUDGET - 2);
|
||||
});
|
||||
assert_eq!(get(), UNCONSTRAINED);
|
||||
|
||||
assert!(get().0.is_none());
|
||||
|
||||
budget(|| {
|
||||
limit(3, || {
|
||||
assert_eq!(get(), 3);
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), 2);
|
||||
limit(4, || {
|
||||
assert_eq!(get(), 2);
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), 1);
|
||||
});
|
||||
assert_eq!(get(), 1);
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), 0);
|
||||
assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), 0);
|
||||
assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), 0);
|
||||
});
|
||||
assert_eq!(get(), BUDGET - 3);
|
||||
assert_eq!(get().0, Budget::initial().0);
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get(), BUDGET - 4);
|
||||
assert_ready!(task::spawn(proceed()).poll());
|
||||
assert_eq!(get(), BUDGET - 5);
|
||||
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
|
||||
|
||||
budget(|| {
|
||||
assert_eq!(get().0, Budget::initial().0);
|
||||
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
|
||||
});
|
||||
|
||||
assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
|
||||
});
|
||||
|
||||
assert!(get().0.is_none());
|
||||
|
||||
budget(|| {
|
||||
let n = get().0.unwrap();
|
||||
|
||||
for _ in 0..n {
|
||||
assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
|
||||
}
|
||||
|
||||
let mut task = task::spawn(poll_fn(|cx| {
|
||||
ready!(poll_proceed(cx));
|
||||
Poll::Ready(())
|
||||
}));
|
||||
|
||||
assert_pending!(task.poll());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -363,3 +363,23 @@ macro_rules! cfg_unstable {
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! cfg_coop {
|
||||
($($item:item)*) => {
|
||||
$(
|
||||
#[cfg(any(
|
||||
feature = "blocking",
|
||||
feature = "dns",
|
||||
feature = "fs",
|
||||
feature = "io-driver",
|
||||
feature = "io-std",
|
||||
feature = "process",
|
||||
feature = "rt-core",
|
||||
feature = "sync",
|
||||
feature = "stream",
|
||||
feature = "time"
|
||||
))]
|
||||
$item
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
@ -386,7 +386,11 @@ impl Future for Acquire<'_> {
|
||||
type Output = Result<(), AcquireError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// First, ensure the current task has enough budget to proceed.
|
||||
ready!(crate::coop::poll_proceed(cx));
|
||||
|
||||
let (node, semaphore, needed, queued) = self.project();
|
||||
|
||||
match semaphore.poll_acquire(cx, needed, node, *queued) {
|
||||
Pending => {
|
||||
*queued = true;
|
||||
|
@ -1,4 +1,3 @@
|
||||
use crate::coop::CoopFutureExt;
|
||||
use crate::sync::batch_semaphore as semaphore;
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
@ -255,7 +254,7 @@ impl<T> Mutex<T> {
|
||||
}
|
||||
|
||||
async fn acquire(&self) {
|
||||
self.s.acquire(1).cooperate().await.unwrap_or_else(|_| {
|
||||
self.s.acquire(1).await.unwrap_or_else(|_| {
|
||||
// The semaphore was closed. but, we never explicitly close it, and
|
||||
// we own it exclusively, which means that this can never happen.
|
||||
unreachable!()
|
||||
|
@ -1,4 +1,3 @@
|
||||
use crate::coop::CoopFutureExt;
|
||||
use crate::sync::batch_semaphore::{AcquireError, Semaphore};
|
||||
use std::cell::UnsafeCell;
|
||||
use std::ops;
|
||||
@ -116,7 +115,7 @@ impl<'a, T> ReleasingPermit<'a, T> {
|
||||
lock: &'a RwLock<T>,
|
||||
num_permits: u16,
|
||||
) -> Result<ReleasingPermit<'a, T>, AcquireError> {
|
||||
lock.s.acquire(num_permits).cooperate().await?;
|
||||
lock.s.acquire(num_permits).await?;
|
||||
Ok(Self { num_permits, lock })
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,4 @@
|
||||
use super::batch_semaphore as ll; // low level implementation
|
||||
use crate::coop::CoopFutureExt;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Counting semaphore performing asynchronous permit aquisition.
|
||||
@ -87,7 +86,7 @@ impl Semaphore {
|
||||
|
||||
/// Acquires permit from the semaphore.
|
||||
pub async fn acquire(&self) -> SemaphorePermit<'_> {
|
||||
self.ll_sem.acquire(1).cooperate().await.unwrap();
|
||||
self.ll_sem.acquire(1).await.unwrap();
|
||||
SemaphorePermit {
|
||||
sem: &self,
|
||||
permits: 1,
|
||||
@ -111,7 +110,7 @@ impl Semaphore {
|
||||
///
|
||||
/// [`Arc`]: std::sync::Arc
|
||||
pub async fn acquire_owned(self: Arc<Self>) -> OwnedSemaphorePermit {
|
||||
self.ll_sem.acquire(1).cooperate().await.unwrap();
|
||||
self.ll_sem.acquire(1).await.unwrap();
|
||||
OwnedSemaphorePermit {
|
||||
sem: self.clone(),
|
||||
permits: 1,
|
||||
|
@ -454,24 +454,20 @@ impl Future for LocalSet {
|
||||
// Register the waker before starting to work
|
||||
self.context.shared.waker.register_by_ref(cx.waker());
|
||||
|
||||
// Reset any previous task budget while polling tasks spawned on the
|
||||
// `LocalSet`, ensuring that each has its own separate budget.
|
||||
crate::coop::reset(|| {
|
||||
if self.with(|| self.tick()) {
|
||||
// If `tick` returns true, we need to notify the local future again:
|
||||
// there are still tasks remaining in the run queue.
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
} else if self.context.tasks.borrow().owned.is_empty() {
|
||||
// If the scheduler has no remaining futures, we're done!
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
// There are still futures in the local set, but we've polled all the
|
||||
// futures in the run queue. Therefore, we can just return Pending
|
||||
// since the remaining futures will be woken from somewhere else.
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
if self.with(|| self.tick()) {
|
||||
// If `tick` returns true, we need to notify the local future again:
|
||||
// there are still tasks remaining in the run queue.
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
} else if self.context.tasks.borrow().owned.is_empty() {
|
||||
// If the scheduler has no remaining futures, we're done!
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
// There are still futures in the local set, but we've polled all the
|
||||
// futures in the run queue. Therefore, we can just return Pending
|
||||
// since the remaining futures will be woken from somewhere else.
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -525,23 +521,19 @@ impl<T: Future> Future for RunUntil<'_, T> {
|
||||
.register_by_ref(cx.waker());
|
||||
|
||||
let _no_blocking = crate::runtime::enter::disallow_blocking();
|
||||
// Reset any previous task budget so that the future passed to
|
||||
// `run_until` and any tasks spawned on the `LocalSet` have their
|
||||
// own budgets.
|
||||
crate::coop::reset(|| {
|
||||
let f = me.future;
|
||||
if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
|
||||
return Poll::Ready(output);
|
||||
}
|
||||
let f = me.future;
|
||||
|
||||
if me.local_set.tick() {
|
||||
// If `tick` returns `true`, we need to notify the local future again:
|
||||
// there are still tasks remaining in the run queue.
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
|
||||
return Poll::Ready(output);
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
})
|
||||
if me.local_set.tick() {
|
||||
// If `tick` returns `true`, we need to notify the local future again:
|
||||
// there are still tasks remaining in the run queue.
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user