mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
parent
827077409c
commit
5a1a6dc90c
@ -96,7 +96,7 @@ impl Barrier {
|
|||||||
// wake everyone, increment the generation, and return
|
// wake everyone, increment the generation, and return
|
||||||
state
|
state
|
||||||
.waker
|
.waker
|
||||||
.broadcast(state.generation)
|
.send(state.generation)
|
||||||
.expect("there is at least one receiver");
|
.expect("there is at least one receiver");
|
||||||
state.arrived = 0;
|
state.arrived = 0;
|
||||||
state.generation += 1;
|
state.generation += 1;
|
||||||
@ -112,7 +112,7 @@ impl Barrier {
|
|||||||
loop {
|
loop {
|
||||||
// note that the first time through the loop, this _will_ yield a generation
|
// note that the first time through the loop, this _will_ yield a generation
|
||||||
// immediately, since we cloned a receiver that has never seen any values.
|
// immediately, since we cloned a receiver that has never seen any values.
|
||||||
if wait.recv().await.expect("sender hasn't been closed") >= generation {
|
if wait.recv().await >= generation {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -330,7 +330,7 @@
|
|||||||
//! // If the configuration changed, send the new config value
|
//! // If the configuration changed, send the new config value
|
||||||
//! // on the watch channel.
|
//! // on the watch channel.
|
||||||
//! if new_config != config {
|
//! if new_config != config {
|
||||||
//! tx.broadcast(new_config.clone()).unwrap();
|
//! tx.send(new_config.clone()).unwrap();
|
||||||
//! config = new_config;
|
//! config = new_config;
|
||||||
//! }
|
//! }
|
||||||
//! }
|
//! }
|
||||||
@ -358,7 +358,7 @@
|
|||||||
//! // Receive the **initial** configuration value. As this is the
|
//! // Receive the **initial** configuration value. As this is the
|
||||||
//! // first time the config is received from the watch, it will
|
//! // first time the config is received from the watch, it will
|
||||||
//! // always complete immediatedly.
|
//! // always complete immediatedly.
|
||||||
//! let mut conf = rx.recv().await.unwrap();
|
//! let mut conf = rx.recv().await;
|
||||||
//!
|
//!
|
||||||
//! let mut op_start = Instant::now();
|
//! let mut op_start = Instant::now();
|
||||||
//! let mut delay = time::delay_until(op_start + conf.timeout);
|
//! let mut delay = time::delay_until(op_start + conf.timeout);
|
||||||
@ -376,7 +376,7 @@
|
|||||||
//! delay = time::delay_until(op_start + conf.timeout);
|
//! delay = time::delay_until(op_start + conf.timeout);
|
||||||
//! }
|
//! }
|
||||||
//! new_conf = rx.recv() => {
|
//! new_conf = rx.recv() => {
|
||||||
//! conf = new_conf.unwrap();
|
//! conf = new_conf;
|
||||||
//!
|
//!
|
||||||
//! // The configuration has been updated. Update the
|
//! // The configuration has been updated. Update the
|
||||||
//! // `delay` using the new `timeout` value.
|
//! // `delay` using the new `timeout` value.
|
||||||
|
@ -23,12 +23,12 @@
|
|||||||
//! let (tx, mut rx) = watch::channel("hello");
|
//! let (tx, mut rx) = watch::channel("hello");
|
||||||
//!
|
//!
|
||||||
//! tokio::spawn(async move {
|
//! tokio::spawn(async move {
|
||||||
//! while let Some(value) = rx.recv().await {
|
//! while let Some(value) = Some(rx.recv().await) {
|
||||||
//! println!("received = {:?}", value);
|
//! println!("received = {:?}", value);
|
||||||
//! }
|
//! }
|
||||||
//! });
|
//! });
|
||||||
//!
|
//!
|
||||||
//! tx.broadcast("world")?;
|
//! tx.send("world")?;
|
||||||
//! # Ok(())
|
//! # Ok(())
|
||||||
//! # }
|
//! # }
|
||||||
//! ```
|
//! ```
|
||||||
@ -162,12 +162,12 @@ const CLOSED: usize = 1;
|
|||||||
/// let (tx, mut rx) = watch::channel("hello");
|
/// let (tx, mut rx) = watch::channel("hello");
|
||||||
///
|
///
|
||||||
/// tokio::spawn(async move {
|
/// tokio::spawn(async move {
|
||||||
/// while let Some(value) = rx.recv().await {
|
/// while let Some(value) = Some(rx.recv().await) {
|
||||||
/// println!("received = {:?}", value);
|
/// println!("received = {:?}", value);
|
||||||
/// }
|
/// }
|
||||||
/// });
|
/// });
|
||||||
///
|
///
|
||||||
/// tx.broadcast("world")?;
|
/// tx.send("world")?;
|
||||||
/// # Ok(())
|
/// # Ok(())
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
@ -223,7 +223,7 @@ impl<T> Receiver<T> {
|
|||||||
|
|
||||||
// TODO: document
|
// TODO: document
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
|
pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Ref<'a, T>> {
|
||||||
// Make sure the task is up to date
|
// Make sure the task is up to date
|
||||||
self.inner.waker.register_by_ref(cx.waker());
|
self.inner.waker.register_by_ref(cx.waker());
|
||||||
|
|
||||||
@ -233,12 +233,14 @@ impl<T> Receiver<T> {
|
|||||||
if self.inner.version.swap(version, Relaxed) != version {
|
if self.inner.version.swap(version, Relaxed) != version {
|
||||||
let inner = self.shared.value.read().unwrap();
|
let inner = self.shared.value.read().unwrap();
|
||||||
|
|
||||||
return Ready(Some(Ref { inner }));
|
return Ready(Ref { inner });
|
||||||
}
|
}
|
||||||
|
|
||||||
if CLOSED == state & CLOSED {
|
if CLOSED == state & CLOSED {
|
||||||
// The `Store` handle has been dropped.
|
// The `Store` handle has been dropped.
|
||||||
return Ready(None);
|
let inner = self.shared.value.read().unwrap();
|
||||||
|
|
||||||
|
return Ready(Ref { inner });
|
||||||
}
|
}
|
||||||
|
|
||||||
Pending
|
Pending
|
||||||
@ -264,25 +266,25 @@ impl<T: Clone> Receiver<T> {
|
|||||||
/// async fn main() {
|
/// async fn main() {
|
||||||
/// let (tx, mut rx) = watch::channel("hello");
|
/// let (tx, mut rx) = watch::channel("hello");
|
||||||
///
|
///
|
||||||
/// let v = rx.recv().await.unwrap();
|
/// let v = rx.recv().await;
|
||||||
/// assert_eq!(v, "hello");
|
/// assert_eq!(v, "hello");
|
||||||
///
|
///
|
||||||
/// tokio::spawn(async move {
|
/// tokio::spawn(async move {
|
||||||
/// tx.broadcast("goodbye").unwrap();
|
/// tx.send("goodbye").unwrap();
|
||||||
/// });
|
/// });
|
||||||
///
|
///
|
||||||
/// // Waits for the new task to spawn and send the value.
|
/// // Waits for the new task to spawn and send the value.
|
||||||
/// let v = rx.recv().await.unwrap();
|
/// let v = rx.recv().await;
|
||||||
/// assert_eq!(v, "goodbye");
|
/// assert_eq!(v, "goodbye");
|
||||||
///
|
///
|
||||||
/// let v = rx.recv().await;
|
/// let v = rx.recv().await;
|
||||||
/// assert!(v.is_none());
|
/// assert_eq!(v, "goodbye");
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn recv(&mut self) -> Option<T> {
|
pub async fn recv(&mut self) -> T {
|
||||||
poll_fn(|cx| {
|
poll_fn(|cx| {
|
||||||
let v_ref = ready!(self.poll_recv_ref(cx));
|
let v_ref = ready!(self.poll_recv_ref(cx));
|
||||||
Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
|
Poll::Ready((*v_ref).clone())
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@ -295,7 +297,7 @@ impl<T: Clone> crate::stream::Stream for Receiver<T> {
|
|||||||
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
||||||
let v_ref = ready!(self.poll_recv_ref(cx));
|
let v_ref = ready!(self.poll_recv_ref(cx));
|
||||||
|
|
||||||
Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
|
Poll::Ready(Some((*v_ref).clone()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -318,8 +320,8 @@ impl<T> Drop for Receiver<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Sender<T> {
|
impl<T> Sender<T> {
|
||||||
/// Broadcasts a new value via the channel, notifying all receivers.
|
/// Sends a new value via the channel, notifying all receivers.
|
||||||
pub fn broadcast(&self, value: T) -> Result<(), error::SendError<T>> {
|
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
|
||||||
let shared = match self.shared.upgrade() {
|
let shared = match self.shared.upgrade() {
|
||||||
Some(shared) => shared,
|
Some(shared) => shared,
|
||||||
// All `Watch` handles have been canceled
|
// All `Watch` handles have been canceled
|
||||||
|
@ -12,7 +12,7 @@ fn single_rx_recv() {
|
|||||||
|
|
||||||
{
|
{
|
||||||
let mut t = spawn(rx.recv());
|
let mut t = spawn(rx.recv());
|
||||||
let v = assert_ready!(t.poll()).unwrap();
|
let v = assert_ready!(t.poll());
|
||||||
assert_eq!(v, "one");
|
assert_eq!(v, "one");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -21,11 +21,11 @@ fn single_rx_recv() {
|
|||||||
|
|
||||||
assert_pending!(t.poll());
|
assert_pending!(t.poll());
|
||||||
|
|
||||||
tx.broadcast("two").unwrap();
|
tx.send("two").unwrap();
|
||||||
|
|
||||||
assert!(t.is_woken());
|
assert!(t.is_woken());
|
||||||
|
|
||||||
let v = assert_ready!(t.poll()).unwrap();
|
let v = assert_ready!(t.poll());
|
||||||
assert_eq!(v, "two");
|
assert_eq!(v, "two");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,7 +37,7 @@ fn single_rx_recv() {
|
|||||||
drop(tx);
|
drop(tx);
|
||||||
|
|
||||||
let res = assert_ready!(t.poll());
|
let res = assert_ready!(t.poll());
|
||||||
assert!(res.is_none());
|
assert_eq!(res, "two");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,10 +51,10 @@ fn multi_rx() {
|
|||||||
let mut t2 = spawn(rx2.recv());
|
let mut t2 = spawn(rx2.recv());
|
||||||
|
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert_eq!(res.unwrap(), "one");
|
assert_eq!(res, "one");
|
||||||
|
|
||||||
let res = assert_ready!(t2.poll());
|
let res = assert_ready!(t2.poll());
|
||||||
assert_eq!(res.unwrap(), "one");
|
assert_eq!(res, "one");
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut t2 = spawn(rx2.recv());
|
let mut t2 = spawn(rx2.recv());
|
||||||
@ -65,13 +65,13 @@ fn multi_rx() {
|
|||||||
assert_pending!(t1.poll());
|
assert_pending!(t1.poll());
|
||||||
assert_pending!(t2.poll());
|
assert_pending!(t2.poll());
|
||||||
|
|
||||||
tx.broadcast("two").unwrap();
|
tx.send("two").unwrap();
|
||||||
|
|
||||||
assert!(t1.is_woken());
|
assert!(t1.is_woken());
|
||||||
assert!(t2.is_woken());
|
assert!(t2.is_woken());
|
||||||
|
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert_eq!(res.unwrap(), "two");
|
assert_eq!(res, "two");
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -79,16 +79,16 @@ fn multi_rx() {
|
|||||||
|
|
||||||
assert_pending!(t1.poll());
|
assert_pending!(t1.poll());
|
||||||
|
|
||||||
tx.broadcast("three").unwrap();
|
tx.send("three").unwrap();
|
||||||
|
|
||||||
assert!(t1.is_woken());
|
assert!(t1.is_woken());
|
||||||
assert!(t2.is_woken());
|
assert!(t2.is_woken());
|
||||||
|
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert_eq!(res.unwrap(), "three");
|
assert_eq!(res, "three");
|
||||||
|
|
||||||
let res = assert_ready!(t2.poll());
|
let res = assert_ready!(t2.poll());
|
||||||
assert_eq!(res.unwrap(), "three");
|
assert_eq!(res, "three");
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(t2);
|
drop(t2);
|
||||||
@ -100,10 +100,10 @@ fn multi_rx() {
|
|||||||
assert_pending!(t1.poll());
|
assert_pending!(t1.poll());
|
||||||
assert_pending!(t2.poll());
|
assert_pending!(t2.poll());
|
||||||
|
|
||||||
tx.broadcast("four").unwrap();
|
tx.send("four").unwrap();
|
||||||
|
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert_eq!(res.unwrap(), "four");
|
assert_eq!(res, "four");
|
||||||
drop(t1);
|
drop(t1);
|
||||||
|
|
||||||
let mut t1 = spawn(rx1.recv());
|
let mut t1 = spawn(rx1.recv());
|
||||||
@ -113,15 +113,15 @@ fn multi_rx() {
|
|||||||
|
|
||||||
assert!(t1.is_woken());
|
assert!(t1.is_woken());
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert!(res.is_none());
|
assert_eq!(res, "four");
|
||||||
|
|
||||||
let res = assert_ready!(t2.poll());
|
let res = assert_ready!(t2.poll());
|
||||||
assert_eq!(res.unwrap(), "four");
|
assert_eq!(res, "four");
|
||||||
|
|
||||||
drop(t2);
|
drop(t2);
|
||||||
let mut t2 = spawn(rx2.recv());
|
let mut t2 = spawn(rx2.recv());
|
||||||
let res = assert_ready!(t2.poll());
|
let res = assert_ready!(t2.poll());
|
||||||
assert!(res.is_none());
|
assert_eq!(res, "four");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,44 +135,44 @@ fn rx_observes_final_value() {
|
|||||||
{
|
{
|
||||||
let mut t1 = spawn(rx.recv());
|
let mut t1 = spawn(rx.recv());
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert_eq!(res.unwrap(), "one");
|
assert_eq!(res, "one");
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut t1 = spawn(rx.recv());
|
let mut t1 = spawn(rx.recv());
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert!(res.is_none());
|
assert_eq!(res, "one");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sending a value
|
// Sending a value
|
||||||
|
|
||||||
let (tx, mut rx) = watch::channel("one");
|
let (tx, mut rx) = watch::channel("one");
|
||||||
|
|
||||||
tx.broadcast("two").unwrap();
|
tx.send("two").unwrap();
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut t1 = spawn(rx.recv());
|
let mut t1 = spawn(rx.recv());
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert_eq!(res.unwrap(), "two");
|
assert_eq!(res, "two");
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut t1 = spawn(rx.recv());
|
let mut t1 = spawn(rx.recv());
|
||||||
assert_pending!(t1.poll());
|
assert_pending!(t1.poll());
|
||||||
|
|
||||||
tx.broadcast("three").unwrap();
|
tx.send("three").unwrap();
|
||||||
drop(tx);
|
drop(tx);
|
||||||
|
|
||||||
assert!(t1.is_woken());
|
assert!(t1.is_woken());
|
||||||
|
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert_eq!(res.unwrap(), "three");
|
assert_eq!(res, "three");
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut t1 = spawn(rx.recv());
|
let mut t1 = spawn(rx.recv());
|
||||||
let res = assert_ready!(t1.poll());
|
let res = assert_ready!(t1.poll());
|
||||||
assert!(res.is_none());
|
assert_eq!(res, "three");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,7 +190,7 @@ fn poll_close() {
|
|||||||
assert_ready!(t.poll());
|
assert_ready!(t.poll());
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(tx.broadcast("two").is_err());
|
assert!(tx.send("two").is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -210,7 +210,7 @@ fn stream_impl() {
|
|||||||
|
|
||||||
assert_pending!(t.poll());
|
assert_pending!(t.poll());
|
||||||
|
|
||||||
tx.broadcast("two").unwrap();
|
tx.send("two").unwrap();
|
||||||
|
|
||||||
assert!(t.is_woken());
|
assert!(t.is_woken());
|
||||||
|
|
||||||
@ -226,6 +226,6 @@ fn stream_impl() {
|
|||||||
drop(tx);
|
drop(tx);
|
||||||
|
|
||||||
let res = assert_ready!(t.poll());
|
let res = assert_ready!(t.poll());
|
||||||
assert!(res.is_none());
|
assert_eq!(res, Some("two"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user