mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
Add PollEvented::poll_ready
This commit adds a general-purpose method for querying the readiness of a `PollEvented` type. This new method, `poll_ready`, takes a blanket `mio::Ready` and tests if any part of it is ready. The purpose of this is to expose platform-specific events through `PollReady` such as `hup` and `error` events other than just the platform-agnostic readable/writable events. The semanatics of this method are: * The `poll_ready` function takes a mask, and the return value is either `Async::Ready` with a subset of these events that are ready or `None` if none of them are ready. * There can be up to two tasks blocked on a `PollEvented`, so we need to pick which one is suitable for these new events. Currently all events are routed to the `read` task unless the writable bit is set. This is mostly only relevant for multi-task usage or if you're manually calling `need_read` and/or `need_write`, and hopefully the docs will cover this now.
This commit is contained in:
parent
b92fd2d22a
commit
4dd3d30f2a
@ -139,13 +139,6 @@ enum Message {
|
||||
Run(Box<FnBox>),
|
||||
}
|
||||
|
||||
#[repr(usize)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
enum Readiness {
|
||||
Readable = 1,
|
||||
Writable = 2,
|
||||
}
|
||||
|
||||
const TOKEN_MESSAGES: mio::Token = mio::Token(0);
|
||||
const TOKEN_FUTURE: mio::Token = mio::Token(1);
|
||||
const TOKEN_START: usize = 2;
|
||||
@ -330,15 +323,12 @@ impl Core {
|
||||
let mut writer = None;
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
if let Some(io) = inner.io_dispatch.get_mut(token) {
|
||||
if ready.is_readable() || platform::is_hup(&ready) {
|
||||
reader = io.reader.take();
|
||||
io.readiness.fetch_or(Readiness::Readable as usize,
|
||||
Ordering::Relaxed);
|
||||
}
|
||||
io.readiness.fetch_or(ready2usize(ready), Ordering::Relaxed);
|
||||
if ready.is_writable() {
|
||||
writer = io.writer.take();
|
||||
io.readiness.fetch_or(Readiness::Writable as usize,
|
||||
Ordering::Relaxed);
|
||||
}
|
||||
if !(ready & (!mio::Ready::writable())).is_empty() {
|
||||
reader = io.reader.take();
|
||||
}
|
||||
}
|
||||
drop(inner);
|
||||
@ -497,14 +487,16 @@ impl Inner {
|
||||
-> Option<Task> {
|
||||
debug!("scheduling direction for: {}", token);
|
||||
let sched = self.io_dispatch.get_mut(token).unwrap();
|
||||
let (slot, bit) = match dir {
|
||||
Direction::Read => (&mut sched.reader, Readiness::Readable as usize),
|
||||
Direction::Write => (&mut sched.writer, Readiness::Writable as usize),
|
||||
let (slot, ready) = match dir {
|
||||
Direction::Read => (&mut sched.reader, !mio::Ready::writable()),
|
||||
Direction::Write => (&mut sched.writer, mio::Ready::writable()),
|
||||
};
|
||||
if sched.readiness.load(Ordering::SeqCst) & bit != 0 {
|
||||
if sched.readiness.load(Ordering::SeqCst) & ready2usize(ready) != 0 {
|
||||
debug!("cancelling block");
|
||||
*slot = None;
|
||||
Some(wake)
|
||||
} else {
|
||||
debug!("blocking");
|
||||
*slot = Some(wake);
|
||||
None
|
||||
}
|
||||
@ -754,29 +746,84 @@ impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
|
||||
}
|
||||
}
|
||||
|
||||
fn read_ready() -> mio::Ready {
|
||||
mio::Ready::readable() | platform::hup()
|
||||
}
|
||||
|
||||
const READ: usize = 1 << 0;
|
||||
const WRITE: usize = 1 << 1;
|
||||
|
||||
fn ready2usize(ready: mio::Ready) -> usize {
|
||||
let mut bits = 0;
|
||||
if ready.is_readable() {
|
||||
bits |= READ;
|
||||
}
|
||||
if ready.is_writable() {
|
||||
bits |= WRITE;
|
||||
}
|
||||
bits | platform::ready2usize(ready)
|
||||
}
|
||||
|
||||
fn usize2ready(bits: usize) -> mio::Ready {
|
||||
let mut ready = mio::Ready::empty();
|
||||
if bits & READ != 0 {
|
||||
ready.insert(mio::Ready::readable());
|
||||
}
|
||||
if bits & WRITE != 0 {
|
||||
ready.insert(mio::Ready::writable());
|
||||
}
|
||||
ready | platform::usize2ready(bits)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
mod platform {
|
||||
use mio::Ready;
|
||||
use mio::unix::UnixReady;
|
||||
|
||||
pub fn is_hup(event: &Ready) -> bool {
|
||||
UnixReady::from(*event).is_hup()
|
||||
}
|
||||
|
||||
pub fn hup() -> Ready {
|
||||
UnixReady::hup().into()
|
||||
}
|
||||
|
||||
const HUP: usize = 1 << 2;
|
||||
const ERROR: usize = 1 << 3;
|
||||
|
||||
pub fn ready2usize(ready: Ready) -> usize {
|
||||
let ready = UnixReady::from(ready);
|
||||
let mut bits = 0;
|
||||
if ready.is_error() {
|
||||
bits |= ERROR;
|
||||
}
|
||||
if ready.is_hup() {
|
||||
bits |= HUP;
|
||||
}
|
||||
bits
|
||||
}
|
||||
|
||||
pub fn usize2ready(bits: usize) -> Ready {
|
||||
let mut ready = UnixReady::from(Ready::empty());
|
||||
if bits & HUP != 0 {
|
||||
ready.insert(UnixReady::hup());
|
||||
}
|
||||
if bits & ERROR != 0 {
|
||||
ready.insert(UnixReady::error());
|
||||
}
|
||||
ready.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
mod platform {
|
||||
use mio::Ready;
|
||||
|
||||
pub fn is_hup(_event: &Ready) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn hup() -> Ready {
|
||||
Ready::empty()
|
||||
}
|
||||
|
||||
pub fn ready2usize(_r: Ready) -> usize {
|
||||
0
|
||||
}
|
||||
|
||||
pub fn usize2ready(_r: usize) -> Ready {
|
||||
Ready::empty()
|
||||
}
|
||||
}
|
||||
|
@ -12,10 +12,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use mio::event::Evented;
|
||||
use mio::Ready;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use reactor::{Handle, Remote};
|
||||
use reactor::Readiness::*;
|
||||
use reactor::io_token::IoToken;
|
||||
|
||||
/// A concrete implementation of a stream of readiness notifications for I/O
|
||||
@ -25,6 +25,10 @@ use reactor::io_token::IoToken;
|
||||
/// associated with a specific event loop and source of events that will be
|
||||
/// registered with an event loop.
|
||||
///
|
||||
/// An instance of `PollEvented` is essentially the bridge between the `mio`
|
||||
/// world and the `tokio-core` world, providing abstractions to receive
|
||||
/// notifications about changes to an object's `mio::Ready` state.
|
||||
///
|
||||
/// Each readiness stream has a number of methods to test whether the underlying
|
||||
/// object is readable or writable. Once the methods return that an object is
|
||||
/// readable/writable, then it will continue to do so until the `need_read` or
|
||||
@ -38,6 +42,27 @@ use reactor::io_token::IoToken;
|
||||
/// You can find more information about creating a custom I/O object [online].
|
||||
///
|
||||
/// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io
|
||||
///
|
||||
/// ## Readiness to read/write
|
||||
///
|
||||
/// A `PollEvented` allows listenting and waiting for an arbitrary `mio::Ready`
|
||||
/// instance, including the platform-specific contents of `mio::Ready`. At most
|
||||
/// two future tasks, however, can be waiting on a `PollEvented`. The
|
||||
/// `need_read` and `need_write` methods can block two separate tasks, one on
|
||||
/// reading and one on writing. Not all I/O events correspond to read/write,
|
||||
/// however!
|
||||
///
|
||||
/// To account for this a `PollEvented` gets a little interesting when working
|
||||
/// with an arbitrary instance of `mio::Ready` that may not map precisely to
|
||||
/// "write" and "read" tasks. Currently it is defined that instances of
|
||||
/// `mio::Ready` that do *not* return true from `is_writable` are all notified
|
||||
/// through `need_read`, or the read task.
|
||||
///
|
||||
/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block
|
||||
/// the read task of this `PollEvented` if the `hup` event isn't available.
|
||||
/// Essentially a good rule of thumb is that if you're using the `poll_ready`
|
||||
/// method you want to also use `need_read` to signal blocking and you should
|
||||
/// otherwise probably avoid using two tasks on the same `PollEvented`.
|
||||
pub struct PollEvented<E> {
|
||||
token: IoToken,
|
||||
handle: Remote,
|
||||
@ -98,17 +123,16 @@ impl<E> PollEvented<E> {
|
||||
/// the stream is readable again. In other words, this method is only safe
|
||||
/// to call from within the context of a future's task, typically done in a
|
||||
/// `Future::poll` method.
|
||||
///
|
||||
/// This is mostly equivalent to `self.poll_ready(Ready::readable())`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if called outside the context of a future's
|
||||
/// task.
|
||||
pub fn poll_read(&self) -> Async<()> {
|
||||
if self.readiness.load(Ordering::SeqCst) & Readable as usize != 0 {
|
||||
return Async::Ready(())
|
||||
}
|
||||
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
|
||||
if self.readiness.load(Ordering::SeqCst) & Readable as usize != 0 {
|
||||
Async::Ready(())
|
||||
} else {
|
||||
self.token.schedule_read(&self.handle);
|
||||
Async::NotReady
|
||||
}
|
||||
self.poll_ready(super::read_ready())
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Tests to see if this source is ready to be written to or not.
|
||||
@ -118,16 +142,58 @@ impl<E> PollEvented<E> {
|
||||
/// the stream is writable again. In other words, this method is only safe
|
||||
/// to call from within the context of a future's task, typically done in a
|
||||
/// `Future::poll` method.
|
||||
///
|
||||
/// This is mostly equivalent to `self.poll_ready(Ready::writable())`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if called outside the context of a future's
|
||||
/// task.
|
||||
pub fn poll_write(&self) -> Async<()> {
|
||||
if self.readiness.load(Ordering::SeqCst) & Writable as usize != 0 {
|
||||
return Async::Ready(())
|
||||
self.poll_ready(Ready::writable())
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Test to see whether this source fulfills any condition listed in `mask`
|
||||
/// provided.
|
||||
///
|
||||
/// The `mask` given here is a mio `Ready` set of possible events. This can
|
||||
/// contain any events like read/write but also platform-specific events
|
||||
/// such as hup and error. The `mask` indicates events that are interested
|
||||
/// in being ready.
|
||||
///
|
||||
/// If any event in `mask` is ready then it is returned through
|
||||
/// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty
|
||||
/// and contains all events that are currently ready in the `mask` provided.
|
||||
///
|
||||
/// If no events are ready in the `mask` provided then the current task is
|
||||
/// scheduled to receive a notification when any of them become ready. If
|
||||
/// the `writable` event is contained within `mask` then this
|
||||
/// `PollEvented`'s `write` task will be blocked and otherwise the `read`
|
||||
/// task will be blocked. This is generally only relevant if you're working
|
||||
/// with this `PollEvented` object on multiple tasks.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if called outside the context of a future's
|
||||
/// task.
|
||||
pub fn poll_ready(&self, mask: Ready) -> Async<Ready> {
|
||||
let bits = super::ready2usize(mask);
|
||||
match self.readiness.load(Ordering::SeqCst) & bits {
|
||||
0 => {}
|
||||
n => return Async::Ready(super::usize2ready(n)),
|
||||
}
|
||||
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
|
||||
if self.readiness.load(Ordering::SeqCst) & Writable as usize != 0 {
|
||||
Async::Ready(())
|
||||
} else {
|
||||
self.token.schedule_write(&self.handle);
|
||||
Async::NotReady
|
||||
match self.readiness.load(Ordering::SeqCst) & bits {
|
||||
0 => {
|
||||
if mask.is_writable() {
|
||||
self.need_write();
|
||||
} else {
|
||||
self.need_read();
|
||||
}
|
||||
Async::NotReady
|
||||
}
|
||||
n => Async::Ready(super::usize2ready(n)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,15 +205,24 @@ impl<E> PollEvented<E> {
|
||||
/// informs this readiness stream that the underlying object is no longer
|
||||
/// readable, typically because a "would block" error was seen.
|
||||
///
|
||||
/// The flag indicating that this stream is readable is unset and the
|
||||
/// current task is scheduled to receive a notification when the stream is
|
||||
/// then again readable.
|
||||
/// *All* readiness bits associated with this stream except the writable bit
|
||||
/// will be reset when this method is called. The current task is then
|
||||
/// scheduled to receive a notification whenever anything changes other than
|
||||
/// the writable bit. Note that this typically just means the readable bit
|
||||
/// is used here, but if you're using a custom I/O object for events like
|
||||
/// hup/error this may also be relevant.
|
||||
///
|
||||
/// Note that it is also only valid to call this method if `poll_read`
|
||||
/// previously indicated that the object is readable. That is, this function
|
||||
/// must always be paired with calls to `poll_read` previously.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if called outside the context of a future's
|
||||
/// task.
|
||||
pub fn need_read(&self) {
|
||||
self.readiness.fetch_and(!(Readable as usize), Ordering::SeqCst);
|
||||
let bits = super::ready2usize(super::read_ready());
|
||||
self.readiness.fetch_and(!bits, Ordering::SeqCst);
|
||||
self.token.schedule_read(&self.handle)
|
||||
}
|
||||
|
||||
@ -166,8 +241,14 @@ impl<E> PollEvented<E> {
|
||||
/// Note that it is also only valid to call this method if `poll_write`
|
||||
/// previously indicated that the object is writable. That is, this function
|
||||
/// must always be paired with calls to `poll_write` previously.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if called outside the context of a future's
|
||||
/// task.
|
||||
pub fn need_write(&self) {
|
||||
self.readiness.fetch_and(!(Writable as usize), Ordering::SeqCst);
|
||||
let bits = super::ready2usize(Ready::writable());
|
||||
self.readiness.fetch_and(!bits, Ordering::SeqCst);
|
||||
self.token.schedule_write(&self.handle)
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user