mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
sync: add a has_changed
method to watch::Receiver
(#4342)
This commit is contained in:
parent
c301f6d83a
commit
12dd06336d
@ -318,6 +318,48 @@ impl<T> Receiver<T> {
|
||||
Ref { inner }
|
||||
}
|
||||
|
||||
/// Checks if this channel contains a message that this receiver has not yet
|
||||
/// seen. The new value is not marked as seen.
|
||||
///
|
||||
/// Although this method is called `has_changed`, it does not check new
|
||||
/// messages for equality, so this call will return true even if the new
|
||||
/// message is equal to the old message.
|
||||
///
|
||||
/// Returns an error if the channel has been closed.
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::sync::watch;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let (tx, mut rx) = watch::channel("hello");
|
||||
///
|
||||
/// tx.send("goodbye").unwrap();
|
||||
///
|
||||
/// assert!(rx.has_changed().unwrap());
|
||||
/// assert_eq!(*rx.borrow_and_update(), "goodbye");
|
||||
///
|
||||
/// // The value has been marked as seen
|
||||
/// assert!(!rx.has_changed().unwrap());
|
||||
///
|
||||
/// drop(tx);
|
||||
/// // The `tx` handle has been dropped
|
||||
/// assert!(rx.has_changed().is_err());
|
||||
/// }
|
||||
/// ```
|
||||
pub fn has_changed(&self) -> Result<bool, error::RecvError> {
|
||||
// Load the version from the state
|
||||
let state = self.shared.state.load();
|
||||
if state.is_closed() {
|
||||
// The sender has dropped.
|
||||
return Err(error::RecvError(()));
|
||||
}
|
||||
let new_version = state.version();
|
||||
|
||||
Ok(self.version != new_version)
|
||||
}
|
||||
|
||||
/// Waits for a change notification, then marks the newest value as seen.
|
||||
///
|
||||
/// If the newest value in the channel has not yet been marked seen when
|
||||
|
@ -174,17 +174,24 @@ fn poll_close() {
|
||||
fn borrow_and_update() {
|
||||
let (tx, mut rx) = watch::channel("one");
|
||||
|
||||
assert!(!rx.has_changed().unwrap());
|
||||
|
||||
tx.send("two").unwrap();
|
||||
assert!(rx.has_changed().unwrap());
|
||||
assert_ready!(spawn(rx.changed()).poll()).unwrap();
|
||||
assert_pending!(spawn(rx.changed()).poll());
|
||||
assert!(!rx.has_changed().unwrap());
|
||||
|
||||
tx.send("three").unwrap();
|
||||
assert!(rx.has_changed().unwrap());
|
||||
assert_eq!(*rx.borrow_and_update(), "three");
|
||||
assert_pending!(spawn(rx.changed()).poll());
|
||||
assert!(!rx.has_changed().unwrap());
|
||||
|
||||
drop(tx);
|
||||
assert_eq!(*rx.borrow_and_update(), "three");
|
||||
assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
|
||||
assert!(rx.has_changed().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Loading…
x
Reference in New Issue
Block a user