Merge pull request #87 from alexcrichton/less-arc

Don't store an Arc in ReadinessStream
This commit is contained in:
Aaron Turon 2016-08-21 21:18:53 -07:00 committed by GitHub
commit cff73b7a9d
11 changed files with 1546 additions and 1386 deletions

View File

@ -1,118 +1,107 @@
//! A thin wrapper around a mpsc queue and mio-based channel information
//!
//! Normally the standard library's channels would suffice but we unfortunately
//! need the `Sender<T>` half to be `Sync`, so to accomplish this for now we
//! just vendor the same mpsc queue as the one in the standard library and then
//! we pair that with the `mio::channel` module's Ctl pairs to control the
//! readiness notifications on the channel.
use std::cell::Cell;
use std::io;
use std::marker;
use std::sync::Arc;
use std::sync::mpsc::TryRecvError;
use mio;
use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
use futures::{Future, Poll};
use futures_io::IoFuture;
use mio::channel;
use mpsc_queue::{Queue, PopResult};
use {ReadinessStream, LoopHandle};
/// The transmission half of a channel used for sending messages to a receiver.
///
/// A `Sender` can be `clone`d to have multiple threads or instances sending
/// messages to one receiver.
///
/// This type is created by the `LoopHandle::channel` method.
pub struct Sender<T> {
ctl: SenderCtl,
inner: Arc<Queue<T>>,
tx: channel::Sender<T>,
}
/// The receiving half of a channel used for processing messages sent by a
/// `Sender`.
///
/// A `Receiver` cannot be cloned and is not `Sync`, so only one thread can
/// receive messages at a time.
///
/// This type is created by the `LoopHandle::channel` method.
pub struct Receiver<T> {
ctl: ReceiverCtl,
inner: Arc<Queue<T>>,
_marker: marker::PhantomData<Cell<()>>, // this type is not Sync
rx: ReadinessStream<channel::Receiver<T>>,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Queue::new());
let (tx, rx) = ctl_pair();
let tx = Sender {
ctl: tx,
inner: inner.clone(),
};
let rx = Receiver {
ctl: rx,
inner: inner.clone(),
_marker: marker::PhantomData,
};
(tx, rx)
impl LoopHandle {
/// Creates a new in-memory channel used for sending data across `Send +
/// 'static` boundaries, frequently threads.
///
/// This type can be used to conveniently send messages between futures.
/// Unlike the futures crate `channel` method and types, the returned tx/rx
/// pair is a multi-producer single-consumer (mpsc) channel *with no
/// backpressure*. Currently it's left up to the application to implement a
/// mechanism, if necessary, to avoid messages piling up.
///
/// The returned `Sender` can be used to send messages that are processed by
/// the returned `Receiver`. The `Sender` can be cloned to send messages
/// from multiple sources simultaneously.
pub fn channel<T>(self) -> (Sender<T>, IoFuture<Receiver<T>>)
where T: Send + 'static,
{
let (tx, rx) = channel::channel();
let rx = ReadinessStream::new(self, rx).map(|rx| Receiver { rx: rx });
(Sender { tx: tx }, rx.boxed())
}
}
impl<T> Sender<T> {
pub fn send(&self, data: T) -> io::Result<()> {
self.inner.push(data);
self.ctl.inc()
/// Sends a message to the corresponding receiver of this sender.
///
/// The message provided will be enqueued on the channel immediately, and
/// this function will return immediately. Keep in mind that the
/// underlying channel has infinite capacity, and this may not always be
/// desired.
///
/// If an I/O error happens while sending the message, or if the receiver
/// has gone away, then an error will be returned. Note that I/O errors here
/// are generally quite abnormal.
pub fn send(&self, t: T) -> io::Result<()> {
self.tx.send(t).map_err(|e| {
match e {
channel::SendError::Io(e) => e,
channel::SendError::Disconnected(_) => {
io::Error::new(io::ErrorKind::Other,
"channel has been disconnected")
}
}
impl<T> Receiver<T> {
pub fn recv(&self) -> io::Result<Option<T>> {
// Note that the underlying method is `unsafe` because it's only safe
// if one thread accesses it at a time.
//
// We, however, are the only thread with a `Receiver<T>` because this
// type is not `Sync`. and we never handed out another instance.
match unsafe { self.inner.pop() } {
PopResult::Data(t) => {
try!(self.ctl.dec());
Ok(Some(t))
}
// If the queue is either in an inconsistent or empty state, then
// we return `None` for both instances. Note that the standard
// library performs a yield loop in the event of `Inconsistent`,
// which means that there's data in the queue but a sender hasn't
// finished their operation yet.
//
// We do this because the queue will continue to be readable as
// the thread performing the push will eventually call `inc`, so
// if we return `None` and the event loop just loops aruond calling
// this method then we'll eventually get back to the same spot
// and due the retry.
//
// Basically, the inconsistent state doesn't mean we need to busy
// wait, but instead we can forge ahead and assume by the time we
// go to the kernel and come back we'll no longer be in an
// inconsistent state.
PopResult::Empty |
PopResult::Inconsistent => Ok(None),
}
}
}
// Just delegate everything to `self.ctl`
impl<T> mio::Evented for Receiver<T> {
fn register(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::EventSet,
opts: mio::PollOpt) -> io::Result<()> {
self.ctl.register(poll, token, interest, opts)
}
fn reregister(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::EventSet,
opts: mio::PollOpt) -> io::Result<()> {
self.ctl.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
self.ctl.deregister(poll)
})
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender {
ctl: self.ctl.clone(),
inner: self.inner.clone(),
Sender { tx: self.tx.clone() }
}
}
impl<T> Receiver<T> {
/// Attempts to receive a message sent on this channel.
///
/// This method will attempt to dequeue any messages sent on this channel
/// from any corresponding sender. If no message is available, but senders
/// are still detected, then `Poll::NotReady` is returned and the current
/// future task is scheduled to receive a notification when a message is
/// available.
///
/// If an I/O error happens or if all senders have gone away (the channel is
/// disconnected) then `Poll::Err` will be returned.
pub fn recv(&self) -> Poll<T, io::Error> {
match self.rx.get_ref().try_recv() {
Ok(t) => Poll::Ok(t),
Err(TryRecvError::Empty) => {
self.rx.need_read();
Poll::NotReady
}
Err(TryRecvError::Disconnected) => {
Poll::Err(io::Error::new(io::ErrorKind::Other,
"channel has been disconnected"))
}
}
}
}

File diff suppressed because it is too large Load Diff

118
src/event_loop/channel.rs Normal file
View File

@ -0,0 +1,118 @@
//! A thin wrapper around a mpsc queue and mio-based channel information
//!
//! Normally the standard library's channels would suffice but we unfortunately
//! need the `Sender<T>` half to be `Sync`, so to accomplish this for now we
//! just vendor the same mpsc queue as the one in the standard library and then
//! we pair that with the `mio::channel` module's Ctl pairs to control the
//! readiness notifications on the channel.
use std::cell::Cell;
use std::io;
use std::marker;
use std::sync::Arc;
use mio;
use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
use mpsc_queue::{Queue, PopResult};
pub struct Sender<T> {
ctl: SenderCtl,
inner: Arc<Queue<T>>,
}
pub struct Receiver<T> {
ctl: ReceiverCtl,
inner: Arc<Queue<T>>,
_marker: marker::PhantomData<Cell<()>>, // this type is not Sync
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Queue::new());
let (tx, rx) = ctl_pair();
let tx = Sender {
ctl: tx,
inner: inner.clone(),
};
let rx = Receiver {
ctl: rx,
inner: inner.clone(),
_marker: marker::PhantomData,
};
(tx, rx)
}
impl<T> Sender<T> {
pub fn send(&self, data: T) -> io::Result<()> {
self.inner.push(data);
self.ctl.inc()
}
}
impl<T> Receiver<T> {
pub fn recv(&self) -> io::Result<Option<T>> {
// Note that the underlying method is `unsafe` because it's only safe
// if one thread accesses it at a time.
//
// We, however, are the only thread with a `Receiver<T>` because this
// type is not `Sync`. and we never handed out another instance.
match unsafe { self.inner.pop() } {
PopResult::Data(t) => {
try!(self.ctl.dec());
Ok(Some(t))
}
// If the queue is either in an inconsistent or empty state, then
// we return `None` for both instances. Note that the standard
// library performs a yield loop in the event of `Inconsistent`,
// which means that there's data in the queue but a sender hasn't
// finished their operation yet.
//
// We do this because the queue will continue to be readable as
// the thread performing the push will eventually call `inc`, so
// if we return `None` and the event loop just loops aruond calling
// this method then we'll eventually get back to the same spot
// and due the retry.
//
// Basically, the inconsistent state doesn't mean we need to busy
// wait, but instead we can forge ahead and assume by the time we
// go to the kernel and come back we'll no longer be in an
// inconsistent state.
PopResult::Empty |
PopResult::Inconsistent => Ok(None),
}
}
}
// Just delegate everything to `self.ctl`
impl<T> mio::Evented for Receiver<T> {
fn register(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::EventSet,
opts: mio::PollOpt) -> io::Result<()> {
self.ctl.register(poll, token, interest, opts)
}
fn reregister(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::EventSet,
opts: mio::PollOpt) -> io::Result<()> {
self.ctl.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
self.ctl.deregister(poll)
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender {
ctl: self.ctl.clone(),
inner: self.inner.clone(),
}
}
}

347
src/event_loop/loop_data.rs Normal file
View File

@ -0,0 +1,347 @@
use std::sync::Arc;
use std::io;
use futures::{Future, Poll};
use futures::task;
use futures::executor::Executor;
use event_loop::{Message, Loop, LoopPin, LoopHandle, LoopFuture};
use self::dropbox::DropBox;
/// A handle to data that is owned by an event loop thread, and is only
/// accessible on that thread itself.
///
/// This structure is created by the `LoopHandle::add_loop_data` method which
/// will return a future resolving to one of these references. A `LoopData<A>`
/// handle is `Send` regardless of what `A` is, but the internal data can only
/// be accessed on the event loop thread itself.
///
/// Internally this reference also stores a handle to the event loop that the
/// data originated on, so it knows how to go back to the event loop to access
/// the data itself.
// TODO: write more once it's implemented
pub struct LoopData<A: 'static> {
data: DropBox<A>,
handle: LoopHandle,
}
pub struct Opaque {
_inner: DropBox<dropbox::MyDrop>,
}
/// Future returned from the `LoopHandle::add_loop_data` method.
///
/// This future will resolve to a `LoopData<A>` reference when completed, which
/// represents a handle to data that is "owned" by the event loop thread but can
/// migrate among threads temporarily so travel with a future itself.
pub struct AddLoopData<F, A> {
inner: LoopFuture<DropBox<A>, F>,
}
fn _assert() {
fn _assert_send<T: Send>() {}
_assert_send::<LoopData<()>>();
}
impl Loop {
/// Creates a new `LoopData<A>` handle by associating data to be directly
/// stored by this event loop.
///
/// This function is useful for when storing non-`Send` data inside of a
/// future. The `LoopData<A>` handle is itself `Send + 'static` regardless
/// of the underlying `A`. That is, for example, you can create a handle to
/// some data that contains an `Rc`, for example.
pub fn add_loop_data<A>(&self, a: A) -> LoopData<A>
where A: 'static,
{
self.pin().add_loop_data(a)
}
}
impl LoopPin {
/// Adds some data to the event loop this pin is associated with.
///
/// This method will return a handle to the data, `LoopData`, which can be
/// used to access the underlying data whenever it's on the correct event
/// loop thread.
pub fn add_loop_data<A>(&self, a: A) -> LoopData<A>
where A: 'static,
{
LoopData {
data: DropBox::new_on(a, self),
handle: self.handle.clone(),
}
}
}
impl LoopHandle {
/// Schedules a closure to add some data to event loop thread itself.
///
/// This function is useful for when storing non-`Send` data inside of a
/// future. This returns a future which will resolve to a `LoopData<A>`
/// handle, which is itself `Send + 'static` regardless of the underlying
/// `A`. That is, for example, you can create a handle to some data that
/// contains an `Rc`, for example.
///
/// This function takes a closure which may be sent to the event loop to
/// generate an instance of type `A`. The closure itself is required to be
/// `Send + 'static`, but the data it produces is only required to adhere to
/// `'static`.
///
/// If the returned future is polled on the event loop thread itself it will
/// very cheaply resolve to a handle to the data, but if it's not polled on
/// the event loop then it will send a message to the event loop to run the
/// closure `f`, generate a handle, and then the future will yield it back.
// TODO: more with examples
pub fn add_loop_data<F, A>(&self, f: F) -> AddLoopData<F, A>
where F: FnOnce() -> A + Send + 'static,
A: 'static,
{
AddLoopData {
inner: LoopFuture {
loop_handle: self.clone(),
data: Some(f),
result: None,
},
}
}
}
impl<F, A> Future for AddLoopData<F, A>
where F: FnOnce() -> A + Send + 'static,
A: 'static,
{
type Item = LoopData<A>;
type Error = io::Error;
fn poll(&mut self) -> Poll<LoopData<A>, io::Error> {
let ret = self.inner.poll(|_lp, f| {
Ok(DropBox::new(f()))
}, |f, slot| {
Message::Run(Box::new(move || {
slot.try_produce(Ok(DropBox::new(f()))).ok()
.expect("add loop data try_produce intereference");
}))
});
ret.map(|data| {
LoopData {
data: data,
handle: self.inner.loop_handle.clone(),
}
})
}
}
impl<A: 'static> LoopData<A> {
/// Gets a shared reference to the underlying data in this handle.
///
/// Returns `None` if it is not called from the event loop thread that this
/// `LoopData<A>` is associated with, or `Some` with a reference to the data
/// if we are indeed on the event loop thread.
pub fn get(&self) -> Option<&A> {
self.data.get()
}
/// Gets a mutable reference to the underlying data in this handle.
///
/// Returns `None` if it is not called from the event loop thread that this
/// `LoopData<A>` is associated with, or `Some` with a reference to the data
/// if we are indeed on the event loop thread.
pub fn get_mut(&mut self) -> Option<&mut A> {
self.data.get_mut()
}
/// Acquire the executor associated with the thread that owns this
/// `LoopData<A>`'s data.
///
/// If the `get` and `get_mut` functions above return `None`, then this data
/// is being polled on the wrong thread to access the data, and to make
/// progress a future may need to migrate to the actual thread which owns
/// the relevant data.
///
/// This executor can in turn be passed to `Task::poll_on`, which will then
/// move the entire future to be polled on the right thread.
pub fn executor(&self) -> Arc<Executor> {
self.handle.tx.clone()
}
/// Returns a reference to the handle that this data is bound to.
pub fn loop_handle(&self) -> &LoopHandle {
&self.handle
}
}
impl<A: Future> Future for LoopData<A> {
type Item = A::Item;
type Error = A::Error;
fn poll(&mut self) -> Poll<A::Item, A::Error> {
// If we're on the right thread, then we can proceed. Otherwise we need
// to go and get polled on the right thread.
if let Some(inner) = self.get_mut() {
return inner.poll()
}
task::poll_on(self.executor());
Poll::NotReady
}
}
impl<A: 'static> Drop for LoopData<A> {
fn drop(&mut self) {
// The `DropBox` we store internally will cause a memory leak if it's
// dropped on the wrong thread. While necessary for safety, we don't
// actually want a memory leak, so for all normal circumstances we take
// out the `DropBox<A>` as a `DropBox<MyDrop>` and then we send it off
// to the event loop.
//
// TODO: possible optimization is to do none of this if we're on the
// event loop thread itself
if let Some(data) = self.data.take() {
self.handle.send(Message::Drop(Opaque { _inner: data }));
}
}
}
/// A curious inner module with one `unsafe` keyword, yet quite an important
/// one!
///
/// The purpose of this module is to define a type, `DropBox<A>`, which is able
/// to be sent across thread event when the underlying data `A` is itself not
/// sendable across threads. This is then in turn used to build up the
/// `LoopData` abstraction above.
///
/// A `DropBox` currently contains two major components, an identification of
/// the thread that it originated from as well as the data itself. Right now the
/// data is stored in a `Box` as we'll transition between it and `Box<MyDrop>`,
/// but this is perhaps optimizable.
///
/// The `DropBox<A>` itself only provides a few safe methods, all of which are
/// safe to call from any thread. Access to the underlying data is only granted
/// if we're on the right thread, and otherwise the methods don't access the
/// data itself.
///
/// Finally, one crucial piece, if the data is dropped it may run code that
/// assumes it's on the original thread. For this reason we have to be sure that
/// the data is only dropped on the originating thread itself. It's currently
/// the job of the outer `LoopData` to ensure that a `DropBox` is dropped on the
/// right thread, so we don't attempt to perform any communication in this
/// `Drop` implementation. Instead, if a `DropBox` is dropped on the wrong
/// thread, it simply leaks its contents.
///
/// All that's really just a lot of words in an attempt to justify the `unsafe`
/// impl of `Send` below. The idea is that the data is only ever accessed on the
/// originating thread, even during `Drop`.
///
/// Note that this is a private module to have a visibility boundary around the
/// unsafe internals. Although there's not any unsafe blocks here, the code
/// itself is quite unsafe as it has to make sure that the data is dropped in
/// the right place, if ever.
mod dropbox {
use std::mem;
use event_loop::{CURRENT_LOOP, LoopPin};
pub struct DropBox<A: ?Sized> {
id: usize,
inner: Option<Box<A>>,
}
// We can be sent across threads due to the comment above
unsafe impl<A: ?Sized> Send for DropBox<A> {}
// We can also be shared across threads just fine as we'll only ever get a
// reference on at most one thread, regardless of `A`.
unsafe impl<A: ?Sized> Sync for DropBox<A> {}
pub trait MyDrop {}
impl<T: ?Sized> MyDrop for T {}
impl<A> DropBox<A> {
/// Creates a new `DropBox` pinned to the current threads.
///
/// Will panic if `CURRENT_LOOP` isn't set.
pub fn new(a: A) -> DropBox<A> {
DropBox {
id: CURRENT_LOOP.with(|lp| lp.id),
inner: Some(Box::new(a)),
}
}
/// Creates a new `DropBox` pinned to the thread of `LoopPin`.
pub fn new_on(a: A, lp: &LoopPin) -> DropBox<A> {
DropBox {
id: lp.handle.id,
inner: Some(Box::new(a)),
}
}
/// Consumes the contents of this `DropBox<A>`, returning a new
/// `DropBox<MyDrop>`.
///
/// This is just intended to be a simple and cheap conversion, should
/// almost always return `Some`.
pub fn take<'a>(&mut self) -> Option<DropBox<MyDrop + 'a>>
where A: 'a
{
self.inner.take().map(|d| {
DropBox { id: self.id, inner: Some(d as Box<MyDrop + 'a>) }
})
}
}
impl<A: ?Sized> DropBox<A> {
/// Returns a shared reference to the data if we're on the right
/// thread.
pub fn get(&self) -> Option<&A> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
if lp.id == self.id {
self.inner.as_ref().map(|b| &**b)
} else {
None
}
})
} else {
None
}
}
/// Returns a mutable reference to the data if we're on the right
/// thread.
pub fn get_mut(&mut self) -> Option<&mut A> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(move |lp| {
if lp.id == self.id {
self.inner.as_mut().map(|b| &mut **b)
} else {
None
}
})
} else {
None
}
}
}
impl<A: ?Sized> Drop for DropBox<A> {
fn drop(&mut self) {
// Try our safe accessor first, and if it works then we know that
// we're on the right thread. In that case we can simply drop as
// usual.
if let Some(a) = self.get_mut().take() {
return drop(a)
}
// If we're on the wrong thread but we actually have some data, then
// something in theory horrible has gone awry. Prevent memory safety
// issues by forgetting the data and then also warn about this odd
// event.
if let Some(data) = self.inner.take() {
mem::forget(data);
warn!("forgetting some data on an event loop");
}
}
}
}

596
src/event_loop/mod.rs Normal file
View File

@ -0,0 +1,596 @@
use std::cell::RefCell;
use std::io::{self, ErrorKind};
use std::marker;
use std::mem;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use futures::{Future, Poll};
use futures::task::{self, Task, Notify, TaskHandle};
use futures::executor::{ExecuteCallback, Executor};
use mio;
use slab::Slab;
use slot::{self, Slot};
use timer_wheel::{TimerWheel, Timeout};
mod channel;
mod loop_data;
mod source;
mod timeout;
pub use self::loop_data::{LoopData, AddLoopData};
pub use self::source::{AddSource, IoToken};
pub use self::timeout::{AddTimeout, TimeoutToken};
use self::channel::{Sender, Receiver, channel};
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Loop);
const SLAB_CAPACITY: usize = 1024 * 64;
/// An event loop.
///
/// The event loop is the main source of blocking in an application which drives
/// all other I/O events and notifications happening. Each event loop can have
/// multiple handles pointing to it, each of which can then be used to create
/// various I/O objects to interact with the event loop in interesting ways.
// TODO: expand this
pub struct Loop {
id: usize,
io: mio::Poll,
events: mio::Events,
tx: Arc<MioSender>,
rx: Receiver<Message>,
dispatch: RefCell<Slab<Scheduled, usize>>,
_future_registration: mio::Registration,
future_readiness: Arc<mio::SetReadiness>,
// Timer wheel keeping track of all timeouts. The `usize` stored in the
// timer wheel is an index into the slab below.
//
// The slab below keeps track of the timeouts themselves as well as the
// state of the timeout itself. The `TimeoutToken` type is an index into the
// `timeouts` slab.
timer_wheel: RefCell<TimerWheel<usize>>,
timeouts: RefCell<Slab<(Timeout, TimeoutState), usize>>,
// A `Loop` cannot be sent to other threads as it's used as a proxy for data
// that belongs to the thread the loop was running on at some point. In
// other words, the safety of `DropBox` below relies on loops not crossing
// threads.
_marker: marker::PhantomData<Rc<u32>>,
}
struct MioSender {
inner: Sender<Message>,
}
/// Handle to an event loop, used to construct I/O objects, send messages, and
/// otherwise interact indirectly with the event loop itself.
///
/// Handles can be cloned, and when cloned they will still refer to the
/// same underlying event loop.
#[derive(Clone)]
pub struct LoopHandle {
id: usize,
tx: Arc<MioSender>,
}
/// A non-sendable handle to an event loop, useful for manufacturing instances
/// of `LoopData`.
#[derive(Clone)]
pub struct LoopPin {
handle: LoopHandle,
_marker: marker::PhantomData<Box<Drop>>,
}
struct Scheduled {
readiness: Arc<AtomicUsize>,
reader: Option<TaskHandle>,
writer: Option<TaskHandle>,
}
enum TimeoutState {
NotFired,
Fired,
Waiting(TaskHandle),
}
enum Direction {
Read,
Write,
}
enum Message {
DropSource(usize),
Schedule(usize, TaskHandle, Direction),
AddTimeout(Instant, Arc<Slot<io::Result<(usize, Instant)>>>),
UpdateTimeout(usize, TaskHandle),
CancelTimeout(usize),
Run(Box<ExecuteCallback>),
Drop(loop_data::Opaque),
}
impl Loop {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub fn new() -> io::Result<Loop> {
let (tx, rx) = channel();
let io = try!(mio::Poll::new());
try!(io.register(&rx,
mio::Token(0),
mio::EventSet::readable(),
mio::PollOpt::edge()));
let pair = mio::Registration::new(&io,
mio::Token(1),
mio::EventSet::readable(),
mio::PollOpt::level());
let (registration, readiness) = pair;
Ok(Loop {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io,
events: mio::Events::new(),
tx: Arc::new(MioSender { inner: tx }),
rx: rx,
_future_registration: registration,
future_readiness: Arc::new(readiness),
dispatch: RefCell::new(Slab::new_starting_at(2, SLAB_CAPACITY)),
timeouts: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)),
timer_wheel: RefCell::new(TimerWheel::new()),
_marker: marker::PhantomData,
})
}
/// Generates a handle to this event loop used to construct I/O objects and
/// send messages.
///
/// Handles to an event loop are cloneable as well and clones will always
/// refer to the same event loop.
pub fn handle(&self) -> LoopHandle {
LoopHandle {
id: self.id,
tx: self.tx.clone(),
}
}
/// Returns a "pin" of this event loop which cannot be sent across threads
/// but can be used as a proxy to the event loop itself.
///
/// Currently the primary use for this is to use as a handle to add data
/// to the event loop directly. The `LoopPin::add_loop_data` method can
/// be used to immediately create instances of `LoopData` structures.
pub fn pin(&self) -> LoopPin {
LoopPin {
handle: self.handle(),
_marker: marker::PhantomData,
}
}
/// Runs a future until completion, driving the event loop while we're
/// otherwise waiting for the future to complete.
///
/// This function will begin executing the event loop and will finish once
/// the provided future is resolve. Note that the future argument here
/// crucially does not require the `'static` nor `Send` bounds. As a result
/// the future will be "pinned" to not only this thread but also this stack
/// frame.
///
/// This function will returns the value that the future resolves to once
/// the future has finished. If the future never resolves then this function
/// will never return.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
///
/// Similarly, becuase the provided future will be pinned not only to this
/// thread but also to this task, any attempt to poll the future on a
/// separate thread will result in a panic. That is, calls to
/// `task::poll_on` must be avoided.
pub fn run<F>(&mut self, mut f: F) -> Result<F::Item, F::Error>
where F: Future,
{
struct MyNotify(Arc<mio::SetReadiness>);
impl Notify for MyNotify {
fn notify(&self) {
self.0.set_readiness(mio::EventSet::readable())
.expect("failed to set readiness");
}
}
// First up, create the task that will drive this future. The task here
// isn't a "normal task" but rather one where we define what to do when
// a readiness notification comes in.
//
// We translate readiness notifications to a `set_readiness` of our
// `future_readiness` structure we have stored internally.
let mut task = Task::new_notify(MyNotify(self.future_readiness.clone()));
let ready = self.future_readiness.clone();
// Next, move all that data into a dynamically dispatched closure to cut
// down on monomorphization costs. Inside this closure we unset the
// readiness of the future (as we're about to poll it) and then we check
// to see if it's done. If it's not then the event loop will turn again.
let mut res = None;
self._run(&mut || {
ready.set_readiness(mio::EventSet::none())
.expect("failed to set readiness");
assert!(res.is_none());
match task.enter(|| f.poll()) {
Poll::NotReady => {}
Poll::Ok(e) => res = Some(Ok(e)),
Poll::Err(e) => res = Some(Err(e)),
}
res.is_some()
});
res.expect("run should not return until future is done")
}
fn _run(&mut self, done: &mut FnMut() -> bool) {
// Check to see if we're done immediately, if so we shouldn't do any
// work.
if CURRENT_LOOP.set(self, || done()) {
return
}
loop {
let amt;
// On Linux, Poll::poll is epoll_wait, which may return EINTR if a
// ptracer attaches. This retry loop prevents crashing when
// attaching strace, or similar.
let start = Instant::now();
loop {
let timeout = self.timer_wheel.borrow().next_timeout().map(|t| {
if t < start {
Duration::new(0, 0)
} else {
t - start
}
});
match self.io.poll(&mut self.events, timeout) {
Ok(a) => {
amt = a;
break;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
err @ Err(_) => {
err.unwrap();
}
}
}
debug!("loop poll - {:?}", start.elapsed());
debug!("loop time - {:?}", Instant::now());
// First up, process all timeouts that may have just occurred.
let start = Instant::now();
self.consume_timeouts(start);
// Next, process all the events that came in.
for i in 0..self.events.len() {
let event = self.events.get(i).unwrap();
let token = usize::from(event.token());
// Token 0 == our incoming message queue, so this means we
// process the whole queue of messages.
//
// Token 1 == we should poll the future, we'll do that right
// after we get through the rest of this tick of the event loop.
if token == 0 {
debug!("consuming notification queue");
CURRENT_LOOP.set(&self, || {
self.consume_queue();
});
continue
} else if token == 1 {
if CURRENT_LOOP.set(self, || done()) {
return
}
continue
}
trace!("event {:?} {:?}", event.kind(), event.token());
// For any other token we look at `dispatch` to see what we're
// supposed to do. If there's a waiter we get ready to notify
// it, and we also or-in atomically any events that have
// happened (currently read/write events).
let mut reader = None;
let mut writer = None;
if let Some(sched) = self.dispatch.borrow_mut().get_mut(token) {
if event.kind().is_readable() {
reader = sched.reader.take();
sched.readiness.fetch_or(1, Ordering::Relaxed);
}
if event.kind().is_writable() {
writer = sched.writer.take();
sched.readiness.fetch_or(2, Ordering::Relaxed);
}
} else {
debug!("notified on {} which no longer exists", token);
}
// If we actually got a waiter, then notify!
//
// TODO: don't notify the same task twice
if let Some(reader) = reader {
self.notify_handle(reader);
}
if let Some(writer) = writer {
self.notify_handle(writer);
}
}
debug!("loop process - {} events, {:?}", amt, start.elapsed());
}
}
fn consume_timeouts(&mut self, now: Instant) {
loop {
let idx = match self.timer_wheel.borrow_mut().poll(now) {
Some(idx) => idx,
None => break,
};
trace!("firing timeout: {}", idx);
let handle = self.timeouts.borrow_mut()[idx].1.fire();
if let Some(handle) = handle {
self.notify_handle(handle);
}
}
}
/// Method used to notify a task handle.
///
/// Note that this should be used instead fo `handle.unpark()` to ensure
/// that the `CURRENT_LOOP` variable is set appropriately.
fn notify_handle(&self, handle: TaskHandle) {
debug!("notifying a task handle");
CURRENT_LOOP.set(&self, || handle.unpark());
}
fn add_source(&self, source: &mio::Evented)
-> io::Result<(Arc<AtomicUsize>, usize)> {
debug!("adding a new I/O source");
let sched = Scheduled {
readiness: Arc::new(AtomicUsize::new(0)),
reader: None,
writer: None,
};
let mut dispatch = self.dispatch.borrow_mut();
if dispatch.vacant_entry().is_none() {
let amt = dispatch.count();
dispatch.grow(amt);
}
let entry = dispatch.vacant_entry().unwrap();
try!(self.io.register(source,
mio::Token(entry.index()),
mio::EventSet::readable() |
mio::EventSet::writable(),
mio::PollOpt::edge()));
Ok((sched.readiness.clone(), entry.insert(sched).index()))
}
fn drop_source(&self, token: usize) {
debug!("dropping I/O source: {}", token);
self.dispatch.borrow_mut().remove(token).unwrap();
}
fn schedule(&self, token: usize, wake: TaskHandle, dir: Direction) {
debug!("scheduling direction for: {}", token);
let to_call = {
let mut dispatch = self.dispatch.borrow_mut();
let sched = dispatch.get_mut(token).unwrap();
let (slot, bit) = match dir {
Direction::Read => (&mut sched.reader, 1),
Direction::Write => (&mut sched.writer, 2),
};
let ready = sched.readiness.load(Ordering::SeqCst);
if ready & bit != 0 {
*slot = None;
sched.readiness.store(ready & !bit, Ordering::SeqCst);
Some(wake)
} else {
*slot = Some(wake);
None
}
};
if let Some(to_call) = to_call {
debug!("schedule immediately done");
self.notify_handle(to_call);
}
}
fn add_timeout(&self, at: Instant) -> io::Result<(usize, Instant)> {
let mut timeouts = self.timeouts.borrow_mut();
if timeouts.vacant_entry().is_none() {
let len = timeouts.count();
timeouts.grow(len);
}
let entry = timeouts.vacant_entry().unwrap();
let timeout = self.timer_wheel.borrow_mut().insert(at, entry.index());
let when = *timeout.when();
let entry = entry.insert((timeout, TimeoutState::NotFired));
debug!("added a timeout: {}", entry.index());
Ok((entry.index(), when))
}
fn update_timeout(&self, token: usize, handle: TaskHandle) {
debug!("updating a timeout: {}", token);
let to_wake = self.timeouts.borrow_mut()[token].1.block(handle);
if let Some(to_wake) = to_wake {
self.notify_handle(to_wake);
}
}
fn cancel_timeout(&self, token: usize) {
debug!("cancel a timeout: {}", token);
let pair = self.timeouts.borrow_mut().remove(token);
if let Some((timeout, _state)) = pair {
self.timer_wheel.borrow_mut().cancel(&timeout);
}
}
fn consume_queue(&self) {
// TODO: can we do better than `.unwrap()` here?
while let Some(msg) = self.rx.recv().unwrap() {
self.notify(msg);
}
}
fn notify(&self, msg: Message) {
match msg {
Message::DropSource(tok) => self.drop_source(tok),
Message::Schedule(tok, wake, dir) => self.schedule(tok, wake, dir),
Message::AddTimeout(at, slot) => {
slot.try_produce(self.add_timeout(at))
.ok().expect("interference with try_produce on timeout");
}
Message::UpdateTimeout(t, handle) => self.update_timeout(t, handle),
Message::CancelTimeout(t) => self.cancel_timeout(t),
Message::Run(f) => {
debug!("running a closure");
f.call()
}
Message::Drop(data) => {
debug!("dropping some data");
drop(data);
}
}
}
}
impl LoopHandle {
fn send(&self, msg: Message) {
self.with_loop(|lp| {
match lp {
Some(lp) => {
// Need to execute all existing requests first, to ensure
// that our message is processed "in order"
lp.consume_queue();
lp.notify(msg);
}
None => {
match self.tx.inner.send(msg) {
Ok(()) => {}
// This should only happen when there was an error
// writing to the pipe to wake up the event loop,
// hopefully that never happens
Err(e) => {
panic!("error sending message to event loop: {}", e)
}
}
}
}
})
}
fn with_loop<F, R>(&self, f: F) -> R
where F: FnOnce(Option<&Loop>) -> R
{
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
if lp.id == self.id {
f(Some(lp))
} else {
f(None)
}
})
} else {
f(None)
}
}
}
impl LoopPin {
/// Returns a reference to the underlying handle to the event loop.
pub fn handle(&self) -> &LoopHandle {
&self.handle
}
/// TODO: dox
pub fn executor(&self) -> Arc<Executor> {
self.handle.tx.clone()
}
}
struct LoopFuture<T, U> {
loop_handle: LoopHandle,
data: Option<U>,
result: Option<(Arc<Slot<io::Result<T>>>, slot::Token)>,
}
impl<T, U> LoopFuture<T, U>
where T: 'static,
{
fn poll<F, G>(&mut self, f: F, g: G) -> Poll<T, io::Error>
where F: FnOnce(&Loop, U) -> io::Result<T>,
G: FnOnce(U, Arc<Slot<io::Result<T>>>) -> Message,
{
match self.result {
Some((ref result, ref mut token)) => {
result.cancel(*token);
match result.try_consume() {
Ok(t) => return t.into(),
Err(_) => {}
}
let task = task::park();
*token = result.on_full(move |_| {
task.unpark();
});
return Poll::NotReady
}
None => {
let data = &mut self.data;
let ret = self.loop_handle.with_loop(|lp| {
lp.map(|lp| f(lp, data.take().unwrap()))
});
if let Some(ret) = ret {
debug!("loop future done immediately on event loop");
return ret.into()
}
debug!("loop future needs to send info to event loop");
let task = task::park();
let result = Arc::new(Slot::new(None));
let token = result.on_full(move |_| {
task.unpark();
});
self.result = Some((result.clone(), token));
self.loop_handle.send(g(data.take().unwrap(), result));
Poll::NotReady
}
}
}
}
impl TimeoutState {
fn block(&mut self, handle: TaskHandle) -> Option<TaskHandle> {
match *self {
TimeoutState::Fired => return Some(handle),
_ => {}
}
*self = TimeoutState::Waiting(handle);
None
}
fn fire(&mut self) -> Option<TaskHandle> {
match mem::replace(self, TimeoutState::Fired) {
TimeoutState::NotFired => None,
TimeoutState::Fired => panic!("fired twice?"),
TimeoutState::Waiting(handle) => Some(handle),
}
}
}
impl Executor for MioSender {
fn execute_boxed(&self, callback: Box<ExecuteCallback>) {
self.inner.send(Message::Run(callback))
.expect("error sending a message to the event loop")
}
}

183
src/event_loop/source.rs Normal file
View File

@ -0,0 +1,183 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::io;
use futures::{Future, Poll};
use futures::task;
use mio;
use event_loop::{Message, LoopHandle, LoopFuture, Direction};
/// A future which will resolve a unique `tok` token for an I/O object.
///
/// Created through the `LoopHandle::add_source` method, this future can also
/// resolve to an error if there's an issue communicating with the event loop.
pub struct AddSource<E> {
inner: LoopFuture<(E, (Arc<AtomicUsize>, usize)), E>,
}
/// A token that identifies an active timeout.
pub struct IoToken {
token: usize,
// TODO: can we avoid this allocation? It's kind of a bummer...
readiness: Arc<AtomicUsize>,
}
impl LoopHandle {
/// Add a new source to an event loop, returning a future which will resolve
/// to the token that can be used to identify this source.
///
/// When a new I/O object is created it needs to be communicated to the
/// event loop to ensure that it's registered and ready to receive
/// notifications. The event loop with then respond back with the I/O object
/// and a token which can be used to send more messages to the event loop.
///
/// The token returned is then passed in turn to each of the methods below
/// to interact with notifications on the I/O object itself.
///
/// # Panics
///
/// The returned future will panic if the event loop this handle is
/// associated with has gone away, or if there is an error communicating
/// with the event loop.
pub fn add_source<E>(&self, source: E) -> AddSource<E>
where E: mio::Evented + Send + 'static,
{
AddSource {
inner: LoopFuture {
loop_handle: self.clone(),
data: Some(source),
result: None,
}
}
}
/// Schedule the current future task to receive a notification when the
/// corresponding I/O object is readable.
///
/// Once an I/O object has been registered with the event loop through the
/// `add_source` method, this method can be used with the assigned token to
/// notify the current future task when the next read notification comes in.
///
/// The current task will only receive a notification **once** and to
/// receive further notifications it will need to call `schedule_read`
/// again.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self, tok: &IoToken) {
self.send(Message::Schedule(tok.token, task::park(), Direction::Read));
}
/// Schedule the current future task to receive a notification when the
/// corresponding I/O object is writable.
///
/// Once an I/O object has been registered with the event loop through the
/// `add_source` method, this method can be used with the assigned token to
/// notify the current future task when the next write notification comes
/// in.
///
/// The current task will only receive a notification **once** and to
/// receive further notifications it will need to call `schedule_write`
/// again.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self, tok: &IoToken) {
self.send(Message::Schedule(tok.token, task::park(), Direction::Write));
}
/// Unregister all information associated with a token on an event loop,
/// deallocating all internal resources assigned to the given token.
///
/// This method should be called whenever a source of events is being
/// destroyed. This will ensure that the event loop can reuse `tok` for
/// another I/O object if necessary and also remove it from any poll
/// notifications and callbacks.
///
/// Note that wake callbacks may still be invoked after this method is
/// called as it may take some time for the message to drop a source to
/// reach the event loop. Despite this fact, this method will attempt to
/// ensure that the callbacks are **not** invoked, so pending scheduled
/// callbacks cannot be relied upon to get called.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
///
/// # Panics
///
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
pub fn drop_source(&self, tok: &IoToken) {
self.send(Message::DropSource(tok.token));
}
}
impl IoToken {
/// Consumes the last readiness notification the token this source is for
/// registered.
///
/// Currently sources receive readiness notifications on an edge-basis. That
/// is, once you receive a notification that an object can be read, you
/// won't receive any more notifications until all of that data has been
/// read.
///
/// The event loop will fill in this information and then inform futures
/// that they're ready to go with the `schedule` method, and then the `poll`
/// method can use this to figure out what happened.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
// TODO: this should really return a proper newtype/enum, not a usize
pub fn take_readiness(&self) -> usize {
self.readiness.swap(0, Ordering::SeqCst)
}
}
impl<E> Future for AddSource<E>
where E: mio::Evented + Send + 'static,
{
type Item = (E, IoToken);
type Error = io::Error;
fn poll(&mut self) -> Poll<(E, IoToken), io::Error> {
let handle = self.inner.loop_handle.clone();
let res = self.inner.poll(|lp, io| {
let pair = try!(lp.add_source(&io));
Ok((io, pair))
}, |io, slot| {
Message::Run(Box::new(move || {
let res = handle.with_loop(|lp| {
let lp = lp.unwrap();
let pair = try!(lp.add_source(&io));
Ok((io, pair))
});
slot.try_produce(res).ok()
.expect("add source try_produce intereference");
}))
});
res.map(|(io, (ready, token))| {
(io, IoToken { token: token, readiness: ready })
})
}
}

81
src/event_loop/timeout.rs Normal file
View File

@ -0,0 +1,81 @@
use std::io;
use std::time::Instant;
use futures::{Future, Poll};
use futures::task;
use event_loop::{Message, Loop, LoopHandle, LoopFuture};
impl LoopHandle {
/// Adds a new timeout to get fired at the specified instant, notifying the
/// specified task.
pub fn add_timeout(&self, at: Instant) -> AddTimeout {
AddTimeout {
inner: LoopFuture {
loop_handle: self.clone(),
data: Some(at),
result: None,
},
}
}
/// Updates a previously added timeout to notify a new task instead.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn update_timeout(&self, timeout: &TimeoutToken) {
self.send(Message::UpdateTimeout(timeout.token, task::park()))
}
/// Cancel a previously added timeout.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn cancel_timeout(&self, timeout: &TimeoutToken) {
debug!("cancel timeout {}", timeout.token);
self.send(Message::CancelTimeout(timeout.token))
}
}
/// Return value from the `LoopHandle::add_timeout` method, a future that will
/// resolve to a `TimeoutToken` to configure the behavior of that timeout.
pub struct AddTimeout {
inner: LoopFuture<(usize, Instant), Instant>,
}
/// A token that identifies an active timeout.
pub struct TimeoutToken {
token: usize,
when: Instant,
}
impl Future for AddTimeout {
type Item = TimeoutToken;
type Error = io::Error;
fn poll(&mut self) -> Poll<TimeoutToken, io::Error> {
self.inner.poll(Loop::add_timeout, Message::AddTimeout).map(|(t, i)| {
TimeoutToken {
token: t,
when: i,
}
})
}
}
impl TimeoutToken {
/// Returns the instant in time when this timeout token will "fire".
///
/// Note that this instant may *not* be the instant that was passed in when
/// the timeout was created. The event loop does not support high resolution
/// timers, so the exact resolution of when a timeout may fire may be
/// slightly fudged.
pub fn when(&self) -> &Instant {
&self.when
}
}

View File

@ -16,21 +16,23 @@ extern crate scoped_tls;
#[macro_use]
extern crate log;
mod readiness_stream;
mod event_loop;
mod tcp;
mod udp;
mod timeout;
mod timer_wheel;
#[path = "../../src/slot.rs"]
mod slot;
#[path = "../../src/lock.rs"]
mod lock;
mod mpsc_queue;
mod channel;
mod channel;
mod event_loop;
mod mpsc_queue;
mod readiness_stream;
mod tcp;
mod timeout;
mod timer_wheel;
mod udp;
pub use channel::{Sender, Receiver};
pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout};
pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoSource, Source};
pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken};
pub use readiness_stream::ReadinessStream;
pub use tcp::{TcpListener, TcpStream};
pub use timeout::Timeout;

View File

@ -2,8 +2,9 @@ use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{Future, Poll};
use mio;
use event_loop::{IoSource, LoopHandle, AddSource};
use event_loop::{IoToken, LoopHandle, AddSource};
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
@ -21,34 +22,35 @@ use event_loop::{IoSource, LoopHandle, AddSource};
/// It's the responsibility of the wrapper to inform the readiness stream when a
/// "would block" I/O event is seen. The readiness stream will then take care of
/// any scheduling necessary to get notified when the event is ready again.
pub struct ReadinessStream {
io_token: usize,
loop_handle: LoopHandle,
source: IoSource,
pub struct ReadinessStream<E> {
token: IoToken,
handle: LoopHandle,
readiness: AtomicUsize,
io: E,
}
pub struct ReadinessStreamNew {
inner: AddSource,
handle: Option<LoopHandle>,
source: Option<IoSource>,
pub struct ReadinessStreamNew<E> {
inner: AddSource<E>,
handle: LoopHandle,
}
impl ReadinessStream {
impl<E> ReadinessStream<E>
where E: mio::Evented + Send + 'static,
{
/// Creates a new readiness stream associated with the provided
/// `loop_handle` and for the given `source`.
///
/// This method returns a future which will resolve to the readiness stream
/// when it's ready.
pub fn new(loop_handle: LoopHandle, source: IoSource)
-> ReadinessStreamNew {
pub fn new(loop_handle: LoopHandle, source: E) -> ReadinessStreamNew<E> {
ReadinessStreamNew {
inner: loop_handle.add_source(source.clone()),
source: Some(source),
handle: Some(loop_handle),
inner: loop_handle.add_source(source),
handle: loop_handle,
}
}
}
impl<E> ReadinessStream<E> {
/// Tests to see if this source is ready to be read from or not.
///
/// If this stream is not ready for a read then `NotReady` will be returned
@ -60,11 +62,11 @@ impl ReadinessStream {
if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
return Poll::Ok(())
}
self.readiness.fetch_or(self.source.take_readiness(), Ordering::SeqCst);
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
Poll::Ok(())
} else {
self.loop_handle.schedule_read(self.io_token);
self.handle.schedule_read(&self.token);
Poll::NotReady
}
}
@ -80,11 +82,11 @@ impl ReadinessStream {
if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
return Poll::Ok(())
}
self.readiness.fetch_or(self.source.take_readiness(), Ordering::SeqCst);
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
Poll::Ok(())
} else {
self.loop_handle.schedule_write(self.io_token);
self.handle.schedule_write(&self.token);
Poll::NotReady
}
}
@ -102,7 +104,7 @@ impl ReadinessStream {
/// then again readable.
pub fn need_read(&self) {
self.readiness.fetch_and(!1, Ordering::SeqCst);
self.loop_handle.schedule_read(self.io_token);
self.handle.schedule_read(&self.token);
}
/// Indicates to this source of events that the corresponding I/O object is
@ -118,28 +120,48 @@ impl ReadinessStream {
/// then again writable.
pub fn need_write(&self) {
self.readiness.fetch_and(!2, Ordering::SeqCst);
self.loop_handle.schedule_write(self.io_token);
self.handle.schedule_write(&self.token);
}
/// Returns a reference to the event loop handle that this readiness stream
/// is associated with.
pub fn loop_handle(&self) -> &LoopHandle {
&self.handle
}
/// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_ref(&self) -> &E {
&self.io
}
/// Returns a mutable reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_mut(&mut self) -> &mut E {
&mut self.io
}
}
impl Future for ReadinessStreamNew {
type Item = ReadinessStream;
impl<E> Future for ReadinessStreamNew<E>
where E: mio::Evented + Send + 'static,
{
type Item = ReadinessStream<E>;
type Error = io::Error;
fn poll(&mut self) -> Poll<ReadinessStream, io::Error> {
self.inner.poll().map(|token| {
fn poll(&mut self) -> Poll<ReadinessStream<E>, io::Error> {
self.inner.poll().map(|(io, token)| {
ReadinessStream {
io_token: token,
source: self.source.take().unwrap(),
loop_handle: self.handle.take().unwrap(),
token: token,
handle: self.handle.clone(),
io: io,
readiness: AtomicUsize::new(0),
}
})
}
}
impl Drop for ReadinessStream {
impl<E> Drop for ReadinessStream<E> {
fn drop(&mut self) {
self.loop_handle.drop_source(self.io_token)
self.handle.drop_source(&self.token)
}
}

View File

@ -2,7 +2,6 @@ use std::fmt;
use std::io::{self, ErrorKind, Read, Write};
use std::mem;
use std::net::{self, SocketAddr, Shutdown};
use std::sync::Arc;
use futures::stream::Stream;
use futures::{Future, IntoFuture, failed, Poll};
@ -10,27 +9,21 @@ use futures_io::{IoFuture, IoStream};
use mio;
use {ReadinessStream, LoopHandle};
use event_loop::Source;
/// An I/O object representing a TCP socket listening for incoming connections.
///
/// This object can be converted into a stream of incoming connections for
/// various forms of processing.
pub struct TcpListener {
loop_handle: LoopHandle,
ready: ReadinessStream,
listener: Arc<Source<mio::tcp::TcpListener>>,
io: ReadinessStream<mio::tcp::TcpListener>,
}
impl TcpListener {
fn new(listener: mio::tcp::TcpListener,
handle: LoopHandle) -> IoFuture<TcpListener> {
let listener = Arc::new(Source::new(listener));
ReadinessStream::new(handle.clone(), listener.clone()).map(|r| {
ReadinessStream::new(handle, listener).map(|io| {
TcpListener {
loop_handle: handle,
ready: r,
listener: listener,
io: io,
}
}).boxed()
}
@ -73,7 +66,7 @@ impl TcpListener {
/// Test whether this socket is ready to be read or not.
pub fn poll_read(&self) -> Poll<(), io::Error> {
self.ready.poll_read()
self.io.poll_read()
}
/// Returns the local address that this listener is bound to.
@ -81,7 +74,7 @@ impl TcpListener {
/// This can be useful, for example, when binding to port 0 to figure out
/// which port was actually bound.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.listener.io().local_addr()
self.io.get_ref().local_addr()
}
/// Consumes this listener, returning a stream of the sockets this listener
@ -99,14 +92,14 @@ impl TcpListener {
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
match self.inner.listener.io().accept() {
match self.inner.io.get_ref().accept() {
Ok(Some(pair)) => {
debug!("accepted a socket");
Poll::Ok(Some(pair))
}
Ok(None) => {
debug!("waiting to accept another socket");
self.inner.ready.need_read();
self.inner.io.need_read();
Poll::NotReady
}
Err(e) => Poll::Err(e),
@ -114,17 +107,11 @@ impl TcpListener {
}
}
let loop_handle = self.loop_handle.clone();
let loop_handle = self.io.loop_handle().clone();
Incoming { inner: self }
.and_then(move |(tcp, addr)| {
let tcp = Arc::new(Source::new(tcp));
ReadinessStream::new(loop_handle.clone(),
tcp.clone()).map(move |ready| {
let stream = TcpStream {
source: tcp,
ready: ready,
};
(stream, addr)
ReadinessStream::new(loop_handle.clone(), tcp).map(move |io| {
(TcpStream { io: io }, addr)
})
}).boxed()
}
@ -132,7 +119,7 @@ impl TcpListener {
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.listener.io().fmt(f)
self.io.get_ref().fmt(f)
}
}
@ -143,8 +130,7 @@ impl fmt::Debug for TcpListener {
/// raw underlying I/O object as well as streams for the read/write
/// notifications on the stream itself.
pub struct TcpStream {
source: Arc<Source<mio::tcp::TcpStream>>,
ready: ReadinessStream,
io: ReadinessStream<mio::tcp::TcpStream>,
}
enum TcpStreamNew {
@ -184,18 +170,8 @@ impl TcpStream {
fn new(connected_stream: mio::tcp::TcpStream,
handle: LoopHandle)
-> IoFuture<TcpStream> {
// Once we've connected, wait for the stream to be writable as that's
// when the actual connection has been initiated. Once we're writable we
// check for `take_socket_error` to see if the connect actually hit an
// error or not.
//
// If all that succeeded then we ship everything on up.
let connected_stream = Arc::new(Source::new(connected_stream));
ReadinessStream::new(handle, connected_stream.clone()).and_then(|ready| {
TcpStreamNew::Waiting(TcpStream {
source: connected_stream,
ready: ready,
})
ReadinessStream::new(handle, connected_stream).and_then(|io| {
TcpStreamNew::Waiting(TcpStream { io: io })
}).boxed()
}
@ -233,7 +209,7 @@ impl TcpStream {
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
pub fn poll_read(&self) -> Poll<(), io::Error> {
self.ready.poll_read()
self.io.poll_read()
}
/// Test whether this socket is writey to be written to or not.
@ -243,17 +219,17 @@ impl TcpStream {
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again.
pub fn poll_write(&self) -> Poll<(), io::Error> {
self.ready.poll_write()
self.io.poll_write()
}
/// Returns the local address that this stream is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.source.io().local_addr()
self.io.get_ref().local_addr()
}
/// Returns the remote address that this stream is connected to.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.source.io().peer_addr()
self.io.get_ref().peer_addr()
}
/// Shuts down the read, write, or both halves of this connection.
@ -262,7 +238,7 @@ impl TcpStream {
/// portions to return immediately with an appropriate value (see the
/// documentation of `Shutdown`).
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.source.io().shutdown(how)
self.io.get_ref().shutdown(how)
}
/// Sets the value of the `TCP_NODELAY` option on this socket.
@ -273,12 +249,12 @@ impl TcpStream {
/// sufficient amount to send out, thereby avoiding the frequent sending of
/// small packets.
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
self.source.io().set_nodelay(nodelay)
self.io.get_ref().set_nodelay(nodelay)
}
/// Sets the keepalive time in seconds for this socket.
pub fn set_keepalive_s(&self, seconds: Option<u32>) -> io::Result<()> {
self.source.io().set_keepalive(seconds)
self.io.get_ref().set_keepalive(seconds)
}
}
@ -291,9 +267,16 @@ impl Future for TcpStreamNew {
TcpStreamNew::Waiting(s) => s,
TcpStreamNew::Empty => panic!("can't poll TCP stream twice"),
};
match stream.ready.poll_write() {
// Once we've connected, wait for the stream to be writable as that's
// when the actual connection has been initiated. Once we're writable we
// check for `take_socket_error` to see if the connect actually hit an
// error or not.
//
// If all that succeeded then we ship everything on up.
match stream.io.poll_write() {
Poll::Ok(()) => {
match stream.source.io().take_socket_error() {
match stream.io.get_ref().take_socket_error() {
Ok(()) => return Poll::Ok(stream),
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
Err(e) => return Poll::Err(e),
@ -309,9 +292,9 @@ impl Future for TcpStreamNew {
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let r = self.source.io().read(buf);
let r = self.io.get_ref().read(buf);
if is_wouldblock(&r) {
self.ready.need_read();
self.io.need_read();
}
return r
}
@ -319,16 +302,16 @@ impl Read for TcpStream {
impl Write for TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let r = self.source.io().write(buf);
let r = self.io.get_ref().write(buf);
if is_wouldblock(&r) {
self.ready.need_write();
self.io.need_write();
}
return r
}
fn flush(&mut self) -> io::Result<()> {
let r = self.source.io().flush();
let r = self.io.get_ref().flush();
if is_wouldblock(&r) {
self.ready.need_write();
self.io.need_write();
}
return r
}
@ -336,9 +319,9 @@ impl Write for TcpStream {
impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let r = self.source.io().read(buf);
let r = self.io.get_ref().read(buf);
if is_wouldblock(&r) {
self.ready.need_read();
self.io.need_read();
}
return r
}
@ -346,17 +329,17 @@ impl<'a> Read for &'a TcpStream {
impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let r = self.source.io().write(buf);
let r = self.io.get_ref().write(buf);
if is_wouldblock(&r) {
self.ready.need_write();
self.io.need_write();
}
return r
}
fn flush(&mut self) -> io::Result<()> {
let r = self.source.io().flush();
let r = self.io.get_ref().flush();
if is_wouldblock(&r) {
self.ready.need_write();
self.io.need_write();
}
return r
}
@ -371,7 +354,7 @@ fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.source.io().fmt(f)
self.io.get_ref().fmt(f)
}
}
@ -382,13 +365,13 @@ mod sys {
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
self.source.io().as_raw_fd()
self.io.get_ref().as_raw_fd()
}
}
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.listener.io().as_raw_fd()
self.io.get_ref().as_raw_fd()
}
}
}
@ -402,7 +385,7 @@ mod sys {
//
// impl AsRawHandle for TcpStream {
// fn as_raw_handle(&self) -> RawHandle {
// self.source.io().as_raw_handle()
// self.io.get_ref().as_raw_handle()
// }
// }
//

View File

@ -1,6 +1,5 @@
use std::io;
use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::fmt;
use futures::{Future, failed, Poll};
@ -8,12 +7,10 @@ use futures_io::IoFuture;
use mio;
use {ReadinessStream, LoopHandle};
use event_loop::Source;
/// An I/O object representing a UDP socket.
pub struct UdpSocket {
source: Arc<Source<mio::udp::UdpSocket>>,
ready: ReadinessStream,
io: ReadinessStream<mio::udp::UdpSocket>,
}
impl LoopHandle {
@ -34,12 +31,8 @@ impl LoopHandle {
impl UdpSocket {
fn new(socket: mio::udp::UdpSocket, handle: LoopHandle)
-> IoFuture<UdpSocket> {
let socket = Arc::new(Source::new(socket));
ReadinessStream::new(handle, socket.clone()).map(|ready| {
UdpSocket {
source: socket,
ready: ready,
}
ReadinessStream::new(handle, socket).map(|io| {
UdpSocket { io: io }
}).boxed()
}
@ -62,7 +55,7 @@ impl UdpSocket {
/// Returns the local address that this stream is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.source.io().local_addr()
self.io.get_ref().local_addr()
}
/// Test whether this socket is ready to be read or not.
@ -72,7 +65,7 @@ impl UdpSocket {
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
pub fn poll_read(&self) -> Poll<(), io::Error> {
self.ready.poll_read()
self.io.poll_read()
}
/// Test whether this socket is writey to be written to or not.
@ -82,7 +75,7 @@ impl UdpSocket {
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again.
pub fn poll_write(&self) -> Poll<(), io::Error> {
self.ready.poll_write()
self.io.poll_write()
}
/// Sends data on the socket to the given address. On success, returns the
@ -91,10 +84,10 @@ impl UdpSocket {
/// Address type can be any implementor of `ToSocketAddrs` trait. See its
/// documentation for concrete examples.
pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
match self.source.io().send_to(buf, target) {
match self.io.get_ref().send_to(buf, target) {
Ok(Some(n)) => Ok(n),
Ok(None) => {
self.ready.need_write();
self.io.need_write();
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block"))
}
Err(e) => Err(e),
@ -104,10 +97,10 @@ impl UdpSocket {
/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
match self.source.io().recv_from(buf) {
match self.io.get_ref().recv_from(buf) {
Ok(Some(n)) => Ok(n),
Ok(None) => {
self.ready.need_read();
self.io.need_read();
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block"))
}
Err(e) => Err(e),
@ -121,7 +114,7 @@ impl UdpSocket {
///
/// [link]: #method.set_broadcast
pub fn broadcast(&self) -> io::Result<bool> {
self.source.io().broadcast()
self.io.get_ref().broadcast()
}
/// Sets the value of the `SO_BROADCAST` option for this socket.
@ -129,7 +122,7 @@ impl UdpSocket {
/// When enabled, this socket is allowed to send packets to a broadcast
/// address.
pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
self.source.io().set_broadcast(on)
self.io.get_ref().set_broadcast(on)
}
/// Gets the value of the `IP_MULTICAST_LOOP` option for this socket.
@ -139,7 +132,7 @@ impl UdpSocket {
///
/// [link]: #method.set_multicast_loop_v4
pub fn multicast_loop_v4(&self) -> io::Result<bool> {
self.source.io().multicast_loop_v4()
self.io.get_ref().multicast_loop_v4()
}
/// Sets the value of the `IP_MULTICAST_LOOP` option for this socket.
@ -147,7 +140,7 @@ impl UdpSocket {
/// If enabled, multicast packets will be looped back to the local socket.
/// Note that this may not have any affect on IPv6 sockets.
pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
self.source.io().set_multicast_loop_v4(on)
self.io.get_ref().set_multicast_loop_v4(on)
}
/// Gets the value of the `IP_MULTICAST_TTL` option for this socket.
@ -157,7 +150,7 @@ impl UdpSocket {
///
/// [link]: #method.set_multicast_ttl_v4
pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
self.source.io().multicast_ttl_v4()
self.io.get_ref().multicast_ttl_v4()
}
/// Sets the value of the `IP_MULTICAST_TTL` option for this socket.
@ -168,7 +161,7 @@ impl UdpSocket {
///
/// Note that this may not have any affect on IPv6 sockets.
pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
self.source.io().set_multicast_ttl_v4(ttl)
self.io.get_ref().set_multicast_ttl_v4(ttl)
}
/// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
@ -178,7 +171,7 @@ impl UdpSocket {
///
/// [link]: #method.set_multicast_loop_v6
pub fn multicast_loop_v6(&self) -> io::Result<bool> {
self.source.io().multicast_loop_v6()
self.io.get_ref().multicast_loop_v6()
}
/// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
@ -186,7 +179,7 @@ impl UdpSocket {
/// Controls whether this socket sees the multicast packets it sends itself.
/// Note that this may not have any affect on IPv4 sockets.
pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
self.source.io().set_multicast_loop_v6(on)
self.io.get_ref().set_multicast_loop_v6(on)
}
/// Gets the value of the `IP_TTL` option for this socket.
@ -195,7 +188,7 @@ impl UdpSocket {
///
/// [link]: #method.set_ttl
pub fn ttl(&self) -> io::Result<u32> {
self.source.io().ttl()
self.io.get_ref().ttl()
}
/// Sets the value for the `IP_TTL` option on this socket.
@ -203,7 +196,7 @@ impl UdpSocket {
/// This value sets the time-to-live field that is used in every packet sent
/// from this socket.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.source.io().set_ttl(ttl)
self.io.get_ref().set_ttl(ttl)
}
/// Executes an operation of the `IP_ADD_MEMBERSHIP` type.
@ -216,7 +209,7 @@ impl UdpSocket {
pub fn join_multicast_v4(&self,
multiaddr: &Ipv4Addr,
interface: &Ipv4Addr) -> io::Result<()> {
self.source.io().join_multicast_v4(multiaddr, interface)
self.io.get_ref().join_multicast_v4(multiaddr, interface)
}
/// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type.
@ -227,7 +220,7 @@ impl UdpSocket {
pub fn join_multicast_v6(&self,
multiaddr: &Ipv6Addr,
interface: u32) -> io::Result<()> {
self.source.io().join_multicast_v6(multiaddr, interface)
self.io.get_ref().join_multicast_v6(multiaddr, interface)
}
/// Executes an operation of the `IP_DROP_MEMBERSHIP` type.
@ -239,7 +232,7 @@ impl UdpSocket {
pub fn leave_multicast_v4(&self,
multiaddr: &Ipv4Addr,
interface: &Ipv4Addr) -> io::Result<()> {
self.source.io().leave_multicast_v4(multiaddr, interface)
self.io.get_ref().leave_multicast_v4(multiaddr, interface)
}
/// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type.
@ -251,13 +244,13 @@ impl UdpSocket {
pub fn leave_multicast_v6(&self,
multiaddr: &Ipv6Addr,
interface: u32) -> io::Result<()> {
self.source.io().leave_multicast_v6(multiaddr, interface)
self.io.get_ref().leave_multicast_v6(multiaddr, interface)
}
}
impl fmt::Debug for UdpSocket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.source.io().fmt(f)
self.io.get_ref().fmt(f)
}
}
@ -268,7 +261,7 @@ mod sys {
impl AsRawFd for UdpSocket {
fn as_raw_fd(&self) -> RawFd {
self.source.io().as_raw_fd()
self.io.get_ref().as_raw_fd()
}
}
}
@ -282,7 +275,7 @@ mod sys {
//
// impl AsRawHandle for UdpSocket {
// fn as_raw_handle(&self) -> RawHandle {
// self.source.io().as_raw_handle()
// self.io.get_ref().as_raw_handle()
// }
// }
}