sync: rewrite CancellationToken (#4652)

This commit is contained in:
Finomnis 2022-05-13 23:26:15 +02:00 committed by GitHub
parent 4ec6ba8b76
commit addf5b5749
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 603 additions and 1494 deletions

View File

@ -1,18 +1,15 @@
//! An asynchronously awaitable `CancellationToken`.
//! The token allows to signal a cancellation request to one or more tasks.
pub(crate) mod guard;
mod tree_node;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::sync::intrusive_double_linked_list::{LinkedList, ListNode};
use crate::loom::sync::Arc;
use core::future::Future;
use core::pin::Pin;
use core::ptr::NonNull;
use core::sync::atomic::Ordering;
use core::task::{Context, Poll, Waker};
use core::task::{Context, Poll};
use guard::DropGuard;
use pin_project_lite::pin_project;
/// A token which can be used to signal a cancellation request to one or more
/// tasks.
@ -55,31 +52,20 @@ use guard::DropGuard;
/// }
/// ```
pub struct CancellationToken {
inner: NonNull<CancellationTokenState>,
inner: Arc<tree_node::TreeNode>,
}
// Safety: The CancellationToken is thread-safe and can be moved between threads,
// since all methods are internally synchronized.
unsafe impl Send for CancellationToken {}
unsafe impl Sync for CancellationToken {}
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// was cancelled
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFuture<'a> {
/// The CancellationToken that is associated with this WaitForCancellationFuture
cancellation_token: Option<&'a CancellationToken>,
/// Node for waiting at the cancellation_token
wait_node: ListNode<WaitQueueEntry>,
/// Whether this future was registered at the token yet as a waiter
is_registered: bool,
pin_project! {
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// is cancelled.
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFuture<'a> {
cancellation_token: &'a CancellationToken,
#[pin]
future: tokio::sync::futures::Notified<'a>,
}
}
// Safety: Futures can be sent between threads as long as the underlying
// cancellation_token is thread-safe (Sync),
// which allows to poll/register/unregister from a different thread.
unsafe impl<'a> Send for WaitForCancellationFuture<'a> {}
// ===== impl CancellationToken =====
impl core::fmt::Debug for CancellationToken {
@ -92,43 +78,16 @@ impl core::fmt::Debug for CancellationToken {
impl Clone for CancellationToken {
fn clone(&self) -> Self {
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
let inner = self.state();
// Tokens are cloned by increasing their refcount
let current_state = inner.snapshot();
inner.increment_refcount(current_state);
CancellationToken { inner: self.inner }
tree_node::increase_handle_refcount(&self.inner);
CancellationToken {
inner: self.inner.clone(),
}
}
}
impl Drop for CancellationToken {
fn drop(&mut self) {
let token_state_pointer = self.inner;
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
let inner = unsafe { &mut *self.inner.as_ptr() };
let mut current_state = inner.snapshot();
// We need to safe the parent, since the state might be released by the
// next call
let parent = inner.parent;
// Drop our own refcount
current_state = inner.decrement_refcount(current_state);
// If this was the last reference, unregister from the parent
if current_state.refcount == 0 {
if let Some(mut parent) = parent {
// Safety: Since we still retain a reference on the parent, it must be valid.
let parent = unsafe { parent.as_mut() };
parent.unregister_child(token_state_pointer, current_state);
}
}
tree_node::decrease_handle_refcount(&self.inner);
}
}
@ -141,29 +100,11 @@ impl Default for CancellationToken {
impl CancellationToken {
/// Creates a new CancellationToken in the non-cancelled state.
pub fn new() -> CancellationToken {
let state = Box::new(CancellationTokenState::new(
None,
StateSnapshot {
cancel_state: CancellationState::NotCancelled,
has_parent_ref: false,
refcount: 1,
},
));
// Safety: We just created the Box. The pointer is guaranteed to be
// not null
CancellationToken {
inner: unsafe { NonNull::new_unchecked(Box::into_raw(state)) },
inner: Arc::new(tree_node::TreeNode::new()),
}
}
/// Returns a reference to the utilized `CancellationTokenState`.
fn state(&self) -> &CancellationTokenState {
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
unsafe { &*self.inner.as_ptr() }
}
/// Creates a `CancellationToken` which will get cancelled whenever the
/// current token gets cancelled.
///
@ -203,56 +144,8 @@ impl CancellationToken {
/// }
/// ```
pub fn child_token(&self) -> CancellationToken {
let inner = self.state();
// Increment the refcount of this token. It will be referenced by the
// child, independent of whether the child is immediately cancelled or
// not.
let _current_state = inner.increment_refcount(inner.snapshot());
let mut unpacked_child_state = StateSnapshot {
has_parent_ref: true,
refcount: 1,
cancel_state: CancellationState::NotCancelled,
};
let mut child_token_state = Box::new(CancellationTokenState::new(
Some(self.inner),
unpacked_child_state,
));
{
let mut guard = inner.synchronized.lock().unwrap();
if guard.is_cancelled {
// This task was already cancelled. In this case we should not
// insert the child into the list, since it would never get removed
// from the list.
(*child_token_state.synchronized.lock().unwrap()).is_cancelled = true;
unpacked_child_state.cancel_state = CancellationState::Cancelled;
// Since it's not in the list, the parent doesn't need to retain
// a reference to it.
unpacked_child_state.has_parent_ref = false;
child_token_state
.state
.store(unpacked_child_state.pack(), Ordering::SeqCst);
} else {
if let Some(mut first_child) = guard.first_child {
child_token_state.from_parent.next_peer = Some(first_child);
// Safety: We manipulate other child task inside the Mutex
// and retain a parent reference on it. The child token can't
// get invalidated while the Mutex is held.
unsafe {
first_child.as_mut().from_parent.prev_peer =
Some((&mut *child_token_state).into())
};
}
guard.first_child = Some((&mut *child_token_state).into());
}
};
let child_token_ptr = Box::into_raw(child_token_state);
// Safety: We just created the pointer from a `Box`
CancellationToken {
inner: unsafe { NonNull::new_unchecked(child_token_ptr) },
inner: tree_node::child_node(&self.inner),
}
}
@ -260,21 +153,33 @@ impl CancellationToken {
/// derived from it.
///
/// This will wake up all tasks which are waiting for cancellation.
///
/// Be aware that cancellation is not an atomic operation. It is possible
/// for another thread running in parallel with a call to `cancel` to first
/// receive `true` from `is_cancelled` on one child node, and then receive
/// `false` from `is_cancelled` on another child node. However, once the
/// call to `cancel` returns, all child nodes have been fully cancelled.
pub fn cancel(&self) {
self.state().cancel();
tree_node::cancel(&self.inner);
}
/// Returns `true` if the `CancellationToken` had been cancelled
/// Returns `true` if the `CancellationToken` is cancelled.
pub fn is_cancelled(&self) -> bool {
self.state().is_cancelled()
tree_node::is_cancelled(&self.inner)
}
/// Returns a `Future` that gets fulfilled when cancellation is requested.
///
/// The future will complete immediately if the token is already cancelled
/// when this method is called.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
WaitForCancellationFuture {
cancellation_token: Some(self),
wait_node: ListNode::new(WaitQueueEntry::new()),
is_registered: false,
cancellation_token: self,
future: self.inner.notified(),
}
}
@ -285,26 +190,6 @@ impl CancellationToken {
pub fn drop_guard(self) -> DropGuard {
DropGuard { inner: Some(self) }
}
unsafe fn register(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
self.state().register(wait_node, cx)
}
fn check_for_cancellation(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
self.state().check_for_cancellation(wait_node, cx)
}
fn unregister(&self, wait_node: &mut ListNode<WaitQueueEntry>) {
self.state().unregister(wait_node)
}
}
// ===== impl WaitForCancellationFuture =====
@ -319,560 +204,21 @@ impl<'a> Future for WaitForCancellationFuture<'a> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Safety: We do not move anything out of `WaitForCancellationFuture`
let mut_self: &mut WaitForCancellationFuture<'_> = unsafe { Pin::get_unchecked_mut(self) };
let cancellation_token = mut_self
.cancellation_token
.expect("polled WaitForCancellationFuture after completion");
let poll_res = if !mut_self.is_registered {
// Safety: The `ListNode` is pinned through the Future,
// and we will unregister it in `WaitForCancellationFuture::drop`
// before the Future is dropped and the memory reference is invalidated.
unsafe { cancellation_token.register(&mut mut_self.wait_node, cx) }
} else {
cancellation_token.check_for_cancellation(&mut mut_self.wait_node, cx)
};
if let Poll::Ready(()) = poll_res {
// The cancellation_token was signalled
mut_self.cancellation_token = None;
// A signalled Token means the Waker won't be enqueued anymore
mut_self.is_registered = false;
mut_self.wait_node.task = None;
} else {
// This `Future` and its stored `Waker` stay registered at the
// `CancellationToken`
mut_self.is_registered = true;
}
poll_res
}
}
impl<'a> Drop for WaitForCancellationFuture<'a> {
fn drop(&mut self) {
// If this WaitForCancellationFuture has been polled and it was added to the
// wait queue at the cancellation_token, it must be removed before dropping.
// Otherwise the cancellation_token would access invalid memory.
if let Some(token) = self.cancellation_token {
if self.is_registered {
token.unregister(&mut self.wait_node);
}
}
}
}
/// Tracks how the future had interacted with the [`CancellationToken`]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum PollState {
/// The task has never interacted with the [`CancellationToken`].
New,
/// The task was added to the wait queue at the [`CancellationToken`].
Waiting,
/// The task has been polled to completion.
Done,
}
/// Tracks the WaitForCancellationFuture waiting state.
/// Access to this struct is synchronized through the mutex in the CancellationToken.
struct WaitQueueEntry {
/// The task handle of the waiting task
task: Option<Waker>,
// Current polling state. This state is only updated inside the Mutex of
// the CancellationToken.
state: PollState,
}
impl WaitQueueEntry {
/// Creates a new WaitQueueEntry
fn new() -> WaitQueueEntry {
WaitQueueEntry {
task: None,
state: PollState::New,
}
}
}
struct SynchronizedState {
waiters: LinkedList<WaitQueueEntry>,
first_child: Option<NonNull<CancellationTokenState>>,
is_cancelled: bool,
}
impl SynchronizedState {
fn new() -> Self {
Self {
waiters: LinkedList::new(),
first_child: None,
is_cancelled: false,
}
}
}
/// Information embedded in child tokens which is synchronized through the Mutex
/// in their parent.
struct SynchronizedThroughParent {
next_peer: Option<NonNull<CancellationTokenState>>,
prev_peer: Option<NonNull<CancellationTokenState>>,
}
/// Possible states of a `CancellationToken`
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum CancellationState {
NotCancelled = 0,
Cancelling = 1,
Cancelled = 2,
}
impl CancellationState {
fn pack(self) -> usize {
self as usize
}
fn unpack(value: usize) -> Self {
match value {
0 => CancellationState::NotCancelled,
1 => CancellationState::Cancelling,
2 => CancellationState::Cancelled,
_ => unreachable!("Invalid value"),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct StateSnapshot {
/// The amount of references to this particular CancellationToken.
/// `CancellationToken` structs hold these references to a `CancellationTokenState`.
/// Also the state is referenced by the state of each child.
refcount: usize,
/// Whether the state is still referenced by it's parent and can therefore
/// not be freed.
has_parent_ref: bool,
/// Whether the token is cancelled
cancel_state: CancellationState,
}
impl StateSnapshot {
/// Packs the snapshot into a `usize`
fn pack(self) -> usize {
self.refcount << 3 | if self.has_parent_ref { 4 } else { 0 } | self.cancel_state.pack()
}
/// Unpacks the snapshot from a `usize`
fn unpack(value: usize) -> Self {
let refcount = value >> 3;
let has_parent_ref = value & 4 != 0;
let cancel_state = CancellationState::unpack(value & 0x03);
StateSnapshot {
refcount,
has_parent_ref,
cancel_state,
}
}
/// Whether this `CancellationTokenState` is still referenced by any
/// `CancellationToken`.
fn has_refs(&self) -> bool {
self.refcount != 0 || self.has_parent_ref
}
}
/// The maximum permitted amount of references to a CancellationToken. This
/// is derived from the intent to never use more than 32bit in the `Snapshot`.
const MAX_REFS: u32 = (std::u32::MAX - 7) >> 3;
/// Internal state of the `CancellationToken` pair above
struct CancellationTokenState {
state: AtomicUsize,
parent: Option<NonNull<CancellationTokenState>>,
from_parent: SynchronizedThroughParent,
synchronized: Mutex<SynchronizedState>,
}
impl CancellationTokenState {
fn new(
parent: Option<NonNull<CancellationTokenState>>,
state: StateSnapshot,
) -> CancellationTokenState {
CancellationTokenState {
parent,
from_parent: SynchronizedThroughParent {
prev_peer: None,
next_peer: None,
},
state: AtomicUsize::new(state.pack()),
synchronized: Mutex::new(SynchronizedState::new()),
}
}
/// Returns a snapshot of the current atomic state of the token
fn snapshot(&self) -> StateSnapshot {
StateSnapshot::unpack(self.state.load(Ordering::SeqCst))
}
fn atomic_update_state<F>(&self, mut current_state: StateSnapshot, func: F) -> StateSnapshot
where
F: Fn(StateSnapshot) -> StateSnapshot,
{
let mut current_packed_state = current_state.pack();
let mut this = self.project();
loop {
let next_state = func(current_state);
match self.state.compare_exchange(
current_packed_state,
next_state.pack(),
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
return next_state;
}
Err(actual) => {
current_packed_state = actual;
current_state = StateSnapshot::unpack(actual);
}
}
}
}
fn increment_refcount(&self, current_state: StateSnapshot) -> StateSnapshot {
self.atomic_update_state(current_state, |mut state: StateSnapshot| {
if state.refcount >= MAX_REFS as usize {
eprintln!("[ERROR] Maximum reference count for CancellationToken was exceeded");
std::process::abort();
}
state.refcount += 1;
state
})
}
fn decrement_refcount(&self, current_state: StateSnapshot) -> StateSnapshot {
let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| {
state.refcount -= 1;
state
});
// Drop the State if it is not referenced anymore
if !current_state.has_refs() {
// Safety: `CancellationTokenState` is always stored in refcounted
// Boxes
let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) };
}
current_state
}
fn remove_parent_ref(&self, current_state: StateSnapshot) -> StateSnapshot {
let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| {
state.has_parent_ref = false;
state
});
// Drop the State if it is not referenced anymore
if !current_state.has_refs() {
// Safety: `CancellationTokenState` is always stored in refcounted
// Boxes
let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) };
}
current_state
}
/// Unregisters a child from the parent token.
/// The child tokens state is not exactly known at this point in time.
/// If the parent token is cancelled, the child token gets removed from the
/// parents list, and might therefore already have been freed. If the parent
/// token is not cancelled, the child token is still valid.
fn unregister_child(
&mut self,
mut child_state: NonNull<CancellationTokenState>,
current_child_state: StateSnapshot,
) {
let removed_child = {
// Remove the child toke from the parents linked list
let mut guard = self.synchronized.lock().unwrap();
if !guard.is_cancelled {
// Safety: Since the token was not cancelled, the child must
// still be in the list and valid.
let mut child_state = unsafe { child_state.as_mut() };
debug_assert!(child_state.snapshot().has_parent_ref);
if guard.first_child == Some(child_state.into()) {
guard.first_child = child_state.from_parent.next_peer;
}
// Safety: If peers wouldn't be valid anymore, they would try
// to remove themselves from the list. This would require locking
// the Mutex that we currently own.
unsafe {
if let Some(mut prev_peer) = child_state.from_parent.prev_peer {
prev_peer.as_mut().from_parent.next_peer =
child_state.from_parent.next_peer;
}
if let Some(mut next_peer) = child_state.from_parent.next_peer {
next_peer.as_mut().from_parent.prev_peer =
child_state.from_parent.prev_peer;
}
}
child_state.from_parent.prev_peer = None;
child_state.from_parent.next_peer = None;
// The child is no longer referenced by the parent, since we were able
// to remove its reference from the parents list.
true
} else {
// Do not touch the linked list anymore. If the parent is cancelled
// it will move all childs outside of the Mutex and manipulate
// the pointers there. Manipulating the pointers here too could
// lead to races. Therefore leave them just as as and let the
// parent deal with it. The parent will make sure to retain a
// reference to this state as long as it manipulates the list
// pointers. Therefore the pointers are not dangling.
false
}
};
if removed_child {
// If the token removed itself from the parents list, it can reset
// the parent ref status. If it is isn't able to do so, because the
// parent removed it from the list, there is no need to do this.
// The parent ref acts as as another reference count. Therefore
// removing this reference can free the object.
// Safety: The token was in the list. This means the parent wasn't
// cancelled before, and the token must still be alive.
unsafe { child_state.as_mut().remove_parent_ref(current_child_state) };
}
// Decrement the refcount on the parent and free it if necessary
self.decrement_refcount(self.snapshot());
}
fn cancel(&self) {
// Move the state of the CancellationToken from `NotCancelled` to `Cancelling`
let mut current_state = self.snapshot();
let state_after_cancellation = loop {
if current_state.cancel_state != CancellationState::NotCancelled {
// Another task already initiated the cancellation
return;
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}
let mut next_state = current_state;
next_state.cancel_state = CancellationState::Cancelling;
match self.state.compare_exchange(
current_state.pack(),
next_state.pack(),
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break next_state,
Err(actual) => current_state = StateSnapshot::unpack(actual),
// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
};
// This task cancelled the token
// Take the task list out of the Token
// We do not want to cancel child token inside this lock. If one of the
// child tasks would have additional child tokens, we would recursively
// take locks.
// Doing this action has an impact if the child token is dropped concurrently:
// It will try to deregister itself from the parent task, but can not find
// itself in the task list anymore. Therefore it needs to assume the parent
// has extracted the list and will process it. It may not modify the list.
// This is OK from a memory safety perspective, since the parent still
// retains a reference to the child task until it finished iterating over
// it.
let mut first_child = {
let mut guard = self.synchronized.lock().unwrap();
// Save the cancellation also inside the Mutex
// This allows child tokens which want to detach themselves to detect
// that this is no longer required since the parent cleared the list.
guard.is_cancelled = true;
// Wakeup all waiters
// This happens inside the lock to make cancellation reliable
// If we would access waiters outside of the lock, the pointers
// may no longer be valid.
// Typically this shouldn't be an issue, since waking a task should
// only move it from the blocked into the ready state and not have
// further side effects.
// Use a reverse iterator, so that the oldest waiter gets
// scheduled first
guard.waiters.reverse_drain(|waiter| {
// We are not allowed to move the `Waker` out of the list node.
// The `Future` relies on the fact that the old `Waker` stays there
// as long as the `Future` has not completed in order to perform
// the `will_wake()` check.
// Therefore `wake_by_ref` is used instead of `wake()`
if let Some(handle) = &mut waiter.task {
handle.wake_by_ref();
}
// Mark the waiter to have been removed from the list.
waiter.state = PollState::Done;
});
guard.first_child.take()
};
while let Some(mut child) = first_child {
// Safety: We know this is a valid pointer since it is in our child pointer
// list. It can't have been freed in between, since we retain a a reference
// to each child.
let mut_child = unsafe { child.as_mut() };
// Get the next child and clean up list pointers
first_child = mut_child.from_parent.next_peer;
mut_child.from_parent.prev_peer = None;
mut_child.from_parent.next_peer = None;
// Cancel the child task
mut_child.cancel();
// Drop the parent reference. This `CancellationToken` is not interested
// in interacting with the child anymore.
// This is ONLY allowed once we promised not to touch the state anymore
// after this interaction.
mut_child.remove_parent_ref(mut_child.snapshot());
this.future.set(this.cancellation_token.inner.notified());
}
// The cancellation has completed
// At this point in time tasks which registered a wait node can be sure
// that this wait node already had been dequeued from the list without
// needing to inspect the list.
self.atomic_update_state(state_after_cancellation, |mut state| {
state.cancel_state = CancellationState::Cancelled;
state
});
}
/// Returns `true` if the `CancellationToken` had been cancelled
fn is_cancelled(&self) -> bool {
let current_state = self.snapshot();
current_state.cancel_state != CancellationState::NotCancelled
}
/// Registers a waiting task at the `CancellationToken`.
/// Safety: This method is only safe as long as the waiting waiting task
/// will properly unregister the wait node before it gets moved.
unsafe fn register(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
debug_assert_eq!(PollState::New, wait_node.state);
let current_state = self.snapshot();
// Perform an optimistic cancellation check before. This is not strictly
// necessary since we also check for cancellation in the Mutex, but
// reduces the necessary work to be performed for tasks which already
// had been cancelled.
if current_state.cancel_state != CancellationState::NotCancelled {
return Poll::Ready(());
}
// So far the token is not cancelled. However it could be cancelled before
// we get the chance to store the `Waker`. Therefore we need to check
// for cancellation again inside the mutex.
let mut guard = self.synchronized.lock().unwrap();
if guard.is_cancelled {
// Cancellation was signalled
wait_node.state = PollState::Done;
Poll::Ready(())
} else {
// Added the task to the wait queue
wait_node.task = Some(cx.waker().clone());
wait_node.state = PollState::Waiting;
guard.waiters.add_front(wait_node);
Poll::Pending
}
}
fn check_for_cancellation(
&self,
wait_node: &mut ListNode<WaitQueueEntry>,
cx: &mut Context<'_>,
) -> Poll<()> {
debug_assert!(
wait_node.task.is_some(),
"Method can only be called after task had been registered"
);
let current_state = self.snapshot();
if current_state.cancel_state != CancellationState::NotCancelled {
// If the cancellation had been fully completed we know that our `Waker`
// is no longer registered at the `CancellationToken`.
// Otherwise the cancel call may or may not yet have iterated
// through the waiters list and removed the wait nodes.
// If it hasn't yet, we need to remove it. Otherwise an attempt to
// reuse the `wait_node´ might get freed due to the `WaitForCancellationFuture`
// getting dropped before the cancellation had interacted with it.
if current_state.cancel_state != CancellationState::Cancelled {
self.unregister(wait_node);
}
Poll::Ready(())
} else {
// Check if we need to swap the `Waker`. This will make the check more
// expensive, since the `Waker` is synchronized through the Mutex.
// If we don't need to perform a `Waker` update, an atomic check for
// cancellation is sufficient.
let need_waker_update = wait_node
.task
.as_ref()
.map(|waker| !waker.will_wake(cx.waker()))
.unwrap_or(true);
if need_waker_update {
let guard = self.synchronized.lock().unwrap();
if guard.is_cancelled {
// Cancellation was signalled. Since this cancellation signal
// is set inside the Mutex, the old waiter must already have
// been removed from the waiting list
debug_assert_eq!(PollState::Done, wait_node.state);
wait_node.task = None;
Poll::Ready(())
} else {
// The WaitForCancellationFuture is already in the queue.
// The CancellationToken can't have been cancelled,
// since this would change the is_cancelled flag inside the mutex.
// Therefore we just have to update the Waker. A follow-up
// cancellation will always use the new waker.
wait_node.task = Some(cx.waker().clone());
Poll::Pending
}
} else {
// Do nothing. If the token gets cancelled, this task will get
// woken again and can fetch the cancellation.
Poll::Pending
}
}
}
fn unregister(&self, wait_node: &mut ListNode<WaitQueueEntry>) {
debug_assert!(
wait_node.task.is_some(),
"waiter can not be active without task"
);
let mut guard = self.synchronized.lock().unwrap();
// WaitForCancellationFuture only needs to get removed if it has been added to
// the wait queue of the CancellationToken.
// This has happened in the PollState::Waiting case.
if let PollState::Waiting = wait_node.state {
// Safety: Due to the state, we know that the node must be part
// of the waiter list
if !unsafe { guard.waiters.remove(wait_node) } {
// Panic if the address isn't found. This can only happen if the contract was
// violated, e.g. the WaitQueueEntry got moved after the initial poll.
panic!("Future could not be removed from wait queue");
}
wait_node.state = PollState::Done;
}
wait_node.task = None;
}
}

View File

@ -0,0 +1,373 @@
//! This mod provides the logic for the inner tree structure of the CancellationToken.
//!
//! CancellationTokens are only light handles with references to TreeNode.
//! All the logic is actually implemented in the TreeNode.
//!
//! A TreeNode is part of the cancellation tree and may have one parent and an arbitrary number of
//! children.
//!
//! A TreeNode can receive the request to perform a cancellation through a CancellationToken.
//! This cancellation request will cancel the node and all of its descendants.
//!
//! As soon as a node cannot get cancelled any more (because it was already cancelled or it has no
//! more CancellationTokens pointing to it any more), it gets removed from the tree, to keep the
//! tree as small as possible.
//!
//! # Invariants
//!
//! Those invariants shall be true at any time.
//!
//! 1. A node that has no parents and no handles can no longer be cancelled.
//! This is important during both cancellation and refcounting.
//!
//! 2. If node B *is* or *was* a child of node A, then node B was created *after* node A.
//! This is important for deadlock safety, as it is used for lock order.
//! Node B can only become the child of node A in two ways:
//! - being created with `child_node()`, in which case it is trivially true that
//! node A already existed when node B was created
//! - being moved A->C->B to A->B because node C was removed in `decrease_handle_refcount()`
//! or `cancel()`. In this case the invariant still holds, as B was younger than C, and C
//! was younger than A, therefore B is also younger than A.
//!
//! 3. If two nodes are both unlocked and node A is the parent of node B, then node B is a child of
//! node A. It is important to always restore that invariant before dropping the lock of a node.
//!
//! # Deadlock safety
//!
//! We always lock in the order of creation time. We can prove this through invariant #2.
//! Specifically, through invariant #2, we know that we always have to lock a parent
//! before its child.
//!
use crate::loom::sync::{Arc, Mutex, MutexGuard};
/// A node of the cancellation tree structure
///
/// The actual data it holds is wrapped inside a mutex for synchronization.
pub(crate) struct TreeNode {
inner: Mutex<Inner>,
waker: tokio::sync::Notify,
}
impl TreeNode {
pub(crate) fn new() -> Self {
Self {
inner: Mutex::new(Inner {
parent: None,
parent_idx: 0,
children: vec![],
is_cancelled: false,
num_handles: 1,
}),
waker: tokio::sync::Notify::new(),
}
}
pub(crate) fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.waker.notified()
}
}
/// The data contained inside a TreeNode.
///
/// This struct exists so that the data of the node can be wrapped
/// in a Mutex.
struct Inner {
parent: Option<Arc<TreeNode>>,
parent_idx: usize,
children: Vec<Arc<TreeNode>>,
is_cancelled: bool,
num_handles: usize,
}
/// Returns whether or not the node is cancelled
pub(crate) fn is_cancelled(node: &Arc<TreeNode>) -> bool {
node.inner.lock().unwrap().is_cancelled
}
/// Creates a child node
pub(crate) fn child_node(parent: &Arc<TreeNode>) -> Arc<TreeNode> {
let mut locked_parent = parent.inner.lock().unwrap();
// Do not register as child if we are already cancelled.
// Cancelled trees can never be uncancelled and therefore
// need no connection to parents or children any more.
if locked_parent.is_cancelled {
return Arc::new(TreeNode {
inner: Mutex::new(Inner {
parent: None,
parent_idx: 0,
children: vec![],
is_cancelled: true,
num_handles: 1,
}),
waker: tokio::sync::Notify::new(),
});
}
let child = Arc::new(TreeNode {
inner: Mutex::new(Inner {
parent: Some(parent.clone()),
parent_idx: locked_parent.children.len(),
children: vec![],
is_cancelled: false,
num_handles: 1,
}),
waker: tokio::sync::Notify::new(),
});
locked_parent.children.push(child.clone());
child
}
/// Disconnects the given parent from all of its children.
///
/// Takes a reference to [Inner] to make sure the parent is already locked.
fn disconnect_children(node: &mut Inner) {
for child in std::mem::take(&mut node.children) {
let mut locked_child = child.inner.lock().unwrap();
locked_child.parent_idx = 0;
locked_child.parent = None;
}
}
/// Figures out the parent of the node and locks the node and its parent atomically.
///
/// The basic principle of preventing deadlocks in the tree is
/// that we always lock the parent first, and then the child.
/// For more info look at *deadlock safety* and *invariant #2*.
///
/// Sadly, it's impossible to figure out the parent of a node without
/// locking it. To then achieve locking order consistency, the node
/// has to be unlocked before the parent gets locked.
/// This leaves a small window where we already assume that we know the parent,
/// but neither the parent nor the node is locked. Therefore, the parent could change.
///
/// To prevent that this problem leaks into the rest of the code, it is abstracted
/// in this function.
///
/// The locked child and optionally its locked parent, if a parent exists, get passed
/// to the `func` argument via (node, None) or (node, Some(parent)).
fn with_locked_node_and_parent<F, Ret>(node: &Arc<TreeNode>, func: F) -> Ret
where
F: FnOnce(MutexGuard<'_, Inner>, Option<MutexGuard<'_, Inner>>) -> Ret,
{
let mut potential_parent = {
let locked_node = node.inner.lock().unwrap();
match locked_node.parent.clone() {
Some(parent) => parent,
// If we locked the node and its parent is `None`, we are in a valid state
// and can return.
None => return func(locked_node, None),
}
};
loop {
// Deadlock safety:
//
// Due to invariant #2, we know that we have to lock the parent first, and then the child.
// This is true even if the potential_parent is no longer the current parent or even its
// sibling, as the invariant still holds.
let locked_parent = potential_parent.inner.lock().unwrap();
let locked_node = node.inner.lock().unwrap();
let actual_parent = match locked_node.parent.clone() {
Some(parent) => parent,
// If we locked the node and its parent is `None`, we are in a valid state
// and can return.
None => {
// Was the wrong parent, so unlock it before calling `func`
drop(locked_parent);
return func(locked_node, None);
}
};
// Loop until we managed to lock both the node and its parent
if Arc::ptr_eq(&actual_parent, &potential_parent) {
return func(locked_node, Some(locked_parent));
}
// Drop locked_parent before reassigning to potential_parent,
// as potential_parent is borrowed in it
drop(locked_node);
drop(locked_parent);
potential_parent = actual_parent;
}
}
/// Moves all children from `node` to `parent`.
///
/// `parent` MUST have been a parent of the node when they both got locked,
/// otherwise there is a potential for a deadlock as invariant #2 would be violated.
///
/// To aquire the locks for node and parent, use [with_locked_node_and_parent].
fn move_children_to_parent(node: &mut Inner, parent: &mut Inner) {
// Pre-allocate in the parent, for performance
parent.children.reserve(node.children.len());
for child in std::mem::take(&mut node.children) {
{
let mut child_locked = child.inner.lock().unwrap();
child_locked.parent = node.parent.clone();
child_locked.parent_idx = parent.children.len();
}
parent.children.push(child);
}
}
/// Removes a child from the parent.
///
/// `parent` MUST be the parent of `node`.
/// To aquire the locks for node and parent, use [with_locked_node_and_parent].
fn remove_child(parent: &mut Inner, mut node: MutexGuard<'_, Inner>) {
// Query the position from where to remove a node
let pos = node.parent_idx;
node.parent = None;
node.parent_idx = 0;
// Unlock node, so that only one child at a time is locked.
// Otherwise we would violate the lock order (see 'deadlock safety') as we
// don't know the creation order of the child nodes
drop(node);
// If `node` is the last element in the list, we don't need any swapping
if parent.children.len() == pos + 1 {
parent.children.pop().unwrap();
} else {
// If `node` is not the last element in the list, we need to
// replace it with the last element
let replacement_child = parent.children.pop().unwrap();
replacement_child.inner.lock().unwrap().parent_idx = pos;
parent.children[pos] = replacement_child;
}
let len = parent.children.len();
if 4 * len <= parent.children.capacity() {
// equal to:
// parent.children.shrink_to(2 * len);
// but shrink_to was not yet stabilized in our minimal compatible version
let old_children = std::mem::replace(&mut parent.children, Vec::with_capacity(2 * len));
parent.children.extend(old_children);
}
}
/// Increases the reference count of handles.
pub(crate) fn increase_handle_refcount(node: &Arc<TreeNode>) {
let mut locked_node = node.inner.lock().unwrap();
// Once no handles are left over, the node gets detached from the tree.
// There should never be a new handle once all handles are dropped.
assert!(locked_node.num_handles > 0);
locked_node.num_handles += 1;
}
/// Decreases the reference count of handles.
///
/// Once no handle is left, we can remove the node from the
/// tree and connect its parent directly to its children.
pub(crate) fn decrease_handle_refcount(node: &Arc<TreeNode>) {
let num_handles = {
let mut locked_node = node.inner.lock().unwrap();
locked_node.num_handles -= 1;
locked_node.num_handles
};
if num_handles == 0 {
with_locked_node_and_parent(node, |mut node, parent| {
// Remove the node from the tree
match parent {
Some(mut parent) => {
// As we want to remove ourselves from the tree,
// we have to move the children to the parent, so that
// they still receive the cancellation event without us.
// Moving them does not violate invariant #1.
move_children_to_parent(&mut node, &mut parent);
// Remove the node from the parent
remove_child(&mut parent, node);
}
None => {
// Due to invariant #1, we can assume that our
// children can no longer be cancelled through us.
// (as we now have neither a parent nor handles)
// Therefore we can disconnect them.
disconnect_children(&mut node);
}
}
});
}
}
/// Cancels a node and its children.
pub(crate) fn cancel(node: &Arc<TreeNode>) {
let mut locked_node = node.inner.lock().unwrap();
if locked_node.is_cancelled {
return;
}
// One by one, adopt grandchildren and then cancel and detach the child
while let Some(child) = locked_node.children.pop() {
// This can't deadlock because the mutex we are already
// holding is the parent of child.
let mut locked_child = child.inner.lock().unwrap();
// Detach the child from node
// No need to modify node.children, as the child already got removed with `.pop`
locked_child.parent = None;
locked_child.parent_idx = 0;
// If child is already cancelled, detaching is enough
if locked_child.is_cancelled {
continue;
}
// Cancel or adopt grandchildren
while let Some(grandchild) = locked_child.children.pop() {
// This can't deadlock because the two mutexes we are already
// holding is the parent and grandparent of grandchild.
let mut locked_grandchild = grandchild.inner.lock().unwrap();
// Detach the grandchild
locked_grandchild.parent = None;
locked_grandchild.parent_idx = 0;
// If grandchild is already cancelled, detaching is enough
if locked_grandchild.is_cancelled {
continue;
}
// For performance reasons, only adopt grandchildren that have children.
// Otherwise, just cancel them right away, no need for another iteration.
if locked_grandchild.children.is_empty() {
// Cancel the grandchild
locked_grandchild.is_cancelled = true;
locked_grandchild.children = Vec::new();
drop(locked_grandchild);
grandchild.waker.notify_waiters();
} else {
// Otherwise, adopt grandchild
locked_grandchild.parent = Some(node.clone());
locked_grandchild.parent_idx = locked_node.children.len();
drop(locked_grandchild);
locked_node.children.push(grandchild);
}
}
// Cancel the child
locked_child.is_cancelled = true;
locked_child.children = Vec::new();
drop(locked_child);
child.waker.notify_waiters();
// Now the child is cancelled and detached and all its children are adopted.
// Just continue until all (including adopted) children are cancelled and detached.
}
// Cancel the node itself.
locked_node.is_cancelled = true;
locked_node.children = Vec::new();
drop(locked_node);
node.waker.notify_waiters();
}

View File

@ -1,788 +0,0 @@
//! An intrusive double linked list of data
#![allow(dead_code, unreachable_pub)]
use core::{
marker::PhantomPinned,
ops::{Deref, DerefMut},
ptr::NonNull,
};
/// A node which carries data of type `T` and is stored in an intrusive list
#[derive(Debug)]
pub struct ListNode<T> {
/// The previous node in the list. `None` if there is no previous node.
prev: Option<NonNull<ListNode<T>>>,
/// The next node in the list. `None` if there is no previous node.
next: Option<NonNull<ListNode<T>>>,
/// The data which is associated to this list item
data: T,
/// Prevents `ListNode`s from being `Unpin`. They may never be moved, since
/// the list semantics require addresses to be stable.
_pin: PhantomPinned,
}
impl<T> ListNode<T> {
/// Creates a new node with the associated data
pub fn new(data: T) -> ListNode<T> {
Self {
prev: None,
next: None,
data,
_pin: PhantomPinned,
}
}
}
impl<T> Deref for ListNode<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T> DerefMut for ListNode<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.data
}
}
/// An intrusive linked list of nodes, where each node carries associated data
/// of type `T`.
#[derive(Debug)]
pub struct LinkedList<T> {
head: Option<NonNull<ListNode<T>>>,
tail: Option<NonNull<ListNode<T>>>,
}
impl<T> LinkedList<T> {
/// Creates an empty linked list
pub fn new() -> Self {
LinkedList::<T> {
head: None,
tail: None,
}
}
/// Adds a node at the front of the linked list.
/// Safety: This function is only safe as long as `node` is guaranteed to
/// get removed from the list before it gets moved or dropped.
/// In addition to this `node` may not be added to another other list before
/// it is removed from the current one.
pub unsafe fn add_front(&mut self, node: &mut ListNode<T>) {
node.next = self.head;
node.prev = None;
if let Some(mut head) = self.head {
head.as_mut().prev = Some(node.into())
};
self.head = Some(node.into());
if self.tail.is_none() {
self.tail = Some(node.into());
}
}
/// Inserts a node into the list in a way that the list keeps being sorted.
/// Safety: This function is only safe as long as `node` is guaranteed to
/// get removed from the list before it gets moved or dropped.
/// In addition to this `node` may not be added to another other list before
/// it is removed from the current one.
pub unsafe fn add_sorted(&mut self, node: &mut ListNode<T>)
where
T: PartialOrd,
{
if self.head.is_none() {
// First node in the list
self.head = Some(node.into());
self.tail = Some(node.into());
return;
}
let mut prev: Option<NonNull<ListNode<T>>> = None;
let mut current = self.head;
while let Some(mut current_node) = current {
if node.data < current_node.as_ref().data {
// Need to insert before the current node
current_node.as_mut().prev = Some(node.into());
match prev {
Some(mut prev) => {
prev.as_mut().next = Some(node.into());
}
None => {
// We are inserting at the beginning of the list
self.head = Some(node.into());
}
}
node.next = current;
node.prev = prev;
return;
}
prev = current;
current = current_node.as_ref().next;
}
// We looped through the whole list and the nodes data is bigger or equal
// than everything we found up to now.
// Insert at the end. Since we checked before that the list isn't empty,
// tail always has a value.
node.prev = self.tail;
node.next = None;
self.tail.as_mut().unwrap().as_mut().next = Some(node.into());
self.tail = Some(node.into());
}
/// Returns the first node in the linked list without removing it from the list
/// The function is only safe as long as valid pointers are stored inside
/// the linked list.
/// The returned pointer is only guaranteed to be valid as long as the list
/// is not mutated
pub fn peek_first(&self) -> Option<&mut ListNode<T>> {
// Safety: When the node was inserted it was promised that it is alive
// until it gets removed from the list.
// The returned node has a pointer which constrains it to the lifetime
// of the list. This is ok, since the Node is supposed to outlive
// its insertion in the list.
unsafe {
self.head
.map(|mut node| &mut *(node.as_mut() as *mut ListNode<T>))
}
}
/// Returns the last node in the linked list without removing it from the list
/// The function is only safe as long as valid pointers are stored inside
/// the linked list.
/// The returned pointer is only guaranteed to be valid as long as the list
/// is not mutated
pub fn peek_last(&self) -> Option<&mut ListNode<T>> {
// Safety: When the node was inserted it was promised that it is alive
// until it gets removed from the list.
// The returned node has a pointer which constrains it to the lifetime
// of the list. This is ok, since the Node is supposed to outlive
// its insertion in the list.
unsafe {
self.tail
.map(|mut node| &mut *(node.as_mut() as *mut ListNode<T>))
}
}
/// Removes the first node from the linked list
pub fn remove_first(&mut self) -> Option<&mut ListNode<T>> {
#![allow(clippy::debug_assert_with_mut_call)]
// Safety: When the node was inserted it was promised that it is alive
// until it gets removed from the list
unsafe {
let mut head = self.head?;
self.head = head.as_mut().next;
let first_ref = head.as_mut();
match first_ref.next {
None => {
// This was the only node in the list
debug_assert_eq!(Some(first_ref.into()), self.tail);
self.tail = None;
}
Some(mut next) => {
next.as_mut().prev = None;
}
}
first_ref.prev = None;
first_ref.next = None;
Some(&mut *(first_ref as *mut ListNode<T>))
}
}
/// Removes the last node from the linked list and returns it
pub fn remove_last(&mut self) -> Option<&mut ListNode<T>> {
#![allow(clippy::debug_assert_with_mut_call)]
// Safety: When the node was inserted it was promised that it is alive
// until it gets removed from the list
unsafe {
let mut tail = self.tail?;
self.tail = tail.as_mut().prev;
let last_ref = tail.as_mut();
match last_ref.prev {
None => {
// This was the last node in the list
debug_assert_eq!(Some(last_ref.into()), self.head);
self.head = None;
}
Some(mut prev) => {
prev.as_mut().next = None;
}
}
last_ref.prev = None;
last_ref.next = None;
Some(&mut *(last_ref as *mut ListNode<T>))
}
}
/// Returns whether the linked list does not contain any node
pub fn is_empty(&self) -> bool {
if self.head.is_some() {
return false;
}
debug_assert!(self.tail.is_none());
true
}
/// Removes the given `node` from the linked list.
/// Returns whether the `node` was removed.
/// It is also only safe if it is known that the `node` is either part of this
/// list, or of no list at all. If `node` is part of another list, the
/// behavior is undefined.
pub unsafe fn remove(&mut self, node: &mut ListNode<T>) -> bool {
#![allow(clippy::debug_assert_with_mut_call)]
match node.prev {
None => {
// This might be the first node in the list. If it is not, the
// node is not in the list at all. Since our precondition is that
// the node must either be in this list or in no list, we check that
// the node is really in no list.
if self.head != Some(node.into()) {
debug_assert!(node.next.is_none());
return false;
}
self.head = node.next;
}
Some(mut prev) => {
debug_assert_eq!(prev.as_ref().next, Some(node.into()));
prev.as_mut().next = node.next;
}
}
match node.next {
None => {
// This must be the last node in our list. Otherwise the list
// is inconsistent.
debug_assert_eq!(self.tail, Some(node.into()));
self.tail = node.prev;
}
Some(mut next) => {
debug_assert_eq!(next.as_mut().prev, Some(node.into()));
next.as_mut().prev = node.prev;
}
}
node.next = None;
node.prev = None;
true
}
/// Drains the list iby calling a callback on each list node
///
/// The method does not return an iterator since stopping or deferring
/// draining the list is not permitted. If the method would push nodes to
/// an iterator we could not guarantee that the nodes do not get utilized
/// after having been removed from the list anymore.
pub fn drain<F>(&mut self, mut func: F)
where
F: FnMut(&mut ListNode<T>),
{
let mut current = self.head;
self.head = None;
self.tail = None;
while let Some(mut node) = current {
// Safety: The nodes have not been removed from the list yet and must
// therefore contain valid data. The nodes can also not be added to
// the list again during iteration, since the list is mutably borrowed.
unsafe {
let node_ref = node.as_mut();
current = node_ref.next;
node_ref.next = None;
node_ref.prev = None;
// Note: We do not reset the pointers from the next element in the
// list to the current one since we will iterate over the whole
// list anyway, and therefore clean up all pointers.
func(node_ref);
}
}
}
/// Drains the list in reverse order by calling a callback on each list node
///
/// The method does not return an iterator since stopping or deferring
/// draining the list is not permitted. If the method would push nodes to
/// an iterator we could not guarantee that the nodes do not get utilized
/// after having been removed from the list anymore.
pub fn reverse_drain<F>(&mut self, mut func: F)
where
F: FnMut(&mut ListNode<T>),
{
let mut current = self.tail;
self.head = None;
self.tail = None;
while let Some(mut node) = current {
// Safety: The nodes have not been removed from the list yet and must
// therefore contain valid data. The nodes can also not be added to
// the list again during iteration, since the list is mutably borrowed.
unsafe {
let node_ref = node.as_mut();
current = node_ref.prev;
node_ref.next = None;
node_ref.prev = None;
// Note: We do not reset the pointers from the next element in the
// list to the current one since we will iterate over the whole
// list anyway, and therefore clean up all pointers.
func(node_ref);
}
}
}
}
#[cfg(all(test, feature = "std"))] // Tests make use of Vec at the moment
mod tests {
use super::*;
fn collect_list<T: Copy>(mut list: LinkedList<T>) -> Vec<T> {
let mut result = Vec::new();
list.drain(|node| {
result.push(**node);
});
result
}
fn collect_reverse_list<T: Copy>(mut list: LinkedList<T>) -> Vec<T> {
let mut result = Vec::new();
list.reverse_drain(|node| {
result.push(**node);
});
result
}
unsafe fn add_nodes(list: &mut LinkedList<i32>, nodes: &mut [&mut ListNode<i32>]) {
for node in nodes.iter_mut() {
list.add_front(node);
}
}
unsafe fn assert_clean<T>(node: &mut ListNode<T>) {
assert!(node.next.is_none());
assert!(node.prev.is_none());
}
#[test]
fn insert_and_iterate() {
unsafe {
let mut a = ListNode::new(5);
let mut b = ListNode::new(7);
let mut c = ListNode::new(31);
let mut setup = |list: &mut LinkedList<i32>| {
assert_eq!(true, list.is_empty());
list.add_front(&mut c);
assert_eq!(31, **list.peek_first().unwrap());
assert_eq!(false, list.is_empty());
list.add_front(&mut b);
assert_eq!(7, **list.peek_first().unwrap());
list.add_front(&mut a);
assert_eq!(5, **list.peek_first().unwrap());
};
let mut list = LinkedList::new();
setup(&mut list);
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 7, 31].to_vec(), items);
let mut list = LinkedList::new();
setup(&mut list);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([31, 7, 5].to_vec(), items);
}
}
#[test]
fn add_sorted() {
unsafe {
let mut a = ListNode::new(5);
let mut b = ListNode::new(7);
let mut c = ListNode::new(31);
let mut d = ListNode::new(99);
let mut list = LinkedList::new();
list.add_sorted(&mut a);
let items: Vec<i32> = collect_list(list);
assert_eq!([5].to_vec(), items);
let mut list = LinkedList::new();
list.add_sorted(&mut a);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut d, &mut c, &mut b]);
list.add_sorted(&mut a);
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 7, 31, 99].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut d, &mut c, &mut b]);
list.add_sorted(&mut a);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([99, 31, 7, 5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut d, &mut c, &mut a]);
list.add_sorted(&mut b);
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 7, 31, 99].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut d, &mut c, &mut a]);
list.add_sorted(&mut b);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([99, 31, 7, 5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut d, &mut b, &mut a]);
list.add_sorted(&mut c);
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 7, 31, 99].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut d, &mut b, &mut a]);
list.add_sorted(&mut c);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([99, 31, 7, 5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
list.add_sorted(&mut d);
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 7, 31, 99].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
list.add_sorted(&mut d);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([99, 31, 7, 5].to_vec(), items);
}
}
#[test]
fn drain_and_collect() {
unsafe {
let mut a = ListNode::new(5);
let mut b = ListNode::new(7);
let mut c = ListNode::new(31);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
let taken_items: Vec<i32> = collect_list(list);
assert_eq!([5, 7, 31].to_vec(), taken_items);
}
}
#[test]
fn peek_last() {
unsafe {
let mut a = ListNode::new(5);
let mut b = ListNode::new(7);
let mut c = ListNode::new(31);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
let last = list.peek_last();
assert_eq!(31, **last.unwrap());
list.remove_last();
let last = list.peek_last();
assert_eq!(7, **last.unwrap());
list.remove_last();
let last = list.peek_last();
assert_eq!(5, **last.unwrap());
list.remove_last();
let last = list.peek_last();
assert!(last.is_none());
}
}
#[test]
fn remove_first() {
unsafe {
// We iterate forward and backwards through the manipulated lists
// to make sure pointers in both directions are still ok.
let mut a = ListNode::new(5);
let mut b = ListNode::new(7);
let mut c = ListNode::new(31);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
let removed = list.remove_first().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_list(list);
assert_eq!([7, 31].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
let removed = list.remove_first().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([31, 7].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
let removed = list.remove_first().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_list(list);
assert_eq!([7].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
let removed = list.remove_first().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([7].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut a]);
let removed = list.remove_first().unwrap();
assert_clean(removed);
assert!(list.is_empty());
let items: Vec<i32> = collect_list(list);
assert!(items.is_empty());
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut a]);
let removed = list.remove_first().unwrap();
assert_clean(removed);
assert!(list.is_empty());
let items: Vec<i32> = collect_reverse_list(list);
assert!(items.is_empty());
}
}
#[test]
fn remove_last() {
unsafe {
// We iterate forward and backwards through the manipulated lists
// to make sure pointers in both directions are still ok.
let mut a = ListNode::new(5);
let mut b = ListNode::new(7);
let mut c = ListNode::new(31);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
let removed = list.remove_last().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 7].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
let removed = list.remove_last().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([7, 5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
let removed = list.remove_last().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_list(list);
assert_eq!([5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
let removed = list.remove_last().unwrap();
assert_clean(removed);
assert!(!list.is_empty());
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut a]);
let removed = list.remove_last().unwrap();
assert_clean(removed);
assert!(list.is_empty());
let items: Vec<i32> = collect_list(list);
assert!(items.is_empty());
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut a]);
let removed = list.remove_last().unwrap();
assert_clean(removed);
assert!(list.is_empty());
let items: Vec<i32> = collect_reverse_list(list);
assert!(items.is_empty());
}
}
#[test]
fn remove_by_address() {
unsafe {
let mut a = ListNode::new(5);
let mut b = ListNode::new(7);
let mut c = ListNode::new(31);
{
// Remove first
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
assert_eq!(true, list.remove(&mut a));
assert_clean((&mut a).into());
// a should be no longer there and can't be removed twice
assert_eq!(false, list.remove(&mut a));
assert_eq!(Some((&mut b).into()), list.head);
assert_eq!(Some((&mut c).into()), b.next);
assert_eq!(Some((&mut b).into()), c.prev);
let items: Vec<i32> = collect_list(list);
assert_eq!([7, 31].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
assert_eq!(true, list.remove(&mut a));
assert_clean((&mut a).into());
// a should be no longer there and can't be removed twice
assert_eq!(false, list.remove(&mut a));
assert_eq!(Some((&mut c).into()), b.next);
assert_eq!(Some((&mut b).into()), c.prev);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([31, 7].to_vec(), items);
}
{
// Remove middle
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
assert_eq!(true, list.remove(&mut b));
assert_clean((&mut b).into());
assert_eq!(Some((&mut c).into()), a.next);
assert_eq!(Some((&mut a).into()), c.prev);
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 31].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
assert_eq!(true, list.remove(&mut b));
assert_clean((&mut b).into());
assert_eq!(Some((&mut c).into()), a.next);
assert_eq!(Some((&mut a).into()), c.prev);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([31, 5].to_vec(), items);
}
{
// Remove last
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
assert_eq!(true, list.remove(&mut c));
assert_clean((&mut c).into());
assert!(b.next.is_none());
assert_eq!(Some((&mut b).into()), list.tail);
let items: Vec<i32> = collect_list(list);
assert_eq!([5, 7].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]);
assert_eq!(true, list.remove(&mut c));
assert_clean((&mut c).into());
assert!(b.next.is_none());
assert_eq!(Some((&mut b).into()), list.tail);
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([7, 5].to_vec(), items);
}
{
// Remove first of two
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
assert_eq!(true, list.remove(&mut a));
assert_clean((&mut a).into());
// a should be no longer there and can't be removed twice
assert_eq!(false, list.remove(&mut a));
assert_eq!(Some((&mut b).into()), list.head);
assert_eq!(Some((&mut b).into()), list.tail);
assert!(b.next.is_none());
assert!(b.prev.is_none());
let items: Vec<i32> = collect_list(list);
assert_eq!([7].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
assert_eq!(true, list.remove(&mut a));
assert_clean((&mut a).into());
// a should be no longer there and can't be removed twice
assert_eq!(false, list.remove(&mut a));
assert_eq!(Some((&mut b).into()), list.head);
assert_eq!(Some((&mut b).into()), list.tail);
assert!(b.next.is_none());
assert!(b.prev.is_none());
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([7].to_vec(), items);
}
{
// Remove last of two
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
assert_eq!(true, list.remove(&mut b));
assert_clean((&mut b).into());
assert_eq!(Some((&mut a).into()), list.head);
assert_eq!(Some((&mut a).into()), list.tail);
assert!(a.next.is_none());
assert!(a.prev.is_none());
let items: Vec<i32> = collect_list(list);
assert_eq!([5].to_vec(), items);
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut b, &mut a]);
assert_eq!(true, list.remove(&mut b));
assert_clean((&mut b).into());
assert_eq!(Some((&mut a).into()), list.head);
assert_eq!(Some((&mut a).into()), list.tail);
assert!(a.next.is_none());
assert!(a.prev.is_none());
let items: Vec<i32> = collect_reverse_list(list);
assert_eq!([5].to_vec(), items);
}
{
// Remove last item
let mut list = LinkedList::new();
add_nodes(&mut list, &mut [&mut a]);
assert_eq!(true, list.remove(&mut a));
assert_clean((&mut a).into());
assert!(list.head.is_none());
assert!(list.tail.is_none());
let items: Vec<i32> = collect_list(list);
assert!(items.is_empty());
}
{
// Remove missing
let mut list = LinkedList::new();
list.add_front(&mut b);
list.add_front(&mut a);
assert_eq!(false, list.remove(&mut c));
}
}
}
}

View File

@ -3,8 +3,6 @@
mod cancellation_token;
pub use cancellation_token::{guard::DropGuard, CancellationToken, WaitForCancellationFuture};
mod intrusive_double_linked_list;
mod mpsc;
pub use mpsc::{PollSendError, PollSender};

View File

@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio::pin;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
use core::future::Future;
use core::task::{Context, Poll};
@ -77,6 +77,46 @@ fn cancel_child_token_through_parent() {
);
}
#[test]
fn cancel_grandchild_token_through_parent_if_child_was_dropped() {
let (waker, wake_counter) = new_count_waker();
let token = CancellationToken::new();
let intermediate_token = token.child_token();
let child_token = intermediate_token.child_token();
drop(intermediate_token);
assert!(!child_token.is_cancelled());
let child_fut = child_token.cancelled();
pin!(child_fut);
let parent_fut = token.cancelled();
pin!(parent_fut);
assert_eq!(
Poll::Pending,
child_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(wake_counter, 0);
token.cancel();
assert_eq!(wake_counter, 2);
assert!(token.is_cancelled());
assert!(child_token.is_cancelled());
assert_eq!(
Poll::Ready(()),
child_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Ready(()),
parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
}
#[test]
fn cancel_child_token_without_parent() {
let (waker, wake_counter) = new_count_waker();
@ -206,6 +246,134 @@ fn drop_multiple_child_tokens() {
}
}
#[test]
fn cancel_only_all_descendants() {
// ARRANGE
let (waker, wake_counter) = new_count_waker();
let parent_token = CancellationToken::new();
let token = parent_token.child_token();
let sibling_token = parent_token.child_token();
let child1_token = token.child_token();
let child2_token = token.child_token();
let grandchild_token = child1_token.child_token();
let grandchild2_token = child1_token.child_token();
let grandgrandchild_token = grandchild_token.child_token();
assert!(!parent_token.is_cancelled());
assert!(!token.is_cancelled());
assert!(!sibling_token.is_cancelled());
assert!(!child1_token.is_cancelled());
assert!(!child2_token.is_cancelled());
assert!(!grandchild_token.is_cancelled());
assert!(!grandchild2_token.is_cancelled());
assert!(!grandgrandchild_token.is_cancelled());
let parent_fut = parent_token.cancelled();
let fut = token.cancelled();
let sibling_fut = sibling_token.cancelled();
let child1_fut = child1_token.cancelled();
let child2_fut = child2_token.cancelled();
let grandchild_fut = grandchild_token.cancelled();
let grandchild2_fut = grandchild2_token.cancelled();
let grandgrandchild_fut = grandgrandchild_token.cancelled();
pin!(parent_fut);
pin!(fut);
pin!(sibling_fut);
pin!(child1_fut);
pin!(child2_fut);
pin!(grandchild_fut);
pin!(grandchild2_fut);
pin!(grandgrandchild_fut);
assert_eq!(
Poll::Pending,
parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
sibling_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
grandchild_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
grandchild2_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Pending,
grandgrandchild_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);
assert_eq!(wake_counter, 0);
// ACT
token.cancel();
// ASSERT
assert_eq!(wake_counter, 6);
assert!(!parent_token.is_cancelled());
assert!(token.is_cancelled());
assert!(!sibling_token.is_cancelled());
assert!(child1_token.is_cancelled());
assert!(child2_token.is_cancelled());
assert!(grandchild_token.is_cancelled());
assert!(grandchild2_token.is_cancelled());
assert!(grandgrandchild_token.is_cancelled());
assert_eq!(
Poll::Ready(()),
fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Ready(()),
child1_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Ready(()),
child2_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Ready(()),
grandchild_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Ready(()),
grandchild2_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Ready(()),
grandgrandchild_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);
assert_eq!(wake_counter, 6);
}
#[test]
fn drop_parent_before_child_tokens() {
let token = CancellationToken::new();
@ -218,3 +386,15 @@ fn drop_parent_before_child_tokens() {
drop(child1);
drop(child2);
}
#[test]
fn derives_send_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<CancellationToken>();
assert_sync::<CancellationToken>();
assert_send::<WaitForCancellationFuture<'static>>();
assert_sync::<WaitForCancellationFuture<'static>>();
}