mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
sync: Add watch, a single value broadcast channel (#922)
A single-producer, multi-consumer channel that only retains the _last_ sent value. Values are broadcasted out. This channel is useful for watching for changes to a value from multiple points in the code base (for example, changes to a configuration value).
This commit is contained in:
parent
7039f02bb2
commit
70f4fc481c
@ -9,5 +9,7 @@
|
|||||||
//! from one task to another.
|
//! from one task to another.
|
||||||
//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for
|
//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for
|
||||||
//! sending values between tasks.
|
//! sending values between tasks.
|
||||||
|
//! - [watch](watch/index.html), a single-producer, multi-consumer channel that
|
||||||
|
//! only stores the **most recently** sent value.
|
||||||
|
|
||||||
pub use tokio_sync::{mpsc, oneshot};
|
pub use tokio_sync::{mpsc, oneshot, watch};
|
||||||
|
@ -19,6 +19,7 @@ Synchronization utilities.
|
|||||||
categories = ["asynchronous"]
|
categories = ["asynchronous"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
fnv = "1.0.6"
|
||||||
futures = "0.1.19"
|
futures = "0.1.19"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -6,6 +6,8 @@
|
|||||||
//!
|
//!
|
||||||
//! This crate provides primitives for synchronizing asynchronous tasks.
|
//! This crate provides primitives for synchronizing asynchronous tasks.
|
||||||
|
|
||||||
|
extern crate fnv;
|
||||||
|
#[macro_use]
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
|
|
||||||
macro_rules! debug {
|
macro_rules! debug {
|
||||||
@ -27,3 +29,4 @@ pub mod mpsc;
|
|||||||
pub mod oneshot;
|
pub mod oneshot;
|
||||||
pub mod semaphore;
|
pub mod semaphore;
|
||||||
pub mod task;
|
pub mod task;
|
||||||
|
pub mod watch;
|
||||||
|
404
tokio-sync/src/watch.rs
Normal file
404
tokio-sync/src/watch.rs
Normal file
@ -0,0 +1,404 @@
|
|||||||
|
//! A single-producer, multi-consumer channel that only retains the *last* sent
|
||||||
|
//! value.
|
||||||
|
//!
|
||||||
|
//! This channel is useful for watching for changes to a value from multiple
|
||||||
|
//! points in the code base, for example, changes to configuration values.
|
||||||
|
//!
|
||||||
|
//! # Usage
|
||||||
|
//!
|
||||||
|
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
|
||||||
|
//! and sender halves of the channel. The channel is created with an initial
|
||||||
|
//! value. `Receiver::poll` will always be ready upon creation and will yield
|
||||||
|
//! either this initial value or the latest value that has been sent by
|
||||||
|
//! `Sender`.
|
||||||
|
//!
|
||||||
|
//! Calls to [`Receiver::poll`] and [`Receiver::poll_ref`] will always yield
|
||||||
|
//! the latest value.
|
||||||
|
//!
|
||||||
|
//! # Examples
|
||||||
|
//!
|
||||||
|
//! ```
|
||||||
|
//! # extern crate futures;
|
||||||
|
//! extern crate tokio;
|
||||||
|
//!
|
||||||
|
//! use tokio::prelude::*;
|
||||||
|
//! use tokio::sync::watch;
|
||||||
|
//!
|
||||||
|
//! # tokio::run(futures::future::lazy(|| {
|
||||||
|
//! let (mut tx, rx) = watch::channel("hello");
|
||||||
|
//!
|
||||||
|
//! tokio::spawn(rx.for_each(|value| {
|
||||||
|
//! println!("received = {:?}", value);
|
||||||
|
//! Ok(())
|
||||||
|
//! }).map_err(|_| ()));
|
||||||
|
//!
|
||||||
|
//! tx.broadcast("world").unwrap();
|
||||||
|
//! # Ok(())
|
||||||
|
//! # }));
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! # Closing
|
||||||
|
//!
|
||||||
|
//! [`Sender::poll_close`] allows the producer to detect when all [`Sender`]
|
||||||
|
//! handles have been dropped. This indicates that there is no further interest
|
||||||
|
//! in the values being produced and work can be stopped.
|
||||||
|
//!
|
||||||
|
//! # Thread safety
|
||||||
|
//!
|
||||||
|
//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
|
||||||
|
//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
|
||||||
|
//! handles may be moved to separate threads and also used concurrently.
|
||||||
|
//!
|
||||||
|
//! [`Sender`]: struct.Sender.html
|
||||||
|
//! [`Receiver`]: struct.Receiver.html
|
||||||
|
//! [`channel`]: fn.channel.html
|
||||||
|
//! [`Sender::poll_close`]: struct.Sender.html#method.poll_close
|
||||||
|
//! [`Receiver::poll`]: struct.Receiver.html#method.poll
|
||||||
|
//! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref
|
||||||
|
|
||||||
|
use fnv::FnvHashMap;
|
||||||
|
use futures::task::AtomicTask;
|
||||||
|
use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
|
||||||
|
|
||||||
|
use std::ops;
|
||||||
|
use std::sync::atomic::AtomicUsize;
|
||||||
|
use std::sync::atomic::Ordering::SeqCst;
|
||||||
|
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
|
||||||
|
|
||||||
|
/// Receives values from the associated `Sender`.
|
||||||
|
///
|
||||||
|
/// Instances are created by the [`channel`](fn.channel.html) function.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Receiver<T> {
|
||||||
|
/// Pointer to the shared state
|
||||||
|
shared: Arc<Shared<T>>,
|
||||||
|
|
||||||
|
/// Pointer to the watcher's internal state
|
||||||
|
inner: Arc<WatchInner>,
|
||||||
|
|
||||||
|
/// Watcher ID.
|
||||||
|
id: u64,
|
||||||
|
|
||||||
|
/// Last observed version
|
||||||
|
ver: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends values to the associated `Receiver`.
|
||||||
|
///
|
||||||
|
/// Instances are created by the [`channel`](fn.channel.html) function.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Sender<T> {
|
||||||
|
shared: Weak<Shared<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a reference to the inner value
|
||||||
|
///
|
||||||
|
/// Outstanding borrows hold a read lock on the inner value. This means that
|
||||||
|
/// long lived borrows could cause the produce half to block. It is recommended
|
||||||
|
/// to keep the borrow as short lived as possible.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Ref<'a, T: 'a> {
|
||||||
|
inner: RwLockReadGuard<'a, T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod error {
|
||||||
|
//! Watch error types
|
||||||
|
|
||||||
|
/// Error produced when receiving a value fails.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RecvError {
|
||||||
|
pub(crate) _p: (),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Error produced when sending a value fails.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SendError<T> {
|
||||||
|
pub(crate) inner: T,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Shared<T> {
|
||||||
|
/// The most recent value
|
||||||
|
value: RwLock<T>,
|
||||||
|
|
||||||
|
/// The current version
|
||||||
|
///
|
||||||
|
/// The lowest bit represents a "closed" state. The rest of the bits
|
||||||
|
/// represent the current version.
|
||||||
|
version: AtomicUsize,
|
||||||
|
|
||||||
|
/// All watchers
|
||||||
|
watchers: Mutex<Watchers>,
|
||||||
|
|
||||||
|
/// Task to notify when all watchers drop
|
||||||
|
cancel: AtomicTask,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Watchers {
|
||||||
|
next_id: u64,
|
||||||
|
watchers: FnvHashMap<u64, Arc<WatchInner>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct WatchInner {
|
||||||
|
task: AtomicTask,
|
||||||
|
}
|
||||||
|
|
||||||
|
const CLOSED: usize = 1;
|
||||||
|
|
||||||
|
/// Create a new watch channel, returning the "send" and "receive" handles.
|
||||||
|
///
|
||||||
|
/// All values sent by `Sender` will become visible to the `Receiver` handles.
|
||||||
|
/// Only the last value sent is made available to the `Receiver` half. All
|
||||||
|
/// intermediate values are dropped.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # extern crate futures;
|
||||||
|
/// extern crate tokio;
|
||||||
|
///
|
||||||
|
/// use tokio::prelude::*;
|
||||||
|
/// use tokio::sync::watch;
|
||||||
|
///
|
||||||
|
/// # tokio::run(futures::future::lazy(|| {
|
||||||
|
/// let (mut tx, rx) = watch::channel("hello");
|
||||||
|
///
|
||||||
|
/// tokio::spawn(rx.for_each(|value| {
|
||||||
|
/// println!("received = {:?}", value);
|
||||||
|
/// Ok(())
|
||||||
|
/// }).map_err(|_| ()));
|
||||||
|
///
|
||||||
|
/// tx.broadcast("world").unwrap();
|
||||||
|
/// # Ok(())
|
||||||
|
/// # }));
|
||||||
|
/// ```
|
||||||
|
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
|
||||||
|
const INIT_ID: u64 = 0;
|
||||||
|
|
||||||
|
let inner = Arc::new(WatchInner::new());
|
||||||
|
|
||||||
|
// Insert the watcher
|
||||||
|
let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default());
|
||||||
|
watchers.insert(INIT_ID, inner.clone());
|
||||||
|
|
||||||
|
let shared = Arc::new(Shared {
|
||||||
|
value: RwLock::new(init),
|
||||||
|
version: AtomicUsize::new(2),
|
||||||
|
watchers: Mutex::new(Watchers {
|
||||||
|
next_id: INIT_ID + 1,
|
||||||
|
watchers,
|
||||||
|
}),
|
||||||
|
cancel: AtomicTask::new(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let tx = Sender {
|
||||||
|
shared: Arc::downgrade(&shared),
|
||||||
|
};
|
||||||
|
|
||||||
|
let rx = Receiver {
|
||||||
|
shared,
|
||||||
|
inner,
|
||||||
|
id: INIT_ID,
|
||||||
|
ver: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
(tx, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Receiver<T> {
|
||||||
|
/// Returns a reference to the most recently sent value
|
||||||
|
///
|
||||||
|
/// Outstanding borrows hold a read lock. This means that long lived borrows
|
||||||
|
/// could cause the send half to block. It is recommended to keep the borrow
|
||||||
|
/// as short lived as possible.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # extern crate tokio;
|
||||||
|
/// # use tokio::sync::watch;
|
||||||
|
/// let (_, rx) = watch::channel("hello");
|
||||||
|
/// assert_eq!(*rx.get_ref(), "hello");
|
||||||
|
/// ```
|
||||||
|
pub fn get_ref(&self) -> Ref<T> {
|
||||||
|
let inner = self.shared.value.read().unwrap();
|
||||||
|
Ref { inner }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to receive the latest value sent via the channel.
|
||||||
|
///
|
||||||
|
/// If a new, unobserved, value has been sent, a reference to it is
|
||||||
|
/// returned. If no new value has been sent, then `NotReady` is returned and
|
||||||
|
/// the current task is notified once a new value is sent.
|
||||||
|
///
|
||||||
|
/// Only the **most recent** value is returned. If the receiver is falling
|
||||||
|
/// behind the sender, intermediate values are dropped.
|
||||||
|
pub fn poll_ref(&mut self) -> Poll<Option<Ref<T>>, error::RecvError> {
|
||||||
|
// Make sure the task is up to date
|
||||||
|
self.inner.task.register();
|
||||||
|
|
||||||
|
let state = self.shared.version.load(SeqCst);
|
||||||
|
let version = state & !CLOSED;
|
||||||
|
|
||||||
|
if version != self.ver {
|
||||||
|
// Track the latest version
|
||||||
|
self.ver = version;
|
||||||
|
|
||||||
|
let inner = self.shared.value.read().unwrap();
|
||||||
|
|
||||||
|
return Ok(Some(Ref { inner }).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
if CLOSED == state & CLOSED {
|
||||||
|
// The `Store` handle has been dropped.
|
||||||
|
return Ok(None.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Clone> Stream for Receiver<T> {
|
||||||
|
type Item = T;
|
||||||
|
type Error = error::RecvError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<T>, error::RecvError> {
|
||||||
|
let item = try_ready!(self.poll_ref());
|
||||||
|
Ok(Async::Ready(item.map(|v_ref| v_ref.clone())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for Receiver<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
let inner = Arc::new(WatchInner::new());
|
||||||
|
let shared = self.shared.clone();
|
||||||
|
|
||||||
|
let id = {
|
||||||
|
let mut watchers = shared.watchers.lock().unwrap();
|
||||||
|
let id = watchers.next_id;
|
||||||
|
|
||||||
|
watchers.next_id += 1;
|
||||||
|
watchers.watchers.insert(id, inner.clone());
|
||||||
|
|
||||||
|
id
|
||||||
|
};
|
||||||
|
|
||||||
|
let ver = self.ver;
|
||||||
|
|
||||||
|
Receiver {
|
||||||
|
shared: shared,
|
||||||
|
inner,
|
||||||
|
id,
|
||||||
|
ver,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for Receiver<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let mut watchers = self.shared.watchers.lock().unwrap();
|
||||||
|
watchers.watchers.remove(&self.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WatchInner {
|
||||||
|
fn new() -> Self {
|
||||||
|
WatchInner {
|
||||||
|
task: AtomicTask::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Sender<T> {
|
||||||
|
/// Broadcast a new value via the channel, notifying all receivers.
|
||||||
|
pub fn broadcast(&mut self, value: T) -> Result<(), error::SendError<T>> {
|
||||||
|
let shared = match self.shared.upgrade() {
|
||||||
|
Some(shared) => shared,
|
||||||
|
// All `Watch` handles have been canceled
|
||||||
|
None => return Err(error::SendError { inner: value }),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Replace the value
|
||||||
|
{
|
||||||
|
let mut lock = shared.value.write().unwrap();
|
||||||
|
*lock = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the version. 2 is used so that the CLOSED bit is not set.
|
||||||
|
shared.version.fetch_add(2, SeqCst);
|
||||||
|
|
||||||
|
// Notify all watchers
|
||||||
|
notify_all(&*shared);
|
||||||
|
|
||||||
|
// Return the old value
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `Ready` when all receivers have dropped.
|
||||||
|
///
|
||||||
|
/// This allows the producer to get notified when interest in the produced
|
||||||
|
/// values is canceled and immediately stop doing work.
|
||||||
|
pub fn poll_close(&mut self) -> Poll<(), ()> {
|
||||||
|
match self.shared.upgrade() {
|
||||||
|
Some(shared) => {
|
||||||
|
shared.cancel.register();
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
None => Ok(Async::Ready(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Sink for Sender<T> {
|
||||||
|
type SinkItem = T;
|
||||||
|
type SinkError = error::SendError<T>;
|
||||||
|
|
||||||
|
fn start_send(&mut self, item: T) -> StartSend<T, error::SendError<T>> {
|
||||||
|
let _ = self.broadcast(item)?;
|
||||||
|
Ok(AsyncSink::Ready)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_complete(&mut self) -> Poll<(), error::SendError<T>> {
|
||||||
|
Ok(().into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notify all watchers of a change
|
||||||
|
fn notify_all<T>(shared: &Shared<T>) {
|
||||||
|
let watchers = shared.watchers.lock().unwrap();
|
||||||
|
|
||||||
|
for watcher in watchers.watchers.values() {
|
||||||
|
// Notify the task
|
||||||
|
watcher.task.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for Sender<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(shared) = self.shared.upgrade() {
|
||||||
|
shared.version.fetch_or(CLOSED, SeqCst);
|
||||||
|
notify_all(&*shared);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Ref =====
|
||||||
|
|
||||||
|
impl<'a, T: 'a> ops::Deref for Ref<'a, T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &T {
|
||||||
|
self.inner.deref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Shared =====
|
||||||
|
|
||||||
|
impl<T> Drop for Shared<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.cancel.notify();
|
||||||
|
}
|
||||||
|
}
|
234
tokio-sync/tests/watch.rs
Normal file
234
tokio-sync/tests/watch.rs
Normal file
@ -0,0 +1,234 @@
|
|||||||
|
extern crate futures;
|
||||||
|
extern crate tokio_mock_task;
|
||||||
|
extern crate tokio_sync;
|
||||||
|
|
||||||
|
use tokio_mock_task::*;
|
||||||
|
use tokio_sync::watch;
|
||||||
|
|
||||||
|
macro_rules! assert_ready {
|
||||||
|
($e:expr) => {{
|
||||||
|
match $e {
|
||||||
|
Ok(futures::Async::Ready(v)) => v,
|
||||||
|
Ok(_) => panic!("not ready"),
|
||||||
|
Err(e) => panic!("error = {:?}", e),
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! assert_not_ready {
|
||||||
|
($e:expr) => {{
|
||||||
|
match $e {
|
||||||
|
Ok(futures::Async::NotReady) => {}
|
||||||
|
Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v),
|
||||||
|
Err(e) => panic!("error = {:?}", e),
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn single_rx() {
|
||||||
|
let (mut tx, mut rx) = watch::channel("one");
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let v = assert_ready!(rx.poll_ref()).unwrap();
|
||||||
|
assert_eq!(*v, "one");
|
||||||
|
});
|
||||||
|
|
||||||
|
task.enter(|| assert_not_ready!(rx.poll_ref()));
|
||||||
|
|
||||||
|
assert!(!task.is_notified());
|
||||||
|
|
||||||
|
tx.broadcast("two").unwrap();
|
||||||
|
|
||||||
|
assert!(task.is_notified());
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let v = assert_ready!(rx.poll_ref()).unwrap();
|
||||||
|
assert_eq!(*v, "two");
|
||||||
|
});
|
||||||
|
|
||||||
|
task.enter(|| assert_not_ready!(rx.poll_ref()));
|
||||||
|
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
assert!(task.is_notified());
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let res = assert_ready!(rx.poll_ref());
|
||||||
|
assert!(res.is_none());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stream_impl() {
|
||||||
|
use futures::Stream;
|
||||||
|
|
||||||
|
let (mut tx, mut rx) = watch::channel("one");
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let v = assert_ready!(rx.poll()).unwrap();
|
||||||
|
assert_eq!(v, "one");
|
||||||
|
});
|
||||||
|
|
||||||
|
task.enter(|| assert_not_ready!(rx.poll()));
|
||||||
|
|
||||||
|
assert!(!task.is_notified());
|
||||||
|
|
||||||
|
tx.broadcast("two").unwrap();
|
||||||
|
|
||||||
|
assert!(task.is_notified());
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let v = assert_ready!(rx.poll()).unwrap();
|
||||||
|
assert_eq!(v, "two");
|
||||||
|
});
|
||||||
|
|
||||||
|
task.enter(|| assert_not_ready!(rx.poll()));
|
||||||
|
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
assert!(task.is_notified());
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let res = assert_ready!(rx.poll());
|
||||||
|
assert!(res.is_none());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn multi_rx() {
|
||||||
|
let (mut tx, mut rx1) = watch::channel("one");
|
||||||
|
let mut rx2 = rx1.clone();
|
||||||
|
|
||||||
|
let mut task1 = MockTask::new();
|
||||||
|
let mut task2 = MockTask::new();
|
||||||
|
|
||||||
|
task1.enter(|| {
|
||||||
|
let res = assert_ready!(rx1.poll_ref());
|
||||||
|
assert_eq!(*res.unwrap(), "one");
|
||||||
|
});
|
||||||
|
|
||||||
|
task2.enter(|| {
|
||||||
|
let res = assert_ready!(rx2.poll_ref());
|
||||||
|
assert_eq!(*res.unwrap(), "one");
|
||||||
|
});
|
||||||
|
|
||||||
|
tx.broadcast("two").unwrap();
|
||||||
|
|
||||||
|
assert!(task1.is_notified());
|
||||||
|
assert!(task2.is_notified());
|
||||||
|
|
||||||
|
task1.enter(|| {
|
||||||
|
let res = assert_ready!(rx1.poll_ref());
|
||||||
|
assert_eq!(*res.unwrap(), "two");
|
||||||
|
});
|
||||||
|
|
||||||
|
tx.broadcast("three").unwrap();
|
||||||
|
|
||||||
|
assert!(task1.is_notified());
|
||||||
|
assert!(task2.is_notified());
|
||||||
|
|
||||||
|
task1.enter(|| {
|
||||||
|
let res = assert_ready!(rx1.poll_ref());
|
||||||
|
assert_eq!(*res.unwrap(), "three");
|
||||||
|
});
|
||||||
|
|
||||||
|
task2.enter(|| {
|
||||||
|
let res = assert_ready!(rx2.poll_ref());
|
||||||
|
assert_eq!(*res.unwrap(), "three");
|
||||||
|
});
|
||||||
|
|
||||||
|
tx.broadcast("four").unwrap();
|
||||||
|
|
||||||
|
task1.enter(|| {
|
||||||
|
let res = assert_ready!(rx1.poll_ref());
|
||||||
|
assert_eq!(*res.unwrap(), "four");
|
||||||
|
});
|
||||||
|
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
task1.enter(|| {
|
||||||
|
let res = assert_ready!(rx1.poll_ref());
|
||||||
|
assert!(res.is_none());
|
||||||
|
});
|
||||||
|
|
||||||
|
task2.enter(|| {
|
||||||
|
let res = assert_ready!(rx2.poll_ref());
|
||||||
|
assert_eq!(*res.unwrap(), "four");
|
||||||
|
});
|
||||||
|
|
||||||
|
task2.enter(|| {
|
||||||
|
let res = assert_ready!(rx2.poll_ref());
|
||||||
|
assert!(res.is_none());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rx_observes_final_value() {
|
||||||
|
// Initial value
|
||||||
|
|
||||||
|
let (tx, mut rx) = watch::channel("one");
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let res = assert_ready!(rx.poll_ref());
|
||||||
|
assert!(res.is_some());
|
||||||
|
assert_eq!(*res.unwrap(), "one");
|
||||||
|
});
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let res = assert_ready!(rx.poll_ref());
|
||||||
|
assert!(res.is_none());
|
||||||
|
});
|
||||||
|
|
||||||
|
// Sending a value
|
||||||
|
|
||||||
|
let (mut tx, mut rx) = watch::channel("one");
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
tx.broadcast("two").unwrap();
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let res = assert_ready!(rx.poll_ref());
|
||||||
|
assert!(res.is_some());
|
||||||
|
assert_eq!(*res.unwrap(), "two");
|
||||||
|
});
|
||||||
|
|
||||||
|
task.enter(|| assert_not_ready!(rx.poll_ref()));
|
||||||
|
|
||||||
|
tx.broadcast("three").unwrap();
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
assert!(task.is_notified());
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let res = assert_ready!(rx.poll_ref());
|
||||||
|
assert!(res.is_some());
|
||||||
|
assert_eq!(*res.unwrap(), "three");
|
||||||
|
});
|
||||||
|
|
||||||
|
task.enter(|| {
|
||||||
|
let res = assert_ready!(rx.poll_ref());
|
||||||
|
assert!(res.is_none());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_close() {
|
||||||
|
let (mut tx, rx) = watch::channel("one");
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
task.enter(|| assert_not_ready!(tx.poll_close()));
|
||||||
|
|
||||||
|
drop(rx);
|
||||||
|
|
||||||
|
assert!(task.is_notified());
|
||||||
|
|
||||||
|
task.enter(|| assert_ready!(tx.poll_close()));
|
||||||
|
|
||||||
|
assert!(tx.broadcast("two").is_err());
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user