Merge pull request #3622 from bugadani/next

Allow polling exited tasks
This commit is contained in:
Dario Nieuwenhuis 2024-12-17 18:03:07 +00:00 committed by GitHub
commit 0c245892c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 45 additions and 47 deletions

View File

@ -50,33 +50,32 @@ use super::SpawnToken;
/// A task's complete life cycle is as follows: /// A task's complete life cycle is as follows:
/// ///
/// ```text /// ```text
/// ┌────────────┐ ┌────────────────────────┐ /// ┌────────────┐ ┌────────────────────────┐
/// ┌─►│Not spawned │◄─6┤Not spawned|Run enqueued│ /// │Not spawned │◄─5┤Not spawned|Run enqueued│
/// │ │ ├7─►│ │ /// │ ├6─►│ │
/// └─────┬──────┘ └──────▲─────────────────┘ /// └─────┬──────┘ └──────▲─────────────────┘
/// 1 │ /// 1 │
/// │ ┌────────────┘ /// │ ┌────────────┘
/// │ │ 5 /// │ 4
/// ┌─────▼────┴─────────┐ /// ┌─────▼────┴─────────┐
/// │Spawned|Run enqueued│ /// │Spawned|Run enqueued│
/// │ /// │
/// └─────┬▲─────────────┘ /// └─────┬▲─────────────┘
/// 2│ /// 2│
/// │3 /// │3
/// ┌─────▼┴─────┐ /// ┌─────▼┴─────┐
/// └─4┤ Spawned │ /// Spawned │
/// │ │ /// │ │
/// └────────────┘ /// └────────────┘
/// ``` /// ```
/// ///
/// Transitions: /// Transitions:
/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn` /// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue` /// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue` /// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` /// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
/// - 5: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` /// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
/// - 6: Task is dequeued and then ignored via `State::run_dequeue` /// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
/// - 7: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
pub(crate) struct TaskHeader { pub(crate) struct TaskHeader {
pub(crate) state: State, pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem, pub(crate) run_queue_item: RunQueueItem,
@ -162,6 +161,10 @@ pub struct TaskStorage<F: Future + 'static> {
future: UninitCell<F>, // Valid if STATE_SPAWNED future: UninitCell<F>, // Valid if STATE_SPAWNED
} }
unsafe fn poll_exited(_p: TaskRef) {
// Nothing to do, the task is already !SPAWNED and dequeued.
}
impl<F: Future + 'static> TaskStorage<F> { impl<F: Future + 'static> TaskStorage<F> {
const NEW: Self = Self::new(); const NEW: Self = Self::new();
@ -203,14 +206,23 @@ impl<F: Future + 'static> TaskStorage<F> {
} }
unsafe fn poll(p: TaskRef) { unsafe fn poll(p: TaskRef) {
let this = &*(p.as_ptr() as *const TaskStorage<F>); let this = &*p.as_ptr().cast::<TaskStorage<F>>();
let future = Pin::new_unchecked(this.future.as_mut()); let future = Pin::new_unchecked(this.future.as_mut());
let waker = waker::from_task(p); let waker = waker::from_task(p);
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) { match future.poll(&mut cx) {
Poll::Ready(_) => { Poll::Ready(_) => {
// As the future has finished and this function will not be called
// again, we can safely drop the future here.
this.future.drop_in_place(); this.future.drop_in_place();
// We replace the poll_fn with a despawn function, so that the task is cleaned up
// when the executor polls it next.
this.raw.poll_fn.set(Some(poll_exited));
// Make sure we despawn last, so that other threads can only spawn the task
// after we're done with it.
this.raw.state.despawn(); this.raw.state.despawn();
} }
Poll::Pending => {} Poll::Pending => {}
@ -411,15 +423,6 @@ impl SyncExecutor {
self.run_queue.dequeue_all(|p| { self.run_queue.dequeue_all(|p| {
let task = p.header(); let task = p.header();
if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
trace::task_exec_begin(self, &p); trace::task_exec_begin(self, &p);

View File

@ -81,6 +81,7 @@ impl RunQueue {
// safety: there are no concurrent accesses to `next` // safety: there are no concurrent accesses to `next`
next = unsafe { task.header().run_queue_item.next.get() }; next = unsafe { task.header().run_queue_item.next.get() };
task.header().state.run_dequeue();
on_task(task); on_task(task);
} }
} }

View File

@ -63,9 +63,10 @@ impl RunQueue {
// If the task re-enqueues itself, the `next` pointer will get overwritten. // If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task. // Therefore, first read the next pointer, and only then process the task.
// safety: we know if the task is enqueued, no one else will touch the `next` pointer. critical_section::with(|cs| {
let cs = unsafe { CriticalSection::new() }; next = task.header().run_queue_item.next.borrow(cs).get();
next = task.header().run_queue_item.next.borrow(cs).get(); task.header().state.run_dequeue(cs);
});
on_task(task); on_task(task);
} }

View File

@ -52,8 +52,7 @@ impl State {
/// Unmark the task as run-queued. Return whether the task is spawned. /// Unmark the task as run-queued. Return whether the task is spawned.
#[inline(always)] #[inline(always)]
pub fn run_dequeue(&self) -> bool { pub fn run_dequeue(&self) {
let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
state & STATE_SPAWNED != 0
} }
} }

View File

@ -75,11 +75,9 @@ impl State {
/// Unmark the task as run-queued. Return whether the task is spawned. /// Unmark the task as run-queued. Return whether the task is spawned.
#[inline(always)] #[inline(always)]
pub fn run_dequeue(&self) -> bool { pub fn run_dequeue(&self) {
compiler_fence(Ordering::Release); compiler_fence(Ordering::Release);
let r = self.spawned.load(Ordering::Relaxed);
self.run_queued.store(false, Ordering::Relaxed); self.run_queued.store(false, Ordering::Relaxed);
r
} }
} }

View File

@ -67,11 +67,7 @@ impl State {
/// Unmark the task as run-queued. Return whether the task is spawned. /// Unmark the task as run-queued. Return whether the task is spawned.
#[inline(always)] #[inline(always)]
pub fn run_dequeue(&self) -> bool { pub fn run_dequeue(&self, cs: CriticalSection<'_>) {
self.update(|s| { self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED)
let ok = *s & STATE_SPAWNED != 0;
*s &= !STATE_RUN_QUEUED;
ok
})
} }
} }