chore: refine feature flags (#1785)

Removes dependencies between Tokio feature flags. For example, `process`
should not depend on `sync` simply because it uses the `mpsc` channel.
Instead, feature flags represent **public** APIs that become available
with the feature enabled. When the feature is not enabled, the
functionality is removed. If another Tokio component requires the
functionality, it is stays as `pub(crate)`.

The threaded scheduler is now exposed under `rt-threaded`. This feature
flag only enables the threaded scheduler and does not include I/O,
networking, or time. Those features must be explictly enabled.

A `full` feature flag is added that enables all features.

`stdin`, `stdout`, `stderr` are exposed under `io-std`.

Macros are used to scope code by feature flag.
This commit is contained in:
Carl Lerche 2019-11-18 07:00:55 -08:00 committed by GitHub
parent 13b6e9939e
commit 0d38936b35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1092 additions and 915 deletions

View File

@ -29,7 +29,7 @@ quote = "1"
syn = { version = "1.0.3", features = ["full"] } syn = { version = "1.0.3", features = ["full"] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "=0.2.0-alpha.6", path = "../tokio", default-features = false, features = ["rt-full"] } tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true

View File

@ -24,14 +24,21 @@ categories = ["asynchronous", "network-programming"]
keywords = ["io", "async", "non-blocking", "futures"] keywords = ["io", "async", "non-blocking", "futures"]
[features] [features]
default = [ default = ["full"]
# enable everything
full = [
"blocking", "blocking",
"dns",
"fs", "fs",
"io-driver",
"io-util", "io-util",
"io", "io-std",
"macros",
"net", "net",
"process", "process",
"rt-full", "rt-core",
"rt-threaded",
"signal", "signal",
"stream", "stream",
"sync", "sync",
@ -39,15 +46,16 @@ default = [
] ]
blocking = ["rt-core"] blocking = ["rt-core"]
dns = ["blocking"] dns = ["rt-core"]
fs = ["blocking"] fs = ["rt-core"]
io-driver = ["mio", "lazy_static", "sync"] # TODO: get rid of sync io-driver = ["rt-core", "mio", "lazy_static"]
io-util = ["memchr"] io-util = ["memchr"]
io = ["io-util", "blocking"] # stdin, stdout, stderr
io-std = ["rt-core"]
macros = ["tokio-macros"] macros = ["tokio-macros"]
net = ["dns", "tcp", "udp", "uds"] net = ["dns", "tcp", "udp", "uds"]
process = [ process = [
"io-util", # TODO: Get rid of "io-driver",
"libc", "libc",
"mio-named-pipes", "mio-named-pipes",
"signal", "signal",
@ -58,14 +66,9 @@ process = [
] ]
# Includes basic task execution capabilities # Includes basic task execution capabilities
rt-core = [] rt-core = []
# TODO: rename this -> `rt-threaded` rt-threaded = [
rt-full = [
"macros",
"num_cpus", "num_cpus",
"net",
"rt-core", "rt-core",
"sync",
"time",
] ]
signal = [ signal = [
"io-driver", "io-driver",
@ -80,7 +83,7 @@ stream = ["futures-core"]
sync = ["fnv"] sync = ["fnv"]
test-util = [] test-util = []
tcp = ["io-driver"] tcp = ["io-driver"]
time = ["rt-core", "sync", "slab"] time = ["rt-core", "slab"]
udp = ["io-driver"] udp = ["io-driver"]
uds = ["io-driver", "mio-uds", "libc"] uds = ["io-driver", "mio-uds", "libc"]

View File

@ -1,7 +1,11 @@
#![cfg_attr(not(feature = "blocking"), allow(dead_code, unused_imports))]
//! Perform blocking operations from an asynchronous context. //! Perform blocking operations from an asynchronous context.
mod pool; cfg_blocking_impl! {
pub(crate) use self::pool::{spawn_blocking, BlockingPool, Spawner}; mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
mod schedule; mod schedule;
mod task; mod task;
}

View File

@ -116,16 +116,20 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner ===== // ===== impl Spawner =====
impl Spawner {
#[cfg(feature = "rt-full")]
pub(crate) fn spawn_background<F>(&self, func: F)
where
F: FnOnce() + Send + 'static,
{
let task = task::background(BlockingTask::new(func));
self.schedule(task);
}
cfg_rt_threaded! {
impl Spawner {
pub(crate) fn spawn_background<F>(&self, func: F)
where
F: FnOnce() + Send + 'static,
{
let task = task::background(BlockingTask::new(func));
self.schedule(task);
}
}
}
impl Spawner {
/// Set the blocking pool for the duration of the closure /// Set the blocking pool for the duration of the closure
/// ///
/// If a blocking pool is already set, it will be restored when the closure /// If a blocking pool is already set, it will be restored when the closure

View File

@ -34,13 +34,14 @@ enum State<T> {
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>), Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
} }
impl<T> Blocking<T> { cfg_io_std! {
#[cfg(feature = "io")] impl<T> Blocking<T> {
pub(crate) fn new(inner: T) -> Blocking<T> { pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking { Blocking {
inner: Some(inner), inner: Some(inner),
state: State::Idle(Some(Buf::with_capacity(0))), state: State::Idle(Some(Buf::with_capacity(0))),
need_flush: false, need_flush: false,
}
} }
} }
} }
@ -264,12 +265,15 @@ impl Buf {
self.buf.clear(); self.buf.clear();
res res
} }
}
#[cfg(feature = "fs")] cfg_fs! {
pub(crate) fn discard_read(&mut self) -> i64 { impl Buf {
let ret = -(self.bytes().len() as i64); pub(crate) fn discard_read(&mut self) -> i64 {
self.pos = 0; let ret = -(self.bytes().len() as i64);
self.buf.truncate(0); self.pos = 0;
ret self.buf.truncate(0);
ret
}
} }
} }

View File

@ -36,8 +36,9 @@
//! [`ErrorKind`]: enum.ErrorKind.html //! [`ErrorKind`]: enum.ErrorKind.html
//! [`Result`]: type.Result.html //! [`Result`]: type.Result.html
#[cfg(any(feature = "io", feature = "fs"))] cfg_io_blocking! {
pub(crate) mod blocking; pub(crate) mod blocking;
}
mod async_buf_read; mod async_buf_read;
pub use self::async_buf_read::AsyncBufRead; pub use self::async_buf_read::AsyncBufRead;
@ -48,43 +49,43 @@ pub use self::async_read::AsyncRead;
mod async_write; mod async_write;
pub use self::async_write::AsyncWrite; pub use self::async_write::AsyncWrite;
#[cfg(feature = "io-util")] cfg_io_std! {
pub mod split; mod stderr;
#[cfg(feature = "io-util")] pub use stderr::{stderr, Stderr};
pub use self::split::split;
#[cfg(feature = "io-util")] mod stdin;
mod util; pub use stdin::{stdin, Stdin};
#[cfg(feature = "io-util")]
pub use self::util::{
copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream,
BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take,
};
#[cfg(feature = "io")] mod stdout;
mod stderr; pub use stdout::{stdout, Stdout};
#[cfg(feature = "io")] }
pub use self::stderr::{stderr, Stderr};
cfg_io_util! {
#[cfg(feature = "io")] pub mod split;
mod stdin; pub use split::split;
#[cfg(feature = "io")]
pub use self::stdin::{stdin, Stdin}; pub(crate) mod util;
pub use util::{
#[cfg(feature = "io")] copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream,
mod stdout; BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take,
#[cfg(feature = "io")] };
pub use self::stdout::{stdout, Stdout};
// Re-export io::Error so that users don't have to deal with conflicts when
// Re-export io::Error so that users don't have to deal // `use`ing `tokio::io` and `std::io`.
// with conflicts when `use`ing `tokio::io` and `std::io`. pub use std::io::{Error, ErrorKind, Result};
#[cfg(feature = "io-util")] }
pub use std::io::{Error, ErrorKind, Result};
cfg_not_io_util! {
/// Types in this module can be mocked out in tests. cfg_process! {
#[cfg(any(feature = "io", feature = "fs"))] pub(crate) mod util;
mod sys { }
// TODO: don't rename }
pub(crate) use crate::blocking::spawn_blocking as run;
pub(crate) use crate::task::JoinHandle as Blocking; cfg_io_blocking! {
/// Types in this module can be mocked out in tests.
mod sys {
// TODO: don't rename
pub(crate) use crate::blocking::spawn_blocking as run;
pub(crate) use crate::task::JoinHandle as Blocking;
}
} }

View File

@ -1,71 +1,75 @@
mod async_buf_read_ext; #![allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::async_buf_read_ext::AsyncBufReadExt;
mod async_read_ext; cfg_io_util! {
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 mod async_buf_read_ext;
pub use self::async_read_ext::AsyncReadExt; pub use async_buf_read_ext::AsyncBufReadExt;
mod async_write_ext; mod async_read_ext;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use async_read_ext::AsyncReadExt;
pub use self::async_write_ext::AsyncWriteExt;
mod buf_reader; mod async_write_ext;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use async_write_ext::AsyncWriteExt;
pub use self::buf_reader::BufReader;
mod buf_stream; mod buf_reader;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use buf_reader::BufReader;
pub use self::buf_stream::BufStream;
mod buf_writer; mod buf_stream;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use buf_stream::BufStream;
pub use self::buf_writer::BufWriter;
mod chain; mod buf_writer;
pub use buf_writer::BufWriter;
mod copy; mod chain;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::copy::{copy, Copy};
mod empty; mod copy;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use copy::{copy, Copy};
pub use self::empty::{empty, Empty};
mod flush; mod empty;
pub use empty::{empty, Empty};
mod lines; mod flush;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::lines::Lines;
mod read; mod lines;
mod read_exact; pub use lines::Lines;
mod read_line;
mod read_to_end;
mod read_to_string;
mod read_until;
mod repeat; mod read;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 mod read_exact;
pub use self::repeat::{repeat, Repeat}; mod read_line;
mod shutdown; mod read_to_end;
cfg_process! {
pub(crate) use read_to_end::read_to_end;
}
mod sink; mod read_to_string;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 mod read_until;
pub use self::sink::{sink, Sink};
mod split; mod repeat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use repeat::{repeat, Repeat};
pub use self::split::Split;
mod take; mod shutdown;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::take::Take;
mod write; mod sink;
mod write_all; pub use sink::{sink, Sink};
// used by `BufReader` and `BufWriter` mod split;
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 pub use split::Split;
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
mod take;
pub use take::Take;
mod write;
mod write_all;
// used by `BufReader` and `BufWriter`
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
}
cfg_not_io_util! {
cfg_process! {
mod read_to_end;
// Used by process
pub(crate) use read_to_end::read_to_end;
}
}

View File

@ -70,75 +70,68 @@
//! } //! }
//! ``` //! ```
#[cfg(all(loom, test))] // macros used internally
macro_rules! thread_local {
($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
}
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
};
}
// At the top due to macros
#[cfg(test)]
#[macro_use] #[macro_use]
mod tests; mod macros;
#[cfg(feature = "blocking")] // Blocking task implementation
pub(crate) mod blocking; pub(crate) mod blocking;
#[cfg(feature = "fs")] cfg_fs! {
pub mod fs; pub mod fs;
}
mod future; mod future;
pub mod io; pub mod io;
#[cfg(feature = "io-driver")]
pub mod net; pub mod net;
mod loom; mod loom;
pub mod prelude; pub mod prelude;
#[cfg(feature = "process")] cfg_process! {
#[cfg(not(loom))] pub mod process;
pub mod process; }
pub mod runtime; pub mod runtime;
#[cfg(feature = "signal")] cfg_signal! {
#[cfg(not(loom))] pub mod signal;
pub mod signal; }
#[cfg(feature = "sync")] cfg_sync! {
pub mod sync; pub mod sync;
}
cfg_not_sync! {
mod sync;
}
#[cfg(feature = "rt-core")] cfg_rt_core! {
pub mod task; pub mod task;
#[cfg(feature = "rt-core")] pub use task::spawn;
pub use crate::task::spawn; }
#[cfg(feature = "time")] cfg_time! {
pub mod time; pub mod time;
}
#[cfg(feature = "rt-full")] cfg_rt_threaded! {
mod util; mod util;
}
#[cfg(not(test))] // Work around for rust-lang/rust#62127 cfg_macros! {
#[cfg(feature = "macros")] #[cfg(not(test))] // Work around for rust-lang/rust#62127
#[doc(inline)] pub use tokio_macros::main;
pub use tokio_macros::main; pub use tokio_macros::test;
}
#[cfg(feature = "macros")] // Tests
#[doc(inline)] #[cfg(test)]
pub use tokio_macros::test; mod tests;
// TODO: rm
#[cfg(feature = "io-util")] #[cfg(feature = "io-util")]
#[cfg(test)] #[cfg(test)]
fn is_unpin<T: Unpin>() {} fn is_unpin<T: Unpin>() {}

View File

@ -1,4 +1,4 @@
#![cfg_attr(not(feature = "rt-full"), allow(unused_imports, dead_code))] #![cfg_attr(not(feature = "full"), allow(unused_imports, dead_code))]
mod atomic_u32; mod atomic_u32;
mod atomic_u64; mod atomic_u64;
@ -11,7 +11,7 @@ pub(crate) mod cell {
pub(crate) use super::causal_cell::{CausalCell, CausalCheck}; pub(crate) use super::causal_cell::{CausalCell, CausalCheck};
} }
#[cfg(feature = "sync")] #[cfg(any(feature = "sync", feature = "io-driver"))]
pub(crate) mod future { pub(crate) mod future {
pub(crate) use crate::sync::AtomicWaker; pub(crate) use crate::sync::AtomicWaker;
} }
@ -51,12 +51,12 @@ pub(crate) mod sync {
} }
pub(crate) mod sys { pub(crate) mod sys {
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
pub(crate) fn num_cpus() -> usize { pub(crate) fn num_cpus() -> usize {
usize::max(1, num_cpus::get_physical()) usize::max(1, num_cpus::get_physical())
} }
#[cfg(not(feature = "rt-full"))] #[cfg(not(feature = "rt-threaded"))]
pub(crate) fn num_cpus() -> usize { pub(crate) fn num_cpus() -> usize {
1 1
} }

View File

@ -0,0 +1,19 @@
/// Assert option is some
macro_rules! assert_some {
($e:expr) => {{
match $e {
Some(v) => v,
_ => panic!("expected some, was none"),
}
}};
}
/// Assert option is none
macro_rules! assert_none {
($e:expr) => {{
match $e {
Some(v) => panic!("expected none, was {:?}", v),
_ => {}
}
}};
}

217
tokio/src/macros/cfg.rs Normal file
View File

@ -0,0 +1,217 @@
#![allow(unused_macros)]
macro_rules! cfg_atomic_waker {
($($item:item)*) => {
$( #[cfg(any(feature = "io-driver", feature = "time"))] $item )*
}
}
macro_rules! cfg_blocking {
($($item:item)*) => {
$( #[cfg(feature = "blocking")] $item )*
}
}
/// Enable blocking API internals
macro_rules! cfg_blocking_impl {
($($item:item)*) => {
$(
#[cfg(any(
feature = "blocking",
feature = "fs",
feature = "dns",
feature = "io-std",
feature = "rt-threaded",
))]
$item
)*
}
}
/// Enable blocking API internals
macro_rules! cfg_not_blocking_impl {
($($item:item)*) => {
$(
#[cfg(not(any(
feature = "blocking",
feature = "fs",
feature = "dns",
feature = "io-std",
feature = "rt-threaded",
)))]
$item
)*
}
}
macro_rules! cfg_dns {
($($item:item)*) => {
$( #[cfg(feature = "dns")] $item )*
}
}
macro_rules! cfg_fs {
($($item:item)*) => { $( #[cfg(feature = "fs")] $item )* }
}
macro_rules! cfg_io_blocking {
($($item:item)*) => {
$( #[cfg(any(feature = "io-std", feature = "fs"))] $item )*
}
}
macro_rules! cfg_io_driver {
($($item:item)*) => {
$( #[cfg(feature = "io-driver")] $item )*
}
}
macro_rules! cfg_not_io_driver {
($($item:item)*) => {
$( #[cfg(not(feature = "io-driver"))] $item )*
}
}
macro_rules! cfg_io_std {
($($item:item)*) => {
$( #[cfg(feature = "io-std")] $item )*
}
}
macro_rules! cfg_io_util {
($($item:item)*) => {
$( #[cfg(feature = "io-util")] $item )*
}
}
macro_rules! cfg_not_io_util {
($($item:item)*) => {
$( #[cfg(not(feature = "io-util"))] $item )*
}
}
macro_rules! cfg_loom {
($($item:item)*) => {
$( #[cfg(loom)] $item )*
}
}
macro_rules! cfg_not_loom {
($($item:item)*) => {
$( #[cfg(not(loom))] $item )*
}
}
macro_rules! cfg_macros {
($($item:item)*) => {
$(
#[cfg(feature = "macros")]
#[doc(inline)]
$item
)*
}
}
macro_rules! cfg_process {
($($item:item)*) => {
$(
#[cfg(feature = "process")]
#[cfg(not(loom))]
$item
)*
}
}
macro_rules! cfg_signal {
($($item:item)*) => {
$(
#[cfg(feature = "signal")]
#[cfg(not(loom))]
$item
)*
}
}
macro_rules! cfg_stream {
($($item:item)*) => {
$( #[cfg(feature = "stream")] $item )*
}
}
macro_rules! cfg_sync {
($($item:item)*) => {
$( #[cfg(feature = "sync")] $item )*
}
}
macro_rules! cfg_not_sync {
($($item:item)*) => {
$( #[cfg(not(feature = "sync"))] $item )*
}
}
macro_rules! cfg_rt_core {
($($item:item)*) => {
$( #[cfg(feature = "rt-core")] $item )*
}
}
macro_rules! cfg_not_rt_core {
($($item:item)*) => {
$( #[cfg(not(feature = "rt-core"))] $item )*
}
}
macro_rules! cfg_rt_threaded {
($($item:item)*) => {
$( #[cfg(feature = "rt-threaded")] $item )*
}
}
macro_rules! cfg_not_rt_threaded {
($($item:item)*) => {
$( #[cfg(not(feature = "rt-threaded"))] $item )*
}
}
macro_rules! cfg_tcp {
($($item:item)*) => {
$( #[cfg(feature = "tcp")] $item )*
}
}
macro_rules! cfg_test_util {
($($item:item)*) => {
$( #[cfg(feature = "test-util")] $item )*
}
}
macro_rules! cfg_not_test_util {
($($item:item)*) => {
$( #[cfg(not(feature = "test-util"))] $item )*
}
}
macro_rules! cfg_time {
($($item:item)*) => {
$( #[cfg(feature = "time")] $item )*
}
}
macro_rules! cfg_not_time {
($($item:item)*) => {
$( #[cfg(not(feature = "time"))] $item )*
}
}
macro_rules! cfg_udp {
($($item:item)*) => {
$( #[cfg(feature = "udp")] $item )*
}
}
macro_rules! cfg_uds {
($($item:item)*) => {
$( #[cfg(all(unix, feature = "uds"))] $item )*
}
}

12
tokio/src/macros/loom.rs Normal file
View File

@ -0,0 +1,12 @@
macro_rules! if_loom {
($($t:tt)*) => {{
#[cfg(loom)]
const LOOM: bool = true;
#[cfg(not(loom))]
const LOOM: bool = false;
if LOOM {
$($t)*
}
}}
}

17
tokio/src/macros/mod.rs Normal file
View File

@ -0,0 +1,17 @@
#![cfg_attr(not(feature = "full"), allow(unused_macros))]
#[macro_use]
#[cfg(test)]
mod assert;
#[macro_use]
mod cfg;
#[macro_use]
mod loom;
#[macro_use]
mod ready;
#[macro_use]
mod thread_local;

View File

@ -0,0 +1,8 @@
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
};
}

View File

@ -0,0 +1,4 @@
#[cfg(all(loom, test))]
macro_rules! thread_local {
($($tts:tt)+) => { loom::thread_local!{ $($tts)+ } }
}

View File

@ -16,6 +16,22 @@ pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {}
type ReadyFuture<T> = future::Ready<io::Result<T>>; type ReadyFuture<T> = future::Ready<io::Result<T>>;
// ===== impl &impl ToSocketAddrs =====
impl<T: ToSocketAddrs + ?Sized> ToSocketAddrs for &T {}
impl<T> sealed::ToSocketAddrsPriv for &T
where
T: sealed::ToSocketAddrsPriv + ?Sized,
{
type Iter = T::Iter;
type Future = T::Future;
fn to_socket_addrs(&self) -> Self::Future {
(**self).to_socket_addrs()
}
}
// ===== impl SocketAddr ===== // ===== impl SocketAddr =====
impl ToSocketAddrs for SocketAddr {} impl ToSocketAddrs for SocketAddr {}
@ -56,75 +72,6 @@ impl sealed::ToSocketAddrsPriv for SocketAddrV6 {
} }
} }
// ===== impl str =====
#[cfg(feature = "dns")]
impl ToSocketAddrs for str {}
#[cfg(feature = "dns")]
impl sealed::ToSocketAddrsPriv for str {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;
fn to_socket_addrs(&self) -> Self::Future {
use crate::blocking;
use sealed::MaybeReady;
// First check if the input parses as a socket address
let res: Result<SocketAddr, _> = self.parse();
if let Ok(addr) = res {
return MaybeReady::Ready(Some(addr));
}
// Run DNS lookup on the blocking pool
let s = self.to_owned();
MaybeReady::Blocking(blocking::spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&s)
}))
}
}
// ===== impl (&str, u16) =====
#[cfg(feature = "dns")]
impl ToSocketAddrs for (&str, u16) {}
#[cfg(feature = "dns")]
impl sealed::ToSocketAddrsPriv for (&str, u16) {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;
fn to_socket_addrs(&self) -> Self::Future {
use crate::blocking;
use sealed::MaybeReady;
let (host, port) = *self;
// try to parse the host as a regular IP address first
if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port);
let addr = SocketAddr::V4(addr);
return MaybeReady::Ready(Some(addr));
}
if let Ok(addr) = host.parse::<Ipv6Addr>() {
let addr = SocketAddrV6::new(addr, port, 0, 0);
let addr = SocketAddr::V6(addr);
return MaybeReady::Ready(Some(addr));
}
let host = host.to_owned();
MaybeReady::Blocking(blocking::spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port))
}))
}
}
// ===== impl (IpAddr, u16) ===== // ===== impl (IpAddr, u16) =====
impl ToSocketAddrs for (IpAddr, u16) {} impl ToSocketAddrs for (IpAddr, u16) {}
@ -167,34 +114,83 @@ impl sealed::ToSocketAddrsPriv for (Ipv6Addr, u16) {
} }
} }
// ===== impl String ===== cfg_dns! {
// ===== impl str =====
#[cfg(feature = "dns")] impl ToSocketAddrs for str {}
impl ToSocketAddrs for String {}
#[cfg(feature = "dns")] impl sealed::ToSocketAddrsPriv for str {
impl sealed::ToSocketAddrsPriv for String { type Iter = sealed::OneOrMore;
type Iter = <str as sealed::ToSocketAddrsPriv>::Iter; type Future = sealed::MaybeReady;
type Future = <str as sealed::ToSocketAddrsPriv>::Future;
fn to_socket_addrs(&self) -> Self::Future { fn to_socket_addrs(&self) -> Self::Future {
(&self[..]).to_socket_addrs() use crate::blocking;
use sealed::MaybeReady;
// First check if the input parses as a socket address
let res: Result<SocketAddr, _> = self.parse();
if let Ok(addr) = res {
return MaybeReady::Ready(Some(addr));
}
// Run DNS lookup on the blocking pool
let s = self.to_owned();
MaybeReady::Blocking(blocking::spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&s)
}))
}
} }
}
// ===== impl &impl ToSocketAddrs ===== // ===== impl (&str, u16) =====
impl<T: ToSocketAddrs + ?Sized> ToSocketAddrs for &T {} impl ToSocketAddrs for (&str, u16) {}
impl<T> sealed::ToSocketAddrsPriv for &T impl sealed::ToSocketAddrsPriv for (&str, u16) {
where type Iter = sealed::OneOrMore;
T: sealed::ToSocketAddrsPriv + ?Sized, type Future = sealed::MaybeReady;
{
type Iter = T::Iter;
type Future = T::Future;
fn to_socket_addrs(&self) -> Self::Future { fn to_socket_addrs(&self) -> Self::Future {
(**self).to_socket_addrs() use crate::blocking;
use sealed::MaybeReady;
let (host, port) = *self;
// try to parse the host as a regular IP address first
if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port);
let addr = SocketAddr::V4(addr);
return MaybeReady::Ready(Some(addr));
}
if let Ok(addr) = host.parse::<Ipv6Addr>() {
let addr = SocketAddrV6::new(addr, port, 0, 0);
let addr = SocketAddr::V6(addr);
return MaybeReady::Ready(Some(addr));
}
let host = host.to_owned();
MaybeReady::Blocking(blocking::spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port))
}))
}
}
// ===== impl String =====
impl ToSocketAddrs for String {}
impl sealed::ToSocketAddrsPriv for String {
type Iter = <str as sealed::ToSocketAddrsPriv>::Iter;
type Future = <str as sealed::ToSocketAddrsPriv>::Future;
fn to_socket_addrs(&self) -> Self::Future {
(&self[..]).to_socket_addrs()
}
} }
} }
@ -203,20 +199,18 @@ pub(crate) mod sealed {
//! part of the `ToSocketAddrs` public API. The details will change over //! part of the `ToSocketAddrs` public API. The details will change over
//! time. //! time.
#[cfg(feature = "dns")]
use crate::task::JoinHandle;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
#[cfg(feature = "dns")]
use std::option; cfg_dns! {
#[cfg(feature = "dns")] use crate::task::JoinHandle;
use std::pin::Pin;
#[cfg(feature = "dns")] use std::option;
use std::task::{Context, Poll}; use std::pin::Pin;
#[cfg(feature = "dns")] use std::task::{Context, Poll};
use std::vec; use std::vec;
}
#[doc(hidden)] #[doc(hidden)]
pub trait ToSocketAddrsPriv { pub trait ToSocketAddrsPriv {
@ -226,56 +220,54 @@ pub(crate) mod sealed {
fn to_socket_addrs(&self) -> Self::Future; fn to_socket_addrs(&self) -> Self::Future;
} }
#[doc(hidden)] cfg_dns! {
#[derive(Debug)] #[doc(hidden)]
#[cfg(feature = "dns")] #[derive(Debug)]
pub enum MaybeReady { pub enum MaybeReady {
Ready(Option<SocketAddr>), Ready(Option<SocketAddr>),
Blocking(JoinHandle<io::Result<vec::IntoIter<SocketAddr>>>), Blocking(JoinHandle<io::Result<vec::IntoIter<SocketAddr>>>),
} }
#[doc(hidden)] #[doc(hidden)]
#[derive(Debug)] #[derive(Debug)]
#[cfg(feature = "dns")] pub enum OneOrMore {
pub enum OneOrMore { One(option::IntoIter<SocketAddr>),
One(option::IntoIter<SocketAddr>), More(vec::IntoIter<SocketAddr>),
More(vec::IntoIter<SocketAddr>), }
}
#[cfg(feature = "dns")] impl Future for MaybeReady {
impl Future for MaybeReady { type Output = io::Result<OneOrMore>;
type Output = io::Result<OneOrMore>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match *self { match *self {
MaybeReady::Ready(ref mut i) => { MaybeReady::Ready(ref mut i) => {
let iter = OneOrMore::One(i.take().into_iter()); let iter = OneOrMore::One(i.take().into_iter());
Poll::Ready(Ok(iter)) Poll::Ready(Ok(iter))
} }
MaybeReady::Blocking(ref mut rx) => { MaybeReady::Blocking(ref mut rx) => {
let res = ready!(Pin::new(rx).poll(cx))?.map(OneOrMore::More); let res = ready!(Pin::new(rx).poll(cx))?.map(OneOrMore::More);
Poll::Ready(res) Poll::Ready(res)
}
} }
} }
} }
}
#[cfg(feature = "dns")] impl Iterator for OneOrMore {
impl Iterator for OneOrMore { type Item = SocketAddr;
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
match self { match self {
OneOrMore::One(i) => i.next(), OneOrMore::One(i) => i.next(),
OneOrMore::More(i) => i.next(), OneOrMore::More(i) => i.next(),
}
} }
}
fn size_hint(&self) -> (usize, Option<usize>) { fn size_hint(&self) -> (usize, Option<usize>) {
match self { match self {
OneOrMore::One(i) => i.size_hint(), OneOrMore::One(i) => i.size_hint(),
OneOrMore::More(i) => i.size_hint(), OneOrMore::More(i) => i.size_hint(),
}
} }
} }
} }

View File

@ -24,24 +24,22 @@
mod addr; mod addr;
pub use addr::ToSocketAddrs; pub use addr::ToSocketAddrs;
pub mod driver; cfg_io_driver! {
pub mod driver;
pub mod util;
}
pub mod util; cfg_tcp! {
pub mod tcp;
pub use tcp::{TcpListener, TcpStream};
}
#[cfg(feature = "tcp")] cfg_udp! {
pub mod tcp; pub mod udp;
pub use udp::UdpSocket;
}
#[cfg(feature = "tcp")] cfg_uds! {
pub use self::tcp::{TcpListener, TcpStream}; pub mod unix;
pub use unix::{UnixDatagram, UnixListener, UnixStream};
#[cfg(feature = "udp")] }
pub mod udp;
#[cfg(feature = "udp")]
pub use self::udp::UdpSocket;
#[cfg(all(unix, feature = "uds"))]
pub mod unix;
#[cfg(all(unix, feature = "uds"))]
pub use self::unix::{UnixDatagram, UnixListener, UnixStream};

View File

@ -12,6 +12,8 @@
//! The prelude may grow over time as additional items see ubiquitous use. //! The prelude may grow over time as additional items see ubiquitous use.
pub use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite}; pub use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
#[cfg(feature = "io-util")]
#[doc(no_inline)] cfg_io_util! {
pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _}; #[doc(no_inline)]
pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _};
}

View File

@ -116,7 +116,7 @@ mod imp;
mod kill; mod kill;
use crate::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use crate::io::{AsyncRead, AsyncWrite};
use crate::process::kill::Kill; use crate::process::kill::Kill;
use std::ffi::OsStr; use std::ffi::OsStr;
@ -771,7 +771,7 @@ impl Child {
async fn read_to_end<A: AsyncRead + Unpin>(io: Option<A>) -> io::Result<Vec<u8>> { async fn read_to_end<A: AsyncRead + Unpin>(io: Option<A>) -> io::Result<Vec<u8>> {
let mut vec = Vec::new(); let mut vec = Vec::new();
if let Some(mut io) = io { if let Some(mut io) = io {
AsyncReadExt::read_to_end(&mut io, &mut vec).await?; crate::io::util::read_to_end(&mut io, &mut vec).await?;
} }
Ok(vec) Ok(vec)
} }

View File

@ -3,10 +3,7 @@
//! shells. This isolates the complexity of dealing with conditional //! shells. This isolates the complexity of dealing with conditional
//! compilation. //! compilation.
pub(crate) use self::variant::*; cfg_blocking_impl! {
#[cfg(feature = "blocking")]
mod variant {
pub(crate) use crate::blocking::BlockingPool; pub(crate) use crate::blocking::BlockingPool;
pub(crate) use crate::blocking::Spawner; pub(crate) use crate::blocking::Spawner;
@ -17,8 +14,7 @@ mod variant {
} }
} }
#[cfg(not(feature = "blocking"))] cfg_not_blocking_impl! {
mod variant {
use crate::runtime::Builder; use crate::runtime::Builder;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@ -62,7 +62,7 @@ enum Kind {
Shell, Shell,
#[cfg(feature = "rt-core")] #[cfg(feature = "rt-core")]
Basic, Basic,
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
ThreadPool, ThreadPool,
} }
@ -132,7 +132,7 @@ impl Builder {
} }
/// Use a multi-threaded scheduler for executing tasks. /// Use a multi-threaded scheduler for executing tasks.
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
pub fn threaded_scheduler(&mut self) -> &mut Self { pub fn threaded_scheduler(&mut self) -> &mut Self {
self.kind = Kind::ThreadPool; self.kind = Kind::ThreadPool;
self self
@ -255,7 +255,7 @@ impl Builder {
Kind::Shell => self.build_shell_runtime(), Kind::Shell => self.build_shell_runtime(),
#[cfg(feature = "rt-core")] #[cfg(feature = "rt-core")]
Kind::Basic => self.build_basic_runtime(), Kind::Basic => self.build_basic_runtime(),
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
Kind::ThreadPool => self.build_threaded_runtime(), Kind::ThreadPool => self.build_threaded_runtime(),
} }
} }
@ -287,121 +287,127 @@ impl Builder {
blocking_pool, blocking_pool,
}) })
} }
}
#[cfg(feature = "rt-core")] cfg_rt_core! {
fn build_basic_runtime(&mut self) -> io::Result<Runtime> { impl Builder {
use crate::runtime::{BasicScheduler, Kind}; fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};
let clock = time::create_clock(); let clock = time::create_clock();
// Create I/O driver // Create I/O driver
let (io_driver, handle) = io::create_driver()?;
let io_handles = vec![handle];
let (driver, handle) = time::create_driver(io_driver, clock.clone());
let time_handles = vec![handle];
// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = BasicScheduler::new(driver);
let spawner = scheduler.spawner();
// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::Basic(scheduler),
handle: Handle {
kind: handle::Kind::Basic(spawner),
io_handles,
time_handles,
clock,
blocking_spawner,
},
blocking_pool,
})
}
#[cfg(feature = "rt-full")]
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Kind, ThreadPool};
use std::sync::Mutex;
let clock = time::create_clock();
let mut io_handles = Vec::new();
let mut time_handles = Vec::new();
let mut drivers = Vec::new();
for _ in 0..self.num_threads {
// Create I/O driver and handle
let (io_driver, handle) = io::create_driver()?; let (io_driver, handle) = io::create_driver()?;
io_handles.push(handle); let io_handles = vec![handle];
// Create a new timer. let (driver, handle) = time::create_driver(io_driver, clock.clone());
let (time_driver, handle) = time::create_driver(io_driver, clock.clone()); let time_handles = vec![handle];
time_handles.push(handle);
drivers.push(Mutex::new(Some(time_driver)));
}
// Create the blocking pool // And now put a single-threaded scheduler on top of the timer. When
let blocking_pool = blocking::create_blocking_pool(self); // there are no futures ready to do something, it'll let the timer or
let blocking_spawner = blocking_pool.spawner().clone(); // the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = BasicScheduler::new(driver);
let spawner = scheduler.spawner();
let scheduler = { // Blocking pool
let clock = clock.clone(); let blocking_pool = blocking::create_blocking_pool(self);
let io_handles = io_handles.clone(); let blocking_spawner = blocking_pool.spawner().clone();
let time_handles = time_handles.clone();
let after_start = self.after_start.clone(); Ok(Runtime {
let before_stop = self.before_stop.clone(); kind: Kind::Basic(scheduler),
handle: Handle {
let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| { kind: handle::Kind::Basic(spawner),
// Configure the I/O driver io_handles,
let _io = io::set_default(&io_handles[index]); time_handles,
clock,
// Configure time blocking_spawner,
time::with_default(&time_handles[index], &clock, || { },
// Call the start callback blocking_pool,
if let Some(after_start) = after_start.as_ref() {
after_start();
}
// Run the worker
next();
// Call the after call back
if let Some(before_stop) = before_stop.as_ref() {
before_stop();
}
})
}) })
as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>); }
}
}
ThreadPool::new( cfg_rt_threaded! {
self.num_threads, impl Builder {
blocking_pool.spawner().clone(), fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
around_worker, use crate::runtime::{Kind, ThreadPool};
move |index| drivers[index].lock().unwrap().take().unwrap(), use std::sync::Mutex;
)
};
let spawner = scheduler.spawner().clone(); let clock = time::create_clock();
Ok(Runtime { let mut io_handles = Vec::new();
kind: Kind::ThreadPool(scheduler), let mut time_handles = Vec::new();
handle: Handle { let mut drivers = Vec::new();
kind: handle::Kind::ThreadPool(spawner),
io_handles, for _ in 0..self.num_threads {
time_handles, // Create I/O driver and handle
clock, let (io_driver, handle) = io::create_driver()?;
blocking_spawner, io_handles.push(handle);
},
blocking_pool, // Create a new timer.
}) let (time_driver, handle) = time::create_driver(io_driver, clock.clone());
time_handles.push(handle);
drivers.push(Mutex::new(Some(time_driver)));
}
// Create the blocking pool
let blocking_pool = blocking::create_blocking_pool(self);
let blocking_spawner = blocking_pool.spawner().clone();
let scheduler = {
let clock = clock.clone();
let io_handles = io_handles.clone();
let time_handles = time_handles.clone();
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let around_worker = Arc::new(Box::new(move |index, next: &mut dyn FnMut()| {
// Configure the I/O driver
let _io = io::set_default(&io_handles[index]);
// Configure time
time::with_default(&time_handles[index], &clock, || {
// Call the start callback
if let Some(after_start) = after_start.as_ref() {
after_start();
}
// Run the worker
next();
// Call the after call back
if let Some(before_stop) = before_stop.as_ref() {
before_stop();
}
})
})
as Box<dyn Fn(usize, &mut dyn FnMut()) + Send + Sync>);
ThreadPool::new(
self.num_threads,
blocking_pool.spawner().clone(),
around_worker,
move |index| drivers[index].lock().unwrap().take().unwrap(),
)
};
let spawner = scheduler.spawner().clone();
Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
handle: Handle {
kind: handle::Kind::ThreadPool(spawner),
io_handles,
time_handles,
clock,
blocking_spawner,
},
blocking_pool,
})
}
} }
} }

View File

@ -1,7 +1,5 @@
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::fmt; use std::fmt;
#[cfg(feature = "rt-full")]
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
@ -48,58 +46,62 @@ pub(crate) fn try_enter() -> Option<Enter> {
// //
// This is hidden for a reason. Do not use without fully understanding // This is hidden for a reason. Do not use without fully understanding
// executors. Misuing can easily cause your program to deadlock. // executors. Misuing can easily cause your program to deadlock.
#[cfg(feature = "rt-full")] cfg_rt_threaded! {
pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R { #[cfg(feature = "blocking")]
// Reset in case the closure panics pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
struct Reset; // Reset in case the closure panics
impl Drop for Reset { struct Reset;
fn drop(&mut self) { impl Drop for Reset {
ENTERED.with(|c| { fn drop(&mut self) {
c.set(true); ENTERED.with(|c| {
}); c.set(true);
});
}
} }
ENTERED.with(|c| {
debug_assert!(c.get());
c.set(false);
});
let reset = Reset;
let ret = f();
::std::mem::forget(reset);
ENTERED.with(|c| {
assert!(!c.get(), "closure claimed permanent executor");
c.set(true);
});
ret
} }
ENTERED.with(|c| { impl Enter {
debug_assert!(c.get()); /// Blocks the thread on the specified future, returning the value with
c.set(false); /// which that future completes.
}); pub(crate) fn block_on<F>(&mut self, mut f: F) -> F::Output
where
F: std::future::Future,
{
use crate::runtime::park::{CachedParkThread, Park};
use std::pin::Pin;
use std::task::Context;
use std::task::Poll::Ready;
let reset = Reset; let mut park = CachedParkThread::new();
let ret = f(); let waker = park.unpark().into_waker();
::std::mem::forget(reset); let mut cx = Context::from_waker(&waker);
ENTERED.with(|c| { // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
assert!(!c.get(), "closure claimed permanent executor"); // no longer be accessed, making the pinning safe.
c.set(true); let mut f = unsafe { Pin::new_unchecked(&mut f) };
});
ret loop {
} if let Ready(v) = f.as_mut().poll(&mut cx) {
return v;
impl Enter { }
/// Blocks the thread on the specified future, returning the value with park.park().unwrap();
/// which that future completes.
#[cfg(feature = "rt-full")]
pub(crate) fn block_on<F: Future>(&mut self, mut f: F) -> F::Output {
use crate::runtime::park::{CachedParkThread, Park};
use std::pin::Pin;
use std::task::Context;
use std::task::Poll::Ready;
let mut park = CachedParkThread::new();
let waker = park.unpark().into_waker();
let mut cx = Context::from_waker(&waker);
// `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
// no longer be accessed, making the pinning safe.
let mut f = unsafe { Pin::new_unchecked(&mut f) };
loop {
if let Ready(v) = f.as_mut().poll(&mut cx) {
return v;
} }
park.park().unwrap();
} }
} }
} }

View File

@ -1,9 +1,6 @@
use crate::runtime::basic_scheduler; use crate::runtime::basic_scheduler;
use crate::task::JoinHandle; use crate::task::JoinHandle;
#[cfg(feature = "rt-full")]
use crate::runtime::thread_pool;
use std::cell::Cell; use std::cell::Cell;
use std::future::Future; use std::future::Future;
@ -16,7 +13,7 @@ enum State {
Basic(*const basic_scheduler::SchedulerPriv), Basic(*const basic_scheduler::SchedulerPriv),
// default executor is a thread pool instance. // default executor is a thread pool instance.
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
ThreadPool(*const thread_pool::Spawner), ThreadPool(*const thread_pool::Spawner),
} }
@ -34,7 +31,7 @@ where
T::Output: Send + 'static, T::Output: Send + 'static,
{ {
EXECUTOR.with(|current_executor| match current_executor.get() { EXECUTOR.with(|current_executor| match current_executor.get() {
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
State::ThreadPool(thread_pool_ptr) => { State::ThreadPool(thread_pool_ptr) => {
let thread_pool = unsafe { &*thread_pool_ptr }; let thread_pool = unsafe { &*thread_pool_ptr };
thread_pool.spawn(future) thread_pool.spawn(future)
@ -75,12 +72,15 @@ pub(super) fn basic_scheduler_is_current(basic_scheduler: &basic_scheduler::Sche
}) })
} }
#[cfg(feature = "rt-full")] cfg_rt_threaded! {
pub(super) fn with_thread_pool<F, R>(thread_pool: &thread_pool::Spawner, f: F) -> R use crate::runtime::thread_pool;
where
F: FnOnce() -> R, pub(super) fn with_thread_pool<F, R>(thread_pool: &thread_pool::Spawner, f: F) -> R
{ where
with_state(State::ThreadPool(thread_pool as *const _), f) F: FnOnce() -> R,
{
with_state(State::ThreadPool(thread_pool as *const _), f)
}
} }
fn with_state<F, R>(state: State, f: F) -> R fn with_state<F, R>(state: State, f: F) -> R

View File

@ -1,13 +1,15 @@
#[cfg(feature = "rt-core")]
use crate::runtime::basic_scheduler;
#[cfg(feature = "rt-full")]
use crate::runtime::thread_pool;
use crate::runtime::{blocking, io, time}; use crate::runtime::{blocking, io, time};
#[cfg(feature = "rt-core")]
use crate::task::JoinHandle;
#[cfg(feature = "rt-core")] cfg_rt_core! {
use std::future::Future; use crate::runtime::basic_scheduler;
use crate::task::JoinHandle;
use std::future::Future;
}
cfg_rt_threaded! {
use crate::runtime::thread_pool;
}
/// Handle to the runtime /// Handle to the runtime
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -31,57 +33,11 @@ pub(super) enum Kind {
Shell, Shell,
#[cfg(feature = "rt-core")] #[cfg(feature = "rt-core")]
Basic(basic_scheduler::Spawner), Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
ThreadPool(thread_pool::Spawner), ThreadPool(thread_pool::Spawner),
} }
impl Handle { impl Handle {
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// let handle = rt.handle();
///
/// // Spawn a future onto the runtime
/// handle.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
#[cfg(feature = "rt-core")]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match &self.kind {
Kind::Shell => panic!("spawning not enabled for runtime"),
#[cfg(feature = "rt-core")]
Kind::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-full")]
Kind::ThreadPool(spawner) => spawner.spawn(future),
}
}
/// Enter the runtime context /// Enter the runtime context
pub fn enter<F, R>(&self, f: F) -> R pub fn enter<F, R>(&self, f: F) -> R
where where
@ -94,9 +50,58 @@ impl Handle {
Kind::Shell => f(), Kind::Shell => f(),
#[cfg(feature = "rt-core")] #[cfg(feature = "rt-core")]
Kind::Basic(spawner) => spawner.enter(f), Kind::Basic(spawner) => spawner.enter(f),
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
Kind::ThreadPool(spawner) => spawner.enter(f), Kind::ThreadPool(spawner) => spawner.enter(f),
}) })
}) })
} }
} }
cfg_rt_core! {
impl Handle {
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// let handle = rt.handle();
///
/// // Spawn a future onto the runtime
/// handle.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match &self.kind {
Kind::Shell => panic!("spawning not enabled for runtime"),
#[cfg(feature = "rt-core")]
Kind::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(spawner) => spawner.spawn(future),
}
}
}
}

View File

@ -3,13 +3,10 @@
//! shells. This isolates the complexity of dealing with conditional //! shells. This isolates the complexity of dealing with conditional
//! compilation. //! compilation.
pub(crate) use self::variant::*;
/// Re-exported for convenience. /// Re-exported for convenience.
pub(crate) use std::io::Result; pub(crate) use std::io::Result;
#[cfg(feature = "io-driver")] cfg_io_driver! {
mod variant {
use crate::net::driver; use crate::net::driver;
use std::io; use std::io;
@ -38,8 +35,7 @@ mod variant {
} }
} }
#[cfg(not(feature = "io-driver"))] cfg_not_io_driver! {
mod variant {
use crate::runtime::park::ParkThread; use crate::runtime::park::ParkThread;
use std::io; use std::io;

View File

@ -132,10 +132,10 @@
#[macro_use] #[macro_use]
mod tests; mod tests;
#[cfg(feature = "rt-core")] cfg_rt_core! {
mod basic_scheduler; mod basic_scheduler;
#[cfg(feature = "rt-core")] use basic_scheduler::BasicScheduler;
use self::basic_scheduler::BasicScheduler; }
mod blocking; mod blocking;
use blocking::BlockingPool; use blocking::BlockingPool;
@ -146,10 +146,10 @@ pub use self::builder::Builder;
pub(crate) mod enter; pub(crate) mod enter;
use self::enter::enter; use self::enter::enter;
#[cfg(feature = "rt-core")] cfg_rt_core! {
mod global; mod global;
#[cfg(feature = "rt-core")] pub(crate) use global::spawn;
pub(crate) use self::global::spawn; }
mod handle; mod handle;
pub use self::handle::Handle; pub use self::handle::Handle;
@ -164,13 +164,14 @@ use self::shell::Shell;
mod time; mod time;
#[cfg(feature = "rt-full")] cfg_rt_threaded! {
pub(crate) mod thread_pool; pub(crate) mod thread_pool;
#[cfg(feature = "rt-full")] use self::thread_pool::ThreadPool;
use self::thread_pool::ThreadPool; }
#[cfg(feature = "rt-core")] cfg_rt_core! {
use crate::task::JoinHandle; use crate::task::JoinHandle;
}
use std::future::Future; use std::future::Future;
@ -223,7 +224,7 @@ enum Kind {
Basic(BasicScheduler<time::Driver>), Basic(BasicScheduler<time::Driver>),
/// Execute tasks across multiple threads. /// Execute tasks across multiple threads.
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
ThreadPool(ThreadPool), ThreadPool(ThreadPool),
} }
@ -254,10 +255,10 @@ impl Runtime {
/// ///
/// [mod]: index.html /// [mod]: index.html
pub fn new() -> io::Result<Self> { pub fn new() -> io::Result<Self> {
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
let ret = Builder::new().threaded_scheduler().build(); let ret = Builder::new().threaded_scheduler().build();
#[cfg(all(not(feature = "rt-full"), feature = "rt-core"))] #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))]
let ret = Builder::new().basic_scheduler().build(); let ret = Builder::new().basic_scheduler().build();
#[cfg(not(feature = "rt-core"))] #[cfg(not(feature = "rt-core"))]
@ -304,7 +305,7 @@ impl Runtime {
{ {
match &self.kind { match &self.kind {
Kind::Shell(_) => panic!("task execution disabled"), Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.spawn(future), Kind::ThreadPool(exec) => exec.spawn(future),
Kind::Basic(exec) => exec.spawn(future), Kind::Basic(exec) => exec.spawn(future),
} }
@ -330,7 +331,7 @@ impl Runtime {
Kind::Shell(exec) => exec.block_on(future), Kind::Shell(exec) => exec.block_on(future),
#[cfg(feature = "rt-core")] #[cfg(feature = "rt-core")]
Kind::Basic(exec) => exec.block_on(future), Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.block_on(future), Kind::ThreadPool(exec) => exec.block_on(future),
}) })
} }

View File

@ -45,7 +45,7 @@
//! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html //! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html
mod thread; mod thread;
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
pub(crate) use self::thread::CachedParkThread; pub(crate) use self::thread::CachedParkThread;
#[cfg(not(feature = "io-driver"))] #[cfg(not(feature = "io-driver"))]
pub(crate) use self::thread::ParkThread; pub(crate) use self::thread::ParkThread;

View File

@ -168,7 +168,7 @@ impl CachedParkThread {
/// ///
/// This type cannot be moved to other threads, so it should be created on /// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park. /// the thread that the caller intends to park.
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
pub(crate) fn new() -> CachedParkThread { pub(crate) fn new() -> CachedParkThread {
CachedParkThread { CachedParkThread {
_anchor: PhantomData, _anchor: PhantomData,
@ -217,7 +217,7 @@ impl Unpark for UnparkThread {
} }
} }
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
mod waker { mod waker {
use super::{Inner, UnparkThread}; use super::{Inner, UnparkThread};
use crate::loom::sync::Arc; use crate::loom::sync::Arc;

View File

@ -22,7 +22,9 @@ mod shutdown;
mod worker; mod worker;
pub(crate) use worker::block_in_place; cfg_blocking! {
pub(crate) use worker::block_in_place;
}
/// Unit tests /// Unit tests
#[cfg(test)] #[cfg(test)]

View File

@ -15,24 +15,26 @@ thread_local! {
static ON_BLOCK: Cell<Option<*const dyn Fn()>> = Cell::new(None) static ON_BLOCK: Cell<Option<*const dyn Fn()>> = Cell::new(None)
} }
pub(crate) fn block_in_place<F, R>(f: F) -> R cfg_blocking! {
where pub(crate) fn block_in_place<F, R>(f: F) -> R
F: FnOnce() -> R, where
{ F: FnOnce() -> R,
// Make the current worker give away its Worker to another thread so that we can safely block {
// this one without preventing progress on other futures the worker owns. // Make the current worker give away its Worker to another thread so that we can safely block
ON_BLOCK.with(|ob| { // this one without preventing progress on other futures the worker owns.
let allow_blocking = ob ON_BLOCK.with(|ob| {
.get() let allow_blocking = ob
.expect("can only call blocking when on Tokio runtime"); .get()
.expect("can only call blocking when on Tokio runtime");
// This is safe, because ON_BLOCK was set from an &mut dyn FnMut in the worker that wraps // This is safe, because ON_BLOCK was set from an &mut dyn FnMut in the worker that wraps
// the worker's operation, and is unset just prior to when the FnMut is dropped. // the worker's operation, and is unset just prior to when the FnMut is dropped.
let allow_blocking = unsafe { &*allow_blocking }; let allow_blocking = unsafe { &*allow_blocking };
allow_blocking(); allow_blocking();
f() f()
}) })
}
} }
pub(crate) struct Worker<P: Park + 'static> { pub(crate) struct Worker<P: Park + 'static> {

View File

@ -3,10 +3,7 @@
//! shells. This isolates the complexity of dealing with conditional //! shells. This isolates the complexity of dealing with conditional
//! compilation. //! compilation.
pub(crate) use self::variant::*; cfg_time! {
#[cfg(feature = "time")]
mod variant {
use crate::runtime::io; use crate::runtime::io;
use crate::time::{self, driver}; use crate::time::{self, driver};
@ -35,8 +32,7 @@ mod variant {
} }
} }
#[cfg(not(feature = "time"))] cfg_not_time! {
mod variant {
use crate::runtime::io; use crate::runtime::io;
pub(crate) type Clock = (); pub(crate) type Clock = ();

View File

@ -13,44 +13,41 @@
//! - [watch](watch/index.html), a single-producer, multi-consumer channel that //! - [watch](watch/index.html), a single-producer, multi-consumer channel that
//! only stores the **most recently** sent value. //! only stores the **most recently** sent value.
macro_rules! debug { cfg_sync! {
($($t:tt)*) => { mod barrier;
if false { pub use barrier::{Barrier, BarrierWaitResult};
println!($($t)*);
} pub mod mpsc;
mod mutex;
pub use mutex::{Mutex, MutexGuard};
pub mod oneshot;
pub(crate) mod semaphore;
mod task;
pub(crate) use task::AtomicWaker;
pub mod watch;
}
cfg_not_sync! {
cfg_atomic_waker! {
mod task;
pub(crate) use task::AtomicWaker;
}
cfg_rt_threaded! {
pub(crate) mod oneshot;
}
cfg_signal! {
pub(crate) mod mpsc;
pub(crate) mod semaphore;
} }
} }
macro_rules! if_loom {
($($t:tt)*) => {{
#[cfg(loom)]
const LOOM: bool = true;
#[cfg(not(loom))]
const LOOM: bool = false;
if LOOM {
$($t)*
}
}}
}
mod barrier;
pub use barrier::{Barrier, BarrierWaitResult};
pub mod mpsc;
mod mutex;
pub use mutex::{Mutex, MutexGuard};
pub mod oneshot;
pub mod semaphore;
mod task;
pub(crate) use task::AtomicWaker;
pub mod watch;
/// Unit tests /// Unit tests
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;

View File

@ -161,12 +161,13 @@ impl<T> Receiver<T> {
impl<T> Unpin for Receiver<T> {} impl<T> Unpin for Receiver<T> {}
#[cfg(feature = "stream")] cfg_stream! {
impl<T> futures_core::Stream for Receiver<T> { impl<T> futures_core::Stream for Receiver<T> {
type Item = T; type Item = T;
fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.poll_recv(cx) self.poll_recv(cx)
}
} }
} }

View File

@ -299,12 +299,6 @@ where
// second time here. // second time here.
try_recv!(); try_recv!();
debug!(
"recv; rx_closed = {:?}; is_idle = {:?}",
rx_fields.rx_closed,
self.inner.semaphore.is_idle()
);
if rx_fields.rx_closed && self.inner.semaphore.is_idle() { if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
Ready(None) Ready(None)
} else { } else {

View File

@ -169,7 +169,6 @@ impl<T> Tx<T> {
} }
pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) { pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
debug!("+ reclaim_block({:p})", block);
// The block has been removed from the linked list and ownership // The block has been removed from the linked list and ownership
// is reclaimed. // is reclaimed.
// //
@ -206,7 +205,6 @@ impl<T> Tx<T> {
} }
if !reused { if !reused {
debug!(" + block freed {:p}", block);
let _ = Box::from_raw(block.as_ptr()); let _ = Box::from_raw(block.as_ptr());
} }
} }
@ -226,7 +224,6 @@ impl<T> Rx<T> {
pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> { pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
// Advance `head`, if needed // Advance `head`, if needed
if !self.try_advancing_head() { if !self.try_advancing_head() {
debug!("+ !self.try_advancing_head() -> false");
return None; return None;
} }
@ -276,8 +273,6 @@ impl<T> Rx<T> {
} }
fn reclaim_blocks(&mut self, tx: &Tx<T>) { fn reclaim_blocks(&mut self, tx: &Tx<T>) {
debug!("+ reclaim_blocks()");
while self.free_head != self.head { while self.free_head != self.head {
unsafe { unsafe {
// Get a handle to the block that will be freed and update // Get a handle to the block that will be freed and update
@ -316,7 +311,6 @@ impl<T> Rx<T> {
/// Effectively `Drop` all the blocks. Should only be called once, when /// Effectively `Drop` all the blocks. Should only be called once, when
/// the list is dropping. /// the list is dropping.
pub(super) unsafe fn free_blocks(&mut self) { pub(super) unsafe fn free_blocks(&mut self) {
debug!("+ free_blocks()");
debug_assert_ne!(self.free_head, NonNull::dangling()); debug_assert_ne!(self.free_head, NonNull::dangling());
let mut cur = Some(self.free_head); let mut cur = Some(self.free_head);
@ -331,7 +325,6 @@ impl<T> Rx<T> {
while let Some(block) = cur { while let Some(block) = cur {
cur = block.as_ref().load_next(Relaxed); cur = block.as_ref().load_next(Relaxed);
debug!(" + free: block = {:p}", block);
drop(Box::from_raw(block.as_ptr())); drop(Box::from_raw(block.as_ptr()));
} }
} }

View File

@ -1,3 +1,5 @@
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
//! A multi-producer, single-consumer queue for sending values across //! A multi-producer, single-consumer queue for sending values across
//! asynchronous tasks. //! asynchronous tasks.
//! //!

View File

@ -1,3 +1,5 @@
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
//! A channel for sending a single message between asynchronous tasks. //! A channel for sending a single message between asynchronous tasks.
use crate::loom::cell::CausalCell; use crate::loom::cell::CausalCell;

View File

@ -1,3 +1,5 @@
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
//! Thread-safe, asynchronous counting semaphore. //! Thread-safe, asynchronous counting semaphore.
//! //!
//! A `Semaphore` instance holds a set of permits. Permits are used to //! A `Semaphore` instance holds a set of permits. Permits are used to
@ -24,7 +26,7 @@ use std::task::{Context, Poll};
use std::usize; use std::usize;
/// Futures-aware semaphore. /// Futures-aware semaphore.
pub struct Semaphore { pub(crate) struct Semaphore {
/// Tracks both the waiter queue tail pointer and the number of remaining /// Tracks both the waiter queue tail pointer and the number of remaining
/// permits. /// permits.
state: AtomicUsize, state: AtomicUsize,
@ -51,18 +53,18 @@ pub struct Semaphore {
/// is the user's responsibility to ensure that `Permit::release` is called /// is the user's responsibility to ensure that `Permit::release` is called
/// before dropping the permit. /// before dropping the permit.
#[derive(Debug)] #[derive(Debug)]
pub struct Permit { pub(crate) struct Permit {
waiter: Option<Arc<WaiterNode>>, waiter: Option<Arc<WaiterNode>>,
state: PermitState, state: PermitState,
} }
/// Error returned by `Permit::poll_acquire`. /// Error returned by `Permit::poll_acquire`.
#[derive(Debug)] #[derive(Debug)]
pub struct AcquireError(()); pub(crate) struct AcquireError(());
/// Error returned by `Permit::try_acquire`. /// Error returned by `Permit::try_acquire`.
#[derive(Debug)] #[derive(Debug)]
pub struct TryAcquireError { pub(crate) struct TryAcquireError {
kind: ErrorKind, kind: ErrorKind,
} }
@ -150,7 +152,7 @@ impl Semaphore {
/// # Panics /// # Panics
/// ///
/// Panics if `permits` is zero. /// Panics if `permits` is zero.
pub fn new(permits: usize) -> Semaphore { pub(crate) fn new(permits: usize) -> Semaphore {
let stub = Box::new(WaiterNode::new()); let stub = Box::new(WaiterNode::new());
let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap(); let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();
@ -168,7 +170,7 @@ impl Semaphore {
} }
/// Returns the current number of available permits /// Returns the current number of available permits
pub fn available_permits(&self) -> usize { pub(crate) fn available_permits(&self) -> usize {
let curr = SemState::load(&self.state, Acquire); let curr = SemState::load(&self.state, Acquire);
curr.available_permits() curr.available_permits()
} }
@ -181,8 +183,6 @@ impl Semaphore {
// Load the current state // Load the current state
let mut curr = SemState::load(&self.state, Acquire); let mut curr = SemState::load(&self.state, Acquire);
debug!(" + poll_permit; sem-state = {:?}", curr);
// Tracks a *mut WaiterNode representing an Arc clone. // Tracks a *mut WaiterNode representing an Arc clone.
// //
// This avoids having to bump the ref count unless required. // This avoids having to bump the ref count unless required.
@ -210,8 +210,6 @@ impl Semaphore {
} }
if !next.acquire_permit(&self.stub) { if !next.acquire_permit(&self.stub) {
debug!(" + poll_permit -- no permits");
debug_assert!(curr.waiter().is_some()); debug_assert!(curr.waiter().is_some());
if maybe_strong.is_none() { if maybe_strong.is_none() {
@ -223,10 +221,7 @@ impl Semaphore {
waiter.register(cx); waiter.register(cx);
debug!(" + poll_permit -- to_queued_waiting");
if !waiter.to_queued_waiting() { if !waiter.to_queued_waiting() {
debug!(" + poll_permit; waiter already queued");
// The node is alrady queued, there is no further work // The node is alrady queued, there is no further work
// to do. // to do.
return Pending; return Pending;
@ -243,14 +238,11 @@ impl Semaphore {
next.set_waiter(maybe_strong.unwrap()); next.set_waiter(maybe_strong.unwrap());
} }
debug!(" + poll_permit -- pre-CAS; next = {:?}", next);
debug_assert_ne!(curr.0, 0); debug_assert_ne!(curr.0, 0);
debug_assert_ne!(next.0, 0); debug_assert_ne!(next.0, 0);
match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
Ok(_) => { Ok(_) => {
debug!(" + poll_permit -- CAS ok");
match curr.waiter() { match curr.waiter() {
Some(prev_waiter) => { Some(prev_waiter) => {
let waiter = maybe_strong.unwrap(); let waiter = maybe_strong.unwrap();
@ -260,13 +252,9 @@ impl Semaphore {
prev_waiter.as_ref().next.store(waiter.as_ptr(), Release); prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
} }
debug!(" + poll_permit -- waiter pushed");
return Pending; return Pending;
} }
None => { None => {
debug!(" + poll_permit -- permit acquired");
undo_strong!(); undo_strong!();
return Ready(Ok(())); return Ready(Ok(()));
@ -282,15 +270,11 @@ impl Semaphore {
/// Close the semaphore. This prevents the semaphore from issuing new /// Close the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters. /// permits and notifies all pending waiters.
pub fn close(&self) { pub(crate) fn close(&self) {
debug!("+ Semaphore::close");
// Acquire the `rx_lock`, setting the "closed" flag on the lock. // Acquire the `rx_lock`, setting the "closed" flag on the lock.
let prev = self.rx_lock.fetch_or(1, AcqRel); let prev = self.rx_lock.fetch_or(1, AcqRel);
debug!(" + close -- rx_lock.fetch_add(1)");
if prev != 0 { if prev != 0 {
debug!("+ close -- locked; prev = {}", prev);
// Another thread has the lock and will be responsible for notifying // Another thread has the lock and will be responsible for notifying
// pending waiters. // pending waiters.
return; return;
@ -300,9 +284,7 @@ impl Semaphore {
} }
/// Add `n` new permits to the semaphore. /// Add `n` new permits to the semaphore.
pub fn add_permits(&self, n: usize) { pub(crate) fn add_permits(&self, n: usize) {
debug!(" + add_permits; n = {}", n);
if n == 0 { if n == 0 {
return; return;
} }
@ -310,10 +292,8 @@ impl Semaphore {
// TODO: Handle overflow. A panic is not sufficient, the process must // TODO: Handle overflow. A panic is not sufficient, the process must
// abort. // abort.
let prev = self.rx_lock.fetch_add(n << 1, AcqRel); let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n);
if prev != 0 { if prev != 0 {
debug!(" + add_permits -- locked; prev = {}", prev);
// Another thread has the lock and will be responsible for notifying // Another thread has the lock and will be responsible for notifying
// pending waiters. // pending waiters.
return; return;
@ -324,11 +304,6 @@ impl Semaphore {
fn add_permits_locked(&self, mut rem: usize, mut closed: bool) { fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
while rem > 0 || closed { while rem > 0 || closed {
debug!(
" + add_permits_locked -- iter; rem = {}; closed = {:?}",
rem, closed
);
if closed { if closed {
SemState::fetch_set_closed(&self.state, AcqRel); SemState::fetch_set_closed(&self.state, AcqRel);
} }
@ -340,28 +315,16 @@ impl Semaphore {
let actual = if closed { let actual = if closed {
let actual = self.rx_lock.fetch_sub(n | 1, AcqRel); let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
debug!(
" + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}",
n, actual
);
closed = false; closed = false;
actual actual
} else { } else {
let actual = self.rx_lock.fetch_sub(n, AcqRel); let actual = self.rx_lock.fetch_sub(n, AcqRel);
debug!(
" + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}",
n, actual
);
closed = actual & 1 == 1; closed = actual & 1 == 1;
actual actual
}; };
rem = (actual >> 1) - rem; rem = (actual >> 1) - rem;
} }
debug!(" + add_permits; done");
} }
/// Release a specific amount of permits to the semaphore /// Release a specific amount of permits to the semaphore
@ -377,11 +340,8 @@ impl Semaphore {
} }
}; };
debug!(" + release_n -- notify");
if waiter.notify(closed) { if waiter.notify(closed) {
n = n.saturating_sub(1); n = n.saturating_sub(1);
debug!(" + release_n -- dec");
} }
} }
} }
@ -392,8 +352,6 @@ impl Semaphore {
/// there are no more waiters to pop, `rem` is used to set the available /// there are no more waiters to pop, `rem` is used to set the available
/// permits. /// permits.
fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> { fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
debug!(" + pop; rem = {}", rem);
'outer: loop { 'outer: loop {
unsafe { unsafe {
let mut head = self.head.with(|head| *head); let mut head = self.head.with(|head| *head);
@ -402,8 +360,6 @@ impl Semaphore {
let stub = self.stub(); let stub = self.stub();
if head == stub { if head == stub {
debug!(" + pop; head == stub");
let next = match NonNull::new(next_ptr) { let next = match NonNull::new(next_ptr) {
Some(next) => next, Some(next) => next,
None => { None => {
@ -429,7 +385,6 @@ impl Semaphore {
loop { loop {
if curr.has_waiter(&self.stub) { if curr.has_waiter(&self.stub) {
// Inconsistent // Inconsistent
debug!(" + pop; inconsistent 1");
thread::yield_now(); thread::yield_now();
continue 'outer; continue 'outer;
} }
@ -456,8 +411,6 @@ impl Semaphore {
} }
}; };
debug!(" + pop; got next waiter");
self.head.with_mut(|head| *head = next); self.head.with_mut(|head| *head = next);
head = next; head = next;
next_ptr = next.as_ref().next.load(Acquire); next_ptr = next.as_ref().next.load(Acquire);
@ -476,7 +429,6 @@ impl Semaphore {
if tail != head { if tail != head {
// Inconsistent // Inconsistent
debug!(" + pop; inconsistent 2");
thread::yield_now(); thread::yield_now();
continue 'outer; continue 'outer;
} }
@ -492,7 +444,6 @@ impl Semaphore {
} }
// Inconsistent state, loop // Inconsistent state, loop
debug!(" + pop; inconsistent 3");
thread::yield_now(); thread::yield_now();
} }
} }
@ -549,16 +500,7 @@ impl Permit {
/// Create a new `Permit`. /// Create a new `Permit`.
/// ///
/// The permit begins in the "unacquired" state. /// The permit begins in the "unacquired" state.
/// pub(crate) fn new() -> Permit {
/// # Examples
///
/// ```
/// use tokio::sync::semaphore::Permit;
///
/// let permit = Permit::new();
/// assert!(!permit.is_acquired());
/// ```
pub fn new() -> Permit {
Permit { Permit {
waiter: None, waiter: None,
state: PermitState::Idle, state: PermitState::Idle,
@ -566,13 +508,13 @@ impl Permit {
} }
/// Returns true if the permit has been acquired /// Returns true if the permit has been acquired
pub fn is_acquired(&self) -> bool { pub(crate) fn is_acquired(&self) -> bool {
self.state == PermitState::Acquired self.state == PermitState::Acquired
} }
/// Try to acquire the permit. If no permits are available, the current task /// Try to acquire the permit. If no permits are available, the current task
/// is notified once a new permit becomes available. /// is notified once a new permit becomes available.
pub fn poll_acquire( pub(crate) fn poll_acquire(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
semaphore: &Semaphore, semaphore: &Semaphore,
@ -607,7 +549,7 @@ impl Permit {
} }
/// Try to acquire the permit. /// Try to acquire the permit.
pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { pub(crate) fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
match self.state { match self.state {
PermitState::Idle => {} PermitState::Idle => {}
PermitState::Waiting => { PermitState::Waiting => {
@ -635,7 +577,7 @@ impl Permit {
} }
/// Release a permit back to the semaphore /// Release a permit back to the semaphore
pub fn release(&mut self, semaphore: &Semaphore) { pub(crate) fn release(&mut self, semaphore: &Semaphore) {
if self.forget2() { if self.forget2() {
semaphore.add_permits(1); semaphore.add_permits(1);
} }
@ -648,7 +590,7 @@ impl Permit {
/// ///
/// Repeatedly calling `forget` without associated calls to `add_permit` /// Repeatedly calling `forget` without associated calls to `add_permit`
/// will result in the semaphore losing all permits. /// will result in the semaphore losing all permits.
pub fn forget(&mut self) { pub(crate) fn forget(&mut self) {
self.forget2(); self.forget2();
} }
@ -711,7 +653,7 @@ impl TryAcquireError {
} }
/// Returns true if the error was caused by a closed semaphore. /// Returns true if the error was caused by a closed semaphore.
pub fn is_closed(&self) -> bool { pub(crate) fn is_closed(&self) -> bool {
match self.kind { match self.kind {
ErrorKind::Closed => true, ErrorKind::Closed => true,
_ => false, _ => false,
@ -720,7 +662,7 @@ impl TryAcquireError {
/// Returns true if the error was caused by calling `try_acquire` on a /// Returns true if the error was caused by calling `try_acquire` on a
/// semaphore with no available permits. /// semaphore with no available permits.
pub fn is_no_permits(&self) -> bool { pub(crate) fn is_no_permits(&self) -> bool {
match self.kind { match self.kind {
ErrorKind::NoPermits => true, ErrorKind::NoPermits => true,
_ => false, _ => false,
@ -857,14 +799,10 @@ impl WaiterNode {
match next.compare_exchange(&self.state, curr, AcqRel, Acquire) { match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
Ok(_) => match curr { Ok(_) => match curr {
QueuedWaiting => { QueuedWaiting => {
debug!(" + notify -- task notified");
self.waker.wake(); self.waker.wake();
return true; return true;
} }
other => { _ => return false,
debug!(" + notify -- not notified; state = {:?}", other);
return false;
}
}, },
Err(actual) => curr = actual, Err(actual) => curr = actual,
} }
@ -1021,7 +959,6 @@ impl SemState {
/// Load the state from an AtomicUsize. /// Load the state from an AtomicUsize.
fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState { fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
let value = cell.load(ordering); let value = cell.load(ordering);
debug!(" + SemState::load; value = {}", value);
SemState(value) SemState(value)
} }
@ -1044,13 +981,6 @@ impl SemState {
let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure); let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure);
debug!(
" + SemState::compare_exchange; prev = {}; next = {}; result = {:?}",
prev.to_usize(),
self.to_usize(),
res
);
res.map(SemState).map_err(SemState) res.map(SemState).map_err(SemState)
} }

View File

@ -170,10 +170,8 @@ impl AtomicWaker {
where where
W: WakerRef, W: WakerRef,
{ {
debug!(" + register_task");
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
WAITING => { WAITING => {
debug!(" + WAITING");
unsafe { unsafe {
// Locked acquired, update the waker cell // Locked acquired, update the waker cell
self.waker.with_mut(|t| *t = Some(waker.into_waker())); self.waker.with_mut(|t| *t = Some(waker.into_waker()));
@ -214,7 +212,6 @@ impl AtomicWaker {
} }
} }
WAKING => { WAKING => {
debug!(" + WAKING");
// Currently in the process of waking the task, i.e., // Currently in the process of waking the task, i.e.,
// `wake` is currently being called on the old waker. // `wake` is currently being called on the old waker.
// So, we call wake on the new waker. // So, we call wake on the new waker.
@ -240,7 +237,6 @@ impl AtomicWaker {
/// ///
/// If `register` has not been called yet, then this does nothing. /// If `register` has not been called yet, then this does nothing.
pub(crate) fn wake(&self) { pub(crate) fn wake(&self) {
debug!(" + wake");
if let Some(waker) = self.take_waker() { if let Some(waker) = self.take_waker() {
waker.wake(); waker.wake();
} }
@ -249,24 +245,20 @@ impl AtomicWaker {
/// Attempts to take the `Waker` value out of the `AtomicWaker` with the /// Attempts to take the `Waker` value out of the `AtomicWaker` with the
/// intention that the caller will wake the task later. /// intention that the caller will wake the task later.
pub(crate) fn take_waker(&self) -> Option<Waker> { pub(crate) fn take_waker(&self) -> Option<Waker> {
debug!(" + take_waker");
// AcqRel ordering is used in order to acquire the value of the `waker` // AcqRel ordering is used in order to acquire the value of the `waker`
// cell as well as to establish a `release` ordering with whatever // cell as well as to establish a `release` ordering with whatever
// memory the `AtomicWaker` is associated with. // memory the `AtomicWaker` is associated with.
match self.state.fetch_or(WAKING, AcqRel) { match self.state.fetch_or(WAKING, AcqRel) {
WAITING => { WAITING => {
debug!(" + WAITING");
// The waking lock has been acquired. // The waking lock has been acquired.
let waker = unsafe { self.waker.with_mut(|t| (*t).take()) }; let waker = unsafe { self.waker.with_mut(|t| (*t).take()) };
// Release the lock // Release the lock
self.state.fetch_and(!WAKING, Release); self.state.fetch_and(!WAKING, Release);
debug!(" + Done taking");
waker waker
} }
state => { state => {
debug!(" + state = {:?}", state);
// There is a concurrent thread currently updating the // There is a concurrent thread currently updating the
// associated waker. // associated waker.
// //

View File

@ -21,17 +21,14 @@ fn smoke() {
for i in 0..NUM_MSG { for i in 0..NUM_MSG {
tx.push((th, i)); tx.push((th, i));
} }
debug!(" + tx thread done");
}); });
} }
let mut next = vec![0; NUM_TX]; let mut next = vec![0; NUM_TX];
loop { loop {
debug!(" + rx.pop()");
match rx.pop(&tx) { match rx.pop(&tx) {
Some(Value((th, v))) => { Some(Value((th, v))) => {
debug!(" + pop() -> Some(Value({}))", v);
assert_eq!(v, next[th]); assert_eq!(v, next[th]);
next[th] += 1; next[th] += 1;
@ -43,7 +40,6 @@ fn smoke() {
panic!(); panic!();
} }
None => { None => {
debug!(" + pop() -> None");
thread::yield_now(); thread::yield_now();
} }
} }

View File

@ -1,17 +1,12 @@
#[cfg(not(loom))] cfg_not_loom! {
mod atomic_waker; mod atomic_waker;
mod semaphore;
}
#[cfg(loom)] cfg_loom! {
mod loom_atomic_waker; mod loom_atomic_waker;
mod loom_list;
#[cfg(loom)] mod loom_mpsc;
mod loom_list; mod loom_oneshot;
mod loom_semaphore;
#[cfg(loom)] }
mod loom_mpsc;
#[cfg(loom)]
mod loom_oneshot;
#[cfg(loom)]
mod loom_semaphore;

View File

@ -1,6 +1,4 @@
#![warn(rust_2018_idioms)] use crate::sync::semaphore::{Permit, Semaphore};
use tokio::sync::semaphore::{Permit, Semaphore};
use tokio_test::task; use tokio_test::task;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok}; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};

View File

@ -1,62 +1,65 @@
use crate::blocking; use crate::blocking;
use crate::task::JoinHandle; use crate::task::JoinHandle;
/// Run the provided blocking function without blocking the executor. cfg_rt_threaded! {
/// /// Run the provided blocking function without blocking the executor.
/// In general, issuing a blocking call or performing a lot of compute in a ///
/// future without yielding is not okay, as it may prevent the executor from /// In general, issuing a blocking call or performing a lot of compute in a
/// driving other futures forward. If you run a closure through this method, /// future without yielding is not okay, as it may prevent the executor from
/// the current executor thread will relegate all its executor duties to another /// driving other futures forward. If you run a closure through this method,
/// (possibly new) thread, and only then poll the task. Note that this requires /// the current executor thread will relegate all its executor duties to another
/// additional synchronization. /// (possibly new) thread, and only then poll the task. Note that this requires
/// /// additional synchronization.
/// # Examples ///
/// /// # Examples
/// ``` ///
/// use tokio::task; /// ```
/// /// use tokio::task;
/// # async fn docs() { ///
/// task::block_in_place(move || { /// # async fn docs() {
/// // do some compute-heavy work or call synchronous code /// task::block_in_place(move || {
/// }); /// // do some compute-heavy work or call synchronous code
/// # } /// });
/// ``` /// # }
#[cfg(feature = "rt-full")] /// ```
pub fn block_in_place<F, R>(f: F) -> R pub fn block_in_place<F, R>(f: F) -> R
where where
F: FnOnce() -> R, F: FnOnce() -> R,
{ {
use crate::runtime::{enter, thread_pool}; use crate::runtime::{enter, thread_pool};
enter::exit(|| thread_pool::block_in_place(f)) enter::exit(|| thread_pool::block_in_place(f))
}
} }
/// Run the provided closure on a thread where blocking is acceptable. cfg_blocking! {
/// /// Run the provided closure on a thread where blocking is acceptable.
/// In general, issuing a blocking call or performing a lot of compute in a future without ///
/// yielding is not okay, as it may prevent the executor from driving other futures forward. /// In general, issuing a blocking call or performing a lot of compute in a future without
/// A closure that is run through this method will instead be run on a dedicated thread pool for /// yielding is not okay, as it may prevent the executor from driving other futures forward.
/// such blocking tasks without holding up the main futures executor. /// A closure that is run through this method will instead be run on a dedicated thread pool for
/// /// such blocking tasks without holding up the main futures executor.
/// # Examples ///
/// /// # Examples
/// ``` ///
/// use tokio::task; /// ```
/// /// use tokio::task;
/// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ ///
/// let res = task::spawn_blocking(move || { /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
/// // do some compute-heavy work or call synchronous code /// let res = task::spawn_blocking(move || {
/// "done computing" /// // do some compute-heavy work or call synchronous code
/// }).await?; /// "done computing"
/// /// }).await?;
/// assert_eq!(res, "done computing"); ///
/// # Ok(()) /// assert_eq!(res, "done computing");
/// # } /// # Ok(())
/// ``` /// # }
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R> /// ```
where pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
F: FnOnce() -> R + Send + 'static, where
R: Send + 'static, F: FnOnce() -> R + Send + 'static,
{ R: Send + 'static,
blocking::spawn_blocking(f) {
blocking::spawn_blocking(f)
}
} }

View File

@ -1,11 +1,13 @@
//! Asynchronous green-threads. //! Asynchronous green-threads.
#[cfg(feature = "blocking")] cfg_blocking! {
mod blocking; mod blocking;
#[cfg(feature = "rt-full")] pub use blocking::spawn_blocking;
pub use blocking::block_in_place;
#[cfg(feature = "blocking")] cfg_rt_threaded! {
pub use blocking::spawn_blocking; pub use blocking::block_in_place;
}
}
mod core; mod core;
use self::core::Cell; use self::core::Cell;
@ -17,10 +19,11 @@ pub use self::error::JoinError;
mod harness; mod harness;
use self::harness::Harness; use self::harness::Harness;
mod join; cfg_rt_core! {
#[cfg(feature = "rt-core")] mod join;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle; pub use self::join::JoinHandle;
}
mod list; mod list;
pub(crate) use self::list::OwnedList; pub(crate) use self::list::OwnedList;
@ -28,10 +31,10 @@ pub(crate) use self::list::OwnedList;
mod raw; mod raw;
use self::raw::RawTask; use self::raw::RawTask;
#[cfg(feature = "rt-core")] cfg_rt_core! {
mod spawn; mod spawn;
#[cfg(feature = "rt-core")] pub use spawn::spawn;
pub use spawn::spawn; }
mod stack; mod stack;
pub(crate) use self::stack::TransferStack; pub(crate) use self::stack::TransferStack;
@ -81,16 +84,17 @@ pub(crate) trait Schedule: Send + Sync + Sized + 'static {
fn schedule(&self, task: Task<Self>); fn schedule(&self, task: Task<Self>);
} }
/// Create a new task without an associated join handle cfg_rt_threaded! {
#[cfg(feature = "rt-full")] /// Create a new task without an associated join handle
pub(crate) fn background<T, S>(task: T) -> Task<S> pub(crate) fn background<T, S>(task: T) -> Task<S>
where where
T: Future + Send + 'static, T: Future + Send + 'static,
S: Schedule, S: Schedule,
{ {
Task { Task {
raw: RawTask::new_background::<_, S>(task), raw: RawTask::new_background::<_, S>(task),
_p: PhantomData, _p: PhantomData,
}
} }
} }

View File

@ -54,16 +54,19 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
} }
} }
impl RawTask { cfg_rt_threaded! {
#[cfg(feature = "rt-full")] impl RawTask {
pub(super) fn new_background<T, S>(task: T) -> RawTask pub(super) fn new_background<T, S>(task: T) -> RawTask
where where
T: Future + Send + 'static, T: Future + Send + 'static,
S: Schedule, S: Schedule,
{ {
RawTask::new::<_, S>(task, State::new_background()) RawTask::new::<_, S>(task, State::new_background())
}
} }
}
impl RawTask {
pub(super) fn new_joinable<T, S>(task: T) -> RawTask pub(super) fn new_joinable<T, S>(task: T) -> RawTask
where where
T: Future + Send + 'static, T: Future + Send + 'static,

View File

@ -58,7 +58,7 @@ const INITIAL_STATE: usize = NOTIFIED;
/// unambiguous modification order. /// unambiguous modification order.
impl State { impl State {
/// Starts with a ref count of 1 /// Starts with a ref count of 1
#[cfg(feature = "rt-full")] #[cfg(feature = "rt-threaded")]
pub(super) fn new_background() -> State { pub(super) fn new_background() -> State {
State { State {
val: AtomicUsize::new(INITIAL_STATE), val: AtomicUsize::new(INITIAL_STATE),

View File

@ -1,25 +1,3 @@
#[macro_export]
/// Assert option is some
macro_rules! assert_some {
($e:expr) => {{
match $e {
Some(v) => v,
_ => panic!("expected some, was none"),
}
}};
}
#[macro_export]
/// Assert option is none
macro_rules! assert_none {
($e:expr) => {{
match $e {
Some(v) => panic!("expected none, was {:?}", v),
_ => {}
}
}};
}
#[cfg(not(loom))] #[cfg(not(loom))]
pub(crate) mod backoff; pub(crate) mod backoff;

View File

@ -4,15 +4,7 @@
//! `test-util` feature flag is enabled, the values returned for `now()` are //! `test-util` feature flag is enabled, the values returned for `now()` are
//! configurable. //! configurable.
#[cfg(feature = "test-util")] cfg_not_test_util! {
pub(crate) use self::variant::now;
pub(crate) use self::variant::Clock;
#[cfg(feature = "test-util")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::variant::{advance, pause, resume};
#[cfg(not(feature = "test-util"))]
mod variant {
use crate::time::Instant; use crate::time::Instant;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -40,8 +32,7 @@ mod variant {
} }
} }
#[cfg(feature = "test-util")] cfg_test_util! {
mod variant {
use crate::time::{Duration, Instant}; use crate::time::{Duration, Instant};
use std::cell::Cell; use std::cell::Cell;

View File

@ -9,6 +9,16 @@ macro_rules! ready {
}; };
} }
#[macro_export]
macro_rules! cfg_fs {
($($item:item)*) => { $($item)* }
}
#[macro_export]
macro_rules! cfg_io_std {
($($item:item)*) => { $($item)* }
}
use futures::future; use futures::future;
// Load source // Load source