sync: adds Notify for basic task notification (#2210)

`Notify` provides a synchronization primitive similar to thread park /
unpark, except for tasks.
This commit is contained in:
Carl Lerche 2020-02-26 11:40:10 -08:00 committed by GitHub
parent 7207bf355e
commit 8b7ea0ff5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1211 additions and 0 deletions

View File

@ -64,6 +64,7 @@ pub(crate) mod sync {
pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
pub(crate) use std::sync::atomic::AtomicU8;
pub(crate) use std::sync::atomic::{fence, AtomicPtr};
pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
}

View File

@ -406,9 +406,18 @@
//! * [`Mutex`][Mutex] Mutual Exclusion mechanism, which ensures that at most
//! one thread at a time is able to access some data.
//!
//! * [`Notify`][Notify] Basic task notification. `Notify` supports notifying a
//! receiving task without sending data. In this case, the task wakes up and
//! resumes processing.
//!
//! * [`RwLock`][RwLock] Provides a mutual exclusion mechanism which allows
//! multiple readers at the same time, while allowing only one writer at a
//! time. In some cases, this can be more efficient than a mutex.
//!
//! * [`Semaphore`][Semaphore] Limits the amount of concurrency. A semaphore
//! holds a number of permits, which tasks may request in order to enter a
//! critical section. Semaphores are useful for implementing limiting of
//! bounding of any kind.
cfg_sync! {
mod barrier;
@ -421,6 +430,9 @@ cfg_sync! {
mod mutex;
pub use mutex::{Mutex, MutexGuard};
mod notify;
pub use notify::Notify;
pub mod oneshot;
pub(crate) mod semaphore_ll;

523
tokio/src/sync/notify.rs Normal file
View File

@ -0,0 +1,523 @@
use crate::loom::sync::atomic::AtomicU8;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::SeqCst;
use std::task::{Context, Poll, Waker};
/// Notify a single task to wake up.
///
/// `Notify` provides a basic mechanism to notify a single task of an event.
/// `Notify` itself does not carry any data. Instead, it is to be used to signal
/// another task to perform an operation.
///
/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits.
/// [`notified().await`] waits for a permit to become available, and [`notify()`]
/// sets a permit **if there currently are no available permits**.
///
/// The synchronization details of `Notify` are similar to
/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
/// value contains a single permit. [`notfied().await`] waits for the permit to
/// be made available, consumes the permit, and resumes. [`notify()`] sets the
/// permit, waking a pending task if there is one.
///
/// If `notify()` is called **before** `notfied().await`, then the next call to
/// `notified().await` will complete immediately, consuming the permit. Any
/// subsequent calls to `notified().await` will wait for a new permit.
///
/// If `notify()` is called **multiple** times before `notified().await`, only a
/// **single** permit is stored. The next call to `notified().await` will
/// complete immediately, but the one after will wait for a new permit.
///
/// # Examples
///
/// Basic usage.
///
/// ```
/// use tokio::sync::Notify;
/// use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
///
/// tokio::spawn(async move {
/// notify2.notified().await;
/// println!("received notification");
/// });
///
/// println!("sending notification");
/// notify.notify();
/// }
/// ```
///
/// Unbound mpsc channel.
///
/// ```
/// use tokio::sync::Notify;
///
/// use std::collections::VecDeque;
/// use std::sync::Mutex;
///
/// struct Channel<T> {
/// values: Mutex<VecDeque<T>>,
/// notify: Notify,
/// }
///
/// impl<T> Channel<T> {
/// pub fn send(&self, value: T) {
/// self.values.lock().unwrap()
/// .push_back(value);
///
/// // Notify the consumer a value is available
/// self.notify.notify();
/// }
///
/// pub async fn recv(&self) -> T {
/// loop {
/// // Drain values
/// if let Some(value) = self.values.lock().unwrap().pop_front() {
/// return value;
/// }
///
/// // Wait for values to be available
/// self.notify.notified().await;
/// }
/// }
/// }
/// ```
///
/// [park]: std::thread::park
/// [unpark]: std::thread::Thread::unpark
/// [`notified().await`]: Notify::notified()
/// [`notify()`]: Notify::notify()
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
state: AtomicU8,
waiters: Mutex<LinkedList<Waiter>>,
}
#[derive(Debug)]
struct Waiter {
/// Waiting task's waker
waker: Option<Waker>,
/// `true` if the notification has been assigned to this waiter.
notified: bool,
}
/// Future returned from `notified()`
#[derive(Debug)]
struct Notified<'a> {
/// The `Notify` being received on.
notify: &'a Notify,
/// The current state of the receiving process.
state: State,
/// Entry in the waiter `LinkedList`.
waiter: linked_list::Entry<Waiter>,
}
#[derive(Debug)]
enum State {
Init,
Waiting,
Done,
}
/// Initial "idle" state
const EMPTY: u8 = 0;
/// One or more threads are currently waiting to be notified.
const WAITING: u8 = 1;
/// Pending notification
const NOTIFIED: u8 = 2;
impl Notify {
/// Create a new `Notify`, initialized without a permit.
///
/// # Examples
///
/// ```
/// use tokio::sync::Notify;
///
/// let notify = Notify::new();
/// ```
pub fn new() -> Notify {
Notify {
state: AtomicU8::new(0),
waiters: Mutex::new(LinkedList::new()),
}
}
/// Wait for a notification.
///
/// Each `Notify` value holds a single permit. If a permit is available from
/// an earlier call to [`notify()`], then `notified().await` will complete
/// immediately, consuming that permit. Otherwise, `notified().await` waits
/// for a permit to be made available by the next call to `notify()`.
///
/// [`notify()`]: Notify::notify
///
/// # Examples
///
/// ```
/// use tokio::sync::Notify;
/// use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
///
/// tokio::spawn(async move {
/// notify2.notified().await;
/// println!("received notification");
/// });
///
/// println!("sending notification");
/// notify.notify();
/// }
/// ```
pub async fn notified(&self) {
Notified {
notify: self,
state: State::Init,
waiter: linked_list::Entry::new(Waiter {
waker: None,
notified: false,
}),
}
.await
}
/// Notifies a waiting task
///
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
/// [`notified().await`] will complete immediately consuming the permit made
/// available by this call to `notify()`.
///
/// At most one permit may be stored by `Notify`. Many sequential calls to
/// `notify` will result in a single permit being stored. The next call to
/// `notified().await` will complete immediately, but the one after that
/// will wait.
///
/// [`notified().await`]: Notify::notified()
///
/// # Examples
///
/// ```
/// use tokio::sync::Notify;
/// use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
///
/// tokio::spawn(async move {
/// notify2.notified().await;
/// println!("received notification");
/// });
///
/// println!("sending notification");
/// notify.notify();
/// }
/// ```
pub fn notify(&self) {
// Load the current state
let mut curr = self.state.load(SeqCst);
// If the state is `EMPTY`, transition to `NOTIFIED` and return.
while let EMPTY | NOTIFIED = curr {
// The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
// happens-before synchronization must happen between this atomic
// operation and a task calling `notified().await`.
let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst);
match res {
// No waiters, no further work to do
Ok(_) => return,
Err(actual) => {
curr = actual;
}
}
}
// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock().unwrap();
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
curr = self.state.load(SeqCst);
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
drop(waiters);
waker.wake();
}
}
}
impl Default for Notify {
fn default() -> Notify {
Notify::new()
}
}
fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -> Option<Waker> {
loop {
match curr {
EMPTY | NOTIFIED => {
let res = state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst);
match res {
Ok(_) => return None,
Err(actual) => {
assert!(actual == EMPTY || actual == NOTIFIED);
state.store(NOTIFIED, SeqCst);
return None;
}
}
}
WAITING => {
// At this point, it is guaranteed that the state will not
// concurrently change as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get a pending waiter
let mut waiter = waiters.pop_back().unwrap();
assert!(!waiter.notified);
waiter.notified = true;
let waker = waiter.waker.take();
if waiters.is_empty() {
// As this the **final** waiter in the list, the state
// must be transitioned to `EMPTY`. As transitioning
// **from** `WAITING` requires the lock to be held, a
// `store` is sufficient.
state.store(EMPTY, SeqCst);
}
return waker;
}
_ => unreachable!(),
}
}
}
// ===== impl Notified =====
impl Notified<'_> {
/// A custom `project` implementation is used in place of `pin-project-lite`
/// as a custom drop implementation is needed.
fn project(
self: Pin<&mut Self>,
) -> (&Notify, &mut State, Pin<&mut linked_list::Entry<Waiter>>) {
unsafe {
// Safety: both `notify` and `state` are `Unpin`.
is_unpin::<&Notify>();
is_unpin::<AtomicU8>();
let me = self.get_unchecked_mut();
(
&me.notify,
&mut me.state,
Pin::new_unchecked(&mut me.waiter),
)
}
}
}
impl Future for Notified<'_> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
use State::*;
let (notify, state, mut waiter) = self.project();
loop {
match *state {
Init => {
// Optimistically try acquiring a pending notification
let res = notify
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst);
if res.is_ok() {
// Acquired the notification
*state = Done;
return Poll::Ready(());
}
// Acquire the lock and attempt to transition to the waiting
// state.
let mut waiters = notify.waiters.lock().unwrap();
// Reload the state with the lock held
let mut curr = notify.state.load(SeqCst);
// Transition the state to WAITING.
loop {
match curr {
EMPTY => {
// Transition to WAITING
let res = notify
.state
.compare_exchange(EMPTY, WAITING, SeqCst, SeqCst);
if let Err(actual) = res {
assert_eq!(actual, NOTIFIED);
curr = actual;
} else {
break;
}
}
WAITING => break,
NOTIFIED => {
// Try consuming the notification
let res = notify
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst);
match res {
Ok(_) => {
// Acquired the notification
*state = Done;
return Poll::Ready(());
}
Err(actual) => {
assert_eq!(actual, EMPTY);
curr = actual;
}
}
}
_ => unreachable!(),
}
}
// Safety: called while locked.
unsafe {
(*waiter.as_mut().get()).waker = Some(cx.waker().clone());
// Insert the waiter into the linked list
waiters.push_front(waiter.as_mut());
}
*state = Waiting;
}
Waiting => {
// Currently in the "Waiting" state, implying the caller has
// a waiter stored in the waiter list (guarded by
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
let waiters = notify.waiters.lock().unwrap();
// Safety: called while locked
let w = unsafe { &mut *waiter.as_mut().get() };
if w.notified {
// Our waker has been notified. Reset the fields and
// remove it from the list.
w.waker = None;
w.notified = false;
*state = Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
}
return Poll::Pending;
}
// Explicit drop of the lock to indicate the scope that the
// lock is held. Because holding the lock is required to
// ensure safe access to fields not held within the lock, it
// is helpful to visualize the scope of the critical
// section.
drop(waiters);
}
Done => {
return Poll::Ready(());
}
}
}
}
}
impl Drop for Notified<'_> {
fn drop(&mut self) {
use State::*;
// Safety: The type only transitions to a "Waiting" state when pinned.
let (notify, state, mut waiter) = unsafe { Pin::new_unchecked(self).project() };
// This is where we ensure safety. The `Notified` value is being
// dropped, which means we must ensure that the waiter entry is no
// longer stored in the linked list.
if let Waiting = *state {
let mut notify_state = WAITING;
let mut waiters = notify.waiters.lock().unwrap();
// `Notify.state` may be in any of the three states (Empty, Waiting,
// Notified). It doesn't actually matter what the atomic is set to
// at this point. We hold the lock and will ensure the atomic is in
// the correct state once th elock is dropped.
//
// Because the atomic state is not checked, at first glance, it may
// seem like this routine does not handle the case where the
// receiver is notified but has not yet observed the notification.
// If this happens, no matter how many notifications happen between
// this receiver being notified and the receive future dropping, all
// we need to do is ensure that one notification is returned back to
// the `Notify`. This is done by calling `notify_locked` if `self`
// has the `notified` flag set.
// remove the entry from the list
//
// safety: the waiter is only added to `waiters` by virtue of it
// being the only `LinkedList` available to the type.
unsafe { waiters.remove(waiter.as_mut()) };
if waiters.is_empty() {
notify_state = EMPTY;
// If the state *should* be `NOTIFIED`, the call to
// `notify_locked` below will end up doing the
// `store(NOTIFIED)`. If a concurrent receiver races and
// observes the incorrect `EMPTY` state, it will then obtain the
// lock and block until `notify.state` is in the correct final
// state.
notify.state.store(EMPTY, SeqCst);
}
// See if the node was notified but not received. In this case, the
// notification must be sent to another waiter.
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
let notified = unsafe { (*waiter.as_mut().get()).notified };
if notified {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();
}
}
}
}
}
fn is_unpin<T: Unpin>() {}

View File

@ -0,0 +1,90 @@
use crate::sync::Notify;
use loom::future::block_on;
use loom::sync::Arc;
use loom::thread;
#[test]
fn notify_one() {
loom::model(|| {
let tx = Arc::new(Notify::new());
let rx = tx.clone();
let th = thread::spawn(move || {
block_on(async {
rx.notified().await;
});
});
tx.notify();
th.join().unwrap();
});
}
#[test]
fn notify_multi() {
loom::model(|| {
let notify = Arc::new(Notify::new());
let mut ths = vec![];
for _ in 0..2 {
let notify = notify.clone();
ths.push(thread::spawn(move || {
block_on(async {
notify.notified().await;
notify.notify();
})
}));
}
notify.notify();
for th in ths.drain(..) {
th.join().unwrap();
}
block_on(async {
notify.notified().await;
});
});
}
#[test]
fn notify_drop() {
use crate::future::poll_fn;
use std::future::Future;
use std::task::Poll;
loom::model(|| {
let notify = Arc::new(Notify::new());
let rx1 = notify.clone();
let rx2 = notify.clone();
let th1 = thread::spawn(move || {
let mut recv = Box::pin(rx1.notified());
block_on(poll_fn(|cx| {
if recv.as_mut().poll(cx).is_ready() {
rx1.notify();
}
Poll::Ready(())
}));
});
let th2 = thread::spawn(move || {
block_on(async {
rx2.notified().await;
// Trigger second notification
rx2.notify();
rx2.notified().await;
});
});
notify.notify();
th1.join().unwrap();
th2.join().unwrap();
});
}

View File

@ -8,6 +8,7 @@ cfg_loom! {
mod loom_broadcast;
mod loom_list;
mod loom_mpsc;
mod loom_notify;
mod loom_oneshot;
mod loom_semaphore_ll;
}

View File

@ -0,0 +1,478 @@
//! An intrusive double linked list of data
//!
//! The data structure supports tracking pinned nodes. Most of the data
//! structure's APIs are `unsafe` as they require the caller to ensure the
//! specified node is actually contained by the list.
use core::cell::UnsafeCell;
use core::marker::PhantomPinned;
use core::pin::Pin;
use core::ptr::NonNull;
/// An intrusive linked list of nodes, where each node carries associated data
/// of type `T`.
#[derive(Debug)]
pub(crate) struct LinkedList<T> {
head: Option<NonNull<Entry<T>>>,
tail: Option<NonNull<Entry<T>>>,
}
unsafe impl<T: Send> Send for LinkedList<T> {}
unsafe impl<T: Sync> Sync for LinkedList<T> {}
/// A node which carries data of type `T` and is stored in an intrusive list.
#[derive(Debug)]
pub(crate) struct Entry<T> {
/// The previous node in the list. null if there is no previous node.
prev: Option<NonNull<Entry<T>>>,
/// The next node in the list. null if there is no previous node.
next: Option<NonNull<Entry<T>>>,
/// The data which is associated to this list item
data: UnsafeCell<T>,
/// Prevents `Entry`s from being `Unpin`. They may never be moved, since
/// the list semantics require addresses to be stable.
_pin: PhantomPinned,
}
unsafe impl<T: Send> Send for Entry<T> {}
unsafe impl<T: Sync> Sync for Entry<T> {}
impl<T> LinkedList<T> {
/// Creates an empty linked list
pub(crate) fn new() -> Self {
LinkedList {
head: None,
tail: None,
}
}
/// Adds an item to the back of the linked list.
///
/// # Safety
///
/// The function is only safe as long as valid pointers are stored inside
/// the linked list.
pub(crate) unsafe fn push_front(&mut self, entry: Pin<&mut Entry<T>>) {
let mut entry: NonNull<Entry<T>> = entry.get_unchecked_mut().into();
entry.as_mut().next = self.head;
entry.as_mut().prev = None;
if let Some(head) = &mut self.head {
head.as_mut().prev = Some(entry);
}
self.head = Some(entry);
if self.tail.is_none() {
self.tail = Some(entry);
}
}
/// Removes the first element and returns it, or `None` if the list is empty.
///
/// The function is safe as the lifetime of the entry is bound to `&mut
/// self`.
pub(crate) fn pop_back(&mut self) -> Option<Pin<&mut T>> {
unsafe {
let mut last = self.tail?;
self.tail = last.as_ref().prev;
if let Some(mut prev) = last.as_mut().prev {
prev.as_mut().next = None;
} else {
self.head = None
}
last.as_mut().prev = None;
last.as_mut().next = None;
let val = &mut *last.as_mut().data.get();
Some(Pin::new_unchecked(val))
}
}
/// Returns whether the linked list doesn not contain any node
pub(crate) fn is_empty(&self) -> bool {
if self.head.is_some() {
return false;
}
assert!(self.tail.is_none());
true
}
/// Removes the given item from the linked list.
///
/// # Safety
///
/// The caller **must** ensure that `entry` is currently contained by
/// `self`.
pub(crate) unsafe fn remove(&mut self, entry: Pin<&mut Entry<T>>) -> bool {
let mut entry = NonNull::from(entry.get_unchecked_mut());
if let Some(mut prev) = entry.as_mut().prev {
debug_assert_eq!(prev.as_ref().next, Some(entry));
prev.as_mut().next = entry.as_ref().next;
} else {
if self.head != Some(entry) {
return false;
}
self.head = entry.as_ref().next;
}
if let Some(mut next) = entry.as_mut().next {
debug_assert_eq!(next.as_ref().prev, Some(entry));
next.as_mut().prev = entry.as_ref().prev;
} else {
// This might be the last item in the list
if self.tail != Some(entry) {
return false;
}
self.tail = entry.as_ref().prev;
}
entry.as_mut().next = None;
entry.as_mut().prev = None;
true
}
}
impl<T> Entry<T> {
/// Creates a new node with the associated data
pub(crate) fn new(data: T) -> Entry<T> {
Entry {
prev: None,
next: None,
data: UnsafeCell::new(data),
_pin: PhantomPinned,
}
}
/// Get a raw pointer to the inner data
pub(crate) fn get(&self) -> *mut T {
self.data.get()
}
}
#[cfg(test)]
#[cfg(not(loom))]
mod tests {
use super::*;
fn collect_list<T: Copy>(list: &mut LinkedList<T>) -> Vec<T> {
let mut ret = vec![];
while let Some(v) = list.pop_back() {
ret.push(*v);
}
ret
}
unsafe fn push_all(list: &mut LinkedList<i32>, entries: &mut [Pin<&mut Entry<i32>>]) {
for entry in entries.iter_mut() {
list.push_front(entry.as_mut());
}
}
macro_rules! assert_clean {
($e:ident) => {{
assert!($e.next.is_none());
assert!($e.prev.is_none());
}};
}
macro_rules! assert_ptr_eq {
($a:expr, $b:expr) => {{
// Deal with mapping a Pin<&mut T> -> Option<NonNull<T>>
assert_eq!(Some($a.as_mut().get_unchecked_mut().into()), $b)
}};
}
#[test]
fn push_and_drain() {
pin! {
let a = Entry::new(5);
let b = Entry::new(7);
let c = Entry::new(31);
}
let mut list = LinkedList::new();
assert!(list.is_empty());
unsafe {
list.push_front(a);
assert!(!list.is_empty());
list.push_front(b);
list.push_front(c);
}
let items: Vec<i32> = collect_list(&mut list);
assert_eq!([5, 7, 31].to_vec(), items);
assert!(list.is_empty());
}
#[test]
fn push_pop_push_pop() {
pin! {
let a = Entry::new(5);
let b = Entry::new(7);
}
let mut list = LinkedList::new();
unsafe {
list.push_front(a);
}
let v = list.pop_back().unwrap();
assert_eq!(5, *v);
assert!(list.is_empty());
unsafe {
list.push_front(b);
}
let v = list.pop_back().unwrap();
assert_eq!(7, *v);
assert!(list.is_empty());
assert!(list.pop_back().is_none());
}
#[test]
fn remove_by_address() {
pin! {
let a = Entry::new(5);
let b = Entry::new(7);
let c = Entry::new(31);
}
unsafe {
// Remove first
let mut list = LinkedList::new();
push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
assert!(list.remove(a.as_mut()));
assert_clean!(a);
// `a` should be no longer there and can't be removed twice
assert!(!list.remove(a.as_mut()));
assert!(!list.is_empty());
assert!(list.remove(b.as_mut()));
assert_clean!(b);
// `b` should be no longer there and can't be removed twice
assert!(!list.remove(b.as_mut()));
assert!(!list.is_empty());
assert!(list.remove(c.as_mut()));
assert_clean!(c);
// `b` should be no longer there and can't be removed twice
assert!(!list.remove(c.as_mut()));
assert!(list.is_empty());
}
unsafe {
// Remove middle
let mut list = LinkedList::new();
push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
assert!(list.remove(a.as_mut()));
assert_clean!(a);
assert_ptr_eq!(b, list.head);
assert_ptr_eq!(c, b.next);
assert_ptr_eq!(b, c.prev);
let items = collect_list(&mut list);
assert_eq!([31, 7].to_vec(), items);
}
unsafe {
// Remove middle
let mut list = LinkedList::new();
push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
assert!(list.remove(b.as_mut()));
assert_clean!(b);
assert_ptr_eq!(c, a.next);
assert_ptr_eq!(a, c.prev);
let items = collect_list(&mut list);
assert_eq!([31, 5].to_vec(), items);
}
unsafe {
// Remove last
// Remove middle
let mut list = LinkedList::new();
push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
assert!(list.remove(c.as_mut()));
assert_clean!(c);
assert!(b.next.is_none());
assert_ptr_eq!(b, list.tail);
let items = collect_list(&mut list);
assert_eq!([7, 5].to_vec(), items);
}
unsafe {
// Remove first of two
let mut list = LinkedList::new();
push_all(&mut list, &mut [b.as_mut(), a.as_mut()]);
assert!(list.remove(a.as_mut()));
assert_clean!(a);
// a should be no longer there and can't be removed twice
assert!(!list.remove(a.as_mut()));
assert_ptr_eq!(b, list.head);
assert_ptr_eq!(b, list.tail);
assert!(b.next.is_none());
assert!(b.prev.is_none());
let items = collect_list(&mut list);
assert_eq!([7].to_vec(), items);
}
unsafe {
// Remove last of two
let mut list = LinkedList::new();
push_all(&mut list, &mut [b.as_mut(), a.as_mut()]);
assert!(list.remove(b.as_mut()));
assert_clean!(b);
assert_ptr_eq!(a, list.head);
assert_ptr_eq!(a, list.tail);
assert!(a.next.is_none());
assert!(a.prev.is_none());
let items = collect_list(&mut list);
assert_eq!([5].to_vec(), items);
}
unsafe {
// Remove last item
let mut list = LinkedList::new();
push_all(&mut list, &mut [a.as_mut()]);
assert!(list.remove(a.as_mut()));
assert_clean!(a);
assert!(list.head.is_none());
assert!(list.tail.is_none());
let items = collect_list(&mut list);
assert!(items.is_empty());
}
unsafe {
// Remove missing
let mut list = LinkedList::new();
list.push_front(b.as_mut());
list.push_front(a.as_mut());
assert!(!list.remove(c.as_mut()));
}
}
proptest::proptest! {
#[test]
fn fuzz_linked_list(ops: Vec<usize>) {
run_fuzz(ops);
}
}
fn run_fuzz(ops: Vec<usize>) {
use std::collections::VecDeque;
#[derive(Debug)]
enum Op {
Push,
Pop,
Remove(usize),
}
let ops = ops
.iter()
.map(|i| match i % 3 {
0 => Op::Push,
1 => Op::Pop,
2 => Op::Remove(i / 3),
_ => unreachable!(),
})
.collect::<Vec<_>>();
let mut next = 0;
let mut ll = LinkedList::new();
let mut entries = VecDeque::new();
let mut reference = VecDeque::new();
for op in ops {
match op {
Op::Push => {
let v = next;
next += 1;
reference.push_front(v);
entries.push_front(Box::pin(Entry::new(v)));
unsafe {
ll.push_front(entries.front_mut().unwrap().as_mut());
}
}
Op::Pop => {
if reference.is_empty() {
assert!(ll.is_empty());
continue;
}
let v = reference.pop_back();
assert_eq!(v, ll.pop_back().map(|v| *v));
entries.pop_back();
}
Op::Remove(n) => {
if reference.is_empty() {
assert!(ll.is_empty());
continue;
}
let idx = n % reference.len();
unsafe {
assert!(ll.remove(entries[idx].as_mut()));
}
let v = reference.remove(idx).unwrap();
assert_eq!(v, unsafe { *entries[idx].get() });
entries.remove(idx);
}
}
}
}
}

View File

@ -3,6 +3,10 @@ cfg_io_driver! {
pub(crate) mod slab;
}
cfg_sync! {
pub(crate) mod linked_list;
}
#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]
mod rand;

102
tokio/tests/sync_notify.rs Normal file
View File

@ -0,0 +1,102 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::sync::Notify;
use tokio_test::task::spawn;
use tokio_test::*;
trait AssertSend: Send + Sync {}
impl AssertSend for Notify {}
#[test]
fn notify_notified_one() {
let notify = Notify::new();
let mut notified = spawn(async { notify.notified().await });
notify.notify();
assert_ready!(notified.poll());
}
#[test]
fn notified_one_notify() {
let notify = Notify::new();
let mut notified = spawn(async { notify.notified().await });
assert_pending!(notified.poll());
notify.notify();
assert!(notified.is_woken());
assert_ready!(notified.poll());
}
#[test]
fn notified_multi_notify() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
notify.notify();
assert!(notified1.is_woken());
assert!(!notified2.is_woken());
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());
}
#[test]
fn notify_notified_multi() {
let notify = Notify::new();
notify.notify();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());
notify.notify();
assert!(notified2.is_woken());
assert_ready!(notified2.poll());
}
#[test]
fn notified_drop_notified_notify() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
assert_pending!(notified1.poll());
drop(notified1);
assert_pending!(notified2.poll());
notify.notify();
assert!(notified2.is_woken());
assert_ready!(notified2.poll());
}
#[test]
fn notified_multi_notify_drop_one() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
notify.notify();
assert!(notified1.is_woken());
assert!(!notified2.is_woken());
drop(notified1);
assert!(notified2.is_woken());
assert_ready!(notified2.poll());
}