Delete the IoToken type (#83)

This was added oh-so-long ago and nowadays is just used by one consumer,
`PollEvented`. Let's just inline the already small implementation directly into
`PollEvented` which should make it easier to modify in the future as well
This commit is contained in:
Alex Crichton 2018-01-18 12:48:59 -06:00 committed by Carl Lerche
parent 025f52aadc
commit 826e27685c
3 changed files with 35 additions and 178 deletions

View File

@ -1,168 +0,0 @@
use std::sync::atomic::Ordering;
use std::io;
use mio::event::Evented;
use reactor::{Handle, Direction};
/// A token that identifies an active I/O resource.
pub struct IoToken {
token: usize,
handle: Handle,
}
impl IoToken {
/// Add a new source to an event loop, returning a token that can be used to
/// identify this source.
///
/// When a new I/O object is created it needs to be communicated to the
/// event loop to ensure that it's registered and ready to receive
/// notifications. The event loop will then respond back with the I/O object
/// and a token which can be used to send more messages to the event loop.
///
/// The token returned is then passed in turn to each of the methods below
/// to interact with notifications on the I/O object itself.
///
/// # Panics
///
/// The returned future will panic if the event loop this handle is
/// associated with has gone away, or if there is an error communicating
/// with the event loop.
pub fn new(source: &Evented, handle: &Handle) -> io::Result<IoToken> {
match handle.inner() {
Some(inner) => {
let token = try!(inner.add_source(source));
let handle = handle.clone();
Ok(IoToken { token, handle })
}
None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")),
}
}
/// Returns a reference to this I/O token's event loop's handle.
pub fn handle(&self) -> &Handle {
&self.handle
}
/// Consumes the last readiness notification the token this source is for
/// registered.
///
/// Currently sources receive readiness notifications on an edge-basis. That
/// is, once you receive a notification that an object can be read, you
/// won't receive any more notifications until all of that data has been
/// read.
///
/// The event loop will fill in this information and then inform futures
/// that they're ready to go with the `schedule` method, and then the `poll`
/// method can use this to figure out what happened.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
// TODO: this should really return a proper newtype/enum, not a usize
pub fn take_readiness(&self) -> usize {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return 0,
};
let io_dispatch = inner.io_dispatch.read().unwrap();
io_dispatch[self.token].readiness.swap(0, Ordering::SeqCst)
}
/// Schedule the current future task to receive a notification when the
/// corresponding I/O object is readable.
///
/// Once an I/O object has been registered with the event loop through the
/// `add_source` method, this method can be used with the assigned token to
/// notify the current future task when the next read notification comes in.
///
/// The current task will only receive a notification **once** and to
/// receive further notifications it will need to call `schedule_read`
/// again.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self) -> io::Result<()> {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.token, Direction::Read);
Ok(())
}
/// Schedule the current future task to receive a notification when the
/// corresponding I/O object is writable.
///
/// Once an I/O object has been registered with the event loop through the
/// `add_source` method, this method can be used with the assigned token to
/// notify the current future task when the next write notification comes
/// in.
///
/// The current task will only receive a notification **once** and to
/// receive further notifications it will need to call `schedule_write`
/// again.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self) -> io::Result<()> {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.token, Direction::Write);
Ok(())
}
/// Unregister all information associated with a token on an event loop,
/// deallocating all internal resources assigned to the given token.
///
/// This method should be called whenever a source of events is being
/// destroyed. This will ensure that the event loop can reuse the `token`
/// for another I/O object if necessary and also remove it from any poll
/// notifications and callbacks.
///
/// Note that wake callbacks may still be invoked after this method is
/// called as it may take some time for the message to drop a source to
/// reach the event loop. Despite this fact, this method will attempt to
/// ensure that the callbacks are **not** invoked, so pending scheduled
/// callbacks cannot be relied upon to get called.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
pub fn drop_source(&self) {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
inner.drop_source(self.token)
}
}

View File

@ -33,7 +33,6 @@ use mio;
use mio::event::Evented;
use slab::Slab;
mod io_token;
mod global;
mod poll_evented;

View File

@ -15,8 +15,7 @@ use mio::event::Evented;
use mio::Ready;
use tokio_io::{AsyncRead, AsyncWrite};
use reactor::Handle;
use reactor::io_token::IoToken;
use reactor::{Handle, Direction};
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
@ -64,7 +63,8 @@ use reactor::io_token::IoToken;
/// method you want to also use `need_read` to signal blocking and you should
/// otherwise probably avoid using two tasks on the same `PollEvented`.
pub struct PollEvented<E> {
token: IoToken,
handle: Handle,
token: usize,
readiness: AtomicUsize,
io: E,
}
@ -83,10 +83,16 @@ impl<E> PollEvented<E> {
pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>>
where E: Evented,
{
let token = IoToken::new(&io, handle)?;
let token = match handle.inner() {
Some(inner) => inner.add_source(&io)?,
None => {
return Err(io::Error::new(io::ErrorKind::Other, "event loop gone"))
}
};
Ok(PollEvented {
token,
handle: handle.clone(),
readiness: AtomicUsize::new(0),
io: io,
})
@ -159,7 +165,13 @@ impl<E> PollEvented<E> {
0 => {}
n => return Async::Ready(super::usize2ready(n)),
}
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
let token_readiness = self.handle.inner().map(|inner| {
let io_dispatch = inner.io_dispatch.read().unwrap();
io_dispatch[self.token].readiness.swap(0, Ordering::SeqCst)
}).unwrap_or(0);
self.readiness.fetch_or(token_readiness, Ordering::SeqCst);
match self.readiness.load(Ordering::SeqCst) & bits {
0 => {
if mask.is_writable() {
@ -211,7 +223,13 @@ impl<E> PollEvented<E> {
pub fn need_read(&self) -> io::Result<()> {
let bits = super::ready2usize(super::read_ready());
self.readiness.fetch_and(!bits, Ordering::SeqCst);
self.token.schedule_read()
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.token, Direction::Read);
Ok(())
}
/// Indicates to this source of events that the corresponding I/O object is
@ -245,13 +263,19 @@ impl<E> PollEvented<E> {
pub fn need_write(&self) -> io::Result<()> {
let bits = super::ready2usize(Ready::writable());
self.readiness.fetch_and(!bits, Ordering::SeqCst);
self.token.schedule_write()
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.token, Direction::Write);
Ok(())
}
/// Returns a reference to the event loop handle that this readiness stream
/// is associated with.
pub fn handle(&self) -> &Handle {
self.token.handle()
&self.handle
}
/// Returns a shared reference to the underlying I/O object this readiness
@ -399,6 +423,8 @@ fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
impl<E> Drop for PollEvented<E> {
fn drop(&mut self) {
self.token.drop_source();
if let Some(inner) = self.handle.inner() {
inner.drop_source(self.token);
}
}
}