From dfdfd61372cca02087c0e2773dc978d03235bc51 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 25 Sep 2020 16:34:40 -0700 Subject: [PATCH] Fix readiness future eagerly consuming entire socket readiness (#2887) In the `readiness` future, before inserting a waiter into the list, the current socket readiness is eagerly checked. However, it would return as a `ReadyEvent` the entire socket readiness, instead of just the interest desired from `readiness(interest)`. This would result in the later call to `clear_readiness(event)` removing all of it. Closes #2886 --- tokio/src/io/driver/scheduled_io.rs | 2 +- tokio/tests/udp.rs | 32 +++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 88daeb2d2..f63fd7ab3 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -371,7 +371,7 @@ cfg_io_readiness! { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { readiness, tick }); + return Poll::Ready(ReadyEvent { readiness: interest, tick }); } // Wasn't ready, take the lock (and check again while locked). diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 9da672674..0bea83aa5 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -56,6 +56,38 @@ async fn split() -> std::io::Result<()> { Ok(()) } +#[tokio::test] +async fn split_chan() -> std::io::Result<()> { + // setup UdpSocket that will echo all sent items + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let addr = socket.local_addr().unwrap(); + let s = Arc::new(socket); + let r = s.clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec, std::net::SocketAddr)>(1_000); + tokio::spawn(async move { + while let Some((bytes, addr)) = rx.recv().await { + s.send_to(&bytes, &addr).await.unwrap(); + } + }); + + tokio::spawn(async move { + let mut buf = [0u8; 32]; + loop { + let (len, addr) = r.recv_from(&mut buf).await.unwrap(); + tx.send((buf[..len].to_vec(), addr)).await.unwrap(); + } + }); + + // test that we can send a value and get back some response + let sender = UdpSocket::bind("127.0.0.1:0").await?; + sender.send_to(MSG, addr).await?; + let mut recv_buf = [0u8; 32]; + let (len, _) = sender.recv_from(&mut recv_buf).await?; + assert_eq!(&recv_buf[..len], MSG); + Ok(()) +} + // # Note // // This test is purposely written such that each time `sender` sends data on