FreeBSD uses a separate kqueue filter type for lio_listio.  This change
adds support for that filter type.  Full functionality will be provided
by the mio-aio and tokio-file crates.

* Add PollEvented::into_inner

Consumes a PollEvented and returns its inner io object.  Useful for io
types that have exclusive ownership of a resource.

See also https://github.com/tokio-rs/tokio-core/commit/9400ffb
This commit is contained in:
Alan Somers 2018-02-26 10:39:57 -07:00 committed by Carl Lerche
parent 5334de5e44
commit 3b64fe9363
2 changed files with 66 additions and 22 deletions

View File

@ -582,9 +582,14 @@ mod platform {
use mio::Ready;
use mio::unix::UnixReady;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
#[cfg(target_os = "dragonfly")]
pub fn all() -> Ready {
hup() | UnixReady::aio().into()
hup() | UnixReady::aio()
}
#[cfg(target_os = "freebsd")]
pub fn all() -> Ready {
hup() | UnixReady::aio() | UnixReady::lio()
}
#[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
@ -599,10 +604,11 @@ mod platform {
const HUP: usize = 1 << 2;
const ERROR: usize = 1 << 3;
const AIO: usize = 1 << 4;
const LIO: usize = 1 << 5;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
fn is_aio(ready: &Ready) -> bool {
ready.is_aio()
UnixReady::from(*ready).is_aio()
}
#[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
@ -610,12 +616,25 @@ mod platform {
false
}
#[cfg(target_os = "freebsd")]
fn is_lio(ready: &Ready) -> bool {
UnixReady::from(*ready).is_lio()
}
#[cfg(not(target_os = "freebsd"))]
fn is_lio(_ready: &Ready) -> bool {
false
}
pub fn ready2usize(ready: Ready) -> usize {
let ready = UnixReady::from(ready);
let mut bits = 0;
if is_aio(&ready) {
bits |= AIO;
}
if is_lio(&ready) {
bits |= LIO;
}
if ready.is_error() {
bits |= ERROR;
}
@ -637,11 +656,24 @@ mod platform {
// aio not available here → empty
}
#[cfg(target_os = "freebsd")]
fn usize2ready_lio(ready: &mut UnixReady) {
ready.insert(UnixReady::lio());
}
#[cfg(not(target_os = "freebsd"))]
fn usize2ready_lio(_ready: &mut UnixReady) {
// lio not available here → empty
}
pub fn usize2ready(bits: usize) -> Ready {
let mut ready = UnixReady::from(Ready::empty());
if bits & AIO != 0 {
usize2ready_aio(&mut ready);
}
if bits & LIO != 0 {
usize2ready_lio(&mut ready);
}
if bits & HUP != 0 {
ready.insert(UnixReady::hup());
}

View File

@ -17,6 +17,12 @@ use tokio_io::{AsyncRead, AsyncWrite};
use reactor::{Handle, Direction};
struct Registration {
pub token: usize,
pub handle: Handle,
pub readiness: usize,
}
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
///
@ -63,9 +69,7 @@ use reactor::{Handle, Direction};
/// method you want to also use `need_read` to signal blocking and you should
/// otherwise probably avoid using two tasks on the same `PollEvented`.
pub struct PollEvented<E> {
token: usize,
handle: Handle,
readiness: usize,
registration: Registration,
io: E,
}
@ -91,9 +95,11 @@ impl<E> PollEvented<E> {
};
Ok(PollEvented {
token,
readiness: 0,
handle: handle.clone(),
registration: Registration {
token: token,
readiness: 0,
handle: handle.clone()
},
io: io,
})
}
@ -162,19 +168,20 @@ impl<E> PollEvented<E> {
pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> {
let bits = super::ready2usize(mask);
match self.readiness & bits {
match self.registration.readiness & bits {
0 => {}
n => return Async::Ready(super::usize2ready(n)),
}
let token_readiness = self.handle.inner().map(|inner| {
let token_readiness = self.registration.handle.inner().map(|inner| {
let io_dispatch = inner.io_dispatch.read().unwrap();
io_dispatch[self.token].readiness.swap(0, Ordering::SeqCst)
let token = self.registration.token;
io_dispatch[token].readiness.swap(0, Ordering::SeqCst)
}).unwrap_or(0);
self.readiness |= token_readiness;
self.registration.readiness |= token_readiness;
match self.readiness & bits {
match self.registration.readiness & bits {
0 => {
if mask.is_writable() {
if self.need_write().is_err() {
@ -224,13 +231,13 @@ impl<E> PollEvented<E> {
/// task.
pub fn need_read(&mut self) -> io::Result<()> {
let bits = super::ready2usize(super::read_ready());
self.readiness &= !bits;
self.registration.readiness &= !bits;
let inner = match self.handle.inner() {
let inner = match self.registration.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.token, Direction::Read);
inner.schedule(self.registration.token, Direction::Read);
Ok(())
}
@ -264,20 +271,20 @@ impl<E> PollEvented<E> {
/// task.
pub fn need_write(&mut self) -> io::Result<()> {
let bits = super::ready2usize(Ready::writable());
self.readiness &= !bits;
self.registration.readiness &= !bits;
let inner = match self.handle.inner() {
let inner = match self.registration.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.token, Direction::Write);
inner.schedule(self.registration.token, Direction::Write);
Ok(())
}
/// Returns a reference to the event loop handle that this readiness stream
/// is associated with.
pub fn handle(&self) -> &Handle {
&self.handle
&self.registration.handle
}
/// Returns a shared reference to the underlying I/O object this readiness
@ -292,6 +299,11 @@ impl<E> PollEvented<E> {
&mut self.io
}
/// Consumes the `PollEvented` and returns the underlying I/O object
pub fn into_inner(self) -> E {
self.io
}
/// Deregisters this source of events from the reactor core specified.
///
/// This method can optionally be called to unregister the underlying I/O
@ -378,7 +390,7 @@ fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
}
}
impl<E> Drop for PollEvented<E> {
impl Drop for Registration {
fn drop(&mut self) {
if let Some(inner) = self.handle.inner() {
inner.drop_source(self.token);