diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 000d3308f..e2b509583 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -139,13 +139,6 @@ enum Message { Run(Box), } -#[repr(usize)] -#[derive(Clone, Copy, Debug, PartialEq)] -enum Readiness { - Readable = 1, - Writable = 2, -} - const TOKEN_MESSAGES: mio::Token = mio::Token(0); const TOKEN_FUTURE: mio::Token = mio::Token(1); const TOKEN_START: usize = 2; @@ -330,15 +323,12 @@ impl Core { let mut writer = None; let mut inner = self.inner.borrow_mut(); if let Some(io) = inner.io_dispatch.get_mut(token) { - if ready.is_readable() || platform::is_hup(&ready) { - reader = io.reader.take(); - io.readiness.fetch_or(Readiness::Readable as usize, - Ordering::Relaxed); - } + io.readiness.fetch_or(ready2usize(ready), Ordering::Relaxed); if ready.is_writable() { writer = io.writer.take(); - io.readiness.fetch_or(Readiness::Writable as usize, - Ordering::Relaxed); + } + if !(ready & (!mio::Ready::writable())).is_empty() { + reader = io.reader.take(); } } drop(inner); @@ -497,14 +487,16 @@ impl Inner { -> Option { debug!("scheduling direction for: {}", token); let sched = self.io_dispatch.get_mut(token).unwrap(); - let (slot, bit) = match dir { - Direction::Read => (&mut sched.reader, Readiness::Readable as usize), - Direction::Write => (&mut sched.writer, Readiness::Writable as usize), + let (slot, ready) = match dir { + Direction::Read => (&mut sched.reader, !mio::Ready::writable()), + Direction::Write => (&mut sched.writer, mio::Ready::writable()), }; - if sched.readiness.load(Ordering::SeqCst) & bit != 0 { + if sched.readiness.load(Ordering::SeqCst) & ready2usize(ready) != 0 { + debug!("cancelling block"); *slot = None; Some(wake) } else { + debug!("blocking"); *slot = Some(wake); None } @@ -754,29 +746,84 @@ impl FnBox for F { } } +fn read_ready() -> mio::Ready { + mio::Ready::readable() | platform::hup() +} + +const READ: usize = 1 << 0; +const WRITE: usize = 1 << 1; + +fn ready2usize(ready: mio::Ready) -> usize { + let mut bits = 0; + if ready.is_readable() { + bits |= READ; + } + if ready.is_writable() { + bits |= WRITE; + } + bits | platform::ready2usize(ready) +} + +fn usize2ready(bits: usize) -> mio::Ready { + let mut ready = mio::Ready::empty(); + if bits & READ != 0 { + ready.insert(mio::Ready::readable()); + } + if bits & WRITE != 0 { + ready.insert(mio::Ready::writable()); + } + ready | platform::usize2ready(bits) +} + #[cfg(unix)] mod platform { use mio::Ready; use mio::unix::UnixReady; - pub fn is_hup(event: &Ready) -> bool { - UnixReady::from(*event).is_hup() - } - pub fn hup() -> Ready { UnixReady::hup().into() } + + const HUP: usize = 1 << 2; + const ERROR: usize = 1 << 3; + + pub fn ready2usize(ready: Ready) -> usize { + let ready = UnixReady::from(ready); + let mut bits = 0; + if ready.is_error() { + bits |= ERROR; + } + if ready.is_hup() { + bits |= HUP; + } + bits + } + + pub fn usize2ready(bits: usize) -> Ready { + let mut ready = UnixReady::from(Ready::empty()); + if bits & HUP != 0 { + ready.insert(UnixReady::hup()); + } + if bits & ERROR != 0 { + ready.insert(UnixReady::error()); + } + ready.into() + } } #[cfg(windows)] mod platform { use mio::Ready; - pub fn is_hup(_event: &Ready) -> bool { - false - } - pub fn hup() -> Ready { Ready::empty() } + + pub fn ready2usize(_r: Ready) -> usize { + 0 + } + + pub fn usize2ready(_r: usize) -> Ready { + Ready::empty() + } } diff --git a/src/reactor/poll_evented.rs b/src/reactor/poll_evented.rs index 26934b69d..401a922db 100644 --- a/src/reactor/poll_evented.rs +++ b/src/reactor/poll_evented.rs @@ -12,10 +12,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use futures::{Async, Poll}; use mio::event::Evented; +use mio::Ready; use tokio_io::{AsyncRead, AsyncWrite}; use reactor::{Handle, Remote}; -use reactor::Readiness::*; use reactor::io_token::IoToken; /// A concrete implementation of a stream of readiness notifications for I/O @@ -25,6 +25,10 @@ use reactor::io_token::IoToken; /// associated with a specific event loop and source of events that will be /// registered with an event loop. /// +/// An instance of `PollEvented` is essentially the bridge between the `mio` +/// world and the `tokio-core` world, providing abstractions to receive +/// notifications about changes to an object's `mio::Ready` state. +/// /// Each readiness stream has a number of methods to test whether the underlying /// object is readable or writable. Once the methods return that an object is /// readable/writable, then it will continue to do so until the `need_read` or @@ -38,6 +42,27 @@ use reactor::io_token::IoToken; /// You can find more information about creating a custom I/O object [online]. /// /// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io +/// +/// ## Readiness to read/write +/// +/// A `PollEvented` allows listenting and waiting for an arbitrary `mio::Ready` +/// instance, including the platform-specific contents of `mio::Ready`. At most +/// two future tasks, however, can be waiting on a `PollEvented`. The +/// `need_read` and `need_write` methods can block two separate tasks, one on +/// reading and one on writing. Not all I/O events correspond to read/write, +/// however! +/// +/// To account for this a `PollEvented` gets a little interesting when working +/// with an arbitrary instance of `mio::Ready` that may not map precisely to +/// "write" and "read" tasks. Currently it is defined that instances of +/// `mio::Ready` that do *not* return true from `is_writable` are all notified +/// through `need_read`, or the read task. +/// +/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block +/// the read task of this `PollEvented` if the `hup` event isn't available. +/// Essentially a good rule of thumb is that if you're using the `poll_ready` +/// method you want to also use `need_read` to signal blocking and you should +/// otherwise probably avoid using two tasks on the same `PollEvented`. pub struct PollEvented { token: IoToken, handle: Remote, @@ -98,17 +123,16 @@ impl PollEvented { /// the stream is readable again. In other words, this method is only safe /// to call from within the context of a future's task, typically done in a /// `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::readable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. pub fn poll_read(&self) -> Async<()> { - if self.readiness.load(Ordering::SeqCst) & Readable as usize != 0 { - return Async::Ready(()) - } - self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); - if self.readiness.load(Ordering::SeqCst) & Readable as usize != 0 { - Async::Ready(()) - } else { - self.token.schedule_read(&self.handle); - Async::NotReady - } + self.poll_ready(super::read_ready()) + .map(|_| ()) } /// Tests to see if this source is ready to be written to or not. @@ -118,16 +142,58 @@ impl PollEvented { /// the stream is writable again. In other words, this method is only safe /// to call from within the context of a future's task, typically done in a /// `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::writable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. pub fn poll_write(&self) -> Async<()> { - if self.readiness.load(Ordering::SeqCst) & Writable as usize != 0 { - return Async::Ready(()) + self.poll_ready(Ready::writable()) + .map(|_| ()) + } + + /// Test to see whether this source fulfills any condition listed in `mask` + /// provided. + /// + /// The `mask` given here is a mio `Ready` set of possible events. This can + /// contain any events like read/write but also platform-specific events + /// such as hup and error. The `mask` indicates events that are interested + /// in being ready. + /// + /// If any event in `mask` is ready then it is returned through + /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty + /// and contains all events that are currently ready in the `mask` provided. + /// + /// If no events are ready in the `mask` provided then the current task is + /// scheduled to receive a notification when any of them become ready. If + /// the `writable` event is contained within `mask` then this + /// `PollEvented`'s `write` task will be blocked and otherwise the `read` + /// task will be blocked. This is generally only relevant if you're working + /// with this `PollEvented` object on multiple tasks. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_ready(&self, mask: Ready) -> Async { + let bits = super::ready2usize(mask); + match self.readiness.load(Ordering::SeqCst) & bits { + 0 => {} + n => return Async::Ready(super::usize2ready(n)), } self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); - if self.readiness.load(Ordering::SeqCst) & Writable as usize != 0 { - Async::Ready(()) - } else { - self.token.schedule_write(&self.handle); - Async::NotReady + match self.readiness.load(Ordering::SeqCst) & bits { + 0 => { + if mask.is_writable() { + self.need_write(); + } else { + self.need_read(); + } + Async::NotReady + } + n => Async::Ready(super::usize2ready(n)), } } @@ -139,15 +205,24 @@ impl PollEvented { /// informs this readiness stream that the underlying object is no longer /// readable, typically because a "would block" error was seen. /// - /// The flag indicating that this stream is readable is unset and the - /// current task is scheduled to receive a notification when the stream is - /// then again readable. + /// *All* readiness bits associated with this stream except the writable bit + /// will be reset when this method is called. The current task is then + /// scheduled to receive a notification whenever anything changes other than + /// the writable bit. Note that this typically just means the readable bit + /// is used here, but if you're using a custom I/O object for events like + /// hup/error this may also be relevant. /// /// Note that it is also only valid to call this method if `poll_read` /// previously indicated that the object is readable. That is, this function /// must always be paired with calls to `poll_read` previously. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. pub fn need_read(&self) { - self.readiness.fetch_and(!(Readable as usize), Ordering::SeqCst); + let bits = super::ready2usize(super::read_ready()); + self.readiness.fetch_and(!bits, Ordering::SeqCst); self.token.schedule_read(&self.handle) } @@ -166,8 +241,14 @@ impl PollEvented { /// Note that it is also only valid to call this method if `poll_write` /// previously indicated that the object is writable. That is, this function /// must always be paired with calls to `poll_write` previously. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. pub fn need_write(&self) { - self.readiness.fetch_and(!(Writable as usize), Ordering::SeqCst); + let bits = super::ready2usize(Ready::writable()); + self.readiness.fetch_and(!bits, Ordering::SeqCst); self.token.schedule_write(&self.handle) }