diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index c674b961d..0bce72ac6 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,13 +1,12 @@ use crate::park::{Park, Unpark}; -use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task}; +use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task}; -use std::cell::{Cell, UnsafeCell}; -use std::collections::VecDeque; +use std::cell::Cell; use std::fmt; use std::future::Future; use std::mem::ManuallyDrop; use std::ptr; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable, Waker}; use std::time::Duration; @@ -31,31 +30,7 @@ pub(crate) struct Spawner { /// The scheduler component. pub(super) struct SchedulerPriv { - /// List of all active tasks spawned onto this executor. - /// - /// # Safety - /// - /// Must only be accessed from the primary thread - owned_tasks: UnsafeCell>, - - /// Local run queue. - /// - /// Tasks notified from the current thread are pushed into this queue. - /// - /// # Safety - /// - /// References should not be handed out. Only call `push` / `pop` functions. - /// Only call from the owning thread. - local_queue: UnsafeCell>>, - - /// Remote run queue. - /// - /// Tasks notified from another thread are pushed into this queue. - remote_queue: Mutex, - - /// Tasks pending drop - pending_drop: task::TransferStack, - + queues: MpscQueues, /// Unpark the blocked thread unpark: Box, } @@ -73,21 +48,9 @@ struct LocalState

{ park: P, } -#[derive(Debug)] -struct RemoteQueue { - /// FIFO list of tasks - queue: VecDeque>, - - /// `true` when a task can be pushed into the queue, false otherwise. - open: bool, -} - /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often to check the remote queue first -const CHECK_REMOTE_INTERVAL: u8 = 13; - thread_local! { static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null()) } @@ -101,13 +64,7 @@ where BasicScheduler { scheduler: Arc::new(SchedulerPriv { - owned_tasks: UnsafeCell::new(task::OwnedList::new()), - local_queue: UnsafeCell::new(VecDeque::with_capacity(64)), - remote_queue: Mutex::new(RemoteQueue { - queue: VecDeque::with_capacity(64), - open: true, - }), - pending_drop: task::TransferStack::new(), + queues: MpscQueues::new(), unpark: Box::new(unpark), }), local: LocalState { tick: 0, park }, @@ -155,9 +112,7 @@ where // Track the current scheduler let _guard = ACTIVE.with(|cell| { - let guard = Guard { - old: cell.get(), - }; + let guard = Guard { old: cell.get() }; cell.set(scheduler as *const SchedulerPriv); @@ -188,7 +143,11 @@ where scheduler.tick(local); // Maintenance work - scheduler.drain_pending_drop(); + unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on (which we are). + scheduler.queues.drain_pending_drop(); + } } }) } @@ -216,6 +175,8 @@ impl Spawner { } } +// === impl SchedulerPriv === + impl SchedulerPriv { fn tick(&self, local: &mut LocalState) { for _ in 0..MAX_TASKS_PER_TICK { @@ -224,8 +185,14 @@ impl SchedulerPriv { // Increment the tick local.tick = tick.wrapping_add(1); + let next = unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. The `LocalState` + // parameter to this method implies that we are on that thread. + self.queues.next_task(tick) + }; - let task = match self.next_task(tick) { + let task = match next { Some(task) => task, None => { local.park.park().ok().expect("failed to park"); @@ -235,7 +202,10 @@ impl SchedulerPriv { if let Some(task) = task.run(&mut || Some(self.into())) { unsafe { - self.schedule_local(task); + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. The `LocalState` + // parameter to this method implies that we are on that thread. + self.queues.push_local(task); } } } @@ -247,15 +217,6 @@ impl SchedulerPriv { .expect("failed to park"); } - fn drain_pending_drop(&self) { - for task in self.pending_drop.drain() { - unsafe { - (*self.owned_tasks.get()).remove(&task); - } - drop(task); - } - } - /// # Safety /// /// Must be called from the same thread that holds the `BasicScheduler` @@ -266,63 +227,51 @@ impl SchedulerPriv { F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.schedule_local(task); + self.queues.push_local(task); handle } - - unsafe fn schedule_local(&self, task: Task) { - (*self.local_queue.get()).push_back(task); - } - - fn next_task(&self, tick: u8) -> Option> { - if 0 == tick % CHECK_REMOTE_INTERVAL { - self.next_remote_task().or_else(|| self.next_local_task()) - } else { - self.next_local_task().or_else(|| self.next_remote_task()) - } - } - - fn next_local_task(&self) -> Option> { - unsafe { (*self.local_queue.get()).pop_front() } - } - - fn next_remote_task(&self) -> Option> { - self.remote_queue.lock().unwrap().queue.pop_front() - } } impl Schedule for SchedulerPriv { fn bind(&self, task: &Task) { unsafe { - (*self.owned_tasks.get()).insert(task); + // safety: `Queues::add_task` is only safe to call from the thread + // that owns the queues (the thread the scheduler is running on). + // `Scheduler::bind` is called when polling a task that + // doesn't have a scheduler set. We will only poll new tasks from + // the thread that the scheduler is running on. Therefore, this is + // safe to call. + self.queues.add_task(task); } } fn release(&self, task: Task) { - self.pending_drop.push(task); + self.queues.release_remote(task); } fn release_local(&self, task: &Task) { unsafe { - (*self.owned_tasks.get()).remove(task); + // safety: `Scheduler::release_local` is only called from the + // thread that the scheduler is running on. The `Schedule` trait's + // contract is that releasing a task from another thread should call + // `release` rather than `release_local`. + self.queues.release_local(task); } } fn schedule(&self, task: Task) { - let is_current = ACTIVE.with(|cell| { - cell.get() == self as *const SchedulerPriv - }); + let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); if is_current { - unsafe { self.schedule_local(task) }; + unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. If `is_current` is + // then we are on that thread. + self.queues.push_local(task) + }; } else { - let mut lock = self.remote_queue.lock().unwrap(); - - if lock.open { - lock.queue.push_back(task); - } else { - task.shutdown(); - } + let mut lock = self.queues.remote(); + lock.schedule(task); // while locked, call unpark self.unpark.unpark(); @@ -339,39 +288,30 @@ where P: Park, { fn drop(&mut self) { - // Close the remote queue - let mut lock = self.scheduler.remote_queue.lock().unwrap(); - lock.open = false; - - while let Some(task) = lock.queue.pop_front() { - task.shutdown(); - } - - drop(lock); - - // Drain all local tasks - while let Some(task) = self.scheduler.next_local_task() { - task.shutdown(); - } - - // Release owned tasks unsafe { - (*self.scheduler.owned_tasks.get()).shutdown(); - } + // safety: the `Drop` impl owns the scheduler's queues. these fields + // will only be accessed when running the scheduler, and it can no + // longer be run, since we are in the process of dropping it. - self.scheduler.drain_pending_drop(); + // Shut down the task queues. + self.scheduler.queues.shutdown(); + } // Wait until all tasks have been released. - while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } { + while unsafe { self.scheduler.queues.has_tasks_remaining() } { self.local.park.park().ok().expect("park failed"); - self.scheduler.drain_pending_drop(); + unsafe { + self.scheduler.queues.drain_pending_drop(); + } } } } impl fmt::Debug for SchedulerPriv { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Scheduler").finish() + fmt.debug_struct("Scheduler") + .field("queues", &self.queues) + .finish() } } diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index f81522c9d..aed5105d4 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1,15 +1,12 @@ //! Runs `!Send` futures on the current thread. use crate::sync::AtomicWaker; -use crate::task::{self, JoinHandle, Schedule, Task, TransferStack}; +use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, Task}; -use std::cell::{Cell, UnsafeCell}; -use std::collections::VecDeque; -use std::fmt; +use std::cell::Cell; use std::future::Future; use std::pin::Pin; use std::ptr::{self, NonNull}; use std::rc::Rc; -use std::sync::Mutex; use std::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -82,33 +79,12 @@ cfg_rt_util! { scheduler: Rc, } } + +#[derive(Debug)] struct Scheduler { - /// List of all active tasks spawned onto this executor. - /// - /// # Safety - /// - /// Must only be accessed from the primary thread - tasks: UnsafeCell>, - - /// Local run local_queue. - /// - /// Tasks notified from the current thread are pushed into this queue. - /// - /// # Safety - /// - /// References should not be handed out. Only call `push` / `pop` functions. - /// Only call from the owning thread. - local_queue: UnsafeCell>>, - tick: Cell, - /// Remote run queue. - /// - /// Tasks notified from another thread are pushed into this queue. - remote_queue: Mutex>>, - - /// Tasks pending drop - pending_drop: TransferStack, + queues: MpscQueues, /// Used to notify the `LocalFuture` when a task in the local task set is /// notified. @@ -166,12 +142,17 @@ cfg_rt_util! { CURRENT_TASK_SET.with(|current| { let current = current .get() - .expect("`spawn_local` called from outside of a local::LocalSet!"); + .expect("`spawn_local` called from outside of a task::LocalSet!"); + let (task, handle) = task::joinable_local(future); unsafe { - let (task, handle) = task::joinable_local(future); - current.as_ref().schedule_local(task); - handle + // safety: this function is unsafe to call outside of the local + // thread. Since the call above to get the current task set + // would not succeed if we were outside of a local set, this is + // safe. + current.as_ref().queues.push_local(task); } + + handle }) } } @@ -179,9 +160,6 @@ cfg_rt_util! { /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often to check the remote queue first -const CHECK_REMOTE_INTERVAL: u8 = 13; - impl LocalSet { /// Returns a new local task set. pub fn new() -> Self { @@ -232,9 +210,9 @@ impl LocalSet { { let (task, handle) = task::joinable_local(future); unsafe { - // This is safe: since `LocalSet` is not Send or Sync, this is + // safety: since `LocalSet` is not Send or Sync, this is // always being called from the local thread. - self.scheduler.schedule_local(task); + self.scheduler.queues.push_local(task); } handle } @@ -341,31 +319,32 @@ impl Schedule for Scheduler { fn bind(&self, task: &Task) { assert!(self.is_current()); unsafe { - (*self.tasks.get()).insert(task); + self.queues.add_task(task); } } fn release(&self, task: Task) { // This will be called when dropping the local runtime. - self.pending_drop.push(task); + self.queues.release_remote(task); } fn release_local(&self, task: &Task) { debug_assert!(self.is_current()); unsafe { - (*self.tasks.get()).remove(task); + self.queues.release_local(task); } } fn schedule(&self, task: Task) { if self.is_current() { - unsafe { - self.schedule_local(task); - } + unsafe { self.queues.push_local(task) }; } else { - self.remote_queue.lock().unwrap().push_back(task); + let mut lock = self.queues.remote(); + lock.schedule(task); self.waker.wake(); + + drop(lock); } } } @@ -373,11 +352,8 @@ impl Schedule for Scheduler { impl Scheduler { fn new() -> Self { Self { - tasks: UnsafeCell::new(task::OwnedList::new()), - local_queue: UnsafeCell::new(VecDeque::with_capacity(64)), tick: Cell::new(0), - pending_drop: TransferStack::new(), - remote_queue: Mutex::new(VecDeque::with_capacity(64)), + queues: MpscQueues::new(), waker: AtomicWaker::new(), } } @@ -401,10 +377,6 @@ impl Scheduler { }) } - unsafe fn schedule_local(&self, task: Task) { - (*self.local_queue.get()).push_back(task); - } - fn is_current(&self) -> bool { CURRENT_TASK_SET .try_with(|current| { @@ -416,88 +388,46 @@ impl Scheduler { .unwrap_or(false) } - fn next_task(&self, tick: u8) -> Option> { - if 0 == tick % CHECK_REMOTE_INTERVAL { - self.next_remote_task().or_else(|| self.next_local_task()) - } else { - self.next_local_task().or_else(|| self.next_remote_task()) - } - } - - fn next_local_task(&self) -> Option> { - unsafe { (*self.local_queue.get()).pop_front() } - } - - fn next_remote_task(&self) -> Option> { - // there is no semantic information in the `PoisonError`, and it - // doesn't implement `Debug`, but clippy thinks that it's bad to - // match all errors here... - #[allow(clippy::match_wild_err_arm)] - let mut lock = match self.remote_queue.lock() { - // If the lock is poisoned, but the thread is already panicking, - // avoid a double panic. This is necessary since `next_task` (which - // calls `next_remote_task`) can be called in the `Drop` impl. - Err(_) if std::thread::panicking() => return None, - Err(_) => panic!("mutex poisoned"), - Ok(lock) => lock, - }; - lock.pop_front() - } - fn tick(&self) { assert!(self.is_current()); for _ in 0..MAX_TASKS_PER_TICK { let tick = self.tick.get().wrapping_add(1); self.tick.set(tick); - let task = match self.next_task(tick) { + + let task = match unsafe { + // safety: we must be on the local thread to call this. The assertion + // the top of this method ensures that `tick` is only called locally. + self.queues.next_task(tick) + } { Some(task) => task, None => return, }; if let Some(task) = task.run(&mut || Some(self.into())) { unsafe { - // we are on the local thread, so this is okay. - self.schedule_local(task); + // safety: we must be on the local thread to call this. The + // the top of this method ensures that `tick` is only called locally. + self.queues.push_local(task); } } } } - - fn drain_pending_drop(&self) { - for task in self.pending_drop.drain() { - unsafe { - (*self.tasks.get()).remove(&task); - } - drop(task); - } - } -} - -impl fmt::Debug for Scheduler { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Scheduler { .. }").finish() - } } impl Drop for Scheduler { fn drop(&mut self) { - // Drain all local tasks - while let Some(task) = self.next_local_task() { - task.shutdown(); - } - - // Release owned tasks unsafe { - (*self.tasks.get()).shutdown(); - } + // safety: these functions are unsafe to call outside of the local + // thread. Since the `Scheduler` type is not `Send` or `Sync`, we + // know it will be dropped only from the local thread. + self.queues.shutdown(); - self.drain_pending_drop(); - - // Wait until all tasks have been released. - // XXX: this is a busy loop, but we don't really have any way to park - // the thread here? - while unsafe { !(*self.tasks.get()).is_empty() } { - self.drain_pending_drop(); + // Wait until all tasks have been released. + // XXX: this is a busy loop, but we don't really have any way to park + // the thread here? + while self.queues.has_tasks_remaining() { + self.queues.drain_pending_drop(); + } } } } @@ -506,6 +436,7 @@ impl Drop for Scheduler { mod tests { use super::*; use crate::{runtime, task}; + use std::time::Duration; #[test] fn local_current_thread() { @@ -729,7 +660,6 @@ mod tests { fn drop_cancels_tasks() { // This test reproduces issue #1842 use crate::sync::oneshot; - use std::time::Duration; let mut rt = runtime::Builder::new() .enable_time() @@ -753,4 +683,62 @@ mod tests { drop(local); drop(rt); } + + #[test] + fn drop_cancels_remote_tasks() { + // This test reproduces issue #1885. + use std::sync::mpsc::RecvTimeoutError; + + let (done_tx, done_rx) = std::sync::mpsc::channel(); + let thread = std::thread::spawn(move || { + let (tx, mut rx) = crate::sync::mpsc::channel::<()>(1024); + + let mut rt = runtime::Builder::new() + .enable_time() + .basic_scheduler() + .build() + .expect("building runtime should succeed"); + + let local = LocalSet::new(); + local.spawn_local(async move { while let Some(_) = rx.recv().await {} }); + local.block_on(&mut rt, async { + crate::time::delay_for(Duration::from_millis(1)).await; + }); + + drop(tx); + + // This enters an infinite loop if the remote notified tasks are not + // properly cancelled. + drop(local); + + // Send a message on the channel so that the test thread can + // determine if we have entered an infinite loop: + done_tx.send(()).unwrap(); + }); + + // Since the failure mode of this test is an infinite loop, rather than + // something we can easily make assertions about, we'll run it in a + // thread. When the test thread finishes, it will send a message on a + // channel to this thread. We'll wait for that message with a fairly + // generous timeout, and if we don't recieve it, we assume the test + // thread has hung. + // + // Note that it should definitely complete in under a minute, but just + // in case CI is slow, we'll give it a long timeout. + match done_rx.recv_timeout(Duration::from_secs(60)) { + Err(RecvTimeoutError::Timeout) => panic!( + "test did not complete within 60 seconds, \ + we have (probably) entered an infinite loop!" + ), + // Did the test thread panic? We'll find out for sure when we `join` + // with it. + Err(RecvTimeoutError::Disconnected) => { + println!("done_rx dropped, did the test thread panic?"); + } + // Test completed successfully! + Ok(()) => {} + } + + thread.join().expect("test thread should not panic!") + } } diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index e336ec9cf..f762a561b 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -234,6 +234,8 @@ cfg_rt_core! { mod list; pub(crate) use self::list::OwnedList; + pub(crate) mod queue; + mod raw; use self::raw::RawTask; diff --git a/tokio/src/task/queue.rs b/tokio/src/task/queue.rs new file mode 100644 index 000000000..6a004fc7e --- /dev/null +++ b/tokio/src/task/queue.rs @@ -0,0 +1,319 @@ +use super::{OwnedList, Schedule, Task, TransferStack}; +use std::{ + cell::UnsafeCell, + collections::VecDeque, + fmt, + sync::{Mutex, MutexGuard}, +}; + +/// A set of multi-producer, single consumer task queues, suitable for use by a +/// single-threaded scheduler. +/// +/// This consists of a list of _all_ tasks bound to the scheduler, a run queue +/// of tasks notified from the thread the scheduler is running on (the "local +/// queue"), a run queue of tasks notified from another thread (the "remote +/// queue"), and a stack of tasks released from other threads which will +/// eventually need to be dropped by the scheduler on its own thread ("pending +/// drop"). +/// +/// Submitting tasks to or popping tasks from the local queue is unsafe, as it +/// must only be performed on the same thread as the scheduler. +pub(crate) struct MpscQueues { + /// List of all active tasks spawned onto this executor. + /// + /// # Safety + /// + /// Must only be accessed from the primary thread + owned_tasks: UnsafeCell>, + + /// Local run queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + /// + /// # Safety + /// + /// References should not be handed out. Only call `push` / `pop` functions. + /// Only call from the owning thread. + local_queue: UnsafeCell>>, + + /// Remote run queue. + /// + /// Tasks notified from another thread are pushed into this queue. + remote_queue: Mutex>, + + /// Tasks pending drop + pending_drop: TransferStack, +} + +pub(crate) struct RemoteQueue { + /// FIFO list of tasks + queue: VecDeque>, + + /// `true` when a task can be pushed into the queue, false otherwise. + open: bool, +} + +// === impl Queues === + +impl MpscQueues +where + S: Schedule + 'static, +{ + pub(crate) const INITIAL_CAPACITY: usize = 64; + + /// How often to check the remote queue first + pub(crate) const CHECK_REMOTE_INTERVAL: u8 = 13; + + pub(crate) fn new() -> Self { + Self { + owned_tasks: UnsafeCell::new(OwnedList::new()), + local_queue: UnsafeCell::new(VecDeque::with_capacity(Self::INITIAL_CAPACITY)), + pending_drop: TransferStack::new(), + remote_queue: Mutex::new(RemoteQueue { + queue: VecDeque::with_capacity(Self::INITIAL_CAPACITY), + open: true, + }), + } + } + + /// Add a new task to the scheduler. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn add_task(&self, task: &Task) { + (*self.owned_tasks.get()).insert(task); + } + + /// Push a task to the local queue. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn push_local(&self, task: Task) { + (*self.local_queue.get()).push_back(task); + } + + /// Remove a task from the local queue. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn release_local(&self, task: &Task) { + (*self.owned_tasks.get()).remove(task); + } + + /// Lock the remote queue, returning a `MutexGuard`. + /// + /// This can be used to push to the remote queue and perform other + /// operations while holding the lock. + /// + /// # Panics + /// + /// If the remote queue mutex is poisoned. + pub(crate) fn remote(&self) -> MutexGuard<'_, RemoteQueue> { + self.remote_queue + .lock() + .expect("failed to lock remote queue") + } + + /// Release a task from outside of the thread that owns the scheduler. + /// + /// This simply pushes the task to the pending drop queue. + pub(crate) fn release_remote(&self, task: Task) { + self.pending_drop.push(task); + } + + /// Returns the next task from the remote *or* local queue. + /// + /// Typically, this checks the local queue before the remote queue, and only + /// checks the remote queue if the local queue is empty. However, to avoid + /// starving the remote queue, it is checked first every + /// `CHECK_REMOTE_INTERVAL` ticks. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn next_task(&self, tick: u8) -> Option> { + if 0 == tick % Self::CHECK_REMOTE_INTERVAL { + self.next_remote_task().or_else(|| self.next_local_task()) + } else { + self.next_local_task().or_else(|| self.next_remote_task()) + } + } + + /// Returns the next task from the local queue. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn next_local_task(&self) -> Option> { + (*self.local_queue.get()).pop_front() + } + + /// Returns the next task from the remote queue. + /// + /// # Panics + /// + /// If the mutex around the remote queue is poisoned _and_ the current + /// thread is not already panicking. This is safe to call in a `Drop` impl. + pub(crate) fn next_remote_task(&self) -> Option> { + // there is no semantic information in the `PoisonError`, and it + // doesn't implement `Debug`, but clippy thinks that it's bad to + // match all errors here... + #[allow(clippy::match_wild_err_arm)] + let mut lock = match self.remote_queue.lock() { + // If the lock is poisoned, but the thread is already panicking, + // avoid a double panic. This is necessary since `next_task` (which + // calls `next_remote_task`) can be called in the `Drop` impl. + Err(_) if std::thread::panicking() => return None, + Err(_) => panic!("mutex poisoned"), + Ok(lock) => lock, + }; + lock.queue.pop_front() + } + + /// Returns true if any owned tasks are still bound to this scheduler. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn has_tasks_remaining(&self) -> bool { + !(*self.owned_tasks.get()).is_empty() + } + + /// Drain any tasks that have previously been released from other threads. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn drain_pending_drop(&self) { + for task in self.pending_drop.drain() { + (*self.owned_tasks.get()).remove(&task); + drop(task); + } + } + + /// Shut down the queues. + /// + /// This performs the following operations: + /// + /// 1. Close the remote queue (so that it will no longer accept new tasks). + /// 2. Drain the remote queue and shut down all tasks. + /// 3. Drain the local queue and shut down all tasks. + /// 4. Shut down the owned task list. + /// 5. Drain the list of tasks dropped externally and remove them from the + /// owned task list. + /// + /// This method should be called before dropping a `Queues`. It is provided + /// as a method rather than a `Drop` impl because types that own a `Queues` + /// wish to perform other work in their `Drop` implementations _after_ + /// shutting down the task queues. + /// + /// # Safety + /// + /// This method accesses the local task queue, and therefore *must* be + /// called only from the thread that owns the scheduler. + /// + /// # Panics + /// + /// If the mutex around the remote queue is poisoned _and_ the current + /// thread is not already panicking. This is safe to call in a `Drop` impl. + pub(crate) unsafe fn shutdown(&self) { + // Close and drain the remote queue. + self.close_remote(); + + // Drain the local queue. + self.close_local(); + + // Release owned tasks + self.shutdown_owned_tasks(); + + // Drain tasks pending drop. + self.drain_pending_drop(); + } + + /// Shut down the scheduler's owned task list. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + unsafe fn shutdown_owned_tasks(&self) { + (*self.owned_tasks.get()).shutdown(); + } + + /// Drain the remote queue, and shut down its tasks. + /// + /// This closes the remote queue. Any additional tasks added to it will be + /// shut down instead. + /// + /// # Panics + /// If the mutex around the remote queue is poisoned _and_ the current + /// thread is not already panicking. This is safe to call in a `Drop` impl. + fn close_remote(&self) { + #[allow(clippy::match_wild_err_arm)] + let mut lock = match self.remote_queue.lock() { + // If the lock is poisoned, but the thread is already panicking, + // avoid a double panic. This is necessary since this fn can be + // called in a drop impl. + Err(_) if std::thread::panicking() => return, + Err(_) => panic!("mutex poisoned"), + Ok(lock) => lock, + }; + lock.open = false; + + while let Some(task) = lock.queue.pop_front() { + task.shutdown(); + } + } + + /// Drain the local queue, and shut down its tasks. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + unsafe fn close_local(&self) { + while let Some(task) = self.next_local_task() { + task.shutdown(); + } + } +} + +impl fmt::Debug for MpscQueues { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("MpscQueues") + .field("owned_tasks", &self.owned_tasks) + .field("remote_queue", &self.remote_queue) + .field("local_queue", &self.local_queue) + .finish() + } +} + +// === impl RemoteQueue === + +impl RemoteQueue +where + S: Schedule, +{ + /// Schedule a remote task. + /// + /// If the queue is open to accept new tasks, the task is pushed to the back + /// of the queue. Otherwise, if the queue is closed (the scheduler is + /// shutting down), the new task will be shut down immediately. + pub(crate) fn schedule(&mut self, task: Task) { + if self.open { + self.queue.push_back(task); + } else { + task.shutdown(); + } + } +} + +impl fmt::Debug for RemoteQueue { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("RemoteQueue") + .field("queue", &self.queue) + .field("open", &self.open) + .finish() + } +}