From 25dd54d263356a77406036411ec29442305418e2 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 7 Mar 2018 16:24:51 -0800 Subject: [PATCH] Improve `poll_read_ready` implementation (#193) This patch updates `poll_read_ready` to take a `mask` argument, enabling the caller to specify the desired readiness. `need_read` is renamed to `clear_read_ready` and also takes a mask. This enables a caller to listen for HUP events without requiring reading from the I/O resource. --- Cargo.toml | 1 + src/net/tcp/listener.rs | 4 +- src/net/tcp/stream.rs | 54 +++++++- src/net/udp/socket.rs | 12 +- tests/tcp.rs | 59 ++++++++- tokio-reactor/src/lib.rs | 170 +++---------------------- tokio-reactor/src/poll_evented.rs | 198 +++++++++++++++++------------- tokio-reactor/src/registration.rs | 29 +++-- 8 files changed, 262 insertions(+), 265 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0209180c2..9a3b0dc9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,3 +60,4 @@ time = "0.1" [patch.crates-io] tokio-io = { path = "tokio-io" } +mio = { git = "https://github.com/carllerche/mio" } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 8899cde62..de9e38ea9 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -93,12 +93,12 @@ impl TcpListener { /// /// This function will panic if called from outside of a task context. pub fn poll_accept_std(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> { - try_ready!(self.io.poll_read_ready()); + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); match self.io.get_ref().accept_std() { Ok(pair) => Ok(pair.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_read()?; + self.io.clear_read_ready(mio::Ready::readable())?; Ok(Async::NotReady) } Err(e) => Err(e), diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 58cf8a6cd..d37a1a435 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -114,6 +114,50 @@ impl TcpStream { ConnectFuture { inner: inner } } + /// Check the TCP stream's read readiness state. + /// + /// The mask argument allows specifying what readiness to notify on. This + /// can be any value, including platform specific readiness, **except** + /// `writable`. HUP is always implicitly included on platforms that support + /// it. + /// + /// If the resource is not ready for a read then `Async::NotReady` is + /// returned and the current task is notified once a new event is received. + /// + /// The stream will remain in a read-ready state until calls to `poll_read` + /// return `NotReady`. + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` includes writable. + /// * called from outside of a task context. + pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll { + self.io.poll_read_ready(mask) + } + + /// Check the TCP stream's write readiness state. + /// + /// This always checks for writable readiness and also checks for HUP + /// readiness on platforms that support it. + /// + /// If the resource is not ready for a write then `Async::NotReady` is + /// returned and the current task is notified once a new event is received. + /// + /// The I/O resource will remain in a write-ready state until calls to + /// `poll_write` return `NotReady`. + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` contains bits besides `writable` and `hup`. + /// * called from outside of a task context. + pub fn poll_write_ready(&self) -> Poll { + self.io.poll_write_ready() + } + /// Returns the local address that this stream is bound to. pub fn local_addr(&self) -> io::Result { self.io.get_ref().local_addr() @@ -152,12 +196,12 @@ impl TcpStream { /// /// This function will panic if called from outside of a task context. pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll { - try_ready!(self.io.poll_read_ready()); + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); match self.io.get_ref().peek(buf) { Ok(ret) => Ok(ret.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_read()?; + self.io.clear_read_ready(mio::Ready::readable())?; Ok(Async::NotReady) } Err(e) => Err(e), @@ -357,7 +401,7 @@ impl<'a> AsyncRead for &'a TcpStream { } fn read_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = self.io.poll_read_ready()? { + if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? { return Ok(Async::NotReady) } @@ -397,7 +441,7 @@ impl<'a> AsyncRead for &'a TcpStream { Ok(Async::Ready(n)) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_read()?; + self.io.clear_read_ready(mio::Ready::readable())?; Ok(Async::NotReady) } Err(e) => Err(e), @@ -431,7 +475,7 @@ impl<'a> AsyncWrite for &'a TcpStream { Ok(Async::Ready(n)) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_write()?; + self.io.clear_write_ready()?; Ok(Async::NotReady) } Err(e) => Err(e), diff --git a/src/net/udp/socket.rs b/src/net/udp/socket.rs index 63bdc5e0f..46c3345f4 100644 --- a/src/net/udp/socket.rs +++ b/src/net/udp/socket.rs @@ -88,7 +88,7 @@ impl UdpSocket { match self.io.get_ref().send(buf) { Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_write()?; + self.io.clear_write_ready()?; Ok(Async::NotReady) } Err(e) => Err(e), @@ -128,12 +128,12 @@ impl UdpSocket { /// /// This function will panic if called from outside of a task context. pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll { - try_ready!(self.io.poll_read_ready()); + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); match self.io.get_ref().recv(buf) { Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_read()?; + self.io.clear_read_ready(mio::Ready::readable())?; Ok(Async::NotReady) } Err(e) => Err(e), @@ -172,7 +172,7 @@ impl UdpSocket { match self.io.get_ref().send_to(buf, target) { Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_write()?; + self.io.clear_write_ready()?; Ok(Async::NotReady) } Err(e) => Err(e), @@ -216,12 +216,12 @@ impl UdpSocket { /// This function will panic if called outside the context of a future's /// task. pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { - try_ready!(self.io.poll_read_ready()); + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); match self.io.get_ref().recv_from(buf) { Ok(n) => Ok(n.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.need_read()?; + self.io.clear_read_ready(mio::Ready::readable())?; Ok(Async::NotReady) } Err(e) => Err(e), diff --git a/tests/tcp.rs b/tests/tcp.rs index 83cc425c1..5694e8a61 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -1,14 +1,14 @@ extern crate env_logger; -extern crate futures; extern crate tokio; +extern crate mio; +extern crate futures; -use std::net; +use std::{net, thread}; use std::sync::mpsc::channel; -use std::thread; -use futures::Future; -use futures::stream::Stream; use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + macro_rules! t { ($e:expr) => (match $e { @@ -79,3 +79,52 @@ fn accept2() { mine.unwrap(); t.join().unwrap(); } + +#[cfg(unix)] +mod unix { + use tokio::net::TcpStream; + use tokio::prelude::*; + + use env_logger; + use futures::future; + use mio::unix::UnixReady; + + use std::{net, thread}; + use std::time::Duration; + + #[test] + fn poll_hup() { + drop(env_logger::init()); + + let srv = t!(net::TcpListener::bind("127.0.0.1:0")); + let addr = t!(srv.local_addr()); + let t = thread::spawn(move || { + let mut client = t!(srv.accept()).0; + client.write(b"hello world").unwrap(); + thread::sleep(Duration::from_millis(200)); + }); + + let mut stream = t!(TcpStream::connect(&addr).wait()); + + // Poll for HUP before reading. + future::poll_fn(|| { + stream.poll_read_ready(UnixReady::hup().into()) + }).wait().unwrap(); + + // Same for write half + future::poll_fn(|| { + stream.poll_write_ready() + }).wait().unwrap(); + + let mut buf = vec![0; 11]; + + // Read the data + future::poll_fn(|| { + stream.poll_read(&mut buf) + }).wait().unwrap(); + + assert_eq!(b"hello world", &buf[..]); + + t.join().unwrap(); + } +} diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs index 0519c1001..ae105824a 100644 --- a/tokio-reactor/src/lib.rs +++ b/tokio-reactor/src/lib.rs @@ -352,9 +352,9 @@ impl Reactor { let io_dispatch = self.inner.io_dispatch.read().unwrap(); if let Some(io) = io_dispatch.get(token) { - io.readiness.fetch_or(ready2usize(ready), Relaxed); + io.readiness.fetch_or(ready.as_usize(), Relaxed); - if ready.is_writable() { + if ready.is_writable() || platform::is_hup(&ready) { io.writer.notify(); } @@ -551,9 +551,7 @@ impl Inner { try!(self.io.register(source, mio::Token(TOKEN_START + key), - mio::Ready::readable() | - mio::Ready::writable() | - platform::all(), + mio::Ready::all(), mio::PollOpt::edge())); Ok(key) @@ -582,7 +580,7 @@ impl Inner { task.register_task(t); - if sched.readiness.load(SeqCst) & ready2usize(ready) != 0 { + if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { task.notify(); } } @@ -602,53 +600,15 @@ impl Drop for Inner { } impl Direction { - fn ready(&self) -> mio::Ready { + fn mask(&self) -> mio::Ready { match *self { - Direction::Read => read_ready(), - Direction::Write => write_ready(), + Direction::Read => { + // Everything except writable is signaled through read. + mio::Ready::all() - mio::Ready::writable() + } + Direction::Write => mio::Ready::writable() | platform::hup(), } } - - fn mask(&self) -> usize { - ready2usize(self.ready()) - } -} - -// ===== misc ===== - -const READ: usize = 1 << 0; -const WRITE: usize = 1 << 1; - -fn read_ready() -> mio::Ready { - mio::Ready::readable() | platform::hup() -} - -fn write_ready() -> mio::Ready { - mio::Ready::writable() -} - -// === legacy - -fn ready2usize(ready: mio::Ready) -> usize { - let mut bits = 0; - if ready.is_readable() { - bits |= READ; - } - if ready.is_writable() { - bits |= WRITE; - } - bits | platform::ready2usize(ready) -} - -fn usize2ready(bits: usize) -> mio::Ready { - let mut ready = mio::Ready::empty(); - if bits & READ != 0 { - ready.insert(mio::Ready::readable()); - } - if bits & WRITE != 0 { - ready.insert(mio::Ready::writable()); - } - ready | platform::usize2ready(bits) } #[cfg(all(unix, not(target_os = "fuchsia")))] @@ -656,105 +616,12 @@ mod platform { use mio::Ready; use mio::unix::UnixReady; - #[cfg(target_os = "dragonfly")] - pub fn all() -> Ready { - hup() | UnixReady::aio() - } - - #[cfg(target_os = "freebsd")] - pub fn all() -> Ready { - hup() | UnixReady::aio() | UnixReady::lio() - } - - #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] - pub fn all() -> Ready { - hup() - } - pub fn hup() -> Ready { UnixReady::hup().into() } - const HUP: usize = 1 << 2; - const ERROR: usize = 1 << 3; - const AIO: usize = 1 << 4; - const LIO: usize = 1 << 5; - - #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] - fn is_aio(ready: &Ready) -> bool { - UnixReady::from(*ready).is_aio() - } - - #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] - fn is_aio(_ready: &Ready) -> bool { - false - } - - #[cfg(target_os = "freebsd")] - fn is_lio(ready: &Ready) -> bool { - UnixReady::from(*ready).is_lio() - } - - #[cfg(not(target_os = "freebsd"))] - fn is_lio(_ready: &Ready) -> bool { - false - } - - pub fn ready2usize(ready: Ready) -> usize { - let ready = UnixReady::from(ready); - let mut bits = 0; - if is_aio(&ready) { - bits |= AIO; - } - if is_lio(&ready) { - bits |= LIO; - } - if ready.is_error() { - bits |= ERROR; - } - if ready.is_hup() { - bits |= HUP; - } - bits - } - - #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios", - target_os = "macos"))] - fn usize2ready_aio(ready: &mut UnixReady) { - ready.insert(UnixReady::aio()); - } - - #[cfg(not(any(target_os = "dragonfly", - target_os = "freebsd", target_os = "ios", target_os = "macos")))] - fn usize2ready_aio(_ready: &mut UnixReady) { - // aio not available here → empty - } - - #[cfg(target_os = "freebsd")] - fn usize2ready_lio(ready: &mut UnixReady) { - ready.insert(UnixReady::lio()); - } - - #[cfg(not(target_os = "freebsd"))] - fn usize2ready_lio(_ready: &mut UnixReady) { - // lio not available here → empty - } - - pub fn usize2ready(bits: usize) -> Ready { - let mut ready = UnixReady::from(Ready::empty()); - if bits & AIO != 0 { - usize2ready_aio(&mut ready); - } - if bits & LIO != 0 { - usize2ready_lio(&mut ready); - } - if bits & HUP != 0 { - ready.insert(UnixReady::hup()); - } - if bits & ERROR != 0 { - ready.insert(UnixReady::error()); - } - ready.into() + pub fn is_hup(ready: &Ready) -> bool { + UnixReady::from(*ready).is_hup() } } @@ -762,20 +629,11 @@ mod platform { mod platform { use mio::Ready; - pub fn all() -> Ready { - // No platform-specific Readinesses for Windows - Ready::empty() - } - pub fn hup() -> Ready { Ready::empty() } - pub fn ready2usize(_r: Ready) -> usize { - 0 - } - - pub fn usize2ready(_r: usize) -> Ready { - Ready::empty() + pub fn is_hup(_: &Ready) -> bool { + false } } diff --git a/tokio-reactor/src/poll_evented.rs b/tokio-reactor/src/poll_evented.rs index 03e8d7738..bddf94c79 100644 --- a/tokio-reactor/src/poll_evented.rs +++ b/tokio-reactor/src/poll_evented.rs @@ -43,28 +43,27 @@ use std::sync::atomic::Ordering::Relaxed; /// [`poll_read_ready`] again will also indicate read readiness. /// /// When the operation is attempted and is unable to succeed due to the I/O -/// resource not being ready, the caller must call [`need_read`] or -/// [`need_write`]. This clears the readiness state until a new readiness event -/// is received. +/// resource not being ready, the caller must call [`clear_read_ready`] or +/// [`clear_write_ready`]. This clears the readiness state until a new readiness +/// event is received. /// /// This allows the caller to implement additional funcitons. For example, -/// [`TcpListener`] implements accept by using [`poll_read_ready`] and -/// [`need_read`]. +/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and +/// [`clear_write_ready`]. /// /// ```rust,ignore -/// pub fn accept(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> { -/// if let Async::NotReady = self.poll_evented.poll_read_ready()? { -/// return Err(io::ErrorKind::WouldBlock.into()) -/// } +/// pub fn poll_accept(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> { +/// let ready = Ready::readable(); +/// +/// try_ready!(self.poll_evented.poll_read_ready(ready)); /// /// match self.poll_evented.get_ref().accept_std() { -/// Ok(pair) => Ok(pair), -/// Err(e) => { -/// if e.kind() == io::ErrorKind::WouldBlock { -/// self.poll_evented.need_read()?; -/// } -/// Err(e) +/// Ok(pair) => Ok(Async::Ready(pair)), +/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { +/// self.poll_evented.clear_read_ready(ready); +/// Ok(Async::NotReady) /// } +/// Err(e) => Err(e), /// } /// } /// ``` @@ -99,6 +98,47 @@ struct Inner { // ===== impl PollEvented ===== +macro_rules! poll_ready { + ($me:expr, $mask:expr, $cache:ident, $poll:ident, $take:ident) => {{ + $me.register()?; + + // Load cached & encoded readiness. + let mut cached = $me.inner.$cache.load(Relaxed); + let mask = $mask | ::platform::hup(); + + // See if the current readiness matches any bits. + let mut ret = mio::Ready::from_usize(cached) & $mask; + + if ret.is_empty() { + // Readiness does not match, consume the registration's readiness + // stream. This happens in a loop to ensure that the stream gets + // drained. + loop { + let ready = try_ready!($me.inner.registration.$poll()); + cached |= ready.as_usize(); + + // Update the cache store + $me.inner.$cache.store(cached, Relaxed); + + ret |= ready & mask; + + if !ret.is_empty() { + return Ok(ret.into()); + } + } + } else { + // Check what's new with the registration stream. This will not + // request to be notified + if let Some(ready) = $me.inner.registration.$take()? { + cached |= ready.as_usize(); + $me.inner.$cache.store(cached, Relaxed); + } + + Ok(mio::Ready::from_usize(cached).into()) + } + }} +} + impl PollEvented where E: Evented { @@ -149,58 +189,53 @@ where E: Evented /// Check the I/O resource's read readiness state. /// + /// The mask argument allows specifying what readiness to notify on. This + /// can be any value, including platform specific readiness, **except** + /// `writable`. HUP is always implicitly included on platforms that support + /// it. + /// /// If the resource is not ready for a read then `Async::NotReady` is /// returned and the current task is notified once a new event is received. /// /// The I/O resource will remain in a read-ready state until readiness is - /// cleared by calling [`need_read`]. + /// cleared by calling [`clear_read_ready`]. /// - /// [`need_read`]: #method.need_read + /// [`clear_read_ready`]: #method.clear_read_ready /// /// # Panics /// - /// This function will panic if called from outside of a task context. - pub fn poll_read_ready(&self) -> Poll { - self.register()?; - - // Load the cached readiness - match self.inner.read_readiness.load(Relaxed) { - 0 => {} - mut n => { - // Check what's new with the reactor. - if let Some(ready) = self.inner.registration.take_read_ready()? { - n |= super::ready2usize(ready); - self.inner.read_readiness.store(n, Relaxed); - } - - return Ok(super::usize2ready(n).into()); - } - } - - let ready = try_ready!(self.inner.registration.poll_read_ready()); - - // Cache the value - self.inner.read_readiness.store(super::ready2usize(ready), Relaxed); - - Ok(ready.into()) + /// This function panics if: + /// + /// * `ready` includes writable. + /// * called from outside of a task context. + pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll { + assert!(!mask.is_writable(), "cannot poll for write readiness"); + poll_ready!(self, mask, read_readiness, poll_read_ready, take_read_ready) } - /// Resets the I/O resource's read readiness state and registers the current + /// Clears the I/O resource's read readiness state and registers the current /// task to be notified once a read readiness event is received. /// /// After calling this function, `poll_read_ready` will return `NotReady` /// until a new read readiness event has been received. /// - /// This function clears **all** readiness state **except** write readiness. - /// This includes any platform-specific readiness bits. + /// The `mask` argument specifies the readiness bits to clear. This may not + /// include `writable` or `hup`. /// /// # Panics /// - /// This function will panic if called from outside of a task context. - pub fn need_read(&self) -> io::Result<()> { - self.inner.read_readiness.store(0, Relaxed); + /// This function panics if: + /// + /// * `ready` includes writable or HUP + /// * called from outside of a task context. + pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> { + // Cannot clear write readiness + assert!(!ready.is_writable(), "cannot clear write readiness"); + assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness"); - if self.poll_read_ready()?.is_ready() { + self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed); + + if self.poll_read_ready(ready)?.is_ready() { // Notify the current task task::current().notify(); } @@ -210,52 +245,47 @@ where E: Evented /// Check the I/O resource's write readiness state. /// + /// This always checks for writable readiness and also checks for HUP + /// readiness on platforms that support it. + /// /// If the resource is not ready for a write then `Async::NotReady` is /// returned and the current task is notified once a new event is received. /// /// The I/O resource will remain in a write-ready state until readiness is - /// cleared by calling [`need_write`]. + /// cleared by calling [`clear_write_ready`]. /// - /// [`need_write`]: #method.need_write + /// [`clear_write_ready`]: #method.clear_write_ready /// /// # Panics /// - /// This function will panic if called from outside of a task context. + /// This function panics if: + /// + /// * `ready` contains bits besides `writable` and `hup`. + /// * called from outside of a task context. pub fn poll_write_ready(&self) -> Poll { - self.register()?; - - match self.inner.write_readiness.load(Relaxed) { - 0 => {} - mut n => { - // Check what's new with the reactor. - if let Some(ready) = self.inner.registration.take_write_ready()? { - n |= super::ready2usize(ready); - self.inner.write_readiness.store(n, Relaxed); - } - - return Ok(super::usize2ready(n).into()); - } - } - - let ready = try_ready!(self.inner.registration.poll_write_ready()); - - // Cache the value - self.inner.write_readiness.store(super::ready2usize(ready), Relaxed); - - Ok(ready.into()) + poll_ready!(self, + mio::Ready::writable(), + write_readiness, + poll_write_ready, + take_write_ready) } /// Resets the I/O resource's write readiness state and registers the current /// task to be notified once a write readiness event is received. /// - /// After calling this function, `poll_write_ready` will return `NotReady` - /// until a new read readiness event has been received. + /// This only clears writable readiness. HUP (on platforms that support HUP) + /// cannot be cleared as it is a final state. + /// + /// After calling this function, `poll_write_ready(Ready::writable())` will + /// return `NotReady` until a new read readiness event has been received. /// /// # Panics /// /// This function will panic if called from outside of a task context. - pub fn need_write(&self) -> io::Result<()> { - self.inner.write_readiness.store(0, Relaxed); + pub fn clear_write_ready(&self) -> io::Result<()> { + let ready = mio::Ready::writable(); + + self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed); if self.poll_write_ready()?.is_ready() { // Notify the current task @@ -278,14 +308,14 @@ impl Read for PollEvented where E: Evented + Read, { fn read(&mut self, buf: &mut [u8]) -> io::Result { - if let Async::NotReady = self.poll_read_ready()? { + if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? { return Err(io::ErrorKind::WouldBlock.into()) } let r = self.get_mut().read(buf); if is_wouldblock(&r) { - self.need_read()?; + self.clear_read_ready(mio::Ready::readable())?; } return r @@ -303,7 +333,7 @@ where E: Evented + Write, let r = self.get_mut().write(buf); if is_wouldblock(&r) { - self.need_write()?; + self.clear_write_ready()?; } return r @@ -317,7 +347,7 @@ where E: Evented + Write, let r = self.get_mut().flush(); if is_wouldblock(&r) { - self.need_write()?; + self.clear_write_ready()?; } return r @@ -343,14 +373,14 @@ impl<'a, E> Read for &'a PollEvented where E: Evented, &'a E: Read, { fn read(&mut self, buf: &mut [u8]) -> io::Result { - if let Async::NotReady = self.poll_read_ready()? { + if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? { return Err(io::ErrorKind::WouldBlock.into()) } let r = self.get_ref().read(buf); if is_wouldblock(&r) { - self.need_read()?; + self.clear_read_ready(mio::Ready::readable())?; } return r @@ -368,7 +398,7 @@ where E: Evented, &'a E: Write, let r = self.get_ref().write(buf); if is_wouldblock(&r) { - self.need_write()?; + self.clear_write_ready()?; } return r @@ -382,7 +412,7 @@ where E: Evented, &'a E: Write, let r = self.get_ref().flush(); if is_wouldblock(&r) { - self.need_write()?; + self.clear_write_ready()?; } return r diff --git a/tokio-reactor/src/registration.rs b/tokio-reactor/src/registration.rs index 16c4f8230..a726bdd33 100644 --- a/tokio-reactor/src/registration.rs +++ b/tokio-reactor/src/registration.rs @@ -244,7 +244,9 @@ impl Registration { /// call to `poll_read_ready`, it is returned. If it has not, the current /// task is notified once a new event is received. /// - /// Events are [edge-triggered]. + /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, + /// the function will always return `Ready(HUP)`. This should be treated as + /// the end of the readiness stream. /// /// Ensure that [`register`] has been called first. /// @@ -294,7 +296,9 @@ impl Registration { /// call to `poll_write_ready`, it is returned. If it has not, the current /// task is notified once a new event is received. /// - /// Events are [edge-triggered]. + /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, + /// the function will always return `Ready(HUP)`. This should be treated as + /// the end of the readiness stream. /// /// Ensure that [`register`] has been called first. /// @@ -481,13 +485,23 @@ impl Inner { }; let mask = direction.mask(); + let mask_no_hup = (mask - ::platform::hup()).as_usize(); let io_dispatch = inner.io_dispatch.read().unwrap(); let sched = &io_dispatch[self.token]; - let mut ready = mask & sched.readiness.fetch_and(!mask, SeqCst); + // This consumes the current readiness state **except** for HUP. HUP is + // excluded because a) it is a final state and never transitions out of + // HUP and b) both the read AND the write directions need to be able to + // observe this state. + // + // If HUP were to be cleared when `direction` is `Read`, then when + // `poll_ready` is called again with a _`direction` of `Write`, the HUP + // state would not be visible. + let mut ready = mask & mio::Ready::from_usize( + sched.readiness.fetch_and(!mask_no_hup, SeqCst)); - if ready == 0 && notify { + if ready.is_empty() && notify { // Update the task info match direction { Direction::Read => sched.reader.register(), @@ -495,13 +509,14 @@ impl Inner { } // Try again - ready = mask & sched.readiness.fetch_and(!mask, SeqCst); + ready = mask & mio::Ready::from_usize( + sched.readiness.fetch_and(!mask_no_hup, SeqCst)); } - if ready == 0 { + if ready.is_empty() { Ok(None) } else { - Ok(Some(super::usize2ready(ready))) + Ok(Some(ready)) } } }