Update to futures master

* Remove `LoopData` as it's no longer necessary
* Add `LoopHandle::spawn` to spawn new futures onto an event loop
* Add `LoopData::spawn` to also spawn new futures onto an event loop
* Rejigger the implementation of the event loop a bit (make a slab of futures),
  but otherwise everything else is pretty constant.
This commit is contained in:
Alex Crichton 2016-08-31 00:19:29 -07:00
parent 440a813c5a
commit 330ab823b0
8 changed files with 343 additions and 502 deletions

View File

@ -8,6 +8,7 @@ use std::net::SocketAddr;
use futures::Future;
use futures::stream::Stream;
use tokio_core::Loop;
use tokio_core::io::{copy, TaskIo};
fn main() {
@ -15,7 +16,8 @@ fn main() {
let addr = addr.parse::<SocketAddr>().unwrap();
// Create the event loop that will drive this server
let mut l = tokio_core::Loop::new().unwrap();
let mut l = Loop::new().unwrap();
let pin = l.pin();
// Create a TCP listener which will listen for incoming connections
let server = l.handle().tcp_listen(&addr);
@ -29,17 +31,20 @@ fn main() {
//
// We use the `io::copy` future to copy all data from the
// reading half onto the writing half.
socket.incoming().for_each(|(socket, addr)| {
socket.incoming().for_each(move |(socket, addr)| {
let socket = futures::lazy(|| futures::finished(TaskIo::new(socket)));
let pair = socket.map(|s| s.split());
let amt = pair.and_then(|(reader, writer)| copy(reader, writer));
// Once all that is done we print out how much we wrote, and then
// critically we *forget* this future which allows it to run
// critically we *spawn* this future which allows it to run
// concurrently with other connections.
amt.map(move |amt| {
let msg = amt.map(move |amt| {
println!("wrote {} bytes to {}", amt, addr)
}).forget();
}).map_err(|e| {
panic!("error: {}", e);
});
pin.spawn(msg);
Ok(())
})

View File

@ -1,348 +0,0 @@
use std::sync::Arc;
use std::io;
use futures::{Future, Poll};
use futures::task;
use futures::executor::Executor;
use event_loop::{Message, Loop, LoopPin, LoopHandle, LoopFuture};
use self::dropbox::DropBox;
/// A handle to data that is owned by an event loop thread, and is only
/// accessible on that thread itself.
///
/// This structure is created by the `LoopHandle::add_loop_data` method which
/// will return a future resolving to one of these references. A `LoopData<A>`
/// handle is `Send` regardless of what `A` is, but the internal data can only
/// be accessed on the event loop thread itself.
///
/// Internally this reference also stores a handle to the event loop that the
/// data originated on, so it knows how to go back to the event loop to access
/// the data itself.
// TODO: write more once it's implemented
pub struct LoopData<A: 'static> {
data: DropBox<A>,
handle: LoopHandle,
}
pub struct Opaque {
_inner: DropBox<dropbox::MyDrop>,
}
/// Future returned from the `LoopHandle::add_loop_data` method.
///
/// This future will resolve to a `LoopData<A>` reference when completed, which
/// represents a handle to data that is "owned" by the event loop thread but can
/// migrate among threads temporarily so travel with a future itself.
pub struct AddLoopData<F, A> {
inner: LoopFuture<DropBox<A>, F>,
}
fn _assert() {
fn _assert_send<T: Send>() {}
_assert_send::<LoopData<()>>();
}
impl Loop {
/// Creates a new `LoopData<A>` handle by associating data to be directly
/// stored by this event loop.
///
/// This function is useful for when storing non-`Send` data inside of a
/// future. The `LoopData<A>` handle is itself `Send + 'static` regardless
/// of the underlying `A`. That is, for example, you can create a handle to
/// some data that contains an `Rc`, for example.
pub fn add_loop_data<A>(&self, a: A) -> LoopData<A>
where A: 'static,
{
self.pin().add_loop_data(a)
}
}
impl LoopPin {
/// Adds some data to the event loop this pin is associated with.
///
/// This method will return a handle to the data, `LoopData`, which can be
/// used to access the underlying data whenever it's on the correct event
/// loop thread.
pub fn add_loop_data<A>(&self, a: A) -> LoopData<A>
where A: 'static,
{
LoopData {
data: DropBox::new_on(a, self),
handle: self.handle.clone(),
}
}
}
impl LoopHandle {
/// Schedules a closure to add some data to event loop thread itself.
///
/// This function is useful for when storing non-`Send` data inside of a
/// future. This returns a future which will resolve to a `LoopData<A>`
/// handle, which is itself `Send + 'static` regardless of the underlying
/// `A`. That is, for example, you can create a handle to some data that
/// contains an `Rc`, for example.
///
/// This function takes a closure which may be sent to the event loop to
/// generate an instance of type `A`. The closure itself is required to be
/// `Send + 'static`, but the data it produces is only required to adhere to
/// `'static`.
///
/// If the returned future is polled on the event loop thread itself it will
/// very cheaply resolve to a handle to the data, but if it's not polled on
/// the event loop then it will send a message to the event loop to run the
/// closure `f`, generate a handle, and then the future will yield it back.
// TODO: more with examples
pub fn add_loop_data<F, A>(&self, f: F) -> AddLoopData<F, A>
where F: FnOnce(&LoopPin) -> A + Send + 'static,
A: 'static,
{
AddLoopData {
inner: LoopFuture {
loop_handle: self.clone(),
data: Some(f),
result: None,
},
}
}
}
impl<F, A> Future for AddLoopData<F, A>
where F: FnOnce(&LoopPin) -> A + Send + 'static,
A: 'static,
{
type Item = LoopData<A>;
type Error = io::Error;
fn poll(&mut self) -> Poll<LoopData<A>, io::Error> {
let ret = self.inner.poll(|lp, f| {
Ok(DropBox::new(f(&lp.pin())))
}, |f, slot| {
Message::Run(Box::new(move || {
let pin = super::CURRENT_LOOP.with(|lp| lp.pin());
slot.try_produce(Ok(DropBox::new(f(&pin)))).ok()
.expect("add loop data try_produce intereference");
}))
});
ret.map(|data| {
LoopData {
data: data,
handle: self.inner.loop_handle.clone(),
}
})
}
}
impl<A: 'static> LoopData<A> {
/// Gets a shared reference to the underlying data in this handle.
///
/// Returns `None` if it is not called from the event loop thread that this
/// `LoopData<A>` is associated with, or `Some` with a reference to the data
/// if we are indeed on the event loop thread.
pub fn get(&self) -> Option<&A> {
self.data.get()
}
/// Gets a mutable reference to the underlying data in this handle.
///
/// Returns `None` if it is not called from the event loop thread that this
/// `LoopData<A>` is associated with, or `Some` with a reference to the data
/// if we are indeed on the event loop thread.
pub fn get_mut(&mut self) -> Option<&mut A> {
self.data.get_mut()
}
/// Acquire the executor associated with the thread that owns this
/// `LoopData<A>`'s data.
///
/// If the `get` and `get_mut` functions above return `None`, then this data
/// is being polled on the wrong thread to access the data, and to make
/// progress a future may need to migrate to the actual thread which owns
/// the relevant data.
///
/// This executor can in turn be passed to `Task::poll_on`, which will then
/// move the entire future to be polled on the right thread.
pub fn executor(&self) -> Arc<Executor> {
self.handle.tx.clone()
}
/// Returns a reference to the handle that this data is bound to.
pub fn loop_handle(&self) -> &LoopHandle {
&self.handle
}
}
impl<A: Future> Future for LoopData<A> {
type Item = A::Item;
type Error = A::Error;
fn poll(&mut self) -> Poll<A::Item, A::Error> {
// If we're on the right thread, then we can proceed. Otherwise we need
// to go and get polled on the right thread.
if let Some(inner) = self.get_mut() {
return inner.poll()
}
task::poll_on(self.executor());
Poll::NotReady
}
}
impl<A: 'static> Drop for LoopData<A> {
fn drop(&mut self) {
// The `DropBox` we store internally will cause a memory leak if it's
// dropped on the wrong thread. While necessary for safety, we don't
// actually want a memory leak, so for all normal circumstances we take
// out the `DropBox<A>` as a `DropBox<MyDrop>` and then we send it off
// to the event loop.
//
// TODO: possible optimization is to do none of this if we're on the
// event loop thread itself
if let Some(data) = self.data.take() {
self.handle.send(Message::Drop(Opaque { _inner: data }));
}
}
}
/// A curious inner module with one `unsafe` keyword, yet quite an important
/// one!
///
/// The purpose of this module is to define a type, `DropBox<A>`, which is able
/// to be sent across thread event when the underlying data `A` is itself not
/// sendable across threads. This is then in turn used to build up the
/// `LoopData` abstraction above.
///
/// A `DropBox` currently contains two major components, an identification of
/// the thread that it originated from as well as the data itself. Right now the
/// data is stored in a `Box` as we'll transition between it and `Box<MyDrop>`,
/// but this is perhaps optimizable.
///
/// The `DropBox<A>` itself only provides a few safe methods, all of which are
/// safe to call from any thread. Access to the underlying data is only granted
/// if we're on the right thread, and otherwise the methods don't access the
/// data itself.
///
/// Finally, one crucial piece, if the data is dropped it may run code that
/// assumes it's on the original thread. For this reason we have to be sure that
/// the data is only dropped on the originating thread itself. It's currently
/// the job of the outer `LoopData` to ensure that a `DropBox` is dropped on the
/// right thread, so we don't attempt to perform any communication in this
/// `Drop` implementation. Instead, if a `DropBox` is dropped on the wrong
/// thread, it simply leaks its contents.
///
/// All that's really just a lot of words in an attempt to justify the `unsafe`
/// impl of `Send` below. The idea is that the data is only ever accessed on the
/// originating thread, even during `Drop`.
///
/// Note that this is a private module to have a visibility boundary around the
/// unsafe internals. Although there's not any unsafe blocks here, the code
/// itself is quite unsafe as it has to make sure that the data is dropped in
/// the right place, if ever.
mod dropbox {
use std::mem;
use event_loop::{CURRENT_LOOP, LoopPin};
pub struct DropBox<A: ?Sized> {
id: usize,
inner: Option<Box<A>>,
}
// We can be sent across threads due to the comment above
unsafe impl<A: ?Sized> Send for DropBox<A> {}
// We can also be shared across threads just fine as we'll only ever get a
// reference on at most one thread, regardless of `A`.
unsafe impl<A: ?Sized> Sync for DropBox<A> {}
pub trait MyDrop {}
impl<T: ?Sized> MyDrop for T {}
impl<A> DropBox<A> {
/// Creates a new `DropBox` pinned to the current threads.
///
/// Will panic if `CURRENT_LOOP` isn't set.
pub fn new(a: A) -> DropBox<A> {
DropBox {
id: CURRENT_LOOP.with(|lp| lp.id),
inner: Some(Box::new(a)),
}
}
/// Creates a new `DropBox` pinned to the thread of `LoopPin`.
pub fn new_on(a: A, lp: &LoopPin) -> DropBox<A> {
DropBox {
id: lp.handle.id,
inner: Some(Box::new(a)),
}
}
/// Consumes the contents of this `DropBox<A>`, returning a new
/// `DropBox<MyDrop>`.
///
/// This is just intended to be a simple and cheap conversion, should
/// almost always return `Some`.
pub fn take<'a>(&mut self) -> Option<DropBox<MyDrop + 'a>>
where A: 'a
{
self.inner.take().map(|d| {
DropBox { id: self.id, inner: Some(d as Box<MyDrop + 'a>) }
})
}
}
impl<A: ?Sized> DropBox<A> {
/// Returns a shared reference to the data if we're on the right
/// thread.
pub fn get(&self) -> Option<&A> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
if lp.id == self.id {
self.inner.as_ref().map(|b| &**b)
} else {
None
}
})
} else {
None
}
}
/// Returns a mutable reference to the data if we're on the right
/// thread.
pub fn get_mut(&mut self) -> Option<&mut A> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(move |lp| {
if lp.id == self.id {
self.inner.as_mut().map(|b| &mut **b)
} else {
None
}
})
} else {
None
}
}
}
impl<A: ?Sized> Drop for DropBox<A> {
fn drop(&mut self) {
// Try our safe accessor first, and if it works then we know that
// we're on the right thread. In that case we can simply drop as
// usual.
if self.get().is_some() {
drop(self.inner.take().unwrap());
return;
}
// If we're on the wrong thread but we actually have some data, then
// something in theory horrible has gone awry. Prevent memory safety
// issues by forgetting the data and then also warn about this odd
// event.
if let Some(data) = self.inner.take() {
mem::forget(data);
warn!("forgetting some data on an event loop");
}
}
}
}

View File

@ -1,15 +1,13 @@
use std::cell::RefCell;
use std::io::{self, ErrorKind};
use std::marker;
use std::mem;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use futures::{Future, Poll};
use futures::task::{self, Task, Notify, TaskHandle};
use futures::executor::{ExecuteCallback, Executor};
use futures::{Future, Poll, IntoFuture};
use futures::task::{self, Unpark, Task, Spawn};
use mio;
use slab::Slab;
@ -17,10 +15,8 @@ use slot::{self, Slot};
use timer_wheel::{TimerWheel, Timeout};
mod channel;
mod loop_data;
mod source;
mod timeout;
pub use self::loop_data::{LoopData, AddLoopData};
pub use self::source::{AddSource, IoToken};
pub use self::timeout::{AddTimeout, TimeoutToken};
use self::channel::{Sender, Receiver, channel};
@ -41,11 +37,20 @@ pub struct Loop {
id: usize,
io: mio::Poll,
events: mio::Events,
tx: Arc<MioSender>,
tx: Arc<Sender<Message>>,
rx: Receiver<Message>,
dispatch: RefCell<Slab<Scheduled, usize>>,
io_dispatch: RefCell<Slab<ScheduledIo, usize>>,
task_dispatch: RefCell<Slab<ScheduledTask, usize>>,
// Incoming queue of newly spawned futures
new_futures: Rc<NewFutures>,
_new_futures_registration: mio::Registration,
// Used for determining when the future passed to `run` is ready. Once the
// registration is passed to `io` above we never touch it again, just keep
// it alive.
_future_registration: mio::Registration,
future_readiness: Arc<mio::SetReadiness>,
future_readiness: Arc<MySetReadiness>,
// Timer wheel keeping track of all timeouts. The `usize` stored in the
// timer wheel is an index into the slab below.
@ -55,16 +60,6 @@ pub struct Loop {
// `timeouts` slab.
timer_wheel: RefCell<TimerWheel<usize>>,
timeouts: RefCell<Slab<(Timeout, TimeoutState), usize>>,
// A `Loop` cannot be sent to other threads as it's used as a proxy for data
// that belongs to the thread the loop was running on at some point. In
// other words, the safety of `DropBox` below relies on loops not crossing
// threads.
_marker: marker::PhantomData<Rc<u32>>,
}
struct MioSender {
inner: Sender<Message>,
}
/// Handle to an event loop, used to construct I/O objects, send messages, and
@ -75,7 +70,7 @@ struct MioSender {
#[derive(Clone)]
pub struct LoopHandle {
id: usize,
tx: Arc<MioSender>,
tx: Arc<Sender<Message>>,
}
/// A non-sendable handle to an event loop, useful for manufacturing instances
@ -83,19 +78,30 @@ pub struct LoopHandle {
#[derive(Clone)]
pub struct LoopPin {
handle: LoopHandle,
_marker: marker::PhantomData<Box<Drop>>,
futures: Rc<NewFutures>,
}
struct Scheduled {
struct ScheduledIo {
readiness: Arc<AtomicUsize>,
reader: Option<TaskHandle>,
writer: Option<TaskHandle>,
reader: Option<Task>,
writer: Option<Task>,
}
struct ScheduledTask {
_registration: mio::Registration,
spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>,
wake: Arc<MySetReadiness>,
}
struct NewFutures {
queue: RefCell<Vec<Box<Future<Item=(), Error=()>>>>,
ready: mio::SetReadiness,
}
enum TimeoutState {
NotFired,
Fired,
Waiting(TaskHandle),
Waiting(Task),
}
enum Direction {
@ -105,14 +111,18 @@ enum Direction {
enum Message {
DropSource(usize),
Schedule(usize, TaskHandle, Direction),
Schedule(usize, Task, Direction),
AddTimeout(Instant, Arc<Slot<io::Result<(usize, Instant)>>>),
UpdateTimeout(usize, TaskHandle),
UpdateTimeout(usize, Task),
CancelTimeout(usize),
Run(Box<ExecuteCallback>),
Drop(loop_data::Opaque),
Run(Box<FnBox>),
}
const TOKEN_MESSAGES: mio::Token = mio::Token(0);
const TOKEN_FUTURE: mio::Token = mio::Token(1);
const TOKEN_NEW_FUTURES: mio::Token = mio::Token(2);
const TOKEN_START: usize = 3;
impl Loop {
/// Creates a new event loop, returning any error that happened during the
/// creation.
@ -120,26 +130,34 @@ impl Loop {
let (tx, rx) = channel();
let io = try!(mio::Poll::new());
try!(io.register(&rx,
mio::Token(0),
TOKEN_MESSAGES,
mio::Ready::readable(),
mio::PollOpt::edge()));
let pair = mio::Registration::new(&io,
mio::Token(1),
mio::Ready::readable(),
mio::PollOpt::level());
let (registration, readiness) = pair;
let future_pair = mio::Registration::new(&io,
TOKEN_FUTURE,
mio::Ready::readable(),
mio::PollOpt::level());
let new_future_pair = mio::Registration::new(&io,
TOKEN_NEW_FUTURES,
mio::Ready::readable(),
mio::PollOpt::level());
Ok(Loop {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io,
events: mio::Events::with_capacity(1024),
tx: Arc::new(MioSender { inner: tx }),
tx: Arc::new(tx),
rx: rx,
_future_registration: registration,
future_readiness: Arc::new(readiness),
dispatch: RefCell::new(Slab::new_starting_at(2, SLAB_CAPACITY)),
io_dispatch: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)),
task_dispatch: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)),
timeouts: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)),
timer_wheel: RefCell::new(TimerWheel::new()),
_marker: marker::PhantomData,
_future_registration: future_pair.0,
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
_new_futures_registration: new_future_pair.0,
new_futures: Rc::new(NewFutures {
queue: RefCell::new(Vec::new()),
ready: new_future_pair.1,
}),
})
}
@ -164,7 +182,7 @@ impl Loop {
pub fn pin(&self) -> LoopPin {
LoopPin {
handle: self.handle(),
_marker: marker::PhantomData,
futures: self.new_futures.clone(),
}
}
@ -191,25 +209,10 @@ impl Loop {
/// thread but also to this task, any attempt to poll the future on a
/// separate thread will result in a panic. That is, calls to
/// `task::poll_on` must be avoided.
pub fn run<F>(&mut self, mut f: F) -> Result<F::Item, F::Error>
pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
where F: Future,
{
struct MyNotify(Arc<mio::SetReadiness>);
impl Notify for MyNotify {
fn notify(&self) {
self.0.set_readiness(mio::Ready::readable())
.expect("failed to set readiness");
}
}
// First up, create the task that will drive this future. The task here
// isn't a "normal task" but rather one where we define what to do when
// a readiness notification comes in.
//
// We translate readiness notifications to a `set_readiness` of our
// `future_readiness` structure we have stored internally.
let mut task = Task::new_notify(MyNotify(self.future_readiness.clone()));
let mut task = task::spawn(f);
let ready = self.future_readiness.clone();
// Next, move all that data into a dynamically dispatched closure to cut
@ -218,10 +221,8 @@ impl Loop {
// to see if it's done. If it's not then the event loop will turn again.
let mut res = None;
self._run(&mut || {
ready.set_readiness(mio::Ready::none())
.expect("failed to set readiness");
assert!(res.is_none());
match task.enter(|| f.poll()) {
match task.poll_future(ready.clone()) {
Poll::NotReady => {}
Poll::Ok(e) => res = Some(Ok(e)),
Poll::Err(e) => res = Some(Err(e)),
@ -238,7 +239,8 @@ impl Loop {
return
}
loop {
let mut finished = false;
while !finished {
let amt;
// On Linux, Poll::poll is epoll_wait, which may return EINTR if a
// ptracer attaches. This retry loop prevents crashing when
@ -273,55 +275,24 @@ impl Loop {
// Next, process all the events that came in.
for i in 0..self.events.len() {
let event = self.events.get(i).unwrap();
let token = usize::from(event.token());
// Token 0 == our incoming message queue, so this means we
// process the whole queue of messages.
//
// Token 1 == we should poll the future, we'll do that right
// after we get through the rest of this tick of the event loop.
if token == 0 {
debug!("consuming notification queue");
CURRENT_LOOP.set(&self, || {
self.consume_queue();
});
continue
} else if token == 1 {
if CURRENT_LOOP.set(self, || done()) {
return
}
continue
}
let token = event.token();
trace!("event {:?} {:?}", event.kind(), event.token());
// For any other token we look at `dispatch` to see what we're
// supposed to do. If there's a waiter we get ready to notify
// it, and we also or-in atomically any events that have
// happened (currently read/write events).
let mut reader = None;
let mut writer = None;
if let Some(sched) = self.dispatch.borrow_mut().get_mut(token) {
if event.kind().is_readable() {
reader = sched.reader.take();
sched.readiness.fetch_or(1, Ordering::Relaxed);
if token == TOKEN_MESSAGES {
CURRENT_LOOP.set(&self, || self.consume_queue());
} else if token == TOKEN_FUTURE {
self.future_readiness.0.set_readiness(mio::Ready::none()).unwrap();
if !finished && CURRENT_LOOP.set(self, || done()) {
finished = true;
}
if event.kind().is_writable() {
writer = sched.writer.take();
sched.readiness.fetch_or(2, Ordering::Relaxed);
} else if token == TOKEN_NEW_FUTURES {
self.new_futures.ready.set_readiness(mio::Ready::none()).unwrap();
let mut new_futures = self.new_futures.queue.borrow_mut();
for future in new_futures.drain(..) {
self.spawn(future);
}
} else {
debug!("notified on {} which no longer exists", token);
}
// If we actually got a waiter, then notify!
//
// TODO: don't notify the same task twice
if let Some(reader) = reader {
self.notify_handle(reader);
}
if let Some(writer) = writer {
self.notify_handle(writer);
self.dispatch(token, event.kind());
}
}
@ -329,6 +300,61 @@ impl Loop {
}
}
fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
let token = usize::from(token) - TOKEN_START;
if token % 2 == 0 {
self.dispatch_io(token / 2, ready)
} else {
self.dispatch_task(token / 2)
}
}
fn dispatch_io(&self, token: usize, ready: mio::Ready) {
let mut reader = None;
let mut writer = None;
if let Some(io) = self.io_dispatch.borrow_mut().get_mut(token) {
if ready.is_readable() {
reader = io.reader.take();
io.readiness.fetch_or(1, Ordering::Relaxed);
}
if ready.is_writable() {
writer = io.writer.take();
io.readiness.fetch_or(2, Ordering::Relaxed);
}
}
// TODO: don't notify the same task twice
if let Some(reader) = reader {
self.notify_handle(reader);
}
if let Some(writer) = writer {
self.notify_handle(writer);
}
}
fn dispatch_task(&self, token: usize) {
let (task, wake) = match self.task_dispatch.borrow_mut().get_mut(token) {
Some(slot) => (slot.spawn.take(), slot.wake.clone()),
None => return,
};
wake.0.set_readiness(mio::Ready::none()).unwrap();
let mut task = match task {
Some(task) => task,
None => return,
};
let res = CURRENT_LOOP.set(self, || task.poll_future(wake));
let mut dispatch = self.task_dispatch.borrow_mut();
match res {
Poll::NotReady => {
assert!(dispatch[token].spawn.is_none());
dispatch[token].spawn = Some(task);
}
Poll::Ok(()) |
Poll::Err(()) => {
dispatch.remove(token).unwrap();
}
}
}
fn consume_timeouts(&mut self, now: Instant) {
loop {
let idx = match self.timer_wheel.borrow_mut().poll(now) {
@ -347,7 +373,7 @@ impl Loop {
///
/// Note that this should be used instead fo `handle.unpark()` to ensure
/// that the `CURRENT_LOOP` variable is set appropriately.
fn notify_handle(&self, handle: TaskHandle) {
fn notify_handle(&self, handle: Task) {
debug!("notifying a task handle");
CURRENT_LOOP.set(&self, || handle.unpark());
}
@ -355,19 +381,19 @@ impl Loop {
fn add_source(&self, source: &mio::Evented)
-> io::Result<(Arc<AtomicUsize>, usize)> {
debug!("adding a new I/O source");
let sched = Scheduled {
let sched = ScheduledIo {
readiness: Arc::new(AtomicUsize::new(0)),
reader: None,
writer: None,
};
let mut dispatch = self.dispatch.borrow_mut();
let mut dispatch = self.io_dispatch.borrow_mut();
if dispatch.vacant_entry().is_none() {
let amt = dispatch.count();
dispatch.grow(amt);
}
let entry = dispatch.vacant_entry().unwrap();
try!(self.io.register(source,
mio::Token(entry.index()),
mio::Token(TOKEN_START + entry.index() * 2),
mio::Ready::readable() | mio::Ready::writable(),
mio::PollOpt::edge()));
Ok((sched.readiness.clone(), entry.insert(sched).index()))
@ -375,22 +401,20 @@ impl Loop {
fn drop_source(&self, token: usize) {
debug!("dropping I/O source: {}", token);
self.dispatch.borrow_mut().remove(token).unwrap();
self.io_dispatch.borrow_mut().remove(token).unwrap();
}
fn schedule(&self, token: usize, wake: TaskHandle, dir: Direction) {
fn schedule(&self, token: usize, wake: Task, dir: Direction) {
debug!("scheduling direction for: {}", token);
let to_call = {
let mut dispatch = self.dispatch.borrow_mut();
let mut dispatch = self.io_dispatch.borrow_mut();
let sched = dispatch.get_mut(token).unwrap();
let (slot, bit) = match dir {
Direction::Read => (&mut sched.reader, 1),
Direction::Write => (&mut sched.writer, 2),
};
let ready = sched.readiness.load(Ordering::SeqCst);
if ready & bit != 0 {
if sched.readiness.load(Ordering::SeqCst) & bit != 0 {
*slot = None;
sched.readiness.store(ready & !bit, Ordering::SeqCst);
Some(wake)
} else {
*slot = Some(wake);
@ -417,7 +441,7 @@ impl Loop {
Ok((entry.index(), when))
}
fn update_timeout(&self, token: usize, handle: TaskHandle) {
fn update_timeout(&self, token: usize, handle: Task) {
debug!("updating a timeout: {}", token);
let to_wake = self.timeouts.borrow_mut()[token].1.block(handle);
if let Some(to_wake) = to_wake {
@ -433,7 +457,32 @@ impl Loop {
}
}
fn spawn(&self, future: Box<Future<Item=(), Error=()>>) {
let unpark = {
let mut dispatch = self.task_dispatch.borrow_mut();
if dispatch.vacant_entry().is_none() {
let len = dispatch.count();
dispatch.grow(len);
}
let entry = dispatch.vacant_entry().unwrap();
let token = TOKEN_START + 2 * entry.index() + 1;
let pair = mio::Registration::new(&self.io,
mio::Token(token),
mio::Ready::readable(),
mio::PollOpt::level());
let unpark = Arc::new(MySetReadiness(pair.1));
let entry = entry.insert(ScheduledTask {
spawn: Some(task::spawn(future)),
wake: unpark,
_registration: pair.0,
});
entry.get().wake.clone()
};
unpark.unpark();
}
fn consume_queue(&self) {
debug!("consuming notification queue");
// TODO: can we do better than `.unwrap()` here?
while let Some(msg) = self.rx.recv().unwrap() {
self.notify(msg);
@ -451,14 +500,7 @@ impl Loop {
}
Message::UpdateTimeout(t, handle) => self.update_timeout(t, handle),
Message::CancelTimeout(t) => self.cancel_timeout(t),
Message::Run(f) => {
debug!("running a closure");
f.call()
}
Message::Drop(data) => {
debug!("dropping some data");
drop(data);
}
Message::Run(r) => r.call_box(self),
}
}
}
@ -474,7 +516,7 @@ impl LoopHandle {
lp.notify(msg);
}
None => {
match self.tx.inner.send(msg) {
match self.tx.send(msg) {
Ok(()) => {}
// This should only happen when there was an error
@ -504,6 +546,25 @@ impl LoopHandle {
f(None)
}
}
/// Spawns a new future into the event loop this handle is associated this.
///
/// This function takes a closure which is executed within the context of
/// the I/O loop itself. The future returned by the closure will be
/// scheduled on the event loop an run to completion.
///
/// Note that while the closure, `F`, requires the `Send` bound as it might
/// cross threads, the future `R` does not.
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&LoopPin) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static,
{
self.send(Message::Run(Box::new(|lp: &Loop| {
let f = f(&lp.pin());
lp.spawn(Box::new(f.into_future()));
})));
}
}
impl LoopPin {
@ -512,9 +573,12 @@ impl LoopPin {
&self.handle
}
/// TODO: dox
pub fn executor(&self) -> Arc<Executor> {
self.handle.tx.clone()
/// Spawns a new future on the event loop this pin is associated this.
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static,
{
self.futures.queue.borrow_mut().push(Box::new(f));
self.futures.ready.set_readiness(mio::Ready::readable()).unwrap();
}
}
@ -569,7 +633,7 @@ impl<T, U> LoopFuture<T, U>
}
impl TimeoutState {
fn block(&mut self, handle: TaskHandle) -> Option<TaskHandle> {
fn block(&mut self, handle: Task) -> Option<Task> {
match *self {
TimeoutState::Fired => return Some(handle),
_ => {}
@ -578,7 +642,7 @@ impl TimeoutState {
None
}
fn fire(&mut self) -> Option<TaskHandle> {
fn fire(&mut self) -> Option<Task> {
match mem::replace(self, TimeoutState::Fired) {
TimeoutState::NotFired => None,
TimeoutState::Fired => panic!("fired twice?"),
@ -587,9 +651,21 @@ impl TimeoutState {
}
}
impl Executor for MioSender {
fn execute_boxed(&self, callback: Box<ExecuteCallback>) {
self.inner.send(Message::Run(callback))
.expect("error sending a message to the event loop")
struct MySetReadiness(mio::SetReadiness);
impl Unpark for MySetReadiness {
fn unpark(&self) {
self.0.set_readiness(mio::Ready::readable())
.expect("failed to set readiness");
}
}
trait FnBox: Send + 'static {
fn call_box(self: Box<Self>, lp: &Loop);
}
impl<F: FnOnce(&Loop) + Send + 'static> FnBox for F {
fn call_box(self: Box<Self>, lp: &Loop) {
(*self)(lp)
}
}

View File

@ -6,7 +6,7 @@ use futures::{Future, Poll};
use futures::task;
use mio;
use event_loop::{Message, LoopHandle, LoopFuture, Direction};
use event_loop::{Message, LoopHandle, LoopFuture, Direction, Loop};
/// A future which will resolve a unique `tok` token for an I/O object.
///
@ -160,17 +160,12 @@ impl<E> Future for AddSource<E>
type Error = io::Error;
fn poll(&mut self) -> Poll<(E, IoToken), io::Error> {
let handle = self.inner.loop_handle.clone();
let res = self.inner.poll(|lp, io| {
let pair = try!(lp.add_source(&io));
Ok((io, pair))
}, |io, slot| {
Message::Run(Box::new(move || {
let res = handle.with_loop(|lp| {
let lp = lp.unwrap();
let pair = try!(lp.add_source(&io));
Ok((io, pair))
});
Message::Run(Box::new(move |lp: &Loop| {
let res = lp.add_source(&io).map(|p| (io, p));
slot.try_produce(res).ok()
.expect("add source try_produce intereference");
}))

View File

@ -1,7 +1,7 @@
use std::cell::RefCell;
use std::io::{self, Read, Write};
use futures::task::TaskData;
use futures::task::TaskRc;
/// Abstraction that allows inserting an I/O object into task-local storage,
/// returning a handle that can be split.
@ -15,7 +15,7 @@ use futures::task::TaskData;
/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any
/// attempt to read or write the object on other tasks will result in a panic.
pub struct TaskIo<T> {
handle: TaskData<RefCell<T>>,
handle: TaskRc<RefCell<T>>,
}
/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
@ -23,7 +23,7 @@ pub struct TaskIo<T> {
/// This handle implements the `ReadTask` trait and can be used to split up an
/// I/O object into two distinct halves.
pub struct TaskIoRead<T> {
handle: TaskData<RefCell<T>>,
handle: TaskRc<RefCell<T>>,
}
/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
@ -31,7 +31,7 @@ pub struct TaskIoRead<T> {
/// This handle implements the `WriteTask` trait and can be used to split up an
/// I/O object into two distinct halves.
pub struct TaskIoWrite<T> {
handle: TaskData<RefCell<T>>,
handle: TaskRc<RefCell<T>>,
}
impl<T> TaskIo<T> {
@ -41,7 +41,7 @@ impl<T> TaskIo<T> {
/// The returned future will never resolve to an error.
pub fn new(t: T) -> TaskIo<T> {
TaskIo {
handle: TaskData::new(RefCell::new(t)),
handle: TaskRc::new(RefCell::new(t)),
}
}
}

View File

@ -32,7 +32,7 @@ mod udp;
pub use channel::{Sender, Receiver};
pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout};
pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken};
pub use event_loop::{TimeoutToken, IoToken};
pub use readiness_stream::ReadinessStream;
pub use tcp::{TcpListener, TcpStream};
pub use timeout::Timeout;

64
tests/poll.rs Normal file
View File

@ -0,0 +1,64 @@
extern crate env_logger;
extern crate futures;
extern crate mio;
extern crate tokio_core;
use futures::{Future, Poll};
use futures::task;
use tokio_core::{Loop, IoToken, LoopHandle};
struct Next(usize);
impl Future for Next {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.0 == 0 {
task::park().unpark();
self.0 += 1;
Poll::NotReady
} else {
Poll::Ok(())
}
}
}
#[test]
fn poll_after_ready() {
drop(env_logger::init());
let mut lp = Loop::new().unwrap();
let handle = lp.handle();
let (tx, rx) = mio::channel::channel::<u32>();
let (_rx, token) = lp.run(handle.add_source(rx)).unwrap();
tx.send(2).unwrap();
lp.run(Next(0)).unwrap();
lp.run(ScheduleThenPoll {
token: token,
handle: handle,
n: 0,
}).unwrap();
struct ScheduleThenPoll {
token: IoToken,
handle: LoopHandle,
n: usize,
}
impl Future for ScheduleThenPoll {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.n == 0 {
self.handle.schedule_read(&self.token);
self.n += 1;
Poll::NotReady
} else {
assert!(self.token.take_readiness() & 1 != 0);
Poll::Ok(())
}
}
}
}

49
tests/spawn.rs Normal file
View File

@ -0,0 +1,49 @@
extern crate tokio_core;
extern crate env_logger;
extern crate futures;
use futures::Future;
use tokio_core::Loop;
#[test]
fn simple() {
drop(env_logger::init());
let mut lp = Loop::new().unwrap();
let (tx1, rx1) = futures::oneshot();
let (tx2, rx2) = futures::oneshot();
lp.pin().spawn(futures::lazy(|| {
tx1.complete(1);
Ok(())
}));
lp.handle().spawn(|_| {
futures::lazy(|| {
tx2.complete(2);
Ok(())
})
});
assert_eq!(lp.run(rx1.join(rx2)).unwrap(), (1, 2));
}
#[test]
fn spawn_in_poll() {
drop(env_logger::init());
let mut lp = Loop::new().unwrap();
let (tx1, rx1) = futures::oneshot();
let (tx2, rx2) = futures::oneshot();
let handle = lp.handle();
lp.pin().spawn(futures::lazy(move || {
tx1.complete(1);
handle.spawn(|_| {
futures::lazy(|| {
tx2.complete(2);
Ok(())
})
});
Ok(())
}));
assert_eq!(lp.run(rx1.join(rx2)).unwrap(), (1, 2));
}