mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
767 lines
22 KiB
Rust
767 lines
22 KiB
Rust
//! A single-threaded executor which executes tasks on the same thread from which
|
|
//! they are spawned.
|
|
//!
|
|
//!
|
|
//! The crate provides:
|
|
//!
|
|
//! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread.
|
|
//! The easiest way to start a new [`CurrentThread`] executor is to call
|
|
//! [`block_on_all`] with an initial task to seed the executor.
|
|
//! All tasks that are being managed by a [`CurrentThread`] executor are able to
|
|
//! spawn additional tasks by calling [`spawn`].
|
|
//!
|
|
//!
|
|
//! Application authors will not use this crate directly. Instead, they will use the
|
|
//! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they
|
|
//! are building a custom task executor.
|
|
//!
|
|
//! For more details, see [executor module] documentation in the Tokio crate.
|
|
//!
|
|
//! [`CurrentThread`]: struct.CurrentThread.html
|
|
//! [`spawn`]: fn.spawn.html
|
|
//! [`block_on_all`]: fn.block_on_all.html
|
|
//! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html
|
|
|
|
#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.0")]
|
|
#![deny(warnings, missing_docs, missing_debug_implementations)]
|
|
|
|
extern crate futures;
|
|
extern crate tokio_executor;
|
|
|
|
mod scheduler;
|
|
|
|
use self::scheduler::Scheduler;
|
|
|
|
use tokio_executor::{Enter, SpawnError};
|
|
use tokio_executor::park::{Park, Unpark, ParkThread};
|
|
|
|
use futures::{executor, Async, Future};
|
|
use futures::future::{Executor, ExecuteError, ExecuteErrorKind};
|
|
|
|
use std::fmt;
|
|
use std::cell::Cell;
|
|
use std::error::Error;
|
|
use std::rc::Rc;
|
|
use std::time::{Duration, Instant};
|
|
use std::sync::mpsc;
|
|
|
|
#[cfg(feature = "unstable-futures")]
|
|
use futures2;
|
|
|
|
/// Executes tasks on the current thread
|
|
pub struct CurrentThread<P: Park = ParkThread> {
|
|
/// Execute futures and receive unpark notifications.
|
|
scheduler: Scheduler<P::Unpark>,
|
|
|
|
/// Current number of futures being executed
|
|
num_futures: usize,
|
|
|
|
/// Thread park handle
|
|
park: P,
|
|
|
|
/// Handle for spawning new futures from other threads
|
|
spawn_handle: Handle,
|
|
|
|
/// Receiver for futures spawned from other threads
|
|
spawn_receiver: mpsc::Receiver<Box<Future<Item = (), Error = ()> + Send + 'static>>,
|
|
}
|
|
|
|
/// Executes futures on the current thread.
|
|
///
|
|
/// All futures executed using this executor will be executed on the current
|
|
/// thread. As such, `run` will wait for these futures to complete before
|
|
/// returning.
|
|
///
|
|
/// For more details, see the [module level](index.html) documentation.
|
|
#[derive(Debug, Clone)]
|
|
pub struct TaskExecutor {
|
|
// Prevent the handle from moving across threads.
|
|
_p: ::std::marker::PhantomData<Rc<()>>,
|
|
}
|
|
|
|
/// Returned by the `turn` function.
|
|
#[derive(Debug)]
|
|
pub struct Turn {
|
|
polled: bool
|
|
}
|
|
|
|
impl Turn {
|
|
/// `true` if any futures were polled at all and `false` otherwise.
|
|
pub fn has_polled(&self) -> bool {
|
|
self.polled
|
|
}
|
|
}
|
|
|
|
/// A `CurrentThread` instance bound to a supplied execution context.
|
|
pub struct Entered<'a, P: Park + 'a> {
|
|
executor: &'a mut CurrentThread<P>,
|
|
enter: &'a mut Enter,
|
|
}
|
|
|
|
/// Error returned by the `run` function.
|
|
#[derive(Debug)]
|
|
pub struct RunError {
|
|
_p: (),
|
|
}
|
|
|
|
impl fmt::Display for RunError {
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
write!(fmt, "{}", self.description())
|
|
}
|
|
}
|
|
|
|
impl Error for RunError {
|
|
fn description(&self) -> &str {
|
|
"Run error"
|
|
}
|
|
}
|
|
|
|
/// Error returned by the `run_timeout` function.
|
|
#[derive(Debug)]
|
|
pub struct RunTimeoutError {
|
|
timeout: bool,
|
|
}
|
|
|
|
impl fmt::Display for RunTimeoutError {
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
write!(fmt, "{}", self.description())
|
|
}
|
|
}
|
|
|
|
impl Error for RunTimeoutError {
|
|
fn description(&self) -> &str {
|
|
if self.timeout {
|
|
"Run timeout error (timeout)"
|
|
} else {
|
|
"Run timeout error (not timeout)"
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Error returned by the `turn` function.
|
|
#[derive(Debug)]
|
|
pub struct TurnError {
|
|
_p: (),
|
|
}
|
|
|
|
impl fmt::Display for TurnError {
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
write!(fmt, "{}", self.description())
|
|
}
|
|
}
|
|
|
|
impl Error for TurnError {
|
|
fn description(&self) -> &str {
|
|
"Turn error"
|
|
}
|
|
}
|
|
|
|
/// Error returned by the `block_on` function.
|
|
#[derive(Debug)]
|
|
pub struct BlockError<T> {
|
|
inner: Option<T>,
|
|
}
|
|
|
|
impl<T> fmt::Display for BlockError<T> {
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
write!(fmt, "Block error")
|
|
}
|
|
}
|
|
|
|
impl<T: fmt::Debug> Error for BlockError<T> {
|
|
fn description(&self) -> &str {
|
|
"Block error"
|
|
}
|
|
}
|
|
|
|
/// This is mostly split out to make the borrow checker happy.
|
|
struct Borrow<'a, U: 'a> {
|
|
scheduler: &'a mut Scheduler<U>,
|
|
num_futures: &'a mut usize,
|
|
}
|
|
|
|
trait SpawnLocal {
|
|
fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>);
|
|
}
|
|
|
|
struct CurrentRunner {
|
|
spawn: Cell<Option<*mut SpawnLocal>>,
|
|
}
|
|
|
|
/// Current thread's task runner. This is set in `TaskRunner::with`
|
|
thread_local!(static CURRENT: CurrentRunner = CurrentRunner {
|
|
spawn: Cell::new(None),
|
|
});
|
|
|
|
/// Run the executor bootstrapping the execution with the provided future.
|
|
///
|
|
/// This creates a new [`CurrentThread`] executor, spawns the provided future,
|
|
/// and blocks the current thread until the provided future and **all**
|
|
/// subsequently spawned futures complete. In other words:
|
|
///
|
|
/// * If the provided bootstrap future does **not** spawn any additional tasks,
|
|
/// `block_on_all` returns once `future` completes.
|
|
/// * If the provided bootstrap future **does** spawn additional tasks, then
|
|
/// `block_on_all` returns once **all** spawned futures complete.
|
|
///
|
|
/// See [module level][mod] documentation for more details.
|
|
///
|
|
/// [`CurrentThread`]: struct.CurrentThread.html
|
|
/// [mod]: index.html
|
|
pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error>
|
|
where F: Future,
|
|
{
|
|
let mut current_thread = CurrentThread::new();
|
|
|
|
let ret = current_thread.block_on(future);
|
|
current_thread.run().unwrap();
|
|
|
|
ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
|
|
}
|
|
|
|
/// Executes a future on the current thread.
|
|
///
|
|
/// The provided future must complete or be canceled before `run` will return.
|
|
///
|
|
/// Unlike [`tokio::spawn`], this function will always spawn on a
|
|
/// `CurrentThread` executor and is able to spawn futures that are not `Send`.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This function can only be invoked from the context of a `run` call; any
|
|
/// other use will result in a panic.
|
|
///
|
|
/// [`tokio::spawn`]: ../fn.spawn.html
|
|
pub fn spawn<F>(future: F)
|
|
where F: Future<Item = (), Error = ()> + 'static
|
|
{
|
|
TaskExecutor::current()
|
|
.spawn_local(Box::new(future))
|
|
.unwrap();
|
|
}
|
|
|
|
// ===== impl CurrentThread =====
|
|
|
|
impl CurrentThread<ParkThread> {
|
|
/// Create a new instance of `CurrentThread`.
|
|
pub fn new() -> Self {
|
|
CurrentThread::new_with_park(ParkThread::new())
|
|
}
|
|
}
|
|
|
|
impl<P: Park> CurrentThread<P> {
|
|
/// Create a new instance of `CurrentThread` backed by the given park
|
|
/// handle.
|
|
pub fn new_with_park(park: P) -> Self {
|
|
let unpark = park.unpark();
|
|
|
|
let (spawn_sender, spawn_receiver) = mpsc::channel();
|
|
|
|
let scheduler = Scheduler::new(unpark);
|
|
let notify = scheduler.notify();
|
|
|
|
CurrentThread {
|
|
scheduler: scheduler,
|
|
num_futures: 0,
|
|
park,
|
|
spawn_handle: Handle { sender: spawn_sender, notify: notify },
|
|
spawn_receiver: spawn_receiver,
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if the executor is currently idle.
|
|
///
|
|
/// An idle executor is defined by not currently having any spawned tasks.
|
|
pub fn is_idle(&self) -> bool {
|
|
self.num_futures == 0
|
|
}
|
|
|
|
/// Spawn the future on the executor.
|
|
///
|
|
/// This internally queues the future to be executed once `run` is called.
|
|
pub fn spawn<F>(&mut self, future: F) -> &mut Self
|
|
where F: Future<Item = (), Error = ()> + 'static,
|
|
{
|
|
self.borrow().spawn_local(Box::new(future));
|
|
self
|
|
}
|
|
|
|
/// Synchronously waits for the provided `future` to complete.
|
|
///
|
|
/// This function can be used to synchronously block the current thread
|
|
/// until the provided `future` has resolved either successfully or with an
|
|
/// error. The result of the future is then returned from this function
|
|
/// call.
|
|
///
|
|
/// Note that this function will **also** execute any spawned futures on the
|
|
/// current thread, but will **not** block until these other spawned futures
|
|
/// have completed.
|
|
///
|
|
/// The caller is responsible for ensuring that other spawned futures
|
|
/// complete execution.
|
|
pub fn block_on<F>(&mut self, future: F)
|
|
-> Result<F::Item, BlockError<F::Error>>
|
|
where F: Future
|
|
{
|
|
let mut enter = tokio_executor::enter()
|
|
.expect("failed to start `current_thread::Runtime`");
|
|
self.enter(&mut enter).block_on(future)
|
|
}
|
|
|
|
/// Run the executor to completion, blocking the thread until **all**
|
|
/// spawned futures have completed.
|
|
pub fn run(&mut self) -> Result<(), RunError> {
|
|
let mut enter = tokio_executor::enter()
|
|
.expect("failed to start `current_thread::Runtime`");
|
|
self.enter(&mut enter).run()
|
|
}
|
|
|
|
/// Run the executor to completion, blocking the thread until all
|
|
/// spawned futures have completed **or** `duration` time has elapsed.
|
|
pub fn run_timeout(&mut self, duration: Duration)
|
|
-> Result<(), RunTimeoutError>
|
|
{
|
|
let mut enter = tokio_executor::enter()
|
|
.expect("failed to start `current_thread::Runtime`");
|
|
self.enter(&mut enter).run_timeout(duration)
|
|
}
|
|
|
|
/// Perform a single iteration of the event loop.
|
|
///
|
|
/// This function blocks the current thread even if the executor is idle.
|
|
pub fn turn(&mut self, duration: Option<Duration>)
|
|
-> Result<Turn, TurnError>
|
|
{
|
|
let mut enter = tokio_executor::enter()
|
|
.expect("failed to start `current_thread::Runtime`");
|
|
self.enter(&mut enter).turn(duration)
|
|
}
|
|
|
|
/// Bind `CurrentThread` instance with an execution context.
|
|
pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> {
|
|
Entered {
|
|
executor: self,
|
|
enter,
|
|
}
|
|
}
|
|
|
|
/// Returns a reference to the underlying `Park` instance.
|
|
pub fn get_park(&self) -> &P {
|
|
&self.park
|
|
}
|
|
|
|
/// Returns a mutable reference to the underlying `Park` instance.
|
|
pub fn get_park_mut(&mut self) -> &mut P {
|
|
&mut self.park
|
|
}
|
|
|
|
fn borrow(&mut self) -> Borrow<P::Unpark> {
|
|
Borrow {
|
|
scheduler: &mut self.scheduler,
|
|
num_futures: &mut self.num_futures,
|
|
}
|
|
}
|
|
|
|
/// Get a new handle to spawn futures on the executor
|
|
///
|
|
/// Different to the executor itself, the handle can be sent to different
|
|
/// threads and can be used to spawn futures on the executor.
|
|
pub fn handle(&self) -> Handle {
|
|
self.spawn_handle.clone()
|
|
}
|
|
}
|
|
|
|
impl tokio_executor::Executor for CurrentThread {
|
|
fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
|
|
-> Result<(), SpawnError>
|
|
{
|
|
self.borrow().spawn_local(future);
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(feature = "unstable-futures")]
|
|
fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
|
|
-> Result<(), futures2::executor::SpawnError>
|
|
{
|
|
panic!("Futures 0.2 integration is not available for current_thread");
|
|
}
|
|
}
|
|
|
|
impl<P: Park> fmt::Debug for CurrentThread<P> {
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
fmt.debug_struct("CurrentThread")
|
|
.field("scheduler", &self.scheduler)
|
|
.field("num_futures", &self.num_futures)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
// ===== impl Entered =====
|
|
|
|
impl<'a, P: Park> Entered<'a, P> {
|
|
/// Spawn the future on the executor.
|
|
///
|
|
/// This internally queues the future to be executed once `run` is called.
|
|
pub fn spawn<F>(&mut self, future: F) -> &mut Self
|
|
where F: Future<Item = (), Error = ()> + 'static,
|
|
{
|
|
self.executor.borrow().spawn_local(Box::new(future));
|
|
self
|
|
}
|
|
|
|
/// Synchronously waits for the provided `future` to complete.
|
|
///
|
|
/// This function can be used to synchronously block the current thread
|
|
/// until the provided `future` has resolved either successfully or with an
|
|
/// error. The result of the future is then returned from this function
|
|
/// call.
|
|
///
|
|
/// Note that this function will **also** execute any spawned futures on the
|
|
/// current thread, but will **not** block until these other spawned futures
|
|
/// have completed.
|
|
///
|
|
/// The caller is responsible for ensuring that other spawned futures
|
|
/// complete execution.
|
|
pub fn block_on<F>(&mut self, future: F)
|
|
-> Result<F::Item, BlockError<F::Error>>
|
|
where F: Future
|
|
{
|
|
let mut future = executor::spawn(future);
|
|
let notify = self.executor.scheduler.notify();
|
|
|
|
loop {
|
|
let res = self.executor.borrow().enter(self.enter, || {
|
|
future.poll_future_notify(¬ify, 0)
|
|
});
|
|
|
|
match res {
|
|
Ok(Async::Ready(e)) => return Ok(e),
|
|
Err(e) => return Err(BlockError { inner: Some(e) }),
|
|
Ok(Async::NotReady) => {}
|
|
}
|
|
|
|
self.tick();
|
|
|
|
if let Err(_) = self.executor.park.park() {
|
|
return Err(BlockError { inner: None });
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Run the executor to completion, blocking the thread until **all**
|
|
/// spawned futures have completed.
|
|
pub fn run(&mut self) -> Result<(), RunError> {
|
|
self.run_timeout2(None)
|
|
.map_err(|_| RunError { _p: () })
|
|
}
|
|
|
|
/// Run the executor to completion, blocking the thread until all
|
|
/// spawned futures have completed **or** `duration` time has elapsed.
|
|
pub fn run_timeout(&mut self, duration: Duration)
|
|
-> Result<(), RunTimeoutError>
|
|
{
|
|
self.run_timeout2(Some(duration))
|
|
}
|
|
|
|
/// Perform a single iteration of the event loop.
|
|
///
|
|
/// This function blocks the current thread even if the executor is idle.
|
|
pub fn turn(&mut self, duration: Option<Duration>)
|
|
-> Result<Turn, TurnError>
|
|
{
|
|
let res = if self.executor.scheduler.has_pending_futures() {
|
|
self.executor.park.park_timeout(Duration::from_millis(0))
|
|
} else {
|
|
match duration {
|
|
Some(duration) => self.executor.park.park_timeout(duration),
|
|
None => self.executor.park.park(),
|
|
}
|
|
};
|
|
|
|
if res.is_err() {
|
|
return Err(TurnError { _p: () });
|
|
}
|
|
|
|
let polled = self.tick();
|
|
|
|
Ok(Turn { polled })
|
|
}
|
|
|
|
/// Returns a reference to the underlying `Park` instance.
|
|
pub fn get_park(&self) -> &P {
|
|
&self.executor.park
|
|
}
|
|
|
|
/// Returns a mutable reference to the underlying `Park` instance.
|
|
pub fn get_park_mut(&mut self) -> &mut P {
|
|
&mut self.executor.park
|
|
}
|
|
|
|
fn run_timeout2(&mut self, dur: Option<Duration>)
|
|
-> Result<(), RunTimeoutError>
|
|
{
|
|
if self.executor.is_idle() {
|
|
// Nothing to do
|
|
return Ok(());
|
|
}
|
|
|
|
let mut time = dur.map(|dur| (Instant::now() + dur, dur));
|
|
|
|
loop {
|
|
self.tick();
|
|
|
|
if self.executor.is_idle() {
|
|
return Ok(());
|
|
}
|
|
|
|
match time {
|
|
Some((until, rem)) => {
|
|
if let Err(_) = self.executor.park.park_timeout(rem) {
|
|
return Err(RunTimeoutError::new(false));
|
|
}
|
|
|
|
let now = Instant::now();
|
|
|
|
if now >= until {
|
|
return Err(RunTimeoutError::new(true));
|
|
}
|
|
|
|
time = Some((until, until - now));
|
|
}
|
|
None => {
|
|
if let Err(_) = self.executor.park.park() {
|
|
return Err(RunTimeoutError::new(false));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if any futures were processed
|
|
fn tick(&mut self) -> bool {
|
|
// Spawn any futures that were spawned from other threads by manually
|
|
// looping over the receiver stream
|
|
|
|
// FIXME: Slightly ugly but needed to make the borrow checker happy
|
|
let (mut borrow, spawn_receiver) = (
|
|
Borrow {
|
|
scheduler: &mut self.executor.scheduler,
|
|
num_futures: &mut self.executor.num_futures,
|
|
},
|
|
&mut self.executor.spawn_receiver,
|
|
);
|
|
|
|
while let Ok(future) = spawn_receiver.try_recv() {
|
|
borrow.spawn_local(future);
|
|
}
|
|
|
|
// After any pending futures were scheduled, do the actual tick
|
|
borrow.scheduler.tick(
|
|
&mut *self.enter,
|
|
borrow.num_futures)
|
|
}
|
|
}
|
|
|
|
impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
fmt.debug_struct("Entered")
|
|
.field("executor", &self.executor)
|
|
.field("enter", &self.enter)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
// ===== impl Handle =====
|
|
|
|
/// Handle to spawn a future on the corresponding `CurrentThread` instance
|
|
#[derive(Clone)]
|
|
pub struct Handle {
|
|
sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>,
|
|
notify: executor::NotifyHandle,
|
|
}
|
|
|
|
// Manual implementation because the Sender does not implement Debug
|
|
impl fmt::Debug for Handle {
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
fmt.debug_struct("Handle")
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl Handle {
|
|
/// Spawn a future onto the `CurrentThread` instance corresponding to this handle
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
|
|
/// instance of the `Handle` does not exist anymore.
|
|
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
|
|
where F: Future<Item = (), Error = ()> + Send + 'static {
|
|
self.sender.send(Box::new(future))
|
|
.expect("CurrentThread does not exist anymore");
|
|
// use 0 for the id, CurrentThread does not make use of it
|
|
self.notify.notify(0);
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// ===== impl TaskExecutor =====
|
|
|
|
impl TaskExecutor {
|
|
/// Returns an executor that executes futures on the current thread.
|
|
///
|
|
/// The user of `TaskExecutor` must ensure that when a future is submitted,
|
|
/// that it is done within the context of a call to `run`.
|
|
///
|
|
/// For more details, see the [module level](index.html) documentation.
|
|
pub fn current() -> TaskExecutor {
|
|
TaskExecutor {
|
|
_p: ::std::marker::PhantomData,
|
|
}
|
|
}
|
|
|
|
/// Spawn a future onto the current `CurrentThread` instance.
|
|
pub fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>)
|
|
-> Result<(), SpawnError>
|
|
{
|
|
CURRENT.with(|current| {
|
|
match current.spawn.get() {
|
|
Some(spawn) => {
|
|
unsafe { (*spawn).spawn_local(future) };
|
|
Ok(())
|
|
}
|
|
None => {
|
|
Err(SpawnError::shutdown())
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
impl tokio_executor::Executor for TaskExecutor {
|
|
fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
|
|
-> Result<(), SpawnError>
|
|
{
|
|
self.spawn_local(future)
|
|
}
|
|
|
|
#[cfg(feature = "unstable-futures")]
|
|
fn spawn2(&mut self, _future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
|
|
-> Result<(), futures2::executor::SpawnError>
|
|
{
|
|
panic!("Futures 0.2 integration is not available for current_thread");
|
|
}
|
|
|
|
fn status(&self) -> Result<(), SpawnError> {
|
|
CURRENT.with(|current| {
|
|
if current.spawn.get().is_some() {
|
|
Ok(())
|
|
} else {
|
|
Err(SpawnError::shutdown())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
impl<F> Executor<F> for TaskExecutor
|
|
where F: Future<Item = (), Error = ()> + 'static
|
|
{
|
|
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
|
|
CURRENT.with(|current| {
|
|
match current.spawn.get() {
|
|
Some(spawn) => {
|
|
unsafe { (*spawn).spawn_local(Box::new(future)) };
|
|
Ok(())
|
|
}
|
|
None => {
|
|
Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future))
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// ===== impl Borrow =====
|
|
|
|
impl<'a, U: Unpark> Borrow<'a, U> {
|
|
fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R
|
|
where F: FnOnce() -> R,
|
|
{
|
|
CURRENT.with(|current| {
|
|
current.set_spawn(self, || {
|
|
f()
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> {
|
|
fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>) {
|
|
*self.num_futures += 1;
|
|
self.scheduler.schedule(future);
|
|
}
|
|
}
|
|
|
|
// ===== impl CurrentRunner =====
|
|
|
|
impl CurrentRunner {
|
|
fn set_spawn<F, R>(&self, spawn: &mut SpawnLocal, f: F) -> R
|
|
where F: FnOnce() -> R
|
|
{
|
|
struct Reset<'a>(&'a CurrentRunner);
|
|
|
|
impl<'a> Drop for Reset<'a> {
|
|
fn drop(&mut self) {
|
|
self.0.spawn.set(None);
|
|
}
|
|
}
|
|
|
|
let _reset = Reset(self);
|
|
|
|
let spawn = unsafe { hide_lt(spawn as *mut SpawnLocal) };
|
|
self.spawn.set(Some(spawn));
|
|
|
|
f()
|
|
}
|
|
}
|
|
|
|
unsafe fn hide_lt<'a>(p: *mut (SpawnLocal + 'a)) -> *mut (SpawnLocal + 'static) {
|
|
use std::mem;
|
|
mem::transmute(p)
|
|
}
|
|
|
|
// ===== impl RunTimeoutError =====
|
|
|
|
impl RunTimeoutError {
|
|
fn new(timeout: bool) -> Self {
|
|
RunTimeoutError { timeout }
|
|
}
|
|
|
|
/// Returns `true` if the error was caused by the operation timing out.
|
|
pub fn is_timeout(&self) -> bool {
|
|
self.timeout
|
|
}
|
|
}
|
|
|
|
impl From<tokio_executor::EnterError> for RunTimeoutError {
|
|
fn from(_: tokio_executor::EnterError) -> Self {
|
|
RunTimeoutError::new(false)
|
|
}
|
|
}
|
|
|
|
// ===== impl BlockError =====
|
|
|
|
impl<T> BlockError<T> {
|
|
/// Returns the error yielded by the future being blocked on
|
|
pub fn into_inner(self) -> Option<T> {
|
|
self.inner
|
|
}
|
|
}
|
|
|
|
impl<T> From<tokio_executor::EnterError> for BlockError<T> {
|
|
fn from(_: tokio_executor::EnterError) -> Self {
|
|
BlockError { inner: None }
|
|
}
|
|
}
|