Provide optional features on tokio crate (#808)

Disabling all features means the only dependency is `futures`.

Relevant pieces of the API can then be enabled with the following features:

- `codec`
- `fs`
- `io`
- `reactor`
- `tcp`
- `timer`
- `udp`
- `uds`

This also introduces the beginnings of enabling only certain pieces of the `Runtime`. As a start, the entire default runtime API is enabled via the `rt-full` feature.
This commit is contained in:
Sean McArthur 2019-01-04 11:42:33 -08:00 committed by GitHub
parent 39dc5706b7
commit 76198f63d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 557 additions and 435 deletions

View File

@ -22,6 +22,22 @@ matrix:
- env: TARGET=i686-unknown-freebsd
- env: TARGET=i686-unknown-linux-gnu
# Test combinations of enabled features.
- rust: stable
script: |
shopt -s expand_aliases
alias check="cargo check --no-default-features"
check
check --features codec
check --features fs
check --features io
check --features reactor
check --features rt-full
check --features tcp
check --features timer
check --features udp
check --features uds
# Test the async / await preview. We don't want to block PRs on this failing
# though.
- rust: nightly

View File

@ -43,6 +43,35 @@ members = [
]
[features]
default = [
"codec",
"fs",
"io",
"reactor",
"rt-full",
"tcp",
"timer",
"udp",
"uds",
]
codec = ["tokio-codec"]
fs = ["tokio-fs"]
io = ["bytes", "tokio-io"]
reactor = ["io", "mio", "tokio-reactor"]
rt-full = [
"num_cpus",
"reactor",
"timer",
"tokio-current-thread",
"tokio-executor",
"tokio-threadpool",
]
tcp = ["tokio-tcp"]
timer = ["tokio-timer"]
udp = ["tokio-udp"]
uds = ["tokio-uds"]
# This feature comes with no promise of stability. Things will
# break with each patch release. Use at your own risk.
async-await-preview = [
@ -54,29 +83,31 @@ travis-ci = { repository = "tokio-rs/tokio" }
appveyor = { repository = "carllerche/tokio", id = "s83yxhy9qeb58va7" }
[dependencies]
bytes = "0.4"
num_cpus = "1.8.0"
tokio-codec = { version = "0.1.0", path = "tokio-codec" }
tokio-current-thread = { version = "0.1.3", path = "tokio-current-thread" }
tokio-io = { version = "0.1.6", path = "tokio-io" }
tokio-executor = { version = "0.1.5", path = "tokio-executor" }
tokio-reactor = { version = "0.1.1", path = "tokio-reactor" }
tokio-threadpool = { version = "0.1.4", path = "tokio-threadpool" }
tokio-tcp = { version = "0.1.0", path = "tokio-tcp" }
tokio-udp = { version = "0.1.0", path = "tokio-udp" }
tokio-timer = { version = "0.2.8", path = "tokio-timer" }
tokio-fs = { version = "0.1.3", path = "tokio-fs" }
# Only non-optional dependency...
futures = "0.1.20"
# Everything else is optional...
bytes = { version = "0.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
tokio-codec = { version = "0.1.0", path = "tokio-codec", optional = true }
tokio-current-thread = { version = "0.1.3", path = "tokio-current-thread", optional = true }
tokio-fs = { version = "0.1.3", path = "tokio-fs", optional = true }
tokio-io = { version = "0.1.6", path = "tokio-io", optional = true }
tokio-executor = { version = "0.1.5", path = "tokio-executor", optional = true }
tokio-reactor = { version = "0.1.1", path = "tokio-reactor", optional = true }
tokio-threadpool = { version = "0.1.4", path = "tokio-threadpool", optional = true }
tokio-tcp = { version = "0.1.0", path = "tokio-tcp", optional = true }
tokio-udp = { version = "0.1.0", path = "tokio-udp", optional = true }
tokio-timer = { version = "0.2.8", path = "tokio-timer", optional = true }
# Needed until `reactor` is removed from `tokio`.
mio = "0.6.14"
mio = { version = "0.6.14", optional = true }
# Needed for async/await preview support
tokio-async-await = { version = "0.1.0", path = "tokio-async-await", optional = true }
[target.'cfg(unix)'.dependencies]
tokio-uds = { version = "0.2.1", path = "tokio-uds" }
tokio-uds = { version = "0.2.1", path = "tokio-uds", optional = true }
[dev-dependencies]
env_logger = { version = "0.5", default-features = false }
@ -92,18 +123,18 @@ serde_json = "1.0"
time = "0.1"
[patch.crates-io]
tokio = { path = "." }
tokio-async-await = { path = "./tokio-async-await" }
tokio-codec = { path = "./tokio-codec" }
tokio-current-thread = { path = "./tokio-current-thread" }
tokio-executor = { path = "./tokio-executor" }
tokio-fs = { path = "./tokio-fs" }
tokio-io = { path = "./tokio-io" }
tokio-reactor = { path = "./tokio-reactor" }
tokio-signal = { path = "./tokio-signal" }
tokio-tcp = { path = "./tokio-tcp" }
tokio-threadpool = { path = "./tokio-threadpool" }
tokio-timer = { path = "./tokio-timer" }
tokio-tls = { path = "./tokio-tls" }
tokio-udp = { path = "./tokio-udp" }
tokio-uds = { path = "./tokio-uds" }
#tokio = { path = "." }
#tokio-async-await = { path = "./tokio-async-await" }
#tokio-codec = { path = "./tokio-codec" }
#tokio-current-thread = { path = "./tokio-current-thread" }
#tokio-executor = { path = "./tokio-executor" }
#tokio-fs = { path = "./tokio-fs" }
#tokio-io = { path = "./tokio-io" }
#tokio-reactor = { path = "./tokio-reactor" }
#tokio-signal = { path = "./tokio-signal" }
#tokio-tcp = { path = "./tokio-tcp" }
#tokio-threadpool = { path = "./tokio-threadpool" }
#tokio-timer = { path = "./tokio-timer" }
#tokio-tls = { path = "./tokio-tls" }
#tokio-udp = { path = "./tokio-udp" }
#tokio-uds = { path = "./tokio-uds" }

View File

@ -51,6 +51,7 @@ pub use tokio_io::{
};
// standard input, output, and error
#[cfg(feature = "fs")]
pub use tokio_fs::{
stdin,
Stdin,

View File

@ -72,42 +72,72 @@
//! }
//! ```
extern crate bytes;
#[macro_use]
macro_rules! if_runtime {
($($i:item)*) => ($(
#[cfg(any(feature = "rt-full"))]
$i
)*)
}
#[cfg_attr(feature = "rt-full", macro_use)]
extern crate futures;
#[cfg(feature = "io")]
extern crate bytes;
#[cfg(feature = "reactor")]
extern crate mio;
#[cfg(feature = "rt-full")]
extern crate num_cpus;
#[cfg(feature = "rt-full")]
extern crate tokio_current_thread;
#[cfg(feature = "io")]
extern crate tokio_io;
extern crate tokio_executor;
#[cfg(feature = "codec")]
extern crate tokio_codec;
#[cfg(feature = "fs")]
extern crate tokio_fs;
#[cfg(feature = "reactor")]
extern crate tokio_reactor;
#[cfg(feature = "rt-full")]
extern crate tokio_threadpool;
#[cfg(feature = "timer")]
extern crate tokio_timer;
#[cfg(feature = "tcp")]
extern crate tokio_tcp;
#[cfg(feature = "udp")]
extern crate tokio_udp;
#[cfg(feature = "async-await-preview")]
extern crate tokio_async_await;
#[cfg(unix)]
#[cfg(all(unix, feature = "uds"))]
extern crate tokio_uds;
#[cfg(feature = "timer")]
pub mod clock;
#[cfg(feature = "codec")]
pub mod codec;
pub mod executor;
#[cfg(feature = "fs")]
pub mod fs;
#[cfg(feature = "io")]
pub mod io;
#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
pub mod net;
pub mod prelude;
#[cfg(feature = "reactor")]
pub mod reactor;
pub mod runtime;
#[cfg(feature = "timer")]
pub mod timer;
pub mod util;
pub use executor::spawn;
pub use runtime::run;
if_runtime! {
extern crate tokio_executor;
pub mod executor;
pub mod runtime;
pub use executor::spawn;
pub use runtime::run;
}
// ===== Experimental async/await support =====

View File

@ -22,6 +22,7 @@
//! [`UnixDatagram`]: struct.UnixDatagram.html
//! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html
#[cfg(feature = "tcp")]
pub mod tcp {
//! TCP bindings for `tokio`.
//!
@ -42,15 +43,19 @@ pub mod tcp {
//! [`Incoming`]: struct.Incoming.html
pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream};
}
#[cfg(feature = "tcp")]
pub use self::tcp::{TcpListener, TcpStream};
#[cfg(feature = "tcp")]
#[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")]
#[doc(hidden)]
pub type ConnectFuture = self::tcp::ConnectFuture;
#[cfg(feature = "tcp")]
#[deprecated(note = "use `tokio::net::tcp::Incoming` instead")]
#[doc(hidden)]
pub type Incoming = self::tcp::Incoming;
#[cfg(feature = "udp")]
pub mod udp {
//! UDP bindings for `tokio`.
//!
@ -68,16 +73,19 @@ pub mod udp {
//! [`framed`]: struct.UdpSocket.html#method.framed
pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket};
}
#[cfg(feature = "udp")]
pub use self::udp::{UdpFramed, UdpSocket};
#[cfg(feature = "udp")]
#[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")]
#[doc(hidden)]
pub type RecvDgram<T> = self::udp::RecvDgram<T>;
#[cfg(feature = "udp")]
#[deprecated(note = "use `tokio::net::udp::SendDgram` instead")]
#[doc(hidden)]
pub type SendDgram<T> = self::udp::SendDgram<T>;
#[cfg(unix)]
#[cfg(all(unix, feature = "uds"))]
pub mod unix {
//! Unix domain socket bindings for `tokio` (only available on unix systems).
@ -86,5 +94,5 @@ pub mod unix {
UnixListener, UnixStream,
};
}
#[cfg(unix)]
#[cfg(all(unix, feature = "uds"))]
pub use self::unix::{UnixDatagram, UnixDatagramFramed, UnixListener, UnixStream};

View File

@ -10,6 +10,7 @@
//!
//! The prelude may grow over time as additional items see ubiquitous use.
#[cfg(feature = "io")]
pub use tokio_io::{
AsyncRead,
AsyncWrite,

View File

@ -90,3 +90,18 @@ where
r.run().expect("failed to resolve remaining futures");
Ok(v)
}
/// Start a current-thread runtime using the supplied future to bootstrap execution.
///
/// # Panics
///
/// This function panics if called from the context of an executor.
pub fn run<F>(future: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
let mut r = Runtime::new().expect("failed to start runtime on current thread");
r.spawn(future);
r.run().expect("failed to resolve remaining futures");
}

View File

@ -112,399 +112,14 @@
//! [`tokio::spawn`]: ../executor/fn.spawn.html
//! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html
mod builder;
pub mod current_thread;
mod shutdown;
mod task_executor;
mod threadpool;
pub use self::builder::Builder;
pub use self::shutdown::Shutdown;
pub use self::task_executor::TaskExecutor;
pub use self::threadpool::{
Builder,
Runtime,
Shutdown,
TaskExecutor,
run,
};
use reactor::{Handle, Reactor};
use std::io;
use std::sync::Mutex;
use tokio_executor::enter;
use tokio_threadpool as threadpool;
use futures;
use futures::future::Future;
/// Handle to the Tokio runtime.
///
/// The Tokio runtime includes a reactor as well as an executor for running
/// tasks.
///
/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
/// most users will use [`tokio::run`], which uses a `Runtime` internally.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
/// [`new`]: #method.new
/// [`Builder`]: struct.Builder.html
/// [`tokio::run`]: fn.run.html
#[derive(Debug)]
pub struct Runtime {
inner: Option<Inner>,
}
#[derive(Debug)]
struct Inner {
/// A handle to the reactor in the background thread.
reactor_handle: Handle,
// TODO: This should go away in 0.2
reactor: Mutex<Option<Reactor>>,
/// Task execution pool.
pool: threadpool::ThreadPool,
}
// ===== impl Runtime =====
/// Start the Tokio runtime using the supplied future to bootstrap execution.
///
/// This function is used to bootstrap the execution of a Tokio application. It
/// does the following:
///
/// * Start the Tokio runtime using a default configuration.
/// * Spawn the given future onto the thread pool.
/// * Block the current thread until the runtime shuts down.
///
/// Note that the function will not return immediately once `future` has
/// completed. Instead it waits for the entire runtime to become idle.
///
/// See the [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{Future, Stream};
/// use tokio::net::TcpListener;
///
/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
/// # unimplemented!();
/// # }
/// # fn dox() {
/// # let addr = "127.0.0.1:8080".parse().unwrap();
/// let listener = TcpListener::bind(&addr).unwrap();
///
/// let server = listener.incoming()
/// .map_err(|e| println!("error = {:?}", e))
/// .for_each(|socket| {
/// tokio::spawn(process(socket))
/// });
///
/// tokio::run(server);
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if called from the context of an executor.
///
/// [mod]: ../index.html
pub fn run<F>(future: F)
where F: Future<Item = (), Error = ()> + Send + 'static,
{
// Check enter before creating a new Runtime...
let mut entered = enter().expect("nested tokio::run");
let mut runtime = Runtime::new().expect("failed to start new Runtime");
runtime.spawn(future);
entered
.block_on(runtime.shutdown_on_idle())
.expect("shutdown cannot error")
}
impl Runtime {
/// Create a new runtime instance with default configuration values.
///
/// This results in a reactor, thread pool, and timer being initialized. The
/// thread pool will not spawn any worker threads until it needs to, i.e.
/// tasks are scheduled to run.
///
/// Most users will not need to call this function directly, instead they
/// will use [`tokio::run`](fn.run.html).
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// Creating a new `Runtime` with default configuration values.
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::prelude::*;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
///
/// // Shutdown the runtime
/// rt.shutdown_now()
/// .wait().unwrap();
/// ```
///
/// [mod]: index.html
pub fn new() -> io::Result<Self> {
Builder::new().build()
}
#[deprecated(since = "0.1.5", note = "use `reactor` instead")]
#[doc(hidden)]
pub fn handle(&self) -> &Handle {
#[allow(deprecated)]
self.reactor()
}
/// Return a reference to the reactor handle for this runtime instance.
///
/// The returned handle reference can be cloned in order to get an owned
/// value of the handle. This handle can be used to initialize I/O resources
/// (like TCP or UDP sockets) that will not be used on the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let reactor_handle = rt.reactor().clone();
///
/// // use `reactor_handle`
/// ```
#[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")]
pub fn reactor(&self) -> &Handle {
let mut reactor = self.inner().reactor.lock().unwrap();
if let Some(reactor) = reactor.take() {
if let Ok(background) = reactor.background() {
background.forget();
}
}
&self.inner().reactor_handle
}
/// Return a handle to the runtime's executor.
///
/// The returned handle can be used to spawn tasks that run on this runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let executor_handle = rt.executor();
///
/// // use `executor_handle`
/// ```
pub fn executor(&self) -> TaskExecutor {
let inner = self.inner().pool.sender().clone();
TaskExecutor { inner }
}
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{future, Future, Stream};
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|| {
/// println!("now running on a worker thread");
/// Ok(())
/// }));
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where F: Future<Item = (), Error = ()> + Send + 'static,
{
self.inner_mut().pool.sender().spawn(future).unwrap();
self
}
/// Run a future to completion on the Tokio runtime.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// This method should not be called from an asynchronous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let mut entered = enter().expect("nested block_on");
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
entered.block_on(rx).unwrap()
}
/// Run a future to completion on the Tokio runtime, then wait for all
/// background futures to complete too.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, waiting for background futures to complete, and yielding
/// its resolved result. Any tasks or timers which the future spawns
/// internally will be executed on the runtime and waited for completion.
///
/// This method should not be called from an asynchronous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let mut entered = enter().expect("nested block_on_all");
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
let block = rx
.map_err(|_| unreachable!())
.and_then(move |r| {
self.shutdown_on_idle()
.map(move |()| r)
});
entered.block_on(block).unwrap()
}
/// Signals the runtime to shutdown once it becomes idle.
///
/// Returns a future that completes once the shutdown operation has
/// completed.
///
/// This function can be used to perform a graceful shutdown of the runtime.
///
/// The runtime enters an idle state once **all** of the following occur.
///
/// * The thread pool has no tasks to execute, i.e., all tasks that were
/// spawned have completed.
/// * The reactor is not managing any I/O resources.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::prelude::*;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
///
/// // Shutdown the runtime
/// rt.shutdown_on_idle()
/// .wait().unwrap();
/// ```
///
/// [mod]: index.html
pub fn shutdown_on_idle(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
let inner = inner.pool.shutdown_on_idle();
Shutdown { inner }
}
/// Signals the runtime to shutdown immediately.
///
/// Returns a future that completes once the shutdown operation has
/// completed.
///
/// This function will forcibly shutdown the runtime, causing any
/// in-progress work to become canceled. The shutdown steps are:
///
/// * Drain any scheduled work queues.
/// * Drop any futures that have not yet completed.
/// * Drop the reactor.
///
/// Once the reactor has dropped, any outstanding I/O resources bound to
/// that reactor will no longer function. Calling any method on them will
/// result in an error.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::prelude::*;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
///
/// // Shutdown the runtime
/// rt.shutdown_now()
/// .wait().unwrap();
/// ```
///
/// [mod]: index.html
pub fn shutdown_now(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
Shutdown::shutdown_now(inner)
}
fn inner(&self) -> &Inner {
self.inner.as_ref().unwrap()
}
fn inner_mut(&mut self) -> &mut Inner {
self.inner.as_mut().unwrap()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let shutdown = Shutdown::shutdown_now(inner);
let _ = shutdown.wait();
}
}
}

View File

@ -1,4 +1,4 @@
use runtime::{Inner, Runtime};
use super::{Inner, Runtime};
use reactor::Reactor;

View File

@ -0,0 +1,395 @@
mod builder;
mod shutdown;
mod task_executor;
pub use self::builder::Builder;
pub use self::shutdown::Shutdown;
pub use self::task_executor::TaskExecutor;
use reactor::{Handle, Reactor};
use std::io;
use std::sync::Mutex;
use tokio_executor::enter;
use tokio_threadpool as threadpool;
use futures;
use futures::future::Future;
/// Handle to the Tokio runtime.
///
/// The Tokio runtime includes a reactor as well as an executor for running
/// tasks.
///
/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
/// most users will use [`tokio::run`], which uses a `Runtime` internally.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
/// [`new`]: #method.new
/// [`Builder`]: struct.Builder.html
/// [`tokio::run`]: fn.run.html
#[derive(Debug)]
pub struct Runtime {
inner: Option<Inner>,
}
#[derive(Debug)]
struct Inner {
/// A handle to the reactor in the background thread.
reactor_handle: Handle,
// TODO: This should go away in 0.2
reactor: Mutex<Option<Reactor>>,
/// Task execution pool.
pool: threadpool::ThreadPool,
}
// ===== impl Runtime =====
/// Start the Tokio runtime using the supplied future to bootstrap execution.
///
/// This function is used to bootstrap the execution of a Tokio application. It
/// does the following:
///
/// * Start the Tokio runtime using a default configuration.
/// * Spawn the given future onto the thread pool.
/// * Block the current thread until the runtime shuts down.
///
/// Note that the function will not return immediately once `future` has
/// completed. Instead it waits for the entire runtime to become idle.
///
/// See the [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{Future, Stream};
/// use tokio::net::TcpListener;
///
/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
/// # unimplemented!();
/// # }
/// # fn dox() {
/// # let addr = "127.0.0.1:8080".parse().unwrap();
/// let listener = TcpListener::bind(&addr).unwrap();
///
/// let server = listener.incoming()
/// .map_err(|e| println!("error = {:?}", e))
/// .for_each(|socket| {
/// tokio::spawn(process(socket))
/// });
///
/// tokio::run(server);
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if called from the context of an executor.
///
/// [mod]: ../index.html
pub fn run<F>(future: F)
where F: Future<Item = (), Error = ()> + Send + 'static,
{
// Check enter before creating a new Runtime...
let mut entered = enter().expect("nested tokio::run");
let mut runtime = Runtime::new().expect("failed to start new Runtime");
runtime.spawn(future);
entered
.block_on(runtime.shutdown_on_idle())
.expect("shutdown cannot error")
}
impl Runtime {
/// Create a new runtime instance with default configuration values.
///
/// This results in a reactor, thread pool, and timer being initialized. The
/// thread pool will not spawn any worker threads until it needs to, i.e.
/// tasks are scheduled to run.
///
/// Most users will not need to call this function directly, instead they
/// will use [`tokio::run`](fn.run.html).
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// Creating a new `Runtime` with default configuration values.
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::prelude::*;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
///
/// // Shutdown the runtime
/// rt.shutdown_now()
/// .wait().unwrap();
/// ```
///
/// [mod]: index.html
pub fn new() -> io::Result<Self> {
Builder::new().build()
}
#[deprecated(since = "0.1.5", note = "use `reactor` instead")]
#[doc(hidden)]
pub fn handle(&self) -> &Handle {
#[allow(deprecated)]
self.reactor()
}
/// Return a reference to the reactor handle for this runtime instance.
///
/// The returned handle reference can be cloned in order to get an owned
/// value of the handle. This handle can be used to initialize I/O resources
/// (like TCP or UDP sockets) that will not be used on the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let reactor_handle = rt.reactor().clone();
///
/// // use `reactor_handle`
/// ```
#[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")]
pub fn reactor(&self) -> &Handle {
let mut reactor = self.inner().reactor.lock().unwrap();
if let Some(reactor) = reactor.take() {
if let Ok(background) = reactor.background() {
background.forget();
}
}
&self.inner().reactor_handle
}
/// Return a handle to the runtime's executor.
///
/// The returned handle can be used to spawn tasks that run on this runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let executor_handle = rt.executor();
///
/// // use `executor_handle`
/// ```
pub fn executor(&self) -> TaskExecutor {
let inner = self.inner().pool.sender().clone();
TaskExecutor { inner }
}
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{future, Future, Stream};
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|| {
/// println!("now running on a worker thread");
/// Ok(())
/// }));
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where F: Future<Item = (), Error = ()> + Send + 'static,
{
self.inner_mut().pool.sender().spawn(future).unwrap();
self
}
/// Run a future to completion on the Tokio runtime.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// This method should not be called from an asynchronous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let mut entered = enter().expect("nested block_on");
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
entered.block_on(rx).unwrap()
}
/// Run a future to completion on the Tokio runtime, then wait for all
/// background futures to complete too.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, waiting for background futures to complete, and yielding
/// its resolved result. Any tasks or timers which the future spawns
/// internally will be executed on the runtime and waited for completion.
///
/// This method should not be called from an asynchronous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let mut entered = enter().expect("nested block_on_all");
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
let block = rx
.map_err(|_| unreachable!())
.and_then(move |r| {
self.shutdown_on_idle()
.map(move |()| r)
});
entered.block_on(block).unwrap()
}
/// Signals the runtime to shutdown once it becomes idle.
///
/// Returns a future that completes once the shutdown operation has
/// completed.
///
/// This function can be used to perform a graceful shutdown of the runtime.
///
/// The runtime enters an idle state once **all** of the following occur.
///
/// * The thread pool has no tasks to execute, i.e., all tasks that were
/// spawned have completed.
/// * The reactor is not managing any I/O resources.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::prelude::*;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
///
/// // Shutdown the runtime
/// rt.shutdown_on_idle()
/// .wait().unwrap();
/// ```
///
/// [mod]: index.html
pub fn shutdown_on_idle(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
let inner = inner.pool.shutdown_on_idle();
Shutdown { inner }
}
/// Signals the runtime to shutdown immediately.
///
/// Returns a future that completes once the shutdown operation has
/// completed.
///
/// This function will forcibly shutdown the runtime, causing any
/// in-progress work to become canceled. The shutdown steps are:
///
/// * Drain any scheduled work queues.
/// * Drop any futures that have not yet completed.
/// * Drop the reactor.
///
/// Once the reactor has dropped, any outstanding I/O resources bound to
/// that reactor will no longer function. Calling any method on them will
/// result in an error.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::prelude::*;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
///
/// // Shutdown the runtime
/// rt.shutdown_now()
/// .wait().unwrap();
/// ```
///
/// [mod]: index.html
pub fn shutdown_now(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
Shutdown::shutdown_now(inner)
}
fn inner(&self) -> &Inner {
self.inner.as_ref().unwrap()
}
fn inner_mut(&mut self) -> &mut Inner {
self.inner.as_mut().unwrap()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let shutdown = Shutdown::shutdown_now(inner);
let _ = shutdown.wait();
}
}
}

View File

@ -1,4 +1,4 @@
use runtime::Inner;
use super::Inner;
use tokio_threadpool as threadpool;
use std::fmt;

View File

@ -1,9 +1,12 @@
#[cfg(feature = "timer")]
#[allow(deprecated)]
use tokio_timer::Deadline;
#[cfg(feature = "timer")]
use tokio_timer::Timeout;
use futures::Future;
#[cfg(feature = "timer")]
use std::time::{Instant, Duration};
@ -55,12 +58,14 @@ pub trait FutureExt: Future {
/// tokio::run(future);
/// # }
/// ```
#[cfg(feature = "timer")]
fn timeout(self, timeout: Duration) -> Timeout<Self>
where Self: Sized,
{
Timeout::new(self, timeout)
}
#[cfg(feature = "timer")]
#[deprecated(since = "0.1.8", note = "use `timeout` instead")]
#[allow(deprecated)]
#[doc(hidden)]
@ -78,6 +83,7 @@ mod test {
use super::*;
use prelude::future;
#[cfg(feature = "timer")]
#[test]
fn timeout_polls_at_least_once() {
let base_future = future::result::<(), ()>(Ok(()));

View File

@ -1,3 +1,4 @@
#[cfg(feature = "timer")]
use tokio_timer::{
throttle::Throttle,
Timeout,
@ -5,6 +6,7 @@ use tokio_timer::{
use futures::Stream;
#[cfg(feature = "timer")]
use std::time::Duration;
@ -25,6 +27,7 @@ pub trait StreamExt: Stream {
/// Throttle down the stream by enforcing a fixed delay between items.
///
/// Errors are also delayed.
#[cfg(feature = "timer")]
fn throttle(self, duration: Duration) -> Throttle<Self>
where Self: Sized
{
@ -63,6 +66,7 @@ pub trait StreamExt: Stream {
/// tokio::run(stream);
/// # }
/// ```
#[cfg(feature = "timer")]
fn timeout(self, timeout: Duration) -> Timeout<Self>
where Self: Sized,
{