Improve poll_read_ready implementation (#193)

This patch updates `poll_read_ready` to take a `mask` argument, enabling
the caller to specify the desired readiness. `need_read` is renamed to
`clear_read_ready` and also takes a mask.

This enables a caller to listen for HUP events without requiring reading
from the I/O resource.
This commit is contained in:
Carl Lerche 2018-03-07 16:24:51 -08:00 committed by GitHub
parent 5555cbc85e
commit 25dd54d263
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 262 additions and 265 deletions

View File

@ -60,3 +60,4 @@ time = "0.1"
[patch.crates-io]
tokio-io = { path = "tokio-io" }
mio = { git = "https://github.com/carllerche/mio" }

View File

@ -93,12 +93,12 @@ impl TcpListener {
///
/// This function will panic if called from outside of a task context.
pub fn poll_accept_std(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> {
try_ready!(self.io.poll_read_ready());
try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
match self.io.get_ref().accept_std() {
Ok(pair) => Ok(pair.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read()?;
self.io.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady)
}
Err(e) => Err(e),

View File

@ -114,6 +114,50 @@ impl TcpStream {
ConnectFuture { inner: inner }
}
/// Check the TCP stream's read readiness state.
///
/// The mask argument allows specifying what readiness to notify on. This
/// can be any value, including platform specific readiness, **except**
/// `writable`. HUP is always implicitly included on platforms that support
/// it.
///
/// If the resource is not ready for a read then `Async::NotReady` is
/// returned and the current task is notified once a new event is received.
///
/// The stream will remain in a read-ready state until calls to `poll_read`
/// return `NotReady`.
///
/// # Panics
///
/// This function panics if:
///
/// * `ready` includes writable.
/// * called from outside of a task context.
pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
self.io.poll_read_ready(mask)
}
/// Check the TCP stream's write readiness state.
///
/// This always checks for writable readiness and also checks for HUP
/// readiness on platforms that support it.
///
/// If the resource is not ready for a write then `Async::NotReady` is
/// returned and the current task is notified once a new event is received.
///
/// The I/O resource will remain in a write-ready state until calls to
/// `poll_write` return `NotReady`.
///
/// # Panics
///
/// This function panics if:
///
/// * `ready` contains bits besides `writable` and `hup`.
/// * called from outside of a task context.
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.io.poll_write_ready()
}
/// Returns the local address that this stream is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
@ -152,12 +196,12 @@ impl TcpStream {
///
/// This function will panic if called from outside of a task context.
pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
try_ready!(self.io.poll_read_ready());
try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
match self.io.get_ref().peek(buf) {
Ok(ret) => Ok(ret.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read()?;
self.io.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady)
}
Err(e) => Err(e),
@ -357,7 +401,7 @@ impl<'a> AsyncRead for &'a TcpStream {
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = self.io.poll_read_ready()? {
if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? {
return Ok(Async::NotReady)
}
@ -397,7 +441,7 @@ impl<'a> AsyncRead for &'a TcpStream {
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read()?;
self.io.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady)
}
Err(e) => Err(e),
@ -431,7 +475,7 @@ impl<'a> AsyncWrite for &'a TcpStream {
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_write()?;
self.io.clear_write_ready()?;
Ok(Async::NotReady)
}
Err(e) => Err(e),

View File

@ -88,7 +88,7 @@ impl UdpSocket {
match self.io.get_ref().send(buf) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_write()?;
self.io.clear_write_ready()?;
Ok(Async::NotReady)
}
Err(e) => Err(e),
@ -128,12 +128,12 @@ impl UdpSocket {
///
/// This function will panic if called from outside of a task context.
pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
try_ready!(self.io.poll_read_ready());
try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
match self.io.get_ref().recv(buf) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read()?;
self.io.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady)
}
Err(e) => Err(e),
@ -172,7 +172,7 @@ impl UdpSocket {
match self.io.get_ref().send_to(buf, target) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_write()?;
self.io.clear_write_ready()?;
Ok(Async::NotReady)
}
Err(e) => Err(e),
@ -216,12 +216,12 @@ impl UdpSocket {
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
try_ready!(self.io.poll_read_ready());
try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
match self.io.get_ref().recv_from(buf) {
Ok(n) => Ok(n.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read()?;
self.io.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady)
}
Err(e) => Err(e),

View File

@ -1,14 +1,14 @@
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate mio;
extern crate futures;
use std::net;
use std::{net, thread};
use std::sync::mpsc::channel;
use std::thread;
use futures::Future;
use futures::stream::Stream;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
macro_rules! t {
($e:expr) => (match $e {
@ -79,3 +79,52 @@ fn accept2() {
mine.unwrap();
t.join().unwrap();
}
#[cfg(unix)]
mod unix {
use tokio::net::TcpStream;
use tokio::prelude::*;
use env_logger;
use futures::future;
use mio::unix::UnixReady;
use std::{net, thread};
use std::time::Duration;
#[test]
fn poll_hup() {
drop(env_logger::init());
let srv = t!(net::TcpListener::bind("127.0.0.1:0"));
let addr = t!(srv.local_addr());
let t = thread::spawn(move || {
let mut client = t!(srv.accept()).0;
client.write(b"hello world").unwrap();
thread::sleep(Duration::from_millis(200));
});
let mut stream = t!(TcpStream::connect(&addr).wait());
// Poll for HUP before reading.
future::poll_fn(|| {
stream.poll_read_ready(UnixReady::hup().into())
}).wait().unwrap();
// Same for write half
future::poll_fn(|| {
stream.poll_write_ready()
}).wait().unwrap();
let mut buf = vec![0; 11];
// Read the data
future::poll_fn(|| {
stream.poll_read(&mut buf)
}).wait().unwrap();
assert_eq!(b"hello world", &buf[..]);
t.join().unwrap();
}
}

View File

@ -352,9 +352,9 @@ impl Reactor {
let io_dispatch = self.inner.io_dispatch.read().unwrap();
if let Some(io) = io_dispatch.get(token) {
io.readiness.fetch_or(ready2usize(ready), Relaxed);
io.readiness.fetch_or(ready.as_usize(), Relaxed);
if ready.is_writable() {
if ready.is_writable() || platform::is_hup(&ready) {
io.writer.notify();
}
@ -551,9 +551,7 @@ impl Inner {
try!(self.io.register(source,
mio::Token(TOKEN_START + key),
mio::Ready::readable() |
mio::Ready::writable() |
platform::all(),
mio::Ready::all(),
mio::PollOpt::edge()));
Ok(key)
@ -582,7 +580,7 @@ impl Inner {
task.register_task(t);
if sched.readiness.load(SeqCst) & ready2usize(ready) != 0 {
if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
task.notify();
}
}
@ -602,53 +600,15 @@ impl Drop for Inner {
}
impl Direction {
fn ready(&self) -> mio::Ready {
fn mask(&self) -> mio::Ready {
match *self {
Direction::Read => read_ready(),
Direction::Write => write_ready(),
Direction::Read => {
// Everything except writable is signaled through read.
mio::Ready::all() - mio::Ready::writable()
}
Direction::Write => mio::Ready::writable() | platform::hup(),
}
}
fn mask(&self) -> usize {
ready2usize(self.ready())
}
}
// ===== misc =====
const READ: usize = 1 << 0;
const WRITE: usize = 1 << 1;
fn read_ready() -> mio::Ready {
mio::Ready::readable() | platform::hup()
}
fn write_ready() -> mio::Ready {
mio::Ready::writable()
}
// === legacy
fn ready2usize(ready: mio::Ready) -> usize {
let mut bits = 0;
if ready.is_readable() {
bits |= READ;
}
if ready.is_writable() {
bits |= WRITE;
}
bits | platform::ready2usize(ready)
}
fn usize2ready(bits: usize) -> mio::Ready {
let mut ready = mio::Ready::empty();
if bits & READ != 0 {
ready.insert(mio::Ready::readable());
}
if bits & WRITE != 0 {
ready.insert(mio::Ready::writable());
}
ready | platform::usize2ready(bits)
}
#[cfg(all(unix, not(target_os = "fuchsia")))]
@ -656,105 +616,12 @@ mod platform {
use mio::Ready;
use mio::unix::UnixReady;
#[cfg(target_os = "dragonfly")]
pub fn all() -> Ready {
hup() | UnixReady::aio()
}
#[cfg(target_os = "freebsd")]
pub fn all() -> Ready {
hup() | UnixReady::aio() | UnixReady::lio()
}
#[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
pub fn all() -> Ready {
hup()
}
pub fn hup() -> Ready {
UnixReady::hup().into()
}
const HUP: usize = 1 << 2;
const ERROR: usize = 1 << 3;
const AIO: usize = 1 << 4;
const LIO: usize = 1 << 5;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
fn is_aio(ready: &Ready) -> bool {
UnixReady::from(*ready).is_aio()
}
#[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
fn is_aio(_ready: &Ready) -> bool {
false
}
#[cfg(target_os = "freebsd")]
fn is_lio(ready: &Ready) -> bool {
UnixReady::from(*ready).is_lio()
}
#[cfg(not(target_os = "freebsd"))]
fn is_lio(_ready: &Ready) -> bool {
false
}
pub fn ready2usize(ready: Ready) -> usize {
let ready = UnixReady::from(ready);
let mut bits = 0;
if is_aio(&ready) {
bits |= AIO;
}
if is_lio(&ready) {
bits |= LIO;
}
if ready.is_error() {
bits |= ERROR;
}
if ready.is_hup() {
bits |= HUP;
}
bits
}
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios",
target_os = "macos"))]
fn usize2ready_aio(ready: &mut UnixReady) {
ready.insert(UnixReady::aio());
}
#[cfg(not(any(target_os = "dragonfly",
target_os = "freebsd", target_os = "ios", target_os = "macos")))]
fn usize2ready_aio(_ready: &mut UnixReady) {
// aio not available here → empty
}
#[cfg(target_os = "freebsd")]
fn usize2ready_lio(ready: &mut UnixReady) {
ready.insert(UnixReady::lio());
}
#[cfg(not(target_os = "freebsd"))]
fn usize2ready_lio(_ready: &mut UnixReady) {
// lio not available here → empty
}
pub fn usize2ready(bits: usize) -> Ready {
let mut ready = UnixReady::from(Ready::empty());
if bits & AIO != 0 {
usize2ready_aio(&mut ready);
}
if bits & LIO != 0 {
usize2ready_lio(&mut ready);
}
if bits & HUP != 0 {
ready.insert(UnixReady::hup());
}
if bits & ERROR != 0 {
ready.insert(UnixReady::error());
}
ready.into()
pub fn is_hup(ready: &Ready) -> bool {
UnixReady::from(*ready).is_hup()
}
}
@ -762,20 +629,11 @@ mod platform {
mod platform {
use mio::Ready;
pub fn all() -> Ready {
// No platform-specific Readinesses for Windows
Ready::empty()
}
pub fn hup() -> Ready {
Ready::empty()
}
pub fn ready2usize(_r: Ready) -> usize {
0
}
pub fn usize2ready(_r: usize) -> Ready {
Ready::empty()
pub fn is_hup(_: &Ready) -> bool {
false
}
}

View File

@ -43,28 +43,27 @@ use std::sync::atomic::Ordering::Relaxed;
/// [`poll_read_ready`] again will also indicate read readiness.
///
/// When the operation is attempted and is unable to succeed due to the I/O
/// resource not being ready, the caller must call [`need_read`] or
/// [`need_write`]. This clears the readiness state until a new readiness event
/// is received.
/// resource not being ready, the caller must call [`clear_read_ready`] or
/// [`clear_write_ready`]. This clears the readiness state until a new readiness
/// event is received.
///
/// This allows the caller to implement additional funcitons. For example,
/// [`TcpListener`] implements accept by using [`poll_read_ready`] and
/// [`need_read`].
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
/// [`clear_write_ready`].
///
/// ```rust,ignore
/// pub fn accept(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
/// if let Async::NotReady = self.poll_evented.poll_read_ready()? {
/// return Err(io::ErrorKind::WouldBlock.into())
/// }
/// pub fn poll_accept(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> {
/// let ready = Ready::readable();
///
/// try_ready!(self.poll_evented.poll_read_ready(ready));
///
/// match self.poll_evented.get_ref().accept_std() {
/// Ok(pair) => Ok(pair),
/// Err(e) => {
/// if e.kind() == io::ErrorKind::WouldBlock {
/// self.poll_evented.need_read()?;
/// }
/// Err(e)
/// Ok(pair) => Ok(Async::Ready(pair)),
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// self.poll_evented.clear_read_ready(ready);
/// Ok(Async::NotReady)
/// }
/// Err(e) => Err(e),
/// }
/// }
/// ```
@ -99,6 +98,47 @@ struct Inner {
// ===== impl PollEvented =====
macro_rules! poll_ready {
($me:expr, $mask:expr, $cache:ident, $poll:ident, $take:ident) => {{
$me.register()?;
// Load cached & encoded readiness.
let mut cached = $me.inner.$cache.load(Relaxed);
let mask = $mask | ::platform::hup();
// See if the current readiness matches any bits.
let mut ret = mio::Ready::from_usize(cached) & $mask;
if ret.is_empty() {
// Readiness does not match, consume the registration's readiness
// stream. This happens in a loop to ensure that the stream gets
// drained.
loop {
let ready = try_ready!($me.inner.registration.$poll());
cached |= ready.as_usize();
// Update the cache store
$me.inner.$cache.store(cached, Relaxed);
ret |= ready & mask;
if !ret.is_empty() {
return Ok(ret.into());
}
}
} else {
// Check what's new with the registration stream. This will not
// request to be notified
if let Some(ready) = $me.inner.registration.$take()? {
cached |= ready.as_usize();
$me.inner.$cache.store(cached, Relaxed);
}
Ok(mio::Ready::from_usize(cached).into())
}
}}
}
impl<E> PollEvented<E>
where E: Evented
{
@ -149,58 +189,53 @@ where E: Evented
/// Check the I/O resource's read readiness state.
///
/// The mask argument allows specifying what readiness to notify on. This
/// can be any value, including platform specific readiness, **except**
/// `writable`. HUP is always implicitly included on platforms that support
/// it.
///
/// If the resource is not ready for a read then `Async::NotReady` is
/// returned and the current task is notified once a new event is received.
///
/// The I/O resource will remain in a read-ready state until readiness is
/// cleared by calling [`need_read`].
/// cleared by calling [`clear_read_ready`].
///
/// [`need_read`]: #method.need_read
/// [`clear_read_ready`]: #method.clear_read_ready
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
self.register()?;
// Load the cached readiness
match self.inner.read_readiness.load(Relaxed) {
0 => {}
mut n => {
// Check what's new with the reactor.
if let Some(ready) = self.inner.registration.take_read_ready()? {
n |= super::ready2usize(ready);
self.inner.read_readiness.store(n, Relaxed);
}
return Ok(super::usize2ready(n).into());
}
}
let ready = try_ready!(self.inner.registration.poll_read_ready());
// Cache the value
self.inner.read_readiness.store(super::ready2usize(ready), Relaxed);
Ok(ready.into())
/// This function panics if:
///
/// * `ready` includes writable.
/// * called from outside of a task context.
pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
assert!(!mask.is_writable(), "cannot poll for write readiness");
poll_ready!(self, mask, read_readiness, poll_read_ready, take_read_ready)
}
/// Resets the I/O resource's read readiness state and registers the current
/// Clears the I/O resource's read readiness state and registers the current
/// task to be notified once a read readiness event is received.
///
/// After calling this function, `poll_read_ready` will return `NotReady`
/// until a new read readiness event has been received.
///
/// This function clears **all** readiness state **except** write readiness.
/// This includes any platform-specific readiness bits.
/// The `mask` argument specifies the readiness bits to clear. This may not
/// include `writable` or `hup`.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn need_read(&self) -> io::Result<()> {
self.inner.read_readiness.store(0, Relaxed);
/// This function panics if:
///
/// * `ready` includes writable or HUP
/// * called from outside of a task context.
pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
// Cannot clear write readiness
assert!(!ready.is_writable(), "cannot clear write readiness");
assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness");
if self.poll_read_ready()?.is_ready() {
self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_read_ready(ready)?.is_ready() {
// Notify the current task
task::current().notify();
}
@ -210,52 +245,47 @@ where E: Evented
/// Check the I/O resource's write readiness state.
///
/// This always checks for writable readiness and also checks for HUP
/// readiness on platforms that support it.
///
/// If the resource is not ready for a write then `Async::NotReady` is
/// returned and the current task is notified once a new event is received.
///
/// The I/O resource will remain in a write-ready state until readiness is
/// cleared by calling [`need_write`].
/// cleared by calling [`clear_write_ready`].
///
/// [`need_write`]: #method.need_write
/// [`clear_write_ready`]: #method.clear_write_ready
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
/// This function panics if:
///
/// * `ready` contains bits besides `writable` and `hup`.
/// * called from outside of a task context.
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.register()?;
match self.inner.write_readiness.load(Relaxed) {
0 => {}
mut n => {
// Check what's new with the reactor.
if let Some(ready) = self.inner.registration.take_write_ready()? {
n |= super::ready2usize(ready);
self.inner.write_readiness.store(n, Relaxed);
}
return Ok(super::usize2ready(n).into());
}
}
let ready = try_ready!(self.inner.registration.poll_write_ready());
// Cache the value
self.inner.write_readiness.store(super::ready2usize(ready), Relaxed);
Ok(ready.into())
poll_ready!(self,
mio::Ready::writable(),
write_readiness,
poll_write_ready,
take_write_ready)
}
/// Resets the I/O resource's write readiness state and registers the current
/// task to be notified once a write readiness event is received.
///
/// After calling this function, `poll_write_ready` will return `NotReady`
/// until a new read readiness event has been received.
/// This only clears writable readiness. HUP (on platforms that support HUP)
/// cannot be cleared as it is a final state.
///
/// After calling this function, `poll_write_ready(Ready::writable())` will
/// return `NotReady` until a new read readiness event has been received.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn need_write(&self) -> io::Result<()> {
self.inner.write_readiness.store(0, Relaxed);
pub fn clear_write_ready(&self) -> io::Result<()> {
let ready = mio::Ready::writable();
self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_write_ready()?.is_ready() {
// Notify the current task
@ -278,14 +308,14 @@ impl<E> Read for PollEvented<E>
where E: Evented + Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read_ready()? {
if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().read(buf);
if is_wouldblock(&r) {
self.need_read()?;
self.clear_read_ready(mio::Ready::readable())?;
}
return r
@ -303,7 +333,7 @@ where E: Evented + Write,
let r = self.get_mut().write(buf);
if is_wouldblock(&r) {
self.need_write()?;
self.clear_write_ready()?;
}
return r
@ -317,7 +347,7 @@ where E: Evented + Write,
let r = self.get_mut().flush();
if is_wouldblock(&r) {
self.need_write()?;
self.clear_write_ready()?;
}
return r
@ -343,14 +373,14 @@ impl<'a, E> Read for &'a PollEvented<E>
where E: Evented, &'a E: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read_ready()? {
if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().read(buf);
if is_wouldblock(&r) {
self.need_read()?;
self.clear_read_ready(mio::Ready::readable())?;
}
return r
@ -368,7 +398,7 @@ where E: Evented, &'a E: Write,
let r = self.get_ref().write(buf);
if is_wouldblock(&r) {
self.need_write()?;
self.clear_write_ready()?;
}
return r
@ -382,7 +412,7 @@ where E: Evented, &'a E: Write,
let r = self.get_ref().flush();
if is_wouldblock(&r) {
self.need_write()?;
self.clear_write_ready()?;
}
return r

View File

@ -244,7 +244,9 @@ impl Registration {
/// call to `poll_read_ready`, it is returned. If it has not, the current
/// task is notified once a new event is received.
///
/// Events are [edge-triggered].
/// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
/// the function will always return `Ready(HUP)`. This should be treated as
/// the end of the readiness stream.
///
/// Ensure that [`register`] has been called first.
///
@ -294,7 +296,9 @@ impl Registration {
/// call to `poll_write_ready`, it is returned. If it has not, the current
/// task is notified once a new event is received.
///
/// Events are [edge-triggered].
/// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
/// the function will always return `Ready(HUP)`. This should be treated as
/// the end of the readiness stream.
///
/// Ensure that [`register`] has been called first.
///
@ -481,13 +485,23 @@ impl Inner {
};
let mask = direction.mask();
let mask_no_hup = (mask - ::platform::hup()).as_usize();
let io_dispatch = inner.io_dispatch.read().unwrap();
let sched = &io_dispatch[self.token];
let mut ready = mask & sched.readiness.fetch_and(!mask, SeqCst);
// This consumes the current readiness state **except** for HUP. HUP is
// excluded because a) it is a final state and never transitions out of
// HUP and b) both the read AND the write directions need to be able to
// observe this state.
//
// If HUP were to be cleared when `direction` is `Read`, then when
// `poll_ready` is called again with a _`direction` of `Write`, the HUP
// state would not be visible.
let mut ready = mask & mio::Ready::from_usize(
sched.readiness.fetch_and(!mask_no_hup, SeqCst));
if ready == 0 && notify {
if ready.is_empty() && notify {
// Update the task info
match direction {
Direction::Read => sched.reader.register(),
@ -495,13 +509,14 @@ impl Inner {
}
// Try again
ready = mask & sched.readiness.fetch_and(!mask, SeqCst);
ready = mask & mio::Ready::from_usize(
sched.readiness.fetch_and(!mask_no_hup, SeqCst));
}
if ready == 0 {
if ready.is_empty() {
Ok(None)
} else {
Ok(Some(super::usize2ready(ready)))
Ok(Some(ready))
}
}
}