diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index f359f89f2..8a22d1a70 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -582,9 +582,14 @@ mod platform { use mio::Ready; use mio::unix::UnixReady; - #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] + #[cfg(target_os = "dragonfly")] pub fn all() -> Ready { - hup() | UnixReady::aio().into() + hup() | UnixReady::aio() + } + + #[cfg(target_os = "freebsd")] + pub fn all() -> Ready { + hup() | UnixReady::aio() | UnixReady::lio() } #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] @@ -599,10 +604,11 @@ mod platform { const HUP: usize = 1 << 2; const ERROR: usize = 1 << 3; const AIO: usize = 1 << 4; + const LIO: usize = 1 << 5; #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] fn is_aio(ready: &Ready) -> bool { - ready.is_aio() + UnixReady::from(*ready).is_aio() } #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] @@ -610,12 +616,25 @@ mod platform { false } + #[cfg(target_os = "freebsd")] + fn is_lio(ready: &Ready) -> bool { + UnixReady::from(*ready).is_lio() + } + + #[cfg(not(target_os = "freebsd"))] + fn is_lio(_ready: &Ready) -> bool { + false + } + pub fn ready2usize(ready: Ready) -> usize { let ready = UnixReady::from(ready); let mut bits = 0; if is_aio(&ready) { bits |= AIO; } + if is_lio(&ready) { + bits |= LIO; + } if ready.is_error() { bits |= ERROR; } @@ -637,11 +656,24 @@ mod platform { // aio not available here → empty } + #[cfg(target_os = "freebsd")] + fn usize2ready_lio(ready: &mut UnixReady) { + ready.insert(UnixReady::lio()); + } + + #[cfg(not(target_os = "freebsd"))] + fn usize2ready_lio(_ready: &mut UnixReady) { + // lio not available here → empty + } + pub fn usize2ready(bits: usize) -> Ready { let mut ready = UnixReady::from(Ready::empty()); if bits & AIO != 0 { usize2ready_aio(&mut ready); } + if bits & LIO != 0 { + usize2ready_lio(&mut ready); + } if bits & HUP != 0 { ready.insert(UnixReady::hup()); } diff --git a/src/reactor/poll_evented.rs b/src/reactor/poll_evented.rs index fcd36e5d8..00b71e06a 100644 --- a/src/reactor/poll_evented.rs +++ b/src/reactor/poll_evented.rs @@ -17,6 +17,12 @@ use tokio_io::{AsyncRead, AsyncWrite}; use reactor::{Handle, Direction}; +struct Registration { + pub token: usize, + pub handle: Handle, + pub readiness: usize, +} + /// A concrete implementation of a stream of readiness notifications for I/O /// objects that originates from an event loop. /// @@ -63,9 +69,7 @@ use reactor::{Handle, Direction}; /// method you want to also use `need_read` to signal blocking and you should /// otherwise probably avoid using two tasks on the same `PollEvented`. pub struct PollEvented { - token: usize, - handle: Handle, - readiness: usize, + registration: Registration, io: E, } @@ -91,9 +95,11 @@ impl PollEvented { }; Ok(PollEvented { - token, - readiness: 0, - handle: handle.clone(), + registration: Registration { + token: token, + readiness: 0, + handle: handle.clone() + }, io: io, }) } @@ -162,19 +168,20 @@ impl PollEvented { pub fn poll_ready(&mut self, mask: Ready) -> Async { let bits = super::ready2usize(mask); - match self.readiness & bits { + match self.registration.readiness & bits { 0 => {} n => return Async::Ready(super::usize2ready(n)), } - let token_readiness = self.handle.inner().map(|inner| { + let token_readiness = self.registration.handle.inner().map(|inner| { let io_dispatch = inner.io_dispatch.read().unwrap(); - io_dispatch[self.token].readiness.swap(0, Ordering::SeqCst) + let token = self.registration.token; + io_dispatch[token].readiness.swap(0, Ordering::SeqCst) }).unwrap_or(0); - self.readiness |= token_readiness; + self.registration.readiness |= token_readiness; - match self.readiness & bits { + match self.registration.readiness & bits { 0 => { if mask.is_writable() { if self.need_write().is_err() { @@ -224,13 +231,13 @@ impl PollEvented { /// task. pub fn need_read(&mut self) -> io::Result<()> { let bits = super::ready2usize(super::read_ready()); - self.readiness &= !bits; + self.registration.readiness &= !bits; - let inner = match self.handle.inner() { + let inner = match self.registration.handle.inner() { Some(inner) => inner, None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), }; - inner.schedule(self.token, Direction::Read); + inner.schedule(self.registration.token, Direction::Read); Ok(()) } @@ -264,20 +271,20 @@ impl PollEvented { /// task. pub fn need_write(&mut self) -> io::Result<()> { let bits = super::ready2usize(Ready::writable()); - self.readiness &= !bits; + self.registration.readiness &= !bits; - let inner = match self.handle.inner() { + let inner = match self.registration.handle.inner() { Some(inner) => inner, None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), }; - inner.schedule(self.token, Direction::Write); + inner.schedule(self.registration.token, Direction::Write); Ok(()) } /// Returns a reference to the event loop handle that this readiness stream /// is associated with. pub fn handle(&self) -> &Handle { - &self.handle + &self.registration.handle } /// Returns a shared reference to the underlying I/O object this readiness @@ -292,6 +299,11 @@ impl PollEvented { &mut self.io } + /// Consumes the `PollEvented` and returns the underlying I/O object + pub fn into_inner(self) -> E { + self.io + } + /// Deregisters this source of events from the reactor core specified. /// /// This method can optionally be called to unregister the underlying I/O @@ -378,7 +390,7 @@ fn is_wouldblock(r: &io::Result) -> bool { } } -impl Drop for PollEvented { +impl Drop for Registration { fn drop(&mut self) { if let Some(inner) = self.handle.inner() { inner.drop_source(self.token);