executor: move into tokio crate (#1702)

A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).

The executor implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
This commit is contained in:
Carl Lerche 2019-10-28 21:40:29 -07:00 committed by GitHub
parent 7eb264a0d0
commit c62ef2d232
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
101 changed files with 387 additions and 690 deletions

View File

@ -2,7 +2,6 @@
members = [
"tokio",
"tokio-executor",
"tokio-macros",
"tokio-sync",
"tokio-test",

View File

@ -47,9 +47,6 @@ jobs:
displayName: Test sub crates -
rust: beta
crates:
tokio-executor:
- current-thread
- thread-pool
tokio-sync:
- async-traits
tokio-macros: []
@ -58,22 +55,23 @@ jobs:
examples: []
# Test compilation failure
- template: ci/azure-test-stable.yml
parameters:
name: test_features
displayName: Test feature flags
rust: beta
crates:
tests-build:
- tokio-executor
- executor-without-current-thread
# - macros-invalid-input
# - net-no-features
# - net-with-tcp
# - net-with-udp
# - net-with-uds
# - tokio-no-features
# - tokio-with-net
# Disable pending: https://github.com/tokio-rs/tokio/pull/1695#issuecomment-547045383
# - template: ci/azure-test-stable.yml
# parameters:
# name: test_features
# displayName: Test feature flags
# rust: beta
# crates:
# tests-build:
# # - tokio-executor
# # - executor-without-current-thread
# # - macros-invalid-input
# # - net-no-features
# # - net-with-tcp
# # - net-with-udp
# # - net-with-uds
# # - tokio-no-features
# # - tokio-with-net
# Run loom tests
- template: ci/azure-loom.yml
@ -81,7 +79,6 @@ jobs:
name: loom
rust: beta
crates:
- tokio-executor
- tokio
# Try cross compiling
@ -115,7 +112,7 @@ jobs:
- clippy
- test_tokio
- test_linux
- test_features
# - test_features
- loom
# - test_nightly
- cross

View File

@ -5,8 +5,6 @@ jobs:
matrix:
Timer:
cmd: cargo test -p tokio-timer --test hammer
Threadpool:
cmd: cargo test -p tokio-executor --tests --features threadpool
pool:
vmImage: ubuntu-16.04
steps:

View File

@ -2,7 +2,6 @@
# repository.
[patch.crates-io]
tokio = { path = "tokio" }
tokio-executor = { path = "tokio-executor" }
tokio-macros = { path = "tokio-macros" }
tokio-sync = { path = "tokio-sync" }
tokio-tls = { path = "tokio-tls" }

View File

@ -6,7 +6,7 @@ edition = "2018"
publish = false
[features]
executor-without-current-thread = ["tokio-executor"]
# executor-without-current-thread = ["tokio-executor"]
# macros-invalid-input = ["tokio/rt-full"]
# net-no-features = ["tokio-net"]
# net-with-tcp = ["tokio-net/tcp"]
@ -19,7 +19,6 @@ executor-without-current-thread = ["tokio-executor"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio-executor = { path = "../tokio-executor", optional = true }
# tokio = { path = "../tokio", optional = true, default-features = false }
[dev-dependencies]

View File

@ -1,81 +0,0 @@
# 0.2.0-alpha.6 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.2.0-alpha.5 (September 19, 2019)
### Fix
- shutdown blocking pool threads when idle (#1562, #1514).
# 0.2.0-alpha.4 (August 29, 2019)
- Track tokio release.
# 0.2.0-alpha.3 (August 28, 2019)
### Changed
- use `tracing` instead of `log`
### Added
- thread pool dedicated to blocking operations (#1495).
- `Executor::spawn_with_handle` (#1492).
# 0.2.0-alpha.2 (August 17, 2019)
### Fixed
- allow running executor from within blocking clause (#1433).
### Changed
- Update `futures` dependency to 0.3.0-alpha.18.
### Added
- Import `current-thread` executor (#1447).
- Import `threadpool` executor (#1152).
# 0.2.0-alpha.1 (August 8, 2019)
### Changed
- Switch to `async`, `await`, and `std::future`.
### Removed
- `Enter::make_permanent` and `Enter::on_exit` (#???)
# 0.1.7 (March 22, 2019)
### Added
- `TypedExecutor` for spawning futures of a specific type (#993).
# 0.1.6 (January 6, 2019)
* Implement `Unpark` for `Arc<Unpark>` (#802).
* Switch to crossbeam's Parker / Unparker (#528).
# 0.1.5 (September 26, 2018)
* Implement `futures::Executor` for `DefaultExecutor` (#563).
* Add `Enter::block_on(future)` (#646)
# 0.1.4 (August 23, 2018)
* Implement `std::error::Error` for error types (#511).
# 0.1.3 (August 6, 2018)
* Implement `Executor` for `Box<E: Executor>` (#420).
* Improve `EnterError` debug message (#410).
* Implement `status`, `Send`, and `Sync` for `DefaultExecutor` (#463, #472).
* Fix race in `ParkThread` (#507).
* Handle recursive calls into `DefaultExecutor` (#473).
# 0.1.2 (March 30, 2018)
* Implement `Unpark` for `Box<Unpark>`.
# 0.1.1 (March 22, 2018)
* Optionally support futures 0.2.
# 0.1.0 (March 09, 2018)
* Initial release

View File

@ -1,52 +0,0 @@
[package]
name = "tokio-executor"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "v0.2.x" git tag.
version = "0.2.0-alpha.6"
edition = "2018"
documentation = "https://docs.rs/tokio-executor/0.2.0-alpha.6/tokio_executor"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://github.com/tokio-rs/tokio"
license = "MIT"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = """
Future execution primitives
"""
keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]
[features]
blocking = ["lazy_static"]
current-thread = ["crossbeam-channel"]
thread-pool = ["num_cpus"]
[dependencies]
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["channel"] }
tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" }
# current-thread dependencies
crossbeam-channel = { version = "0.3.8", optional = true }
# threadpool dependencies
num_cpus = { version = "1.2", optional = true }
# blocking
futures-core-preview = { version = "=0.3.0-alpha.19", optional = true }
lazy_static = { version = "1", optional = true }
[dev-dependencies]
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" }
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }
futures-core-preview = "=0.3.0-alpha.19"
loom = { version = "0.2.11", features = ["futures", "checkpoint"] }
rand = "0.7"
[package.metadata.docs.rs]
all-features = true

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# tokio-executor
Task execution related traits and utilities.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tokio by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,102 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tokio-executor/0.2.0-alpha.6")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![deny(intra_doc_link_resolution_failure)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Task execution related traits and utilities.
//!
//! In the Tokio execution model, futures are lazy. When a future is created, no
//! work is performed. In order for the work defined by the future to happen,
//! the future must be submitted to an executor. A future that is submitted to
//! an executor is called a "task".
//!
//! The executor is responsible for ensuring that [`Future::poll`] is called
//! whenever the task is notified. Notification happens when the internal
//! state of a task transitions from *not ready* to *ready*. For example, a
//! socket might have received data and a call to `read` will now be able to
//! succeed.
//!
//! This crate provides traits and utilities that are necessary for building an
//! executor, including:
//!
//! * The [`Executor`] trait spawns future object onto an executor.
//!
//! * The [`TypedExecutor`] trait spawns futures of a specific type onto an
//! executor. This is used to be generic over executors that spawn futures
//! that are either `Send` or `!Send` or implement executors that apply to
//! specific futures.
//!
//! * [`enter`] marks that the current thread is entering an execution
//! context. This prevents a second executor from accidentally starting from
//! within the context of one that is already running.
//!
//! * [`DefaultExecutor`] spawns tasks onto the default executor for the current
//! context.
//!
//! * [`Park`] abstracts over blocking and unblocking the current thread.
//!
//! # Implementing an executor
//!
//! Executors should always implement `TypedExecutor`. This usually is the bound
//! that applications and libraries will use when generic over an executor. See
//! the [trait documentation][`TypedExecutor`] for more details.
//!
//! If the executor is able to spawn all futures that are `Send`, then the
//! executor should also implement the `Executor` trait. This trait is rarely
//! used directly by applications and libraries. Instead, `tokio::spawn` is
//! configured to dispatch to type that implements `Executor`.
//!
//! [`Executor`]: trait.Executor.html
//! [`TypedExecutor`]: trait.TypedExecutor.html
//! [`enter`]: fn.enter.html
//! [`DefaultExecutor`]: struct.DefaultExecutor.html
//! [`Park`]: park/index.html
//! [`Future::poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
#[cfg(all(test, loom))]
macro_rules! thread_local {
($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
}
// At the top due to macros
#[cfg(test)]
#[macro_use]
mod tests;
mod enter;
mod error;
mod executor;
mod global;
mod loom;
pub mod park;
#[cfg(feature = "thread-pool")]
mod task;
mod typed;
#[cfg(feature = "thread-pool")]
mod util;
#[cfg(all(not(feature = "blocking"), feature = "thread-pool"))]
mod blocking;
#[cfg(feature = "blocking")]
pub mod blocking;
#[cfg(feature = "current-thread")]
pub mod current_thread;
#[cfg(feature = "thread-pool")]
pub mod thread_pool;
pub use crate::enter::{enter, exit, Enter, EnterError};
pub use crate::error::SpawnError;
pub use crate::executor::Executor;
pub use crate::global::{spawn, with_default, DefaultExecutor};
pub use crate::typed::TypedExecutor;
pub use futures_util::future::RemoteHandle;

View File

@ -21,7 +21,6 @@ categories = ["asynchronous", "testing"]
[dependencies]
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-executor = { version = "=0.2.0-alpha.6", path = "../tokio-executor" }
tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" }
bytes = "0.4"

View File

@ -22,9 +22,9 @@
//! });
//! ```
use tokio::executor::park::{Park, Unpark};
use tokio::timer::clock::{Clock, Now};
use tokio::timer::Timer;
use tokio_executor::park::{Park, Unpark};
use std::marker::PhantomData;
use std::rc::Rc;

View File

@ -1,6 +1,6 @@
//! Futures task based helpers
use tokio_executor::enter;
use tokio::executor::enter;
use pin_convert::AsPinMut;
use std::future::Future;

View File

@ -25,6 +25,7 @@ keywords = ["io", "async", "non-blocking", "futures"]
[features]
default = [
"blocking",
"fs",
"io",
"net-full",
@ -35,25 +36,25 @@ default = [
"timer",
]
fs = ["tokio-executor/blocking"]
blocking = []
fs = ["blocking"]
io-traits = ["bytes", "iovec"]
io-util = ["io-traits", "pin-project", "memchr"]
io = ["io-traits", "io-util"]
macros = ["tokio-macros"]
net-full = ["tcp", "udp", "uds"]
net-driver = ["mio", "tokio-executor/blocking", "lazy_static"]
net-driver = ["mio", "blocking", "lazy_static"]
rt-current-thread = [
"crossbeam-channel",
"timer",
"tokio-executor/current-thread",
]
rt-full = [
"macros",
"num_cpus",
"net-full",
"rt-current-thread",
"sync",
"timer",
"tokio-executor/current-thread",
"tokio-executor/thread-pool",
]
signal = [
"lazy_static",
@ -82,10 +83,11 @@ process = [
[dependencies]
futures-core-preview = "=0.3.0-alpha.19"
futures-sink-preview = "=0.3.0-alpha.19"
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink"] }
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "channel"] }
# Everything else is optional...
bytes = { version = "0.4", optional = true }
crossbeam-channel = { version = "0.3.8", optional = true }
crossbeam-utils = { version = "0.6.0", optional = true }
iovec = { version = "0.1", optional = true }
lazy_static = { version = "1.0.2", optional = true }
@ -95,7 +97,6 @@ num_cpus = { version = "1.8.0", optional = true }
pin-project = { version = "0.4", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }
tokio-executor = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-executor" }
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync", features = ["async-traits"] }
@ -113,10 +114,6 @@ version = "0.3.8"
default-features = false
optional = true
[target.'cfg(loom)'.dependencies]
# play nice with loom tests in other crates.
loom = "0.2.11"
[dev-dependencies]
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }
tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" }
@ -127,6 +124,7 @@ flate2 = { version = "1", features = ["tokio"] }
http = "0.1"
httparse = "1.0"
libc = "0.2"
loom = { version = "0.2.11", features = ["futures", "checkpoint"] }
num_cpus = "1.0"
rand = "0.7.2"
serde = { version = "1.0", features = ["derive"] }
@ -135,7 +133,6 @@ tempfile = "3.1.0"
time = "0.1"
# sharded slab tests
loom = "0.2.11"
proptest = "0.9.4"
[package.metadata.docs.rs]

View File

@ -2,7 +2,7 @@
extern crate test;
use tokio_executor::thread_pool::{Builder, Spawner, ThreadPool};
use tokio::executor::thread_pool::{Builder, Spawner};
use tokio_sync::oneshot;
use std::future::Future;

View File

@ -1,104 +0,0 @@
//! Task execution utilities.
//!
//! In the Tokio execution model, futures are lazy. When a future is created, no
//! work is performed. In order for the work defined by the future to happen,
//! the future must be submitted to an executor. A future that is submitted to
//! an executor is called a "task".
//!
//! The executor is responsible for ensuring that [`Future::poll`] is
//! called whenever the task is [notified]. Notification happens when the
//! internal state of a task transitions from "not ready" to ready. For
//! example, a socket might have received data and a call to `read` will now be
//! able to succeed.
//!
//! The specific strategy used to manage the tasks is left up to the
//! executor. There are two main flavors of executors: single-threaded and
//! multi-threaded. Tokio provides implementation for both of these in the
//! [`runtime`] module.
//!
//! # `Executor` trait.
//!
//! This module provides the [`Executor`] trait (re-exported from
//! [`tokio-executor`]), which describes the API that all executors must
//! implement.
//!
//! A free [`spawn`] function is provided that allows spawning futures onto the
//! default executor (tracked via a thread-local variable) without referencing a
//! handle. It is expected that all executors will set a value for the default
//! executor. This value will often be set to the executor itself, but it is
//! possible that the default executor might be set to a different executor.
//!
//! For example, a single threaded executor might set the default executor to a
//! thread pool instead of itself, allowing futures to spawn new tasks onto the
//! thread pool when those tasks are `Send`.
//!
//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify
//! [`runtime`]: ../runtime/index.html
//! [`tokio-executor`]: https://docs.rs/tokio-executor/0.1
//! [`Executor`]: trait.Executor.html
//! [`spawn`]: fn.spawn.html
use std::future::Future;
pub use tokio_executor::{DefaultExecutor, Executor, SpawnError, TypedExecutor};
/// Return value from the `spawn` function.
///
/// Currently this value doesn't actually provide any functionality. However, it
/// provides a way to add functionality later without breaking backwards
/// compatibility.
///
/// See [`spawn`] for more details.
///
/// [`spawn`]: fn.spawn.html
#[derive(Debug)]
pub struct Spawn(());
/// Spawns a future on the default executor.
///
/// In order for a future to do work, it must be spawned on an executor. The
/// `spawn` function is the easiest way to do this. It spawns a future on the
/// [default executor] for the current execution context (tracked using a
/// thread-local variable).
///
/// The default executor is **usually** a thread pool.
///
/// # Examples
///
/// In this example, a server is started and `spawn` is used to start a new task
/// that processes each received connection.
///
/// ```
/// use tokio::net::TcpListener;
///
/// # async fn process<T>(_t: T) {}
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
///
/// tokio::spawn(async move {
/// // Process each socket concurrently.
/// process(socket).await
/// });
/// }
/// # }
/// ```
///
/// [default executor]: struct.DefaultExecutor.html
///
/// # Panics
///
/// This function will panic if the default executor is not set or if spawning
/// onto the default executor returns an error. To avoid the panic, use
/// [`DefaultExecutor`].
///
/// [`DefaultExecutor`]: struct.DefaultExecutor.html
pub fn spawn<F>(f: F) -> Spawn
where
F: Future<Output = ()> + 'static + Send,
{
::tokio_executor::spawn(f);
Spawn(())
}

View File

@ -1,5 +1,6 @@
use super::Pool;
use crate::loom::thread;
use crate::executor::blocking::Pool;
use crate::executor::loom::thread;
use std::usize;
/// Builds a blocking thread pool with custom configuration values.

View File

@ -1,7 +1,7 @@
//! Thread pool for blocking operations
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::executor::loom::sync::{Arc, Condvar, Mutex};
use crate::executor::loom::thread;
#[cfg(feature = "blocking")]
use tokio_sync::oneshot;
@ -17,10 +17,10 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
mod builder;
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) use builder::Builder;
#[derive(Clone, Copy)]
@ -259,7 +259,7 @@ impl Drop for PoolWaiter {
///
/// ```
/// # async fn docs() {
/// tokio_executor::blocking::run(move || {
/// tokio::executor::blocking::run(move || {
/// // do some compute-heavy work or call synchronous code
/// }).await;
/// # }

View File

@ -16,12 +16,12 @@
//! [`block_on_all`]: fn.block_on_all.html
mod scheduler;
use self::scheduler::{Scheduler, TickArgs};
use crate::executor::{EnterError, Executor, SpawnError, TypedExecutor};
#[cfg(feature = "blocking")]
use crate::blocking::{Pool, PoolWaiter};
use crate::park::{Park, ParkThread, Unpark};
use crate::{EnterError, Executor, SpawnError, TypedExecutor};
use crate::executor::blocking::{Pool, PoolWaiter};
use crate::executor::park::{Park, ParkThread, Unpark};
use std::cell::Cell;
use std::error::Error;
@ -326,21 +326,21 @@ impl<P: Park> CurrentThread<P> {
where
F: Future,
{
let _enter = crate::enter().expect("failed to start `current_thread::Runtime`");
let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
self.enter().block_on(future)
}
/// Run the executor to completion, blocking the thread until **all**
/// spawned futures have completed.
pub fn run(&mut self) -> Result<(), RunError> {
let _enter = crate::enter().expect("failed to start `current_thread::Runtime`");
let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
self.enter().run()
}
/// Run the executor to completion, blocking the thread until all
/// spawned futures have completed **or** `duration` time has elapsed.
pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
let _enter = crate::enter().expect("failed to start `current_thread::Runtime`");
let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
self.enter().run_timeout(duration)
}
@ -348,7 +348,7 @@ impl<P: Park> CurrentThread<P> {
///
/// This function blocks the current thread even if the executor is idle.
pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
let _enter = crate::enter().expect("failed to start `current_thread::Runtime`");
let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
self.enter().turn(duration)
}
@ -786,7 +786,7 @@ impl<U: Unpark> Borrow<'_, U> {
current.set_spawn(spawner, || {
#[cfg(all(feature = "blocking", not(loom)))]
let res = crate::blocking::with_pool(blocking, || f());
let res = crate::executor::blocking::with_pool(blocking, || f());
#[cfg(any(not(feature = "blocking"), loom))]
let res = f();
res

View File

@ -1,5 +1,5 @@
use super::{Borrow, BorrowSpawner};
use crate::park::Unpark;
use crate::executor::current_thread::{Borrow, BorrowSpawner};
use crate::executor::park::Unpark;
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
@ -128,7 +128,7 @@ pub(super) struct TickArgs<'a> {
pub(super) id: u64,
pub(super) num_futures: &'a AtomicUsize,
#[cfg(feature = "blocking")]
pub(super) blocking: &'a crate::blocking::PoolWaiter,
pub(super) blocking: &'a crate::executor::blocking::PoolWaiter,
}
impl<U> Scheduler<U>

View File

@ -101,7 +101,7 @@ impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub fn block_on<F: Future>(&mut self, mut f: F) -> F::Output {
use crate::park::{Park, ParkThread};
use crate::executor::park::{Park, ParkThread};
use std::pin::Pin;
use std::task::Context;
use std::task::Poll::Ready;

View File

@ -1,4 +1,5 @@
use crate::SpawnError;
use crate::executor::SpawnError;
use futures_util::future::{FutureExt, RemoteHandle};
use std::future::Future;
use std::pin::Pin;
@ -50,7 +51,7 @@ use std::pin::Pin;
/// # Examples
///
/// ```
/// use tokio_executor::Executor;
/// use tokio::executor::Executor;
///
/// # fn docs(my_executor: &mut dyn Executor) {
/// my_executor.spawn(Box::pin(async {
@ -78,7 +79,7 @@ pub trait Executor {
/// # Examples
///
/// ```
/// use tokio_executor::Executor;
/// use tokio::executor::Executor;
///
/// # fn docs(my_executor: &mut dyn Executor) {
/// my_executor.spawn(Box::pin(async {
@ -107,7 +108,7 @@ pub trait Executor {
/// # Examples
///
/// ```
/// use tokio_executor::Executor;
/// use tokio::executor::Executor;
///
/// # fn docs(my_executor: &mut dyn Executor) {
/// if my_executor.status().is_ok() {
@ -141,7 +142,7 @@ impl dyn Executor {
/// # Examples
///
/// ```
/// use tokio_executor::Executor;
/// use tokio::executor::Executor;
/// use futures_util::future::FutureExt;
///
/// # fn docs(my_executor: &'static mut (dyn Executor + 'static)) {

View File

@ -1,6 +1,6 @@
#[cfg(feature = "thread-pool")]
use crate::thread_pool::ThreadPool;
use crate::{Executor, SpawnError};
#[cfg(feature = "rt-full")]
use crate::executor::thread_pool::ThreadPool;
use crate::executor::{Executor, SpawnError};
use std::cell::Cell;
use std::future::Future;
@ -15,7 +15,7 @@ use std::pin::Pin;
/// executor (usually itself) that is used to spawn new tasks.
///
/// The current `DefaultExecutor` reference is tracked using a thread-local
/// variable and is set using `tokio_executor::with_default`
/// variable and is set using `tokio::executor::with_default`
#[derive(Debug, Clone)]
pub struct DefaultExecutor {
_dummy: (),
@ -45,7 +45,7 @@ impl DefaultExecutor {
let executor = unsafe { &mut *executor_ptr };
Some(f(executor))
}
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
State::ThreadPool(threadpool_ptr) => {
let mut thread_pool = unsafe { &*threadpool_ptr };
Some(f(&mut thread_pool))
@ -61,7 +61,7 @@ enum State {
Empty,
// default executor is a thread pool instance.
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
ThreadPool(*const ThreadPool),
// default executor is set to a custom executor.
@ -105,36 +105,47 @@ where
// ===== global spawn fns =====
/// Submits a future for execution on the default executor -- usually a
/// threadpool.
/// Spawns a future on the default executor.
///
/// Futures are lazy constructs. When they are defined, no work happens. In
/// order for the logic defined by the future to be run, the future must be
/// spawned on an executor. This function is the easiest way to do so.
/// In order for a future to do work, it must be spawned on an executor. The
/// `spawn` function is the easiest way to do this. It spawns a future on the
/// [default executor] for the current execution context (tracked using a
/// thread-local variable).
///
/// This function must be called from an execution context, i.e. from a future
/// that has been already spawned onto an executor.
/// The default executor is **usually** a thread pool.
///
/// Once spawned, the future will execute. The details of how that happens is
/// left up to the executor instance. If the executor is a thread pool, the
/// future will be pushed onto a queue that a worker thread polls from. If the
/// executor is a "current thread" executor, the future might be polled
/// immediately from within the call to `spawn` or it might be pushed onto an
/// internal queue.
/// # Examples
///
/// In this example, a server is started and `spawn` is used to start a new task
/// that processes each received connection.
///
/// ```
/// use tokio::net::TcpListener;
///
/// # async fn process<T>(_t: T) {}
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
///
/// tokio::spawn(async move {
/// // Process each socket concurrently.
/// process(socket).await
/// });
/// }
/// # }
/// ```
///
/// [default executor]: struct.DefaultExecutor.html
///
/// # Panics
///
/// This function will panic if the default executor is not set or if spawning
/// onto the default executor returns an error. To avoid the panic, use the
/// `DefaultExecutor` handle directly.
/// onto the default executor returns an error. To avoid the panic, use
/// [`DefaultExecutor`].
///
/// # Examples
///
/// ```no_run
/// tokio::spawn(async {
/// println!("running on the default executor");
/// });
/// ```
/// [`DefaultExecutor`]: struct.DefaultExecutor.html
pub fn spawn<T>(future: T)
where
T: Future<Output = ()> + Send + 'static,
@ -144,7 +155,7 @@ where
let executor = unsafe { &mut *executor_ptr };
executor.spawn(Box::pin(future)).unwrap();
}
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
State::ThreadPool(threadpool_ptr) => {
let thread_pool = unsafe { &*threadpool_ptr };
thread_pool.spawn_background(future);
@ -153,7 +164,7 @@ where
})
}
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) fn with_threadpool<F, R>(thread_pool: &ThreadPool, f: F) -> R
where
F: FnOnce() -> R,

View File

@ -21,7 +21,7 @@ pub(crate) mod std {
}
pub(crate) use self::std::sync;
#[cfg(any(feature = "blocking", feature = "thread-pool"))]
#[cfg(any(feature = "blocking", feature = "rt-full"))]
pub(crate) use self::std::thread;
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) use self::std::{alloc, cell, rand, sys};

View File

@ -22,7 +22,7 @@ impl AtomicUsize {
///
/// All mutations must have happened before the unsynchronized load.
/// Additionally, there must be no concurrent mutations.
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) unsafe fn unsync_load(&self) -> usize {
*(*self.inner.get()).get_mut()
}

View File

@ -1,10 +1,10 @@
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
mod atomic_u32;
mod atomic_usize;
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
mod causal_cell;
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) mod alloc {
#[derive(Debug)]
pub(crate) struct Track<T> {
@ -26,12 +26,12 @@ pub(crate) mod alloc {
}
}
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) mod cell {
pub(crate) use super::causal_cell::{CausalCell, CausalCheck};
}
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) mod rand {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash, Hasher};
@ -57,21 +57,21 @@ pub(crate) mod sync {
pub(crate) use std::sync::{Arc, Condvar, Mutex};
pub(crate) mod atomic {
#[cfg(feature = "thread-pool")]
pub(crate) use crate::loom::std::atomic_u32::AtomicU32;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
#[cfg(feature = "rt-full")]
pub(crate) use crate::executor::loom::std::atomic_u32::AtomicU32;
pub(crate) use crate::executor::loom::std::atomic_usize::AtomicUsize;
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) use std::sync::atomic::{fence, spin_loop_hint, AtomicPtr};
}
}
#[cfg(feature = "thread-pool")]
#[cfg(feature = "rt-full")]
pub(crate) mod sys {
pub(crate) fn num_cpus() -> usize {
usize::max(1, num_cpus::get_physical())
}
}
#[cfg(any(feature = "blocking", feature = "thread-pool"))]
#[cfg(any(feature = "blocking", feature = "rt-full"))]
pub(crate) use std::thread;

84
tokio/src/executor/mod.rs Normal file
View File

@ -0,0 +1,84 @@
//! Task execution related traits and utilities.
//!
//! In the Tokio execution model, futures are lazy. When a future is created, no
//! work is performed. In order for the work defined by the future to happen,
//! the future must be submitted to an executor. A future that is submitted to
//! an executor is called a "task".
//!
//! The executor is responsible for ensuring that [`Future::poll`] is called
//! whenever the task is notified. Notification happens when the internal
//! state of a task transitions from *not ready* to *ready*. For example, a
//! socket might have received data and a call to `read` will now be able to
//! succeed.
//!
//! The specific strategy used to manage the tasks is left up to the
//! executor. There are two main flavors of executors: single-threaded and
//! multi-threaded. Tokio provides implementation for both of these in the
//! [`runtime`] module.
//!
//! # `Executor` trait.
//!
//! This module provides the [`Executor`] trait (re-exported from
//! [`tokio-executor`]), which describes the API that all executors must
//! implement.
//!
//! A free [`spawn`] function is provided that allows spawning futures onto the
//! default executor (tracked via a thread-local variable) without referencing a
//! handle. It is expected that all executors will set a value for the default
//! executor. This value will often be set to the executor itself, but it is
//! possible that the default executor might be set to a different executor.
//!
//! For example, a single threaded executor might set the default executor to a
//! thread pool instead of itself, allowing futures to spawn new tasks onto the
//! thread pool when those tasks are `Send`.
//!
//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify
//! [`runtime`]: ../runtime/index.html
//! [`tokio-executor`]: https://docs.rs/tokio-executor/0.1
//! [`Executor`]: trait.Executor.html
//! [`spawn`]: fn.spawn.html#[cfg(all(test, loom))]
// At the top due to macros
#[cfg(test)]
#[macro_use]
mod tests;
mod enter;
pub use self::enter::{enter, exit, Enter, EnterError};
mod error;
pub use self::error::SpawnError;
#[allow(clippy::module_inception)]
mod executor;
pub use self::executor::Executor;
mod global;
pub use self::global::{spawn, with_default, DefaultExecutor};
mod loom;
pub mod park;
#[cfg(feature = "rt-full")]
mod task;
mod typed;
pub use self::typed::TypedExecutor;
#[cfg(feature = "rt-full")]
mod util;
#[cfg(all(not(feature = "blocking"), feature = "rt-full"))]
mod blocking;
#[cfg(feature = "blocking")]
pub mod blocking;
#[cfg(feature = "rt-current-thread")]
pub mod current_thread;
#[cfg(feature = "rt-full")]
pub mod thread_pool;
pub use futures_util::future::RemoteHandle;

View File

@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::park::{Park, Unpark};
use crate::executor::loom::sync::atomic::AtomicUsize;
use crate::executor::loom::sync::{Arc, Condvar, Mutex};
use crate::executor::park::{Park, Unpark};
use std::marker::PhantomData;
use std::mem;

View File

@ -1,9 +1,9 @@
use crate::loom::alloc::Track;
use crate::loom::cell::CausalCell;
use crate::task::raw::{self, Vtable};
use crate::task::state::State;
use crate::task::waker::waker_ref;
use crate::task::Schedule;
use crate::executor::loom::alloc::Track;
use crate::executor::loom::cell::CausalCell;
use crate::executor::task::raw::{self, Vtable};
use crate::executor::task::state::State;
use crate::executor::task::waker::waker_ref;
use crate::executor::task::Schedule;
use std::cell::UnsafeCell;
use std::future::Future;

View File

@ -1,8 +1,8 @@
use crate::loom::alloc::Track;
use crate::loom::cell::CausalCheck;
use crate::task::core::{Cell, Core, Header, Trailer};
use crate::task::state::Snapshot;
use crate::task::{Error, Schedule, Task};
use crate::executor::loom::alloc::Track;
use crate::executor::loom::cell::CausalCheck;
use crate::executor::task::core::{Cell, Core, Header, Trailer};
use crate::executor::task::state::Snapshot;
use crate::executor::task::{Error, Schedule, Task};
use std::future::Future;
use std::mem::{ManuallyDrop, MaybeUninit};

View File

@ -1,5 +1,5 @@
use crate::loom::alloc::Track;
use crate::task::raw::RawTask;
use crate::executor::loom::alloc::Track;
use crate::executor::task::raw::RawTask;
use std::future::Future;
use std::marker::PhantomData;

View File

@ -1,4 +1,4 @@
use crate::task::{Header, Task};
use crate::executor::task::{Header, Task};
use std::fmt;
use std::ptr::NonNull;

View File

@ -1,8 +1,8 @@
use crate::loom::alloc::Track;
use crate::task::core::Cell;
use crate::task::harness::Harness;
use crate::task::state::{Snapshot, State};
use crate::task::{Header, Schedule};
use crate::executor::loom::alloc::Track;
use crate::executor::task::core::Cell;
use crate::executor::task::harness::Harness;
use crate::executor::task::state::{Snapshot, State};
use crate::executor::task::{Header, Schedule};
use std::future::Future;
use std::ptr::NonNull;

View File

@ -1,5 +1,5 @@
use crate::loom::sync::atomic::AtomicPtr;
use crate::task::{Header, Task};
use crate::executor::loom::sync::atomic::AtomicPtr;
use crate::executor::task::{Header, Task};
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};

View File

@ -1,4 +1,4 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::executor::loom::sync::atomic::AtomicUsize;
use std::fmt;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
@ -222,7 +222,7 @@ impl State {
///
/// Returns a snapshot of the state **after** the transition.
pub(super) fn release_task(&self) -> Snapshot {
use crate::loom::sync::atomic;
use crate::executor::loom::sync::atomic;
const DELTA: usize = RELEASED;
@ -281,7 +281,7 @@ impl State {
///
/// Returns a snapshot of the state **after** the transition.
pub(super) fn complete_join_handle(&self) -> Snapshot {
use crate::loom::sync::atomic;
use crate::executor::loom::sync::atomic;
const DELTA: usize = JOIN_INTEREST;
@ -335,7 +335,7 @@ impl State {
/// Store the join waker.
pub(super) fn store_join_waker(&self) -> Snapshot {
use crate::loom::sync::atomic;
use crate::executor::loom::sync::atomic;
const DELTA: usize = JOIN_WAKER;
@ -405,7 +405,7 @@ impl State {
/// Returns `true` if the task should be released.
pub(super) fn ref_dec(&self) -> bool {
use crate::loom::sync::atomic;
use crate::executor::loom::sync::atomic;
let prev = self.val.fetch_sub(WAKER_ONE, Release);
let next = Snapshot(prev - WAKER_ONE);

View File

@ -1,5 +1,5 @@
use crate::task;
use crate::tests::loom_schedule::LoomSchedule;
use crate::executor::task;
use crate::executor::tests::loom_schedule::LoomSchedule;
use tokio_test::{assert_err, assert_ok};

View File

@ -1,10 +1,9 @@
use crate::task::{self, Header};
use crate::executor::task::{self, Header};
use crate::executor::tests::backoff::*;
use crate::executor::tests::mock_schedule::{mock, Mock};
use crate::executor::tests::track_drop::track_drop;
use crate::sync::oneshot;
use crate::tests::backoff::*;
use crate::tests::mock_schedule::{mock, Mock};
use crate::tests::track_drop::track_drop;
use tokio::sync::oneshot;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};

View File

@ -1,5 +1,5 @@
use crate::task::harness::Harness;
use crate::task::{Header, Schedule};
use crate::executor::task::harness::Harness;
use crate::executor::task::{Header, Schedule};
use std::future::Future;
use std::marker::PhantomData;

View File

@ -1,4 +1,4 @@
use crate::task::{Schedule, Task};
use crate::executor::task::{Schedule, Task};
use loom::sync::Notify;
use std::collections::VecDeque;

View File

@ -1,6 +1,6 @@
#![allow(warnings)]
use crate::park::{Park, Unpark};
use crate::executor::park::{Park, Unpark};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};

View File

@ -1,6 +1,6 @@
#![allow(warnings)]
use crate::task::{Header, Schedule, Task};
use crate::executor::task::{Header, Schedule, Task};
use std::collections::VecDeque;
use std::sync::Mutex;

View File

@ -1,9 +1,9 @@
use crate::loom::sync::Arc;
use crate::loom::sys::num_cpus;
use crate::loom::thread;
use crate::park::Park;
use crate::thread_pool::park::DefaultPark;
use crate::thread_pool::{shutdown, worker, worker::Worker, Spawner, ThreadPool};
use crate::executor::loom::sync::Arc;
use crate::executor::loom::sys::num_cpus;
use crate::executor::loom::thread;
use crate::executor::park::Park;
use crate::executor::thread_pool::park::DefaultPark;
use crate::executor::thread_pool::{shutdown, worker, worker::Worker, Spawner, ThreadPool};
use std::{fmt, usize};
@ -48,7 +48,7 @@ impl Builder {
/// # Examples
///
/// ```
/// use tokio_executor::thread_pool::Builder;
/// use tokio::executor::thread_pool::Builder;
///
/// let thread_pool = Builder::new()
/// .num_threads(4)
@ -67,7 +67,7 @@ impl Builder {
/// # Examples
///
/// ```
/// use tokio_executor::thread_pool::Builder;
/// use tokio::executor::thread_pool::Builder;
///
/// let thread_pool = Builder::new()
/// .name("my-pool")
@ -89,7 +89,7 @@ impl Builder {
/// # Examples
///
/// ```
/// use tokio_executor::thread_pool::Builder;
/// use tokio::executor::thread_pool::Builder;
///
/// let thread_pool = Builder::new()
/// .stack_size(32 * 1024)
@ -109,7 +109,7 @@ impl Builder {
/// # Examples
///
/// ```
/// use tokio_executor::thread_pool::Builder;
/// use tokio::executor::thread_pool::Builder;
///
/// let thread_pool = Builder::new()
/// .around_worker(|index, work| {
@ -134,7 +134,7 @@ impl Builder {
/// # Examples
///
/// ```
/// use tokio_executor::thread_pool::Builder;
/// use tokio::executor::thread_pool::Builder;
///
/// let thread_pool = Builder::new()
/// .build();
@ -188,7 +188,7 @@ impl Builder {
}) as Box<dyn FnOnce() + Send + 'static>
};
let mut blocking = crate::blocking::Builder::default();
let mut blocking = crate::executor::blocking::Builder::default();
blocking.name(self.name.clone());
if let Some(ss) = self.stack_size {
blocking.stack_size(ss);
@ -203,11 +203,11 @@ impl Builder {
// Spawn threads for each worker
for worker in workers {
crate::blocking::Pool::spawn(&blocking, launch_worker(worker))
crate::executor::blocking::Pool::spawn(&blocking, launch_worker(worker))
}
let spawner = Spawner::new(pool);
let blocking = crate::blocking::PoolWaiter::from(blocking);
let blocking = crate::executor::blocking::PoolWaiter::from(blocking);
ThreadPool::from_parts(spawner, shutdown_rx, blocking)
}
}
@ -242,7 +242,7 @@ impl<P> Park for BoxedPark<P>
where
P: Park,
{
type Unpark = Box<dyn crate::park::Unpark>;
type Unpark = Box<dyn crate::executor::park::Unpark>;
type Error = P::Error;
fn unpark(&self) -> Self::Unpark {

View File

@ -1,6 +1,6 @@
use crate::loom::sync::Arc;
use crate::park::Unpark;
use crate::thread_pool::{worker, Owned};
use crate::executor::loom::sync::Arc;
use crate::executor::park::Unpark;
use crate::executor::thread_pool::{worker, Owned};
use std::cell::Cell;
use std::ptr;

View File

@ -1,7 +1,7 @@
//! Coordinates idling workers
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::executor::loom::sync::atomic::AtomicUsize;
use crate::executor::loom::sync::Mutex;
use std::fmt;
use std::sync::atomic::Ordering::{self, AcqRel, Relaxed, SeqCst};

View File

@ -1,6 +1,6 @@
use crate::park::Unpark;
use crate::task;
use crate::thread_pool::Shared;
use crate::executor::park::Unpark;
use crate::executor::task;
use crate::executor::thread_pool::Shared;
use std::fmt;
use std::future::Future;

View File

@ -38,7 +38,7 @@ mod worker;
mod tests;
// Re-export `task::Error`
pub use crate::task::Error;
pub use crate::executor::task::Error;
// These exports are used in tests
#[cfg(test)]

View File

@ -1,6 +1,6 @@
use crate::task::{self, Task};
use crate::thread_pool::{queue, Shared};
use crate::util::FastRand;
use crate::executor::task::{self, Task};
use crate::executor::thread_pool::{queue, Shared};
use crate::executor::util::FastRand;
use std::cell::Cell;

View File

@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::park::{Park, Unpark};
use crate::executor::loom::sync::atomic::AtomicUsize;
use crate::executor::loom::sync::{Arc, Condvar, Mutex};
use crate::executor::park::{Park, Unpark};
use std::error::Error;
use std::fmt;

View File

@ -1,6 +1,6 @@
use crate::blocking::PoolWaiter;
use crate::thread_pool::{shutdown, Builder, JoinHandle, Spawner};
use crate::Executor;
use crate::executor::blocking::PoolWaiter;
use crate::executor::thread_pool::{shutdown, Builder, JoinHandle, Spawner};
use crate::executor::Executor;
use std::fmt;
use std::future::Future;
@ -67,9 +67,9 @@ impl ThreadPool {
where
F: Future,
{
crate::global::with_threadpool(self, || {
let mut enter = crate::enter().expect("attempting to block while on a Tokio executor");
crate::blocking::with_pool(self.spawner.blocking_pool(), || enter.block_on(future))
crate::executor::global::with_threadpool(self, || {
let mut enter = crate::executor::enter().expect("attempting to block while on a Tokio executor");
crate::executor::blocking::with_pool(self.spawner.blocking_pool(), || enter.block_on(future))
})
}
@ -92,7 +92,7 @@ impl Executor for &ThreadPool {
fn spawn(
&mut self,
future: std::pin::Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), crate::SpawnError> {
) -> Result<(), crate::executor::SpawnError> {
ThreadPool::spawn_background(self, future);
Ok(())
}

View File

@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::task::{Header, Task};
use crate::executor::loom::sync::atomic::AtomicUsize;
use crate::executor::loom::sync::Mutex;
use crate::executor::task::{Header, Task};
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Release};

View File

@ -1,6 +1,6 @@
use crate::loom::sync::Arc;
use crate::task::Task;
use crate::thread_pool::queue::Cluster;
use crate::executor::loom::sync::Arc;
use crate::executor::task::Task;
use crate::executor::thread_pool::queue::Cluster;
pub(crate) struct Inject<T: 'static> {
cluster: Arc<Cluster<T>>,

View File

@ -1,8 +1,8 @@
use crate::loom::cell::{CausalCell, CausalCheck};
use crate::loom::sync::atomic::{self, AtomicU32};
use crate::task::Task;
use crate::thread_pool::queue::global;
use crate::thread_pool::LOCAL_QUEUE_CAPACITY;
use crate::executor::loom::cell::{CausalCell, CausalCheck};
use crate::executor::loom::sync::atomic::{self, AtomicU32};
use crate::executor::task::Task;
use crate::executor::thread_pool::queue::global;
use crate::executor::thread_pool::LOCAL_QUEUE_CAPACITY;
use std::fmt;
use std::mem::MaybeUninit;

View File

@ -8,7 +8,7 @@ mod worker;
pub(crate) use self::inject::Inject;
pub(crate) use self::worker::Worker;
use crate::loom::sync::Arc;
use crate::executor::loom::sync::Arc;
pub(crate) fn build<T: 'static>(workers: usize) -> Vec<Worker<T>> {
let local: Vec<_> = (0..workers).map(|_| local::Queue::new()).collect();

View File

@ -1,8 +1,6 @@
use crate::task::Task;
use crate::thread_pool::queue::{local, Cluster, Inject};
// Loom primitive
use crate::loom::sync::Arc;
use crate::executor::loom::sync::Arc;
use crate::executor::task::Task;
use crate::executor::thread_pool::queue::{local, Cluster, Inject};
use std::cell::Cell;
use std::fmt;

View File

@ -2,13 +2,13 @@
//!
//! - Attempt to spin.
use crate::loom::rand::seed;
use crate::loom::sync::Arc;
use crate::park::Unpark;
use crate::task::{self, Task};
use crate::thread_pool::{current, queue, BoxFuture, Idle, JoinHandle, Owned, Shared};
use crate::util::{CachePadded, FastRand};
use crate::{Executor, SpawnError};
use crate::executor::loom::rand::seed;
use crate::executor::loom::sync::Arc;
use crate::executor::park::Unpark;
use crate::executor::task::{self, Task};
use crate::executor::thread_pool::{current, queue, BoxFuture, Idle, JoinHandle, Owned, Shared};
use crate::executor::util::{CachePadded, FastRand};
use crate::executor::{Executor, SpawnError};
use std::cell::UnsafeCell;
use std::future::Future;
@ -30,7 +30,7 @@ where
idle: Idle,
/// Pool where blocking tasks should be spawned.
pub(crate) blocking: Arc<crate::blocking::Pool>,
pub(crate) blocking: Arc<crate::executor::blocking::Pool>,
}
unsafe impl<P: Unpark> Send for Set<P> {}
@ -44,7 +44,7 @@ where
pub(crate) fn new<F>(
num_workers: usize,
mut mk_unpark: F,
blocking: Arc<crate::blocking::Pool>,
blocking: Arc<crate::executor::blocking::Pool>,
) -> Self
where
F: FnMut(usize) -> P,
@ -113,7 +113,7 @@ where
self.schedule(task);
}
pub(super) fn blocking_pool(&self) -> &Arc<crate::blocking::Pool> {
pub(super) fn blocking_pool(&self) -> &Arc<crate::executor::blocking::Pool> {
&self.blocking
}

View File

@ -1,6 +1,6 @@
use crate::park::Unpark;
use crate::task::{self, Schedule, Task};
use crate::thread_pool::worker;
use crate::executor::park::Unpark;
use crate::executor::task::{self, Schedule, Task};
use crate::executor::thread_pool::worker;
use std::ptr;

View File

@ -3,7 +3,7 @@
//! Each worker holds the `Sender` half. When all the `Sender` halves are
//! dropped, the `Receiver` receives a notification.
use crate::loom::sync::Arc;
use crate::executor::loom::sync::Arc;
use tokio_sync::oneshot;
@ -28,7 +28,7 @@ pub(super) fn channel() -> (Sender, Receiver) {
impl Receiver {
/// Block the current thread until all `Sender` handles drop.
pub(crate) fn wait(&mut self) {
use crate::enter;
use crate::executor::enter;
let mut e = match enter() {
Ok(e) => e,

View File

@ -1,6 +1,6 @@
use crate::loom::sync::Arc;
use crate::park::Unpark;
use crate::thread_pool::{worker, JoinHandle};
use crate::executor::loom::sync::Arc;
use crate::executor::park::Unpark;
use crate::executor::thread_pool::{worker, JoinHandle};
use std::fmt;
use std::future::Future;
@ -44,7 +44,7 @@ impl Spawner {
self.workers.spawn_background(future);
}
pub(super) fn blocking_pool(&self) -> &Arc<crate::blocking::Pool> {
pub(super) fn blocking_pool(&self) -> &Arc<crate::executor::blocking::Pool> {
self.workers.blocking_pool()
}

View File

@ -1,9 +1,9 @@
use crate::loom::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::{Arc, Mutex};
use crate::spawn;
use crate::tests::loom_oneshot as oneshot;
use crate::thread_pool::ThreadPool;
use crate::executor::loom::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::executor::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::executor::loom::sync::{Arc, Mutex};
use crate::executor::tests::loom_oneshot as oneshot;
use crate::executor::thread_pool::ThreadPool;
use std::future::Future;
@ -99,7 +99,7 @@ fn gated() -> impl Future<Output = &'static str> {
}
fn gated2(thread: bool) -> impl Future<Output = &'static str> {
use crate::loom::thread;
use crate::executor::loom::thread;
use futures_util::future::poll_fn;
use std::sync::Arc;
use std::task::Poll;

View File

@ -1,6 +1,6 @@
use crate::task::{self, Task};
use crate::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
use crate::thread_pool::queue;
use crate::executor::task::{self, Task};
use crate::executor::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
use crate::executor::thread_pool::queue;
use loom::thread;

View File

@ -1,6 +1,6 @@
use crate::task::{self, Task};
use crate::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
use crate::thread_pool::{queue, LOCAL_QUEUE_CAPACITY};
use crate::executor::task::{self, Task};
use crate::executor::tests::mock_schedule::{Noop, NOOP_SCHEDULE};
use crate::executor::thread_pool::{queue, LOCAL_QUEUE_CAPACITY};
macro_rules! assert_pop {
($q:expr, $expect:expr) => {

View File

@ -1,5 +1,5 @@
use crate::tests::track_drop::track_drop;
use crate::thread_pool;
use crate::executor::tests::track_drop::track_drop;
use crate::executor::thread_pool;
use tokio_test::assert_ok;
@ -9,8 +9,8 @@ macro_rules! pool {
(pool, w.remove(0), w.remove(0), mock_park)
}};
(! $n:expr) => {{
let mut mock_park = crate::tests::mock_park::MockPark::new();
let blocking = std::sync::Arc::new(crate::blocking::Pool::default());
let mut mock_park = crate::executor::tests::mock_park::MockPark::new();
let blocking = std::sync::Arc::new(crate::executor::blocking::Pool::default());
let (pool, workers) =
thread_pool::create_pool($n, |index| mock_park.mk_park(index), blocking);
(pool, workers, mock_park)
@ -40,7 +40,7 @@ fn execute_single_task() {
#[test]
fn task_migrates() {
use std::sync::mpsc;
use tokio::sync::oneshot;
use crate::sync::oneshot;
let (p, mut w0, mut w1, ..) = pool!(2);
let (tx1, rx1) = oneshot::channel();

View File

@ -1,12 +1,12 @@
use crate::loom::sync::Arc;
use crate::park::{Park, Unpark};
use crate::task::Task;
use crate::thread_pool::{current, Owned, Shared};
use crate::executor::loom::sync::Arc;
use crate::executor::park::{Park, Unpark};
use crate::executor::task::Task;
use crate::executor::thread_pool::{current, Owned, Shared};
use std::time::Duration;
// TODO: remove this re-export
pub(super) use crate::thread_pool::set::Set;
pub(super) use crate::executor::thread_pool::set::Set;
pub(crate) struct Worker<P: Park + 'static> {
/// Entry in the set of workers.
@ -19,7 +19,7 @@ pub(crate) struct Worker<P: Park + 'static> {
pub(crate) fn create_set<F, P>(
pool_size: usize,
mk_park: F,
blocking: Arc<crate::blocking::Pool>,
blocking: Arc<crate::executor::blocking::Pool>,
) -> (Arc<Set<P::Unpark>>, Vec<Worker<P>>)
where
P: Send + Park,
@ -78,10 +78,10 @@ where
// Track the current worker
current::set(&pool, index, || {
let _enter = crate::enter().expect("executor already running on thread");
let _enter = crate::executor::enter().expect("executor already running on thread");
crate::with_default(&mut executor, || {
crate::blocking::with_pool(blocking, || entry.run(park))
crate::executor::with_default(&mut executor, || {
crate::executor::blocking::with_pool(blocking, || entry.run(park))
})
});
}

View File

@ -1,4 +1,4 @@
use crate::SpawnError;
use crate::executor::SpawnError;
/// A value that spawns futures of a specific type.
///
@ -87,7 +87,7 @@ pub trait TypedExecutor<T> {
/// # Examples
///
/// ```rust
/// use tokio_executor::TypedExecutor;
/// use tokio::executor::TypedExecutor;
///
/// use std::future::Future;
/// use std::pin::Pin;
@ -131,7 +131,7 @@ pub trait TypedExecutor<T> {
/// # Examples
///
/// ```rust
/// use tokio_executor::TypedExecutor;
/// use tokio::executor::TypedExecutor;
///
/// use std::future::Future;
/// use std::pin::Pin;

View File

@ -18,12 +18,9 @@
//! type. Adaptions also extend to traits like `std::io::Read` where methods
//! return `std::io::Result`. Be warned that these adapted methods may return
//! `std::io::ErrorKind::WouldBlock` if a *worker* thread can not be converted
//! to a *backup* thread immediately. See [tokio-executor] for more details
//! of the threading model and [`blocking`].
//! to a *backup* thread immediately.
//!
//! [`blocking`]: https://docs.rs/tokio-executor/0.2.0-alpha.2/tokio_executor/threadpool/fn.blocking.html
//! [`AsyncRead`]: https://docs.rs/tokio-io/0.1/tokio_io/trait.AsyncRead.html
//! [tokio-executor]: https://docs.rs/tokio-executor/0.2.0-alpha.2/tokio_executor/threadpool/index.html
pub(crate) mod blocking;
@ -94,5 +91,5 @@ where
mod sys {
pub(crate) use std::fs::File;
pub(crate) use tokio_executor::blocking::{run, Blocking};
pub(crate) use crate::executor::blocking::{run, Blocking};
}

View File

@ -79,6 +79,11 @@ macro_rules! if_runtime {
)*)
}
#[cfg(all(loom, test))]
macro_rules! thread_local {
($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
}
#[cfg(feature = "timer")]
pub mod clock;
@ -101,10 +106,11 @@ mod loom;
pub mod prelude;
#[cfg(feature = "process")]
#[cfg(all(feature = "process", not(loom)))]
pub mod process;
#[cfg(feature = "signal")]
#[cfg(not(loom))]
pub mod signal;
pub mod stream;

View File

@ -1,4 +1,4 @@
use tokio_executor::blocking;
use crate::executor::blocking;
use futures_util::future;
use std::io;
@ -143,7 +143,7 @@ pub(crate) mod sealed {
//! part of the `ToSocketAddrs` public API. The details will change over
//! time.
use tokio_executor::blocking::Blocking;
use crate::executor::blocking::Blocking;
use futures_core::ready;
use std::future::Future;

View File

@ -124,12 +124,12 @@
//! [`PollEvented`]: struct.PollEvented.html
//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
#[cfg(loom)]
#[cfg(all(loom, test))]
macro_rules! loom_thread_local {
($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
}
#[cfg(not(loom))]
#[cfg(any(not(loom), not(test)))]
macro_rules! loom_thread_local {
($($tts:tt)+) => { std::thread_local!{ $($tts)+ } }
}

View File

@ -1,4 +1,5 @@
use super::platform;
use crate::executor::park::{Park, Unpark};
use crate::loom::atomic::{AtomicUsize, Ordering::SeqCst};
mod dispatch;
@ -15,7 +16,6 @@ use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;
use std::{fmt, usize};
use tokio_executor::park::{Park, Unpark};
/// The core reactor, or event loop.
///

View File

@ -1,10 +1,9 @@
use crate::executor::current_thread::CurrentThread;
use crate::net::driver::Reactor;
use crate::runtime::current_thread::Runtime;
use crate::timer::clock::Clock;
use crate::timer::timer::Timer;
use tokio_executor::current_thread::CurrentThread;
use std::io;
/// Builds a Single-threaded runtime with custom configuration values.

View File

@ -63,5 +63,5 @@ mod runtime;
pub use self::builder::Builder;
pub use self::runtime::{Handle, Runtime, RunError};
pub use tokio_executor::current_thread::spawn;
pub use tokio_executor::current_thread::TaskExecutor;
pub use crate::executor::current_thread::spawn;
pub use crate::executor::current_thread::TaskExecutor;

View File

@ -1,11 +1,10 @@
use crate::executor::current_thread::Handle as ExecutorHandle;
use crate::executor::current_thread::{self, CurrentThread};
use crate::net::driver::{self, Reactor};
use crate::runtime::current_thread::Builder;
use crate::timer::clock::{self, Clock};
use crate::timer::timer::{self, Timer};
use tokio_executor::current_thread::Handle as ExecutorHandle;
use tokio_executor::current_thread::{self, CurrentThread};
use std::error::Error;
use std::fmt;
use std::future::Future;
@ -38,7 +37,7 @@ impl Handle {
///
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError>
pub fn spawn<F>(&self, future: F) -> Result<(), crate::executor::SpawnError>
where
F: Future<Output = ()> + Send + 'static,
{
@ -54,7 +53,7 @@ impl Handle {
///
/// This allows a caller to avoid creating the task if the call to `spawn`
/// has a high likelihood of failing.
pub fn status(&self) -> Result<(), tokio_executor::SpawnError> {
pub fn status(&self) -> Result<(), crate::executor::SpawnError> {
self.0.status()
}
}
@ -201,7 +200,7 @@ impl Runtime {
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, || f(executor))
crate::executor::with_default(&mut default_executor, || f(executor))
})
}
}

View File

@ -19,7 +19,7 @@
//! Creating a [`Runtime`] does the following:
//!
//! * Spawn a background thread running a [`Reactor`] instance.
//! * Start a [`ThreadPool`] for executing futures.
//! * Start a thread pool for executing futures.
//! * Run an instance of `Timer` **per** thread pool worker thread.
//!
//! The thread pool uses a work-stealing strategy and is configured to start a
@ -124,12 +124,12 @@
//! [timer]: ../timer/index.html
//! [`Runtime`]: struct.Runtime.html
//! [`Reactor`]: ../reactor/struct.Reactor.html
//! [`ThreadPool`]: https://docs.rs/tokio-executor/0.2.0-alpha.2/tokio_executor/threadpool/struct.ThreadPool.html
//! [`run`]: fn.run.html
//! [`tokio::spawn`]: ../executor/fn.spawn.html
//! [`tokio::main`]: ../../tokio_macros/attr.main.html
pub mod current_thread;
#[cfg(feature = "rt-full")]
mod threadpool;

View File

@ -1,10 +1,9 @@
use crate::executor::thread_pool;
use crate::net::driver::{self, Reactor};
use crate::runtime::threadpool::{Inner, Runtime};
use crate::timer::clock::{self, Clock};
use crate::timer::timer::{self, Timer};
use tokio_executor::thread_pool;
use std::sync::{Arc, Mutex};
use std::{fmt, io};

View File

@ -7,12 +7,12 @@ mod spawner;
pub use self::spawner::Spawner;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use tokio_executor::thread_pool::JoinHandle;
pub use crate::executor::thread_pool::JoinHandle;
use crate::net::driver;
use crate::timer::timer;
use tokio_executor::thread_pool::ThreadPool;
use crate::executor::thread_pool::ThreadPool;
use std::future::Future;
use std::io;

View File

@ -1,7 +1,6 @@
use crate::executor::thread_pool;
use crate::runtime::JoinHandle;
use tokio_executor::thread_pool;
use std::future::Future;
/// Spawns futures on the runtime

View File

@ -41,13 +41,12 @@ pub(crate) use self::registration::Registration;
mod stack;
use self::stack::Stack;
use crate::executor::park::{Park, ParkThread, Unpark};
use crate::timer::atomic::AtomicU64;
use crate::timer::clock::Clock;
use crate::timer::wheel;
use crate::timer::Error;
use tokio_executor::park::{Park, ParkThread, Unpark};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;

View File

@ -1,9 +1,9 @@
#![warn(rust_2018_idioms)]
#![cfg(not(miri))]
use tokio::executor::current_thread::{self, block_on_all, CurrentThread, TaskExecutor};
use tokio::executor::TypedExecutor;
use tokio::sync::oneshot;
use tokio_executor::current_thread::{self, block_on_all, CurrentThread, TaskExecutor};
use tokio_executor::TypedExecutor;
use std::any::Any;
use std::cell::{Cell, RefCell};
@ -100,7 +100,7 @@ mod does_not_set_global_executor_by_default {
#[test]
fn spawn() {
test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
test(|f| tokio::executor::DefaultExecutor::current().spawn(f))
}
}
@ -547,7 +547,7 @@ struct MyPark {
struct MyUnpark;
impl tokio_executor::park::Park for MyPark {
impl tokio::executor::park::Park for MyPark {
type Unpark = MyUnpark;
type Error = ();
@ -569,7 +569,7 @@ impl tokio_executor::park::Park for MyPark {
}
}
impl tokio_executor::park::Unpark for MyUnpark {
impl tokio::executor::park::Unpark for MyUnpark {
fn unpark(&self) {}
}

View File

@ -1,13 +1,13 @@
#![warn(rust_2018_idioms)]
use tokio_executor::{self, DefaultExecutor};
use tokio::executor::DefaultExecutor;
use std::future::Future;
use std::pin::Pin;
mod out_of_executor_context {
use super::*;
use tokio_executor::Executor;
use tokio::executor::Executor;
fn test<F, E>(spawn: F)
where

View File

@ -2,7 +2,7 @@
#[test]
fn block_on_ready() {
let mut enter = tokio_executor::enter().unwrap();
let mut enter = tokio::executor::enter().unwrap();
let val = enter.block_on(async { 123 });
assert_eq!(val, 123);
@ -10,7 +10,7 @@ fn block_on_ready() {
#[test]
fn block_on_pending() {
let mut enter = tokio_executor::enter().unwrap();
let mut enter = tokio::executor::enter().unwrap();
let val = enter.block_on(async { 123 });
assert_eq!(val, 123);

View File

@ -1,4 +1,4 @@
use tokio_executor::{with_default, DefaultExecutor};
use tokio::executor::{with_default, DefaultExecutor};
#[test]
fn default_executor_is_send_and_sync() {
@ -10,7 +10,7 @@ fn default_executor_is_send_and_sync() {
#[test]
#[should_panic]
fn nested_default_executor_status() {
let _enter = tokio_executor::enter().unwrap();
let _enter = tokio::executor::enter().unwrap();
let mut executor = DefaultExecutor::current();
let _result = with_default(&mut executor, || ());

View File

@ -61,7 +61,7 @@ fn test_drop_on_notify() {
}
}));
let _enter = tokio_executor::enter().unwrap();
let _enter = tokio::executor::enter().unwrap();
{
let handle = reactor.handle();

View File

@ -1,5 +1,4 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "default")]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
@ -133,6 +132,6 @@ fn racy() {
// wait for runtime thread to exit
jh.join().unwrap();
let mut e = tokio_executor::enter().unwrap();
let mut e = tokio::executor::enter().unwrap();
e.block_on(rx).unwrap();
}

View File

@ -1,7 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "default")]
use tokio;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Runtime;
@ -9,7 +7,6 @@ use tokio::sync::oneshot;
use tokio::timer::delay;
use tokio_test::{assert_err, assert_ok};
use env_logger;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
@ -146,7 +143,7 @@ fn nested_enter() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
assert_err!(tokio_executor::enter());
assert_err!(tokio::executor::enter());
let res = panic::catch_unwind(move || {
let rt = Runtime::new().unwrap();

View File

@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio_executor::park::{Park, Unpark};
use tokio_executor::thread_pool::*;
use tokio::executor::park::{Park, Unpark};
use tokio::executor::thread_pool::*;
use futures_util::future::poll_fn;
use std::cell::Cell;
@ -210,7 +210,7 @@ fn many_multishot_futures() {
}
{
let mut e = tokio_executor::enter().unwrap();
let mut e = tokio::executor::enter().unwrap();
e.block_on(async move {
for mut start_tx in start_txs {
@ -232,7 +232,7 @@ fn global_executor_is_configured() {
let (signal_tx, signal_rx) = mpsc::channel();
pool.spawn(async move {
tokio_executor::spawn(async move {
tokio::executor::spawn(async move {
signal_tx.send(()).unwrap();
});
});

Some files were not shown because too many files have changed in this diff Show More