mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00

This patch disables various Unix-specific platform features that are not enabled on Fuchsia. It also updates the mio version to 0.6.10, which is the first release that supports Fuchsia.
895 lines
28 KiB
Rust
895 lines
28 KiB
Rust
//! The core reactor driving all I/O
|
|
//!
|
|
//! This module contains the `Core` type which is the reactor for all I/O
|
|
//! happening in `tokio-core`. This reactor (or event loop) is used to run
|
|
//! futures, schedule tasks, issue I/O requests, etc.
|
|
|
|
use std::cell::RefCell;
|
|
use std::cmp;
|
|
use std::fmt;
|
|
use std::io::{self, ErrorKind};
|
|
use std::mem;
|
|
use std::rc::{Rc, Weak};
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
|
|
use std::time::{Instant, Duration};
|
|
|
|
use futures::{Future, IntoFuture, Async};
|
|
use futures::future::{self, Executor, ExecuteError};
|
|
use futures::executor::{self, Spawn, Notify};
|
|
use futures::sync::mpsc;
|
|
use futures::task::Task;
|
|
use mio;
|
|
use mio::event::Evented;
|
|
use slab::Slab;
|
|
|
|
use heap::{Heap, Slot};
|
|
|
|
mod io_token;
|
|
mod timeout_token;
|
|
|
|
mod poll_evented;
|
|
mod timeout;
|
|
mod interval;
|
|
pub use self::poll_evented::PollEvented;
|
|
pub use self::timeout::Timeout;
|
|
pub use self::interval::Interval;
|
|
|
|
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
|
|
scoped_thread_local!(static CURRENT_LOOP: Core);
|
|
|
|
/// An event loop.
|
|
///
|
|
/// The event loop is the main source of blocking in an application which drives
|
|
/// all other I/O events and notifications happening. Each event loop can have
|
|
/// multiple handles pointing to it, each of which can then be used to create
|
|
/// various I/O objects to interact with the event loop in interesting ways.
|
|
// TODO: expand this
|
|
pub struct Core {
|
|
events: mio::Events,
|
|
tx: mpsc::UnboundedSender<Message>,
|
|
rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
|
|
_rx_registration: mio::Registration,
|
|
rx_readiness: Arc<MySetReadiness>,
|
|
|
|
inner: Rc<RefCell<Inner>>,
|
|
|
|
// Used for determining when the future passed to `run` is ready. Once the
|
|
// registration is passed to `io` above we never touch it again, just keep
|
|
// it alive.
|
|
_future_registration: mio::Registration,
|
|
future_readiness: Arc<MySetReadiness>,
|
|
}
|
|
|
|
struct Inner {
|
|
id: usize,
|
|
io: mio::Poll,
|
|
|
|
// Dispatch slabs for I/O and futures events
|
|
io_dispatch: Slab<ScheduledIo>,
|
|
task_dispatch: Slab<ScheduledTask>,
|
|
|
|
// Timer wheel keeping track of all timeouts. The `usize` stored in the
|
|
// timer wheel is an index into the slab below.
|
|
//
|
|
// The slab below keeps track of the timeouts themselves as well as the
|
|
// state of the timeout itself. The `TimeoutToken` type is an index into the
|
|
// `timeouts` slab.
|
|
timer_heap: Heap<(Instant, usize)>,
|
|
timeouts: Slab<(Option<Slot>, TimeoutState)>,
|
|
}
|
|
|
|
/// An unique ID for a Core
|
|
///
|
|
/// An ID by which different cores may be distinguished. Can be compared and used as an index in
|
|
/// a `HashMap`.
|
|
///
|
|
/// The ID is globally unique and never reused.
|
|
#[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
|
|
pub struct CoreId(usize);
|
|
|
|
/// Handle to an event loop, used to construct I/O objects, send messages, and
|
|
/// otherwise interact indirectly with the event loop itself.
|
|
///
|
|
/// Handles can be cloned, and when cloned they will still refer to the
|
|
/// same underlying event loop.
|
|
#[derive(Clone)]
|
|
pub struct Remote {
|
|
id: usize,
|
|
tx: mpsc::UnboundedSender<Message>,
|
|
}
|
|
|
|
/// A non-sendable handle to an event loop, useful for manufacturing instances
|
|
/// of `LoopData`.
|
|
#[derive(Clone)]
|
|
pub struct Handle {
|
|
remote: Remote,
|
|
inner: Weak<RefCell<Inner>>,
|
|
}
|
|
|
|
struct ScheduledIo {
|
|
readiness: Arc<AtomicUsize>,
|
|
reader: Option<Task>,
|
|
writer: Option<Task>,
|
|
}
|
|
|
|
struct ScheduledTask {
|
|
_registration: mio::Registration,
|
|
spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>,
|
|
wake: Option<Arc<MySetReadiness>>,
|
|
}
|
|
|
|
enum TimeoutState {
|
|
NotFired,
|
|
Fired,
|
|
Waiting(Task),
|
|
}
|
|
|
|
enum Direction {
|
|
Read,
|
|
Write,
|
|
}
|
|
|
|
enum Message {
|
|
DropSource(usize),
|
|
Schedule(usize, Task, Direction),
|
|
UpdateTimeout(usize, Task),
|
|
ResetTimeout(usize, Instant),
|
|
CancelTimeout(usize),
|
|
Run(Box<FnBox>),
|
|
}
|
|
|
|
const TOKEN_MESSAGES: mio::Token = mio::Token(0);
|
|
const TOKEN_FUTURE: mio::Token = mio::Token(1);
|
|
const TOKEN_START: usize = 2;
|
|
|
|
impl Core {
|
|
/// Creates a new event loop, returning any error that happened during the
|
|
/// creation.
|
|
pub fn new() -> io::Result<Core> {
|
|
let io = try!(mio::Poll::new());
|
|
let future_pair = mio::Registration::new2();
|
|
try!(io.register(&future_pair.0,
|
|
TOKEN_FUTURE,
|
|
mio::Ready::readable(),
|
|
mio::PollOpt::level()));
|
|
let (tx, rx) = mpsc::unbounded();
|
|
let channel_pair = mio::Registration::new2();
|
|
try!(io.register(&channel_pair.0,
|
|
TOKEN_MESSAGES,
|
|
mio::Ready::readable(),
|
|
mio::PollOpt::level()));
|
|
let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
|
|
rx_readiness.notify(0);
|
|
|
|
Ok(Core {
|
|
events: mio::Events::with_capacity(1024),
|
|
tx: tx,
|
|
rx: RefCell::new(executor::spawn(rx)),
|
|
_rx_registration: channel_pair.0,
|
|
rx_readiness: rx_readiness,
|
|
|
|
_future_registration: future_pair.0,
|
|
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
|
|
|
|
inner: Rc::new(RefCell::new(Inner {
|
|
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
|
|
io: io,
|
|
io_dispatch: Slab::with_capacity(1),
|
|
task_dispatch: Slab::with_capacity(1),
|
|
timeouts: Slab::with_capacity(1),
|
|
timer_heap: Heap::new(),
|
|
})),
|
|
})
|
|
}
|
|
|
|
/// Returns a handle to this event loop which cannot be sent across threads
|
|
/// but can be used as a proxy to the event loop itself.
|
|
///
|
|
/// Handles are cloneable and clones always refer to the same event loop.
|
|
/// This handle is typically passed into functions that create I/O objects
|
|
/// to bind them to this event loop.
|
|
pub fn handle(&self) -> Handle {
|
|
Handle {
|
|
remote: self.remote(),
|
|
inner: Rc::downgrade(&self.inner),
|
|
}
|
|
}
|
|
|
|
/// Generates a remote handle to this event loop which can be used to spawn
|
|
/// tasks from other threads into this event loop.
|
|
pub fn remote(&self) -> Remote {
|
|
Remote {
|
|
id: self.inner.borrow().id,
|
|
tx: self.tx.clone(),
|
|
}
|
|
}
|
|
|
|
/// Runs a future until completion, driving the event loop while we're
|
|
/// otherwise waiting for the future to complete.
|
|
///
|
|
/// This function will begin executing the event loop and will finish once
|
|
/// the provided future is resolved. Note that the future argument here
|
|
/// crucially does not require the `'static` nor `Send` bounds. As a result
|
|
/// the future will be "pinned" to not only this thread but also this stack
|
|
/// frame.
|
|
///
|
|
/// This function will return the value that the future resolves to once
|
|
/// the future has finished. If the future never resolves then this function
|
|
/// will never return.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This method will **not** catch panics from polling the future `f`. If
|
|
/// the future panics then it's the responsibility of the caller to catch
|
|
/// that panic and handle it as appropriate.
|
|
pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
|
|
where F: Future,
|
|
{
|
|
let mut task = executor::spawn(f);
|
|
let mut future_fired = true;
|
|
|
|
loop {
|
|
if future_fired {
|
|
let res = try!(CURRENT_LOOP.set(self, || {
|
|
task.poll_future_notify(&self.future_readiness, 0)
|
|
}));
|
|
if let Async::Ready(e) = res {
|
|
return Ok(e)
|
|
}
|
|
}
|
|
future_fired = self.poll(None);
|
|
}
|
|
}
|
|
|
|
/// Performs one iteration of the event loop, blocking on waiting for events
|
|
/// for at most `max_wait` (forever if `None`).
|
|
///
|
|
/// It only makes sense to call this method if you've previously spawned
|
|
/// a future onto this event loop.
|
|
///
|
|
/// `loop { lp.turn(None) }` is equivalent to calling `run` with an
|
|
/// empty future (one that never finishes).
|
|
pub fn turn(&mut self, max_wait: Option<Duration>) {
|
|
self.poll(max_wait);
|
|
}
|
|
|
|
fn poll(&mut self, max_wait: Option<Duration>) -> bool {
|
|
// Given the `max_wait` variable specified, figure out the actual
|
|
// timeout that we're going to pass to `poll`. This involves taking a
|
|
// look at active timers on our heap as well.
|
|
let start = Instant::now();
|
|
let timeout = self.inner.borrow_mut().timer_heap.peek().map(|t| {
|
|
if t.0 < start {
|
|
Duration::new(0, 0)
|
|
} else {
|
|
t.0 - start
|
|
}
|
|
});
|
|
let timeout = match (max_wait, timeout) {
|
|
(Some(d1), Some(d2)) => Some(cmp::min(d1, d2)),
|
|
(max_wait, timeout) => max_wait.or(timeout),
|
|
};
|
|
|
|
// Block waiting for an event to happen, peeling out how many events
|
|
// happened.
|
|
let amt = match self.inner.borrow_mut().io.poll(&mut self.events, timeout) {
|
|
Ok(a) => a,
|
|
Err(ref e) if e.kind() == ErrorKind::Interrupted => return false,
|
|
Err(e) => panic!("error in poll: {}", e),
|
|
};
|
|
|
|
let after_poll = Instant::now();
|
|
debug!("loop poll - {:?}", after_poll - start);
|
|
debug!("loop time - {:?}", after_poll);
|
|
|
|
// Process all timeouts that may have just occurred, updating the
|
|
// current time since
|
|
self.consume_timeouts(after_poll);
|
|
|
|
// Process all the events that came in, dispatching appropriately
|
|
let mut fired = false;
|
|
for i in 0..self.events.len() {
|
|
let event = self.events.get(i).unwrap();
|
|
let token = event.token();
|
|
trace!("event {:?} {:?}", event.readiness(), event.token());
|
|
|
|
if token == TOKEN_MESSAGES {
|
|
self.rx_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
|
|
CURRENT_LOOP.set(&self, || self.consume_queue());
|
|
} else if token == TOKEN_FUTURE {
|
|
self.future_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
|
|
fired = true;
|
|
} else {
|
|
self.dispatch(token, event.readiness());
|
|
}
|
|
}
|
|
debug!("loop process - {} events, {:?}", amt, after_poll.elapsed());
|
|
return fired
|
|
}
|
|
|
|
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
|
|
let token = usize::from(token) - TOKEN_START;
|
|
if token % 2 == 0 {
|
|
self.dispatch_io(token / 2, ready)
|
|
} else {
|
|
self.dispatch_task(token / 2)
|
|
}
|
|
}
|
|
|
|
fn dispatch_io(&mut self, token: usize, ready: mio::Ready) {
|
|
let mut reader = None;
|
|
let mut writer = None;
|
|
let mut inner = self.inner.borrow_mut();
|
|
if let Some(io) = inner.io_dispatch.get_mut(token) {
|
|
io.readiness.fetch_or(ready2usize(ready), Ordering::Relaxed);
|
|
if ready.is_writable() {
|
|
writer = io.writer.take();
|
|
}
|
|
if !(ready & (!mio::Ready::writable())).is_empty() {
|
|
reader = io.reader.take();
|
|
}
|
|
}
|
|
drop(inner);
|
|
// TODO: don't notify the same task twice
|
|
if let Some(reader) = reader {
|
|
self.notify_handle(reader);
|
|
}
|
|
if let Some(writer) = writer {
|
|
self.notify_handle(writer);
|
|
}
|
|
}
|
|
|
|
fn dispatch_task(&mut self, token: usize) {
|
|
let mut inner = self.inner.borrow_mut();
|
|
let (task, wake) = match inner.task_dispatch.get_mut(token) {
|
|
Some(slot) => (slot.spawn.take(), slot.wake.take()),
|
|
None => return,
|
|
};
|
|
let (mut task, wake) = match (task, wake) {
|
|
(Some(task), Some(wake)) => (task, wake),
|
|
_ => return,
|
|
};
|
|
wake.0.set_readiness(mio::Ready::empty()).unwrap();
|
|
drop(inner);
|
|
let res = CURRENT_LOOP.set(self, || {
|
|
task.poll_future_notify(&wake, 0)
|
|
});
|
|
let _task_to_drop;
|
|
inner = self.inner.borrow_mut();
|
|
match res {
|
|
Ok(Async::NotReady) => {
|
|
assert!(inner.task_dispatch[token].spawn.is_none());
|
|
inner.task_dispatch[token].spawn = Some(task);
|
|
inner.task_dispatch[token].wake = Some(wake);
|
|
}
|
|
Ok(Async::Ready(())) |
|
|
Err(()) => {
|
|
_task_to_drop = inner.task_dispatch.remove(token).unwrap();
|
|
}
|
|
}
|
|
drop(inner);
|
|
}
|
|
|
|
fn consume_timeouts(&mut self, now: Instant) {
|
|
loop {
|
|
let mut inner = self.inner.borrow_mut();
|
|
match inner.timer_heap.peek() {
|
|
Some(head) if head.0 <= now => {}
|
|
Some(_) => break,
|
|
None => break,
|
|
};
|
|
let (_, slab_idx) = inner.timer_heap.pop().unwrap();
|
|
|
|
trace!("firing timeout: {}", slab_idx);
|
|
inner.timeouts[slab_idx].0.take().unwrap();
|
|
let handle = inner.timeouts[slab_idx].1.fire();
|
|
drop(inner);
|
|
if let Some(handle) = handle {
|
|
self.notify_handle(handle);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Method used to notify a task handle.
|
|
///
|
|
/// Note that this should be used instead of `handle.notify()` to ensure
|
|
/// that the `CURRENT_LOOP` variable is set appropriately.
|
|
fn notify_handle(&self, handle: Task) {
|
|
debug!("notifying a task handle");
|
|
CURRENT_LOOP.set(&self, || handle.notify());
|
|
}
|
|
|
|
fn consume_queue(&self) {
|
|
debug!("consuming notification queue");
|
|
// TODO: can we do better than `.unwrap()` here?
|
|
loop {
|
|
let msg = self.rx.borrow_mut().poll_stream_notify(&self.rx_readiness, 0).unwrap();
|
|
match msg {
|
|
Async::Ready(Some(msg)) => self.notify(msg),
|
|
Async::NotReady |
|
|
Async::Ready(None) => break,
|
|
}
|
|
}
|
|
}
|
|
|
|
fn notify(&self, msg: Message) {
|
|
match msg {
|
|
Message::DropSource(tok) => self.inner.borrow_mut().drop_source(tok),
|
|
Message::Schedule(tok, wake, dir) => {
|
|
let task = self.inner.borrow_mut().schedule(tok, wake, dir);
|
|
if let Some(task) = task {
|
|
self.notify_handle(task);
|
|
}
|
|
}
|
|
Message::UpdateTimeout(t, handle) => {
|
|
let task = self.inner.borrow_mut().update_timeout(t, handle);
|
|
if let Some(task) = task {
|
|
self.notify_handle(task);
|
|
}
|
|
}
|
|
Message::ResetTimeout(t, at) => {
|
|
self.inner.borrow_mut().reset_timeout(t, at);
|
|
}
|
|
Message::CancelTimeout(t) => {
|
|
self.inner.borrow_mut().cancel_timeout(t)
|
|
}
|
|
Message::Run(r) => r.call_box(self),
|
|
}
|
|
}
|
|
|
|
/// Get the ID of this loop
|
|
pub fn id(&self) -> CoreId {
|
|
CoreId(self.inner.borrow().id)
|
|
}
|
|
}
|
|
|
|
impl<F> Executor<F> for Core
|
|
where F: Future<Item = (), Error = ()> + 'static,
|
|
{
|
|
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
|
|
self.handle().execute(future)
|
|
}
|
|
}
|
|
|
|
impl fmt::Debug for Core {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
f.debug_struct("Core")
|
|
.field("id", &self.id())
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl Inner {
|
|
fn add_source(&mut self, source: &Evented)
|
|
-> io::Result<(Arc<AtomicUsize>, usize)> {
|
|
debug!("adding a new I/O source");
|
|
let sched = ScheduledIo {
|
|
readiness: Arc::new(AtomicUsize::new(0)),
|
|
reader: None,
|
|
writer: None,
|
|
};
|
|
if self.io_dispatch.vacant_entry().is_none() {
|
|
let amt = self.io_dispatch.len();
|
|
self.io_dispatch.reserve_exact(amt);
|
|
}
|
|
let entry = self.io_dispatch.vacant_entry().unwrap();
|
|
try!(self.io.register(source,
|
|
mio::Token(TOKEN_START + entry.index() * 2),
|
|
mio::Ready::readable() |
|
|
mio::Ready::writable() |
|
|
platform::all(),
|
|
mio::PollOpt::edge()));
|
|
Ok((sched.readiness.clone(), entry.insert(sched).index()))
|
|
}
|
|
|
|
fn deregister_source(&mut self, source: &Evented) -> io::Result<()> {
|
|
self.io.deregister(source)
|
|
}
|
|
|
|
fn drop_source(&mut self, token: usize) {
|
|
debug!("dropping I/O source: {}", token);
|
|
self.io_dispatch.remove(token).unwrap();
|
|
}
|
|
|
|
fn schedule(&mut self, token: usize, wake: Task, dir: Direction)
|
|
-> Option<Task> {
|
|
debug!("scheduling direction for: {}", token);
|
|
let sched = self.io_dispatch.get_mut(token).unwrap();
|
|
let (slot, ready) = match dir {
|
|
Direction::Read => (&mut sched.reader, !mio::Ready::writable()),
|
|
Direction::Write => (&mut sched.writer, mio::Ready::writable()),
|
|
};
|
|
if sched.readiness.load(Ordering::SeqCst) & ready2usize(ready) != 0 {
|
|
debug!("cancelling block");
|
|
*slot = None;
|
|
Some(wake)
|
|
} else {
|
|
debug!("blocking");
|
|
*slot = Some(wake);
|
|
None
|
|
}
|
|
}
|
|
|
|
fn add_timeout(&mut self, at: Instant) -> usize {
|
|
if self.timeouts.vacant_entry().is_none() {
|
|
let len = self.timeouts.len();
|
|
self.timeouts.reserve_exact(len);
|
|
}
|
|
let entry = self.timeouts.vacant_entry().unwrap();
|
|
let slot = self.timer_heap.push((at, entry.index()));
|
|
let entry = entry.insert((Some(slot), TimeoutState::NotFired));
|
|
debug!("added a timeout: {}", entry.index());
|
|
return entry.index();
|
|
}
|
|
|
|
fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
|
|
debug!("updating a timeout: {}", token);
|
|
self.timeouts[token].1.block(handle)
|
|
}
|
|
|
|
fn reset_timeout(&mut self, token: usize, at: Instant) {
|
|
let pair = &mut self.timeouts[token];
|
|
// TODO: avoid remove + push and instead just do one sift of the heap?
|
|
// In theory we could update it in place and then do the percolation
|
|
// as necessary
|
|
if let Some(slot) = pair.0.take() {
|
|
self.timer_heap.remove(slot);
|
|
}
|
|
let slot = self.timer_heap.push((at, token));
|
|
*pair = (Some(slot), TimeoutState::NotFired);
|
|
debug!("set a timeout: {}", token);
|
|
}
|
|
|
|
fn cancel_timeout(&mut self, token: usize) {
|
|
debug!("cancel a timeout: {}", token);
|
|
let pair = self.timeouts.remove(token);
|
|
if let Some((Some(slot), _state)) = pair {
|
|
self.timer_heap.remove(slot);
|
|
}
|
|
}
|
|
|
|
fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) {
|
|
if self.task_dispatch.vacant_entry().is_none() {
|
|
let len = self.task_dispatch.len();
|
|
self.task_dispatch.reserve_exact(len);
|
|
}
|
|
let entry = self.task_dispatch.vacant_entry().unwrap();
|
|
let token = TOKEN_START + 2 * entry.index() + 1;
|
|
let pair = mio::Registration::new2();
|
|
self.io.register(&pair.0,
|
|
mio::Token(token),
|
|
mio::Ready::readable(),
|
|
mio::PollOpt::level())
|
|
.expect("cannot fail future registration with mio");
|
|
let unpark = Arc::new(MySetReadiness(pair.1));
|
|
unpark.notify(0);
|
|
entry.insert(ScheduledTask {
|
|
spawn: Some(executor::spawn(future)),
|
|
wake: Some(unpark),
|
|
_registration: pair.0,
|
|
});
|
|
}
|
|
}
|
|
|
|
impl Remote {
|
|
fn send(&self, msg: Message) {
|
|
self.with_loop(|lp| {
|
|
match lp {
|
|
Some(lp) => {
|
|
// Need to execute all existing requests first, to ensure
|
|
// that our message is processed "in order"
|
|
lp.consume_queue();
|
|
lp.notify(msg);
|
|
}
|
|
None => {
|
|
match mpsc::UnboundedSender::send(&self.tx, msg) {
|
|
Ok(()) => {}
|
|
|
|
// TODO: this error should punt upwards and we should
|
|
// notify the caller that the message wasn't
|
|
// received. This is tokio-core#17
|
|
Err(e) => drop(e),
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
fn with_loop<F, R>(&self, f: F) -> R
|
|
where F: FnOnce(Option<&Core>) -> R
|
|
{
|
|
if CURRENT_LOOP.is_set() {
|
|
CURRENT_LOOP.with(|lp| {
|
|
let same = lp.inner.borrow().id == self.id;
|
|
if same {
|
|
f(Some(lp))
|
|
} else {
|
|
f(None)
|
|
}
|
|
})
|
|
} else {
|
|
f(None)
|
|
}
|
|
}
|
|
|
|
/// Spawns a new future into the event loop this remote is associated with.
|
|
///
|
|
/// This function takes a closure which is executed within the context of
|
|
/// the I/O loop itself. The future returned by the closure will be
|
|
/// scheduled on the event loop and run to completion.
|
|
///
|
|
/// Note that while the closure, `F`, requires the `Send` bound as it might
|
|
/// cross threads, the future `R` does not.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This method will **not** catch panics from polling the future `f`. If
|
|
/// the future panics then it's the responsibility of the caller to catch
|
|
/// that panic and handle it as appropriate.
|
|
pub fn spawn<F, R>(&self, f: F)
|
|
where F: FnOnce(&Handle) -> R + Send + 'static,
|
|
R: IntoFuture<Item=(), Error=()>,
|
|
R::Future: 'static,
|
|
{
|
|
self.send(Message::Run(Box::new(|lp: &Core| {
|
|
let f = f(&lp.handle());
|
|
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
|
|
})));
|
|
}
|
|
|
|
/// Return the ID of the represented Core
|
|
pub fn id(&self) -> CoreId {
|
|
CoreId(self.id)
|
|
}
|
|
|
|
/// Attempts to "promote" this remote to a handle, if possible.
|
|
///
|
|
/// This function is intended for structures which typically work through a
|
|
/// `Remote` but want to optimize runtime when the remote doesn't actually
|
|
/// leave the thread of the original reactor. This will attempt to return a
|
|
/// handle if the `Remote` is on the same thread as the event loop and the
|
|
/// event loop is running.
|
|
///
|
|
/// If this `Remote` has moved to a different thread or if the event loop is
|
|
/// running, then `None` may be returned. If you need to guarantee access to
|
|
/// a `Handle`, then you can call this function and fall back to using
|
|
/// `spawn` above if it returns `None`.
|
|
pub fn handle(&self) -> Option<Handle> {
|
|
if CURRENT_LOOP.is_set() {
|
|
CURRENT_LOOP.with(|lp| {
|
|
let same = lp.inner.borrow().id == self.id;
|
|
if same {
|
|
Some(lp.handle())
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<F> Executor<F> for Remote
|
|
where F: Future<Item = (), Error = ()> + Send + 'static,
|
|
{
|
|
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
|
|
self.spawn(|_| future);
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl fmt::Debug for Remote {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
f.debug_struct("Remote")
|
|
.field("id", &self.id())
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl Handle {
|
|
/// Returns a reference to the underlying remote handle to the event loop.
|
|
pub fn remote(&self) -> &Remote {
|
|
&self.remote
|
|
}
|
|
|
|
/// Spawns a new future on the event loop this handle is associated with.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This method will **not** catch panics from polling the future `f`. If
|
|
/// the future panics then it's the responsibility of the caller to catch
|
|
/// that panic and handle it as appropriate.
|
|
pub fn spawn<F>(&self, f: F)
|
|
where F: Future<Item=(), Error=()> + 'static,
|
|
{
|
|
let inner = match self.inner.upgrade() {
|
|
Some(inner) => inner,
|
|
None => return,
|
|
};
|
|
inner.borrow_mut().spawn(Box::new(f));
|
|
}
|
|
|
|
/// Spawns a closure on this event loop.
|
|
///
|
|
/// This function is a convenience wrapper around the `spawn` function above
|
|
/// for running a closure wrapped in `futures::lazy`. It will spawn the
|
|
/// function `f` provided onto the event loop, and continue to run the
|
|
/// future returned by `f` on the event loop as well.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This method will **not** catch panics from polling the future `f`. If
|
|
/// the future panics then it's the responsibility of the caller to catch
|
|
/// that panic and handle it as appropriate.
|
|
pub fn spawn_fn<F, R>(&self, f: F)
|
|
where F: FnOnce() -> R + 'static,
|
|
R: IntoFuture<Item=(), Error=()> + 'static,
|
|
{
|
|
self.spawn(future::lazy(f))
|
|
}
|
|
|
|
/// Return the ID of the represented Core
|
|
pub fn id(&self) -> CoreId {
|
|
self.remote.id()
|
|
}
|
|
}
|
|
|
|
impl<F> Executor<F> for Handle
|
|
where F: Future<Item = (), Error = ()> + 'static,
|
|
{
|
|
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
|
|
self.spawn(future);
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl fmt::Debug for Handle {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
f.debug_struct("Handle")
|
|
.field("id", &self.id())
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl TimeoutState {
|
|
fn block(&mut self, handle: Task) -> Option<Task> {
|
|
match *self {
|
|
TimeoutState::Fired => return Some(handle),
|
|
_ => {}
|
|
}
|
|
*self = TimeoutState::Waiting(handle);
|
|
None
|
|
}
|
|
|
|
fn fire(&mut self) -> Option<Task> {
|
|
match mem::replace(self, TimeoutState::Fired) {
|
|
TimeoutState::NotFired => None,
|
|
TimeoutState::Fired => panic!("fired twice?"),
|
|
TimeoutState::Waiting(handle) => Some(handle),
|
|
}
|
|
}
|
|
}
|
|
|
|
struct MySetReadiness(mio::SetReadiness);
|
|
|
|
impl Notify for MySetReadiness {
|
|
fn notify(&self, _id: usize) {
|
|
self.0.set_readiness(mio::Ready::readable())
|
|
.expect("failed to set readiness");
|
|
}
|
|
}
|
|
|
|
trait FnBox: Send + 'static {
|
|
fn call_box(self: Box<Self>, lp: &Core);
|
|
}
|
|
|
|
impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
|
|
fn call_box(self: Box<Self>, lp: &Core) {
|
|
(*self)(lp)
|
|
}
|
|
}
|
|
|
|
fn read_ready() -> mio::Ready {
|
|
mio::Ready::readable() | platform::hup()
|
|
}
|
|
|
|
const READ: usize = 1 << 0;
|
|
const WRITE: usize = 1 << 1;
|
|
|
|
fn ready2usize(ready: mio::Ready) -> usize {
|
|
let mut bits = 0;
|
|
if ready.is_readable() {
|
|
bits |= READ;
|
|
}
|
|
if ready.is_writable() {
|
|
bits |= WRITE;
|
|
}
|
|
bits | platform::ready2usize(ready)
|
|
}
|
|
|
|
fn usize2ready(bits: usize) -> mio::Ready {
|
|
let mut ready = mio::Ready::empty();
|
|
if bits & READ != 0 {
|
|
ready.insert(mio::Ready::readable());
|
|
}
|
|
if bits & WRITE != 0 {
|
|
ready.insert(mio::Ready::writable());
|
|
}
|
|
ready | platform::usize2ready(bits)
|
|
}
|
|
|
|
#[cfg(all(unix, not(target_os = "fuchsia")))]
|
|
mod platform {
|
|
use mio::Ready;
|
|
use mio::unix::UnixReady;
|
|
|
|
pub fn aio() -> Ready {
|
|
UnixReady::aio().into()
|
|
}
|
|
|
|
pub fn all() -> Ready {
|
|
hup() | aio()
|
|
}
|
|
|
|
pub fn hup() -> Ready {
|
|
UnixReady::hup().into()
|
|
}
|
|
|
|
const HUP: usize = 1 << 2;
|
|
const ERROR: usize = 1 << 3;
|
|
const AIO: usize = 1 << 4;
|
|
|
|
pub fn ready2usize(ready: Ready) -> usize {
|
|
let ready = UnixReady::from(ready);
|
|
let mut bits = 0;
|
|
if ready.is_aio() {
|
|
bits |= AIO;
|
|
}
|
|
if ready.is_error() {
|
|
bits |= ERROR;
|
|
}
|
|
if ready.is_hup() {
|
|
bits |= HUP;
|
|
}
|
|
bits
|
|
}
|
|
|
|
pub fn usize2ready(bits: usize) -> Ready {
|
|
let mut ready = UnixReady::from(Ready::empty());
|
|
if bits & AIO != 0 {
|
|
ready.insert(UnixReady::aio());
|
|
}
|
|
if bits & HUP != 0 {
|
|
ready.insert(UnixReady::hup());
|
|
}
|
|
if bits & ERROR != 0 {
|
|
ready.insert(UnixReady::error());
|
|
}
|
|
ready.into()
|
|
}
|
|
}
|
|
|
|
#[cfg(any(windows, target_os = "fuchsia"))]
|
|
mod platform {
|
|
use mio::Ready;
|
|
|
|
pub fn all() -> Ready {
|
|
// No platform-specific Readinesses for Windows
|
|
Ready::empty()
|
|
}
|
|
|
|
pub fn hup() -> Ready {
|
|
Ready::empty()
|
|
}
|
|
|
|
pub fn ready2usize(_r: Ready) -> usize {
|
|
0
|
|
}
|
|
|
|
pub fn usize2ready(_r: usize) -> Ready {
|
|
Ready::empty()
|
|
}
|
|
}
|