sync: fix watch wrapper (#3914)

This commit is contained in:
Alice Ryhl 2021-07-02 23:24:02 +02:00 committed by GitHub
parent 677107d8d9
commit 8170e2787c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 3 deletions

View File

@ -30,7 +30,7 @@ signal = ["tokio/signal"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }
[dev-dependencies]

View File

@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (result, rx) = ready!(self.inner.poll(cx));
let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
let received = (*rx.borrow()).clone();
let received = (*rx.borrow_and_update()).clone();
self.inner.set(make_future(rx));
Poll::Ready(Some(received))
}

View File

@ -0,0 +1,27 @@
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
#[tokio::test]
async fn message_not_twice() {
let (tx, rx) = watch::channel("hello");
let mut counter = 0;
let mut stream = WatchStream::new(rx).map(move |payload| {
println!("{}", payload);
if payload == "goodbye" {
counter += 1;
}
if counter >= 2 {
panic!("too many goodbyes");
}
});
let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
// Send goodbye just once
tx.send("goodbye").unwrap();
drop(tx);
task.await.unwrap();
}