Merge 'master' and 'tokio-1.20.1'

This commit is contained in:
Alice Ryhl 2022-07-25 14:21:40 +02:00
commit 0906351daf
154 changed files with 1781 additions and 643 deletions

View File

@ -34,16 +34,18 @@ jobs:
runs-on: ubuntu-latest
needs:
- test
- test-unstable
- test-parking_lot
- valgrind
- test-unstable
- miri
- asan
- cross
- features
- minrust
- minimal-versions
- fmt
- clippy
- docs
- valgrind
- loom-compile
- check-readme
- test-hyper
@ -195,7 +197,7 @@ jobs:
- name: Install Rust ${{ env.rust_nightly }}
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2022-07-10
toolchain: nightly
components: miri
override: true
- uses: Swatinem/rust-cache@v1
@ -489,9 +491,23 @@ jobs:
- name: Install cargo-wasi
run: cargo install cargo-wasi
# TODO: Expand this when full WASI support lands.
# Currently, this is a bare bones regression test
# for features that work today with wasi.
- name: WASI test tokio full
run: cargo test -p tokio --target wasm32-wasi --features full
env:
CARGO_TARGET_WASM32_WASI_RUNNER: "wasmtime run --"
RUSTFLAGS: --cfg tokio_unstable -Dwarnings
- name: WASI test tokio-util full
run: cargo test -p tokio-util --target wasm32-wasi --features full
env:
CARGO_TARGET_WASM32_WASI_RUNNER: "wasmtime run --"
RUSTFLAGS: --cfg tokio_unstable -Dwarnings
- name: WASI test tokio-stream
run: cargo test -p tokio-stream --target wasm32-wasi --features time,net,io-util,sync
env:
CARGO_TARGET_WASM32_WASI_RUNNER: "wasmtime run --"
RUSTFLAGS: --cfg tokio_unstable -Dwarnings
- name: test tests-integration --features wasi-rt
# TODO: this should become: `cargo hack wasi test --each-feature`

View File

@ -12,7 +12,7 @@ env:
rust_stable: stable
jobs:
stess-test:
stress-test:
name: Stress Test
runs-on: ubuntu-latest
strategy:

View File

@ -562,7 +562,7 @@ Tokio ≥1.0.0 comes with LTS guarantees:
The goal of these guarantees is to provide stability to the ecosystem.
## Mininum Supported Rust Version (MSRV)
## Minimum Supported Rust Version (MSRV)
* All Tokio ≥1.0.0 releases will support at least a 6-month old Rust
compiler release.

View File

@ -1,13 +1,13 @@
## Report a security issue
The Tokio project team welcomes security reports and is committed to providing prompt attention to security issues. Security issues should be reported privately via [security@tokio.rs](mailto:security@tokio.rs). Security issues should not be reported via the public Github Issue tracker.
The Tokio project team welcomes security reports and is committed to providing prompt attention to security issues. Security issues should be reported privately via [security@tokio.rs](mailto:security@tokio.rs). Security issues should not be reported via the public GitHub Issue tracker.
## Vulnerability coordination
Remediation of security vulnerabilities is prioritized by the project team. The project team coordinates remediation with third-party project stakeholders via [Github Security Advisories](https://help.github.com/en/github/managing-security-vulnerabilities/about-github-security-advisories). Third-party stakeholders may include the reporter of the issue, affected direct or indirect users of Tokio, and maintainers of upstream dependencies if applicable.
Remediation of security vulnerabilities is prioritized by the project team. The project team coordinates remediation with third-party project stakeholders via [GitHub Security Advisories](https://help.github.com/en/github/managing-security-vulnerabilities/about-github-security-advisories). Third-party stakeholders may include the reporter of the issue, affected direct or indirect users of Tokio, and maintainers of upstream dependencies if applicable.
Downstream project maintainers and Tokio users can request participation in coordination of applicable security issues by sending your contact email address, Github username(s) and any other salient information to [security@tokio.rs](mailto:security@tokio.rs). Participation in security issue coordination processes is at the discretion of the Tokio team.
Downstream project maintainers and Tokio users can request participation in coordination of applicable security issues by sending your contact email address, GitHub username(s) and any other salient information to [security@tokio.rs](mailto:security@tokio.rs). Participation in security issue coordination processes is at the discretion of the Tokio team.
## Security advisories
The project team is committed to transparency in the security issue disclosure process. The Tokio team announces security issues via [project Github Release notes](https://github.com/tokio-rs/tokio/releases) and the [RustSec advisory database](https://github.com/RustSec/advisory-db) (i.e. `cargo-audit`).
The project team is committed to transparency in the security issue disclosure process. The Tokio team announces security issues via [project GitHub Release notes](https://github.com/tokio-rs/tokio/releases) and the [RustSec advisory database](https://github.com/RustSec/advisory-db) (i.e. `cargo-audit`).

View File

@ -7,7 +7,6 @@ publish = false
[[bin]]
name = "test-cat"
required-features = ["rt-process-io-util"]
[[bin]]
name = "test-mem"
@ -31,7 +30,6 @@ name = "rt_yield"
required-features = ["rt", "macros", "sync"]
[features]
rt-process-io-util = ["tokio/rt", "tokio/macros", "tokio/process", "tokio/io-util", "tokio/io-std"]
# For mem check
rt-net = ["tokio/rt", "tokio/rt-multi-thread", "tokio/net"]
# For test-process-signal

View File

@ -1,14 +1,20 @@
//! A cat-like utility that can be used as a subprocess to test I/O
//! stream communication.
use tokio::io::AsyncWriteExt;
use std::io;
use std::io::Write;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let mut stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
tokio::io::copy(&mut stdin, &mut stdout).await.unwrap();
stdout.flush().await.unwrap();
fn main() {
let stdin = io::stdin();
let mut stdout = io::stdout();
let mut line = String::new();
loop {
line.clear();
stdin.read_line(&mut line).unwrap();
if line.is_empty() {
break;
}
stdout.write_all(line.as_bytes()).unwrap();
}
stdout.flush().unwrap();
}

View File

@ -1,4 +1,8 @@
#![cfg(all(feature = "macros", feature = "rt-multi-thread"))]
#![cfg(all(
feature = "macros",
feature = "rt-multi-thread",
not(target_os = "wasi")
))]
#[tokio::main]
async fn basic_main() -> usize {

View File

@ -4,6 +4,7 @@ use futures::channel::oneshot;
use futures::executor::block_on;
use std::thread;
#[cfg_attr(target_os = "wasi", ignore = "WASI: std::thread::spawn not supported")]
#[test]
fn join_with_select() {
block_on(async {

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))]
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::join;
@ -8,22 +8,12 @@ use tokio_test::assert_ok;
use futures::future::{self, FutureExt};
use std::convert::TryInto;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
// so, we need to change this back as a test, but for now this doesn't work because of:
// https://github.com/rust-lang/rust/pull/95469
//
// undo when this is closed: https://github.com/tokio-rs/tokio/issues/4802
// fn cat() -> Command {
// let mut cmd = Command::new(std::env!("CARGO_BIN_EXE_test-cat"));
// cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
// cmd
// }
fn cat() -> Command {
let mut cmd = Command::new("cat");
let mut cmd = Command::new(env!("CARGO_BIN_EXE_test-cat"));
cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
cmd
}

View File

@ -38,6 +38,7 @@ parking_lot = "0.12.0"
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
proptest = "1"
[package.metadata.docs.rs]

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "time")]
#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery
use parking_lot::{const_mutex, Mutex};
use std::error::Error;

View File

@ -325,6 +325,7 @@ fn one_ready_many_none() {
}
}
#[cfg(not(target_os = "wasi"))]
proptest::proptest! {
#[test]
fn fuzz_pending_complete_mix(kinds: Vec<bool>) {

View File

@ -260,7 +260,7 @@ macro_rules! assert_err {
}};
}
/// Asserts that an exact duration has elapsed since since the start instant ±1ms.
/// Asserts that an exact duration has elapsed since the start instant ±1ms.
///
/// ```rust
/// use tokio::time::{self, Instant};

View File

@ -29,6 +29,7 @@ cfg_codec! {
}
cfg_net! {
#[cfg(not(target_arch = "wasm32"))]
pub mod udp;
pub mod net;
}

View File

@ -200,7 +200,7 @@ where
/// `parent` MUST have been a parent of the node when they both got locked,
/// otherwise there is a potential for a deadlock as invariant #2 would be violated.
///
/// To aquire the locks for node and parent, use [with_locked_node_and_parent].
/// To acquire the locks for node and parent, use [with_locked_node_and_parent].
fn move_children_to_parent(node: &mut Inner, parent: &mut Inner) {
// Pre-allocate in the parent, for performance
parent.children.reserve(node.children.len());
@ -218,7 +218,7 @@ fn move_children_to_parent(node: &mut Inner, parent: &mut Inner) {
/// Removes a child from the parent.
///
/// `parent` MUST be the parent of `node`.
/// To aquire the locks for node and parent, use [with_locked_node_and_parent].
/// To acquire the locks for node and parent, use [with_locked_node_and_parent].
fn remove_child(parent: &mut Inner, mut node: MutexGuard<'_, Inner>) {
// Query the position from where to remove a node
let pos = node.parent_idx;

View File

@ -80,7 +80,7 @@ fn drop_token_no_child() {
}
#[test]
fn drop_token_with_childs() {
fn drop_token_with_children() {
loom::model(|| {
let token1 = CancellationToken::new();
let child_token1 = token1.child_token();

View File

@ -2,7 +2,9 @@
#[cfg(tokio_unstable)]
mod join_map;
#[cfg(not(target_os = "wasi"))]
mod spawn_pinned;
#[cfg(not(target_os = "wasi"))]
pub use spawn_pinned::LocalPoolHandle;
#[cfg(tokio_unstable)]

View File

@ -190,7 +190,7 @@ impl<T> SlabStorage<T> {
let key_contained = self.key_map.contains_key(&key.into());
if key_contained {
// It's possible that a `compact` call creates capacitiy in `self.inner` in
// It's possible that a `compact` call creates capacity in `self.inner` in
// such a way that a `self.inner.insert` call creates a `key` which was
// previously given out during an `insert` call prior to the `compact` call.
// If `key` is contained in `self.key_map`, we have encountered this exact situation,

View File

@ -1,4 +1,5 @@
#![cfg(feature = "rt")]
#![cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#![warn(rust_2018_idioms)]
use tokio::runtime::Builder;

View File

@ -130,7 +130,7 @@ fn write_hits_backpressure() {
_ => unreachable!(),
}
// Push a new new chunk
// Push a new chunk
mock.calls.push_back(Ok(b[..].to_vec()));
}
// 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer

View File

@ -1,4 +1,5 @@
#![cfg(feature = "io-util")]
#![cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
use std::error::Error;
use std::io::{Cursor, Read, Result as IoResult};

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support panic recovery
use parking_lot::{const_mutex, Mutex};
use std::error::Error;

View File

@ -1,4 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
use std::rc::Rc;
use std::sync::Arc;

View File

@ -258,7 +258,7 @@ fn cancel_only_all_descendants() {
let child2_token = token.child_token();
let grandchild_token = child1_token.child_token();
let grandchild2_token = child1_token.child_token();
let grandgrandchild_token = grandchild_token.child_token();
let great_grandchild_token = grandchild_token.child_token();
assert!(!parent_token.is_cancelled());
assert!(!token.is_cancelled());
@ -267,7 +267,7 @@ fn cancel_only_all_descendants() {
assert!(!child2_token.is_cancelled());
assert!(!grandchild_token.is_cancelled());
assert!(!grandchild2_token.is_cancelled());
assert!(!grandgrandchild_token.is_cancelled());
assert!(!great_grandchild_token.is_cancelled());
let parent_fut = parent_token.cancelled();
let fut = token.cancelled();
@ -276,7 +276,7 @@ fn cancel_only_all_descendants() {
let child2_fut = child2_token.cancelled();
let grandchild_fut = grandchild_token.cancelled();
let grandchild2_fut = grandchild2_token.cancelled();
let grandgrandchild_fut = grandgrandchild_token.cancelled();
let great_grandchild_fut = great_grandchild_token.cancelled();
pin!(parent_fut);
pin!(fut);
@ -285,7 +285,7 @@ fn cancel_only_all_descendants() {
pin!(child2_fut);
pin!(grandchild_fut);
pin!(grandchild2_fut);
pin!(grandgrandchild_fut);
pin!(great_grandchild_fut);
assert_eq!(
Poll::Pending,
@ -321,7 +321,7 @@ fn cancel_only_all_descendants() {
);
assert_eq!(
Poll::Pending,
grandgrandchild_fut
great_grandchild_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);
@ -339,7 +339,7 @@ fn cancel_only_all_descendants() {
assert!(child2_token.is_cancelled());
assert!(grandchild_token.is_cancelled());
assert!(grandchild2_token.is_cancelled());
assert!(grandgrandchild_token.is_cancelled());
assert!(great_grandchild_token.is_cancelled());
assert_eq!(
Poll::Ready(()),
@ -367,7 +367,7 @@ fn cancel_only_all_descendants() {
);
assert_eq!(
Poll::Ready(()),
grandgrandchild_fut
great_grandchild_fut
.as_mut()
.poll(&mut Context::from_waker(&waker))
);

View File

@ -778,6 +778,7 @@ async fn compact_change_deadline() {
assert!(entry.is_none());
}
#[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
#[tokio::test(start_paused = true)]
async fn remove_after_compact() {
let now = Instant::now();
@ -794,6 +795,7 @@ async fn remove_after_compact() {
assert!(panic.is_err());
}
#[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
#[tokio::test(start_paused = true)]
async fn remove_after_compact_poll() {
let now = Instant::now();

View File

@ -1,4 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(not(target_os = "wasi"))] // Wasi doesn't support UDP
use tokio::net::UdpSocket;
use tokio_stream::StreamExt;

View File

@ -51,13 +51,14 @@ net = [
"mio/os-poll",
"mio/os-ext",
"mio/net",
"socket2",
"winapi/fileapi",
"winapi/handleapi",
"winapi/namedpipeapi",
"winapi/winbase",
"winapi/winnt",
"winapi/minwindef",
"winapi/accctrl",
"winapi/aclapi"
]
process = [
"bytes",
@ -68,11 +69,11 @@ process = [
"mio/net",
"signal-hook-registry",
"winapi/handleapi",
"winapi/minwindef",
"winapi/processthreadsapi",
"winapi/threadpoollegacyapiset",
"winapi/winbase",
"winapi/winnt",
"winapi/minwindef",
]
# Includes basic task execution capabilities
rt = ["once_cell"]
@ -112,11 +113,13 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.8.1", optional = true }
socket2 = { version = "0.4.4", optional = true, features = [ "all" ] }
mio = { version = "0.8.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.12.0", optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
socket2 = { version = "0.4.4", features = [ "all" ] }
# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
@ -149,10 +152,12 @@ async-stream = "0.3"
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
proptest = "1"
rand = "0.8.0"
socket2 = "0.4"
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
[target.'cfg(not(all(target_arch = "wasm32", target_os = "unknown")))'.dev-dependencies]
rand = "0.8.0"
[target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dev-dependencies]
wasm-bindgen-test = "0.3.0"
[target.'cfg(target_os = "freebsd")'.dev-dependencies]

View File

@ -34,6 +34,10 @@ fn main() {
enable_const_thread_local = true;
}
}
if !ac.probe_rustc_version(1, 51) {
autocfg::emit("tokio_no_addr_of")
}
}
Err(e) => {

View File

@ -188,9 +188,9 @@ readiness, the driver's tick is packed into the atomic `usize`.
The `ScheduledIo` readiness `AtomicUsize` is structured as:
```
| reserved | generation | driver tick | readinesss |
|----------+------------+--------------+------------|
| 1 bit | 7 bits + 8 bits + 16 bits |
| reserved | generation | driver tick | readiness |
|----------+------------+--------------+-----------|
| 1 bit | 7 bits + 8 bits + 16 bits |
```
The `reserved` and `generation` components exist today.

View File

@ -207,7 +207,7 @@ cfg_coop! {
mod test {
use super::*;
#[cfg(target_arch = "wasm32")]
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as test;
fn get() -> Budget {
@ -215,7 +215,7 @@ mod test {
}
#[test]
fn bugeting() {
fn budgeting() {
use futures::future::poll_fn;
use tokio_test::*;

View File

@ -34,8 +34,9 @@ enum State<T> {
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
}
cfg_io_std! {
cfg_io_blocking! {
impl<T> Blocking<T> {
#[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),

View File

@ -72,6 +72,8 @@ pub(super) struct Inner {
io_dispatch: RwLock<IoDispatcher>,
/// Used to wake up the reactor from a call to `turn`.
/// Not supported on Wasi due to lack of threading support.
#[cfg(not(target_os = "wasi"))]
waker: mio::Waker,
metrics: IoDriverMetrics,
@ -115,6 +117,7 @@ impl Driver {
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
let poll = mio::Poll::new()?;
#[cfg(not(target_os = "wasi"))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;
@ -129,6 +132,7 @@ impl Driver {
inner: Arc::new(Inner {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(target_os = "wasi"))]
waker,
metrics: IoDriverMetrics::default(),
}),
@ -164,6 +168,11 @@ impl Driver {
match self.poll.poll(&mut events, max_wait) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
#[cfg(target_os = "wasi")]
Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
// In case of wasm32_wasi this error happens, when trying to poll without subscriptions
// just return from the park, as there would be nothing, which wakes us up.
}
Err(e) => return Err(e),
}
@ -273,6 +282,7 @@ cfg_not_rt! {
///
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
#[track_caller]
pub(super) fn current() -> Self {
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
@ -300,6 +310,7 @@ impl Handle {
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
fn wakeup(&self) {
#[cfg(not(target_os = "wasi"))]
self.inner.waker.wake().expect("failed to wake I/O driver");
}
}

View File

@ -66,6 +66,14 @@ cfg_io_readiness! {
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
}
}
/// Future returned by `readiness()`.
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
@ -399,8 +407,8 @@ cfg_io_readiness! {
ptr
}
unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
Waiter::addr_of_pointers(target)
}
}

View File

@ -211,9 +211,11 @@ cfg_io_driver_impl! {
pub use driver::{Interest, Ready};
}
#[cfg_attr(target_os = "wasi", allow(unused_imports))]
mod poll_evented;
#[cfg(not(loom))]
#[cfg_attr(target_os = "wasi", allow(unused_imports))]
pub(crate) use poll_evented::PollEvented;
}

View File

@ -77,6 +77,7 @@ impl<E: Source> PollEvented<E> {
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[track_caller]
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
@ -97,6 +98,7 @@ impl<E: Source> PollEvented<E> {
/// a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
/// function.
#[track_caller]
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
Self::new_with_interest_and_handle(io, interest, Handle::current())
@ -134,7 +136,7 @@ impl<E: Source> PollEvented<E> {
}
feature! {
#![any(feature = "net", feature = "process")]
#![any(feature = "net", all(unix, feature = "process"))]
use crate::io::ReadBuf;
use std::task::{Context, Poll};

View File

@ -393,6 +393,20 @@ compile_error! {
"Tokio requires the platform pointer width to be 32, 64, or 128 bits"
}
#[cfg(all(
not(tokio_unstable),
target_arch = "wasm32",
any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt-multi-thread",
feature = "signal"
)
))]
compile_error!("Only features sync,macros,io-util,rt are supported on wasm.");
// Includes re-exports used by macros.
//
// This module is not intended to be part of the public API. In general, any
@ -417,7 +431,12 @@ cfg_process! {
pub mod process;
}
#[cfg(any(feature = "net", feature = "fs", feature = "io-std"))]
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
all(windows, feature = "process"),
))]
mod blocking;
cfg_rt! {

View File

@ -0,0 +1,53 @@
//! This module defines a macro that lets you go from a raw pointer to a struct
//! to a raw pointer to a field of the struct.
#[cfg(not(tokio_no_addr_of))]
macro_rules! generate_addr_of_methods {
(
impl<$($gen:ident)*> $struct_name:ty {$(
$(#[$attrs:meta])*
$vis:vis unsafe fn $fn_name:ident(self: NonNull<Self>) -> NonNull<$field_type:ty> {
&self$(.$field_name:tt)+
}
)*}
) => {
impl<$($gen)*> $struct_name {$(
$(#[$attrs])*
$vis unsafe fn $fn_name(me: ::core::ptr::NonNull<Self>) -> ::core::ptr::NonNull<$field_type> {
let me = me.as_ptr();
let field = ::std::ptr::addr_of_mut!((*me) $(.$field_name)+ );
::core::ptr::NonNull::new_unchecked(field)
}
)*}
};
}
// The `addr_of_mut!` macro is only available for MSRV at least 1.51.0. This
// version of the macro uses a workaround for older versions of rustc.
#[cfg(tokio_no_addr_of)]
macro_rules! generate_addr_of_methods {
(
impl<$($gen:ident)*> $struct_name:ty {$(
$(#[$attrs:meta])*
$vis:vis unsafe fn $fn_name:ident(self: NonNull<Self>) -> NonNull<$field_type:ty> {
&self$(.$field_name:tt)+
}
)*}
) => {
impl<$($gen)*> $struct_name {$(
$(#[$attrs])*
$vis unsafe fn $fn_name(me: ::core::ptr::NonNull<Self>) -> ::core::ptr::NonNull<$field_type> {
let me = me.as_ptr();
let me_u8 = me as *mut u8;
let field_offset = {
let me_ref = &*me;
let field_ref_u8 = (&me_ref $(.$field_name)+ ) as *const $field_type as *const u8;
field_ref_u8.offset_from(me_u8)
};
::core::ptr::NonNull::new_unchecked(me_u8.offset(field_offset).cast())
}
)*}
};
}

View File

@ -61,6 +61,7 @@ macro_rules! cfg_fs {
($($item:item)*) => {
$(
#[cfg(feature = "fs")]
#[cfg(not(target_os = "wasi"))]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
$item
)*
@ -69,7 +70,11 @@ macro_rules! cfg_fs {
macro_rules! cfg_io_blocking {
($($item:item)*) => {
$( #[cfg(any(feature = "io-std", feature = "fs"))] $item )*
$( #[cfg(any(
feature = "io-std",
feature = "fs",
all(windows, feature = "process"),
))] $item )*
}
}
@ -78,12 +83,12 @@ macro_rules! cfg_io_driver {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
#[cfg_attr(docsrs, doc(cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))))]
$item
@ -96,7 +101,7 @@ macro_rules! cfg_io_driver_impl {
$(
#[cfg(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
$item
@ -109,7 +114,7 @@ macro_rules! cfg_not_io_driver {
$(
#[cfg(not(any(
feature = "net",
feature = "process",
all(unix, feature = "process"),
all(unix, feature = "signal"),
)))]
$item
@ -247,6 +252,7 @@ macro_rules! cfg_process {
#[cfg(feature = "process")]
#[cfg_attr(docsrs, doc(cfg(feature = "process")))]
#[cfg(not(loom))]
#[cfg(not(target_os = "wasi"))]
$item
)*
}
@ -275,6 +281,7 @@ macro_rules! cfg_signal {
#[cfg(feature = "signal")]
#[cfg_attr(docsrs, doc(cfg(feature = "signal")))]
#[cfg(not(loom))]
#[cfg(not(target_os = "wasi"))]
$item
)*
}
@ -334,7 +341,7 @@ macro_rules! cfg_not_rt {
macro_rules! cfg_rt_multi_thread {
($($item:item)*) => {
$(
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
$item
)*
@ -451,7 +458,8 @@ macro_rules! cfg_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
)))]
$item
)*
@ -465,9 +473,28 @@ macro_rules! cfg_not_has_atomic_u64 {
target_arch = "arm",
target_arch = "mips",
target_arch = "powerpc",
target_arch = "riscv32"
target_arch = "riscv32",
target_arch = "wasm32"
))]
$item
)*
}
}
macro_rules! cfg_not_wasi {
($($item:item)*) => {
$(
#[cfg(not(target_os = "wasi"))]
$item
)*
}
}
macro_rules! cfg_is_wasm_not_wasi {
($($item:item)*) => {
$(
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
$item
)*
}
}

View File

@ -15,6 +15,9 @@ mod ready;
#[macro_use]
mod thread_local;
#[macro_use]
mod addr_of;
cfg_trace! {
#[macro_use]
mod trace;

View File

@ -23,8 +23,10 @@
//! [`UnixDatagram`]: UnixDatagram
mod addr;
#[cfg(feature = "net")]
pub(crate) use addr::to_socket_addrs;
cfg_not_wasi! {
#[cfg(feature = "net")]
pub(crate) use addr::to_socket_addrs;
}
pub use addr::ToSocketAddrs;
cfg_net! {
@ -33,11 +35,13 @@ cfg_net! {
pub mod tcp;
pub use tcp::listener::TcpListener;
pub use tcp::socket::TcpSocket;
pub use tcp::stream::TcpStream;
cfg_not_wasi! {
pub use tcp::socket::TcpSocket;
mod udp;
pub use udp::UdpSocket;
mod udp;
pub use udp::UdpSocket;
}
}
cfg_net_unix! {

View File

@ -1,6 +1,9 @@
use crate::io::{Interest, PollEvented};
use crate::net::tcp::TcpStream;
use crate::net::{to_socket_addrs, ToSocketAddrs};
cfg_not_wasi! {
use crate::net::{to_socket_addrs, ToSocketAddrs};
}
use std::convert::TryFrom;
use std::fmt;
@ -55,68 +58,70 @@ cfg_net! {
}
impl TcpListener {
/// Creates a new TcpListener, which will be bound to the specified address.
///
/// The returned listener is ready for accepting connections.
///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the `local_addr`
/// method.
///
/// The address type can be any implementor of the [`ToSocketAddrs`] trait.
/// If `addr` yields multiple addresses, bind will be attempted with each of
/// the addresses until one succeeds and returns the listener. If none of
/// the addresses succeed in creating a listener, the error returned from
/// the last attempt (the last address) is returned.
///
/// This function sets the `SO_REUSEADDR` option on the socket.
///
/// To configure the socket before binding, you can use the [`TcpSocket`]
/// type.
///
/// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
/// [`TcpSocket`]: struct@crate::net::TcpSocket
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpListener;
///
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:2345").await?;
///
/// // use the listener
///
/// # let _ = listener;
/// Ok(())
/// }
/// ```
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
let addrs = to_socket_addrs(addr).await?;
cfg_not_wasi! {
/// Creates a new TcpListener, which will be bound to the specified address.
///
/// The returned listener is ready for accepting connections.
///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the `local_addr`
/// method.
///
/// The address type can be any implementor of the [`ToSocketAddrs`] trait.
/// If `addr` yields multiple addresses, bind will be attempted with each of
/// the addresses until one succeeds and returns the listener. If none of
/// the addresses succeed in creating a listener, the error returned from
/// the last attempt (the last address) is returned.
///
/// This function sets the `SO_REUSEADDR` option on the socket.
///
/// To configure the socket before binding, you can use the [`TcpSocket`]
/// type.
///
/// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
/// [`TcpSocket`]: struct@crate::net::TcpSocket
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpListener;
///
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:2345").await?;
///
/// // use the listener
///
/// # let _ = listener;
/// Ok(())
/// }
/// ```
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
let addrs = to_socket_addrs(addr).await?;
let mut last_err = None;
let mut last_err = None;
for addr in addrs {
match TcpListener::bind_addr(addr) {
Ok(listener) => return Ok(listener),
Err(e) => last_err = Some(e),
for addr in addrs {
match TcpListener::bind_addr(addr) {
Ok(listener) => return Ok(listener),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(addr)?;
TcpListener::new(listener)
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(addr)?;
TcpListener::new(listener)
}
}
/// Accepts a new incoming connection from this listener.
@ -216,11 +221,13 @@ impl TcpListener {
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[track_caller]
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener);
let io = PollEvented::new(io)?;
@ -267,11 +274,22 @@ impl TcpListener {
.map(|io| io.into_raw_socket())
.map(|raw_socket| unsafe { std::net::TcpListener::from_raw_socket(raw_socket) })
}
#[cfg(target_os = "wasi")]
{
use std::os::wasi::io::{FromRawFd, IntoRawFd};
self.io
.into_inner()
.map(|io| io.into_raw_fd())
.map(|raw_fd| unsafe { std::net::TcpListener::from_raw_fd(raw_fd) })
}
}
pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
let io = PollEvented::new(listener)?;
Ok(TcpListener { io })
cfg_not_wasi! {
pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
let io = PollEvented::new(listener)?;
Ok(TcpListener { io })
}
}
/// Returns the local address that this listener is bound to.
@ -384,6 +402,20 @@ mod sys {
}
}
cfg_unstable! {
#[cfg(target_os = "wasi")]
mod sys {
use super::TcpListener;
use std::os::wasi::prelude::*;
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
}
}
#[cfg(windows)]
mod sys {
use super::TcpListener;

View File

@ -2,7 +2,9 @@
pub(crate) mod listener;
pub(crate) mod socket;
cfg_not_wasi! {
pub(crate) mod socket;
}
mod split;
pub use split::{ReadHalf, WriteHalf};

View File

@ -1,8 +1,12 @@
use crate::future::poll_fn;
cfg_not_wasi! {
use crate::future::poll_fn;
use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::time::Duration;
}
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
use std::fmt;
@ -10,7 +14,6 @@ use std::io;
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
cfg_io_util! {
use bytes::BufMut;
@ -70,86 +73,88 @@ cfg_net! {
}
impl TcpStream {
/// Opens a TCP connection to a remote host.
///
/// `addr` is an address of the remote host. Anything which implements the
/// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
/// yields multiple addresses, connect will be attempted with each of the
/// addresses until a connection is successful. If none of the addresses
/// result in a successful connection, the error returned from the last
/// connection attempt (the last address) is returned.
///
/// To configure the socket before connecting, you can use the [`TcpSocket`]
/// type.
///
/// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
/// [`TcpSocket`]: struct@crate::net::TcpSocket
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use tokio::io::AsyncWriteExt;
/// use std::error::Error;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// // Write some data.
/// stream.write_all(b"hello world!").await?;
///
/// Ok(())
/// }
/// ```
///
/// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
let addrs = to_socket_addrs(addr).await?;
cfg_not_wasi! {
/// Opens a TCP connection to a remote host.
///
/// `addr` is an address of the remote host. Anything which implements the
/// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
/// yields multiple addresses, connect will be attempted with each of the
/// addresses until a connection is successful. If none of the addresses
/// result in a successful connection, the error returned from the last
/// connection attempt (the last address) is returned.
///
/// To configure the socket before connecting, you can use the [`TcpSocket`]
/// type.
///
/// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
/// [`TcpSocket`]: struct@crate::net::TcpSocket
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use tokio::io::AsyncWriteExt;
/// use std::error::Error;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// // Write some data.
/// stream.write_all(b"hello world!").await?;
///
/// Ok(())
/// }
/// ```
///
/// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
///
/// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
let addrs = to_socket_addrs(addr).await?;
let mut last_err = None;
let mut last_err = None;
for addr in addrs {
match TcpStream::connect_addr(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => last_err = Some(e),
for addr in addrs {
match TcpStream::connect_addr(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}
/// Establishes a connection to the specified `addr`.
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(addr)?;
TcpStream::connect_mio(sys).await
}
pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
let stream = TcpStream::new(sys)?;
// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
// writable we check for `take_socket_error` to see if the connect
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
if let Some(e) = stream.io.take_error()? {
return Err(e);
/// Establishes a connection to the specified `addr`.
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(addr)?;
TcpStream::connect_mio(sys).await
}
Ok(stream)
pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
let stream = TcpStream::new(sys)?;
// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
// writable we check for `take_socket_error` to see if the connect
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
if let Some(e) = stream.io.take_error()? {
return Err(e);
}
Ok(stream)
}
}
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
@ -181,11 +186,13 @@ impl TcpStream {
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[track_caller]
pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_std(stream);
let io = PollEvented::new(io)?;
@ -244,6 +251,15 @@ impl TcpStream {
.map(|io| io.into_raw_socket())
.map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
}
#[cfg(target_os = "wasi")]
{
use std::os::wasi::io::{FromRawFd, IntoRawFd};
self.io
.into_inner()
.map(|io| io.into_raw_fd())
.map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
}
}
/// Returns the local address that this stream is bound to.
@ -1077,52 +1093,54 @@ impl TcpStream {
self.io.set_nodelay(nodelay)
}
/// Reads the linger duration for this socket by getting the `SO_LINGER`
/// option.
///
/// For more information about this option, see [`set_linger`].
///
/// [`set_linger`]: TcpStream::set_linger
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.linger()?);
/// # Ok(())
/// # }
/// ```
pub fn linger(&self) -> io::Result<Option<Duration>> {
socket2::SockRef::from(self).linger()
}
cfg_not_wasi! {
/// Reads the linger duration for this socket by getting the `SO_LINGER`
/// option.
///
/// For more information about this option, see [`set_linger`].
///
/// [`set_linger`]: TcpStream::set_linger
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.linger()?);
/// # Ok(())
/// # }
/// ```
pub fn linger(&self) -> io::Result<Option<Duration>> {
socket2::SockRef::from(self).linger()
}
/// Sets the linger duration of this socket by setting the SO_LINGER option.
///
/// This option controls the action taken when a stream has unsent messages and the stream is
/// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
/// data or until the time expires.
///
/// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
/// way that allows the process to continue as quickly as possible.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_linger(None)?;
/// # Ok(())
/// # }
/// ```
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
socket2::SockRef::from(self).set_linger(dur)
/// Sets the linger duration of this socket by setting the SO_LINGER option.
///
/// This option controls the action taken when a stream has unsent messages and the stream is
/// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
/// data or until the time expires.
///
/// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
/// way that allows the process to continue as quickly as possible.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_linger(None)?;
/// # Ok(())
/// # }
/// ```
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
socket2::SockRef::from(self).set_linger(dur)
}
}
/// Gets the value of the `IP_TTL` option for this socket.
@ -1315,3 +1333,15 @@ mod sys {
}
}
}
#[cfg(all(tokio_unstable, target_os = "wasi"))]
mod sys {
use super::TcpStream;
use std::os::wasi::prelude::*;
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
}

View File

@ -170,6 +170,7 @@ impl UdpSocket {
UdpSocket::new(sys)
}
#[track_caller]
fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> {
let io = PollEvented::new(socket)?;
Ok(UdpSocket { io })
@ -210,6 +211,7 @@ impl UdpSocket {
/// # Ok(())
/// # }
/// ```
#[track_caller]
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_std(socket);
UdpSocket::new(io)

View File

@ -430,7 +430,8 @@ impl UnixDatagram {
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
@ -457,6 +458,7 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
#[track_caller]
pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
let socket = mio::net::UnixDatagram::from_std(datagram);
let io = PollEvented::new(socket)?;

View File

@ -54,11 +54,13 @@ impl UnixListener {
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[track_caller]
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
@ -77,11 +79,13 @@ impl UnixListener {
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[track_caller]
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
let listener = mio::net::UnixListener::from_std(listener);
let io = PollEvented::new(listener)?;

View File

@ -699,11 +699,13 @@ impl UnixStream {
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[track_caller]
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
let stream = mio::net::UnixStream::from_std(stream);
let io = PollEvented::new(stream)?;

View File

@ -1955,6 +1955,106 @@ impl ServerOptions {
self
}
/// Requests permission to modify the pipe's discretionary access control list.
///
/// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
///
/// # Examples
///
/// ```
/// use std::{io, os::windows::prelude::AsRawHandle, ptr};
//
/// use tokio::net::windows::named_pipe::ServerOptions;
/// use winapi::{
/// shared::winerror::ERROR_SUCCESS,
/// um::{accctrl::SE_KERNEL_OBJECT, aclapi::SetSecurityInfo, winnt::DACL_SECURITY_INFORMATION},
/// };
///
/// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
///
/// # #[tokio::main] async fn main() -> io::Result<()> {
/// let mut pipe_template = ServerOptions::new();
/// pipe_template.write_dac(true);
/// let pipe = pipe_template.create(PIPE_NAME)?;
///
/// unsafe {
/// assert_eq!(
/// ERROR_SUCCESS,
/// SetSecurityInfo(
/// pipe.as_raw_handle(),
/// SE_KERNEL_OBJECT,
/// DACL_SECURITY_INFORMATION,
/// ptr::null_mut(),
/// ptr::null_mut(),
/// ptr::null_mut(),
/// ptr::null_mut(),
/// )
/// );
/// }
///
/// # Ok(()) }
/// ```
///
/// ```
/// use std::{io, os::windows::prelude::AsRawHandle, ptr};
//
/// use tokio::net::windows::named_pipe::ServerOptions;
/// use winapi::{
/// shared::winerror::ERROR_ACCESS_DENIED,
/// um::{accctrl::SE_KERNEL_OBJECT, aclapi::SetSecurityInfo, winnt::DACL_SECURITY_INFORMATION},
/// };
///
/// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
///
/// # #[tokio::main] async fn main() -> io::Result<()> {
/// let mut pipe_template = ServerOptions::new();
/// pipe_template.write_dac(false);
/// let pipe = pipe_template.create(PIPE_NAME)?;
///
/// unsafe {
/// assert_eq!(
/// ERROR_ACCESS_DENIED,
/// SetSecurityInfo(
/// pipe.as_raw_handle(),
/// SE_KERNEL_OBJECT,
/// DACL_SECURITY_INFORMATION,
/// ptr::null_mut(),
/// ptr::null_mut(),
/// ptr::null_mut(),
/// ptr::null_mut(),
/// )
/// );
/// }
///
/// # Ok(()) }
/// ```
///
/// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pub fn write_dac(&mut self, requested: bool) -> &mut Self {
bool_flag!(self.open_mode, requested, winnt::WRITE_DAC);
self
}
/// Requests permission to modify the pipe's owner.
///
/// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
///
/// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pub fn write_owner(&mut self, requested: bool) -> &mut Self {
bool_flag!(self.open_mode, requested, winnt::WRITE_OWNER);
self
}
/// Requests permission to modify the pipe's system access control list.
///
/// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
///
/// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
bool_flag!(self.open_mode, requested, winnt::ACCESS_SYSTEM_SECURITY);
self
}
/// Indicates whether this server can accept remote clients or not. Remote
/// clients are disabled by default.
///
@ -2020,6 +2120,7 @@ impl ServerOptions {
/// let builder = ServerOptions::new().max_instances(255);
/// # Ok(()) }
/// ```
#[track_caller]
pub fn max_instances(&mut self, instances: usize) -> &mut Self {
assert!(instances < 255, "cannot specify more than 254 instances");
self.max_instances = instances as DWORD;

View File

@ -64,7 +64,11 @@ impl Park for ParkThread {
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
// Wasi doesn't have threads, so just sleep.
#[cfg(not(target_os = "wasi"))]
self.inner.park_timeout(duration);
#[cfg(target_os = "wasi")]
std::thread::sleep(duration);
Ok(())
}

View File

@ -1285,41 +1285,39 @@ impl ChildStderr {
impl AsyncWrite for ChildStdin {
fn poll_write(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
Pin::new(&mut self.inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
impl AsyncRead for ChildStdout {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
impl AsyncRead for ChildStderr {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

View File

@ -29,7 +29,7 @@ use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;
use crate::io::PollEvented;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::signal::unix::driver::Handle as SignalHandle;
@ -177,8 +177,8 @@ impl AsRawFd for Pipe {
}
}
pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
let mut fd = io.into_inner()?.fd;
pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
let mut fd = io.inner.into_inner()?.fd;
// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
@ -213,7 +213,50 @@ impl Source for Pipe {
}
}
pub(crate) type ChildStdio = PollEvented<Pipe>;
pub(crate) struct ChildStdio {
inner: PollEvented<Pipe>,
}
impl fmt::Debug for ChildStdio {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(fmt)
}
}
impl AsRawFd for ChildStdio {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl AsyncWrite for ChildStdio {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsyncRead for ChildStdio {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: pipes support reading into uninitialized memory
unsafe { self.inner.poll_read(cx, buf) }
}
}
fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
@ -238,7 +281,7 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>
Ok(())
}
pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawFd,
{
@ -246,5 +289,5 @@ where
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;
PollEvented::new(pipe)
PollEvented::new(pipe).map(|inner| ChildStdio { inner })
}

View File

@ -15,22 +15,22 @@
//! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot
//! from then on out.
use crate::io::PollEvented;
use crate::io::{blocking::Blocking, AsyncRead, AsyncWrite, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
use crate::sync::oneshot;
use mio::windows::NamedPipe;
use std::fmt;
use std::fs::File as StdFile;
use std::future::Future;
use std::io;
use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::os::windows::prelude::{AsRawHandle, IntoRawHandle, RawHandle};
use std::pin::Pin;
use std::process::Stdio;
use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
use std::ptr;
use std::task::Context;
use std::task::Poll;
use std::sync::Arc;
use std::task::{Context, Poll};
use winapi::shared::minwindef::{DWORD, FALSE};
use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE};
use winapi::um::processthreadsapi::GetCurrentProcess;
@ -167,28 +167,97 @@ unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) {
let _ = complete.take().unwrap().send(());
}
pub(crate) type ChildStdio = PollEvented<NamedPipe>;
#[derive(Debug)]
struct ArcFile(Arc<StdFile>);
pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<NamedPipe>>
impl io::Read for ArcFile {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
(&*self.0).read(bytes)
}
}
impl io::Write for ArcFile {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
(&*self.0).write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
(&*self.0).flush()
}
}
#[derive(Debug)]
pub(crate) struct ChildStdio {
// Used for accessing the raw handle, even if the io version is busy
raw: Arc<StdFile>,
// For doing I/O operations asynchronously
io: Blocking<ArcFile>,
}
impl AsRawHandle for ChildStdio {
fn as_raw_handle(&self) -> RawHandle {
self.raw.as_raw_handle()
}
}
impl AsyncRead for ChildStdio {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_read(cx, buf)
}
}
impl AsyncWrite for ChildStdio {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.io).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_shutdown(cx)
}
}
pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawHandle,
{
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
PollEvented::new(pipe)
use std::os::windows::prelude::FromRawHandle;
let raw = Arc::new(unsafe { StdFile::from_raw_handle(io.into_raw_handle()) });
let io = Blocking::new(ArcFile(raw.clone()));
Ok(ChildStdio { raw, io })
}
pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
let named_pipe = io.into_inner()?;
pub(crate) fn convert_to_stdio(child_stdio: ChildStdio) -> io::Result<Stdio> {
let ChildStdio { raw, io } = child_stdio;
drop(io); // Try to drop the Arc count here
Arc::try_unwrap(raw)
.or_else(|raw| duplicate_handle(&*raw))
.map(Stdio::from)
}
fn duplicate_handle<T: AsRawHandle>(io: &T) -> io::Result<StdFile> {
use std::os::windows::prelude::FromRawHandle;
// Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually
// duplicate the handle here...
unsafe {
let mut dup_handle = INVALID_HANDLE_VALUE;
let cur_proc = GetCurrentProcess();
let status = DuplicateHandle(
cur_proc,
named_pipe.as_raw_handle(),
io.as_raw_handle(),
cur_proc,
&mut dup_handle,
0 as DWORD,
@ -200,6 +269,6 @@ pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio>
return Err(io::Error::last_os_error());
}
Ok(Stdio::from_raw_handle(dup_handle))
Ok(StdFile::from_raw_handle(dup_handle))
}
}

View File

@ -4,7 +4,7 @@
//! compilation.
mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, Spawner, Task};
pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, SpawnError, Spawner, Task};
cfg_fs! {
pub(crate) use pool::spawn_mandatory_blocking;

View File

@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, ToHandle};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::time::Duration;
pub(crate) struct BlockingPool {
@ -82,6 +83,25 @@ pub(crate) enum Mandatory {
NonMandatory,
}
pub(crate) enum SpawnError {
/// Pool is shutting down and the task was not scheduled
ShuttingDown,
/// There are no worker threads available to take the task
/// and the OS failed to spawn a new one
NoThreads(io::Error),
}
impl From<SpawnError> for io::Error {
fn from(e: SpawnError) -> Self {
match e {
SpawnError::ShuttingDown => {
io::Error::new(io::ErrorKind::Other, "blocking pool shutting down")
}
SpawnError::NoThreads(e) => e,
}
}
}
impl Task {
pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
Task { task, mandatory }
@ -105,6 +125,7 @@ const KEEP_ALIVE: Duration = Duration::from_secs(10);
/// Tasks will be scheduled as non-mandatory, meaning they may not get executed
/// in case of runtime shutdown.
#[track_caller]
#[cfg_attr(target_os = "wasi", allow(dead_code))]
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
@ -220,7 +241,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====
impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> {
pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();
if shared.shutdown {
@ -230,7 +251,7 @@ impl Spawner {
task.task.shutdown();
// no need to even push this task; it would never get picked up
return Err(());
return Err(SpawnError::ShuttingDown);
}
shared.queue.push_back(task);
@ -261,7 +282,7 @@ impl Spawner {
Err(e) => {
// The OS refused to spawn the thread and there is no thread
// to pick up the task that has just been pushed to the queue.
panic!("OS can't spawn worker thread: {}", e)
return Err(SpawnError::NoThreads(e));
}
}
}

View File

@ -174,7 +174,7 @@ pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync +
pub(crate) enum Kind {
CurrentThread,
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
MultiThread,
}
@ -197,14 +197,16 @@ impl Builder {
Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL)
}
/// Returns a new builder with the multi thread scheduler selected.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61, 61)
cfg_not_wasi! {
/// Returns a new builder with the multi thread scheduler selected.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61, 61)
}
}
/// Returns a new runtime builder initialized with default configuration
@ -270,7 +272,11 @@ impl Builder {
/// .unwrap();
/// ```
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))]
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal")
))]
self.enable_io();
#[cfg(feature = "time")]
self.enable_time();
@ -287,11 +293,7 @@ impl Builder {
///
/// The default value is the number of cores available to the system.
///
/// # Panics
///
/// When using the `current_thread` runtime this method will panic, since
/// those variants do not allow setting worker thread counts.
///
/// When using the `current_thread` runtime this method has no effect.
///
/// # Examples
///
@ -617,7 +619,7 @@ impl Builder {
pub fn build(&mut self) -> io::Result<Runtime> {
match &self.kind {
Kind::CurrentThread => self.build_basic_runtime(),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Kind::MultiThread => self.build_threaded_runtime(),
}
}
@ -626,7 +628,7 @@ impl Builder {
driver::Cfg {
enable_pause_time: match self.kind {
Kind::CurrentThread => true,
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Kind::MultiThread => false,
},
enable_io: self.enable_io,
@ -736,7 +738,7 @@ impl Builder {
/// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
/// shutdown immediately when a spawned task panics even if that
/// task's `JoinHandle` has not been dropped. All other spawned tasks
/// will immediatetly terminate and further calls to
/// will immediately terminate and further calls to
/// [`Runtime::block_on`] will panic.
///
/// # Unstable

View File

@ -24,6 +24,7 @@ pub(crate) fn current() -> Handle {
}
cfg_io_driver! {
#[track_caller]
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();

View File

@ -23,7 +23,11 @@ pub struct Handle {
pub(crate) struct HandleInner {
/// Handles to the I/O drivers
#[cfg_attr(
not(any(feature = "net", feature = "process", all(unix, feature = "signal"))),
not(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
)),
allow(dead_code)
)]
pub(super) io_handle: driver::IoHandle,
@ -341,7 +345,7 @@ impl HandleInner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, _was_spawned) = if cfg!(debug_assertions)
let (join_handle, spawn_result) = if cfg!(debug_assertions)
&& std::mem::size_of::<F>() > 2048
{
self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None, rt)
@ -349,7 +353,14 @@ impl HandleInner {
self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None, rt)
};
join_handle
match spawn_result {
Ok(()) => join_handle,
// Compat: do not panic here, return the join_handle even though it will never resolve
Err(blocking::SpawnError::ShuttingDown) => join_handle,
Err(blocking::SpawnError::NoThreads(e)) => {
panic!("OS can't spawn worker thread: {}", e)
}
}
}
cfg_fs! {
@ -363,7 +374,7 @@ impl HandleInner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
self.spawn_blocking_inner(
Box::new(func),
blocking::Mandatory::Mandatory,
@ -379,7 +390,7 @@ impl HandleInner {
)
};
if was_spawned {
if spawn_result.is_ok() {
Some(join_handle)
} else {
None
@ -394,7 +405,7 @@ impl HandleInner {
is_mandatory: blocking::Mandatory,
name: Option<&str>,
rt: &dyn ToHandle,
) -> (JoinHandle<R>, bool)
) -> (JoinHandle<R>, Result<(), blocking::SpawnError>)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
@ -424,7 +435,7 @@ impl HandleInner {
let spawned = self
.blocking_spawner
.spawn(blocking::Task::new(task, is_mandatory), rt);
(handle, spawned.is_ok())
(handle, spawned)
}
}

View File

@ -131,7 +131,7 @@ impl RuntimeMetrics {
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to indentify the worker throughout the lifetime
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics

View File

@ -204,6 +204,7 @@ cfg_rt! {
mod blocking;
use blocking::BlockingPool;
#[cfg_attr(target_os = "wasi", allow(unused_imports))]
pub(crate) use blocking::spawn_blocking;
cfg_trace! {
@ -303,7 +304,7 @@ cfg_rt! {
CurrentThread(BasicScheduler),
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
ThreadPool(ThreadPool),
}
@ -311,39 +312,41 @@ cfg_rt! {
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
impl Runtime {
/// Creates a new runtime instance with default configuration values.
///
/// This results in the multi threaded scheduler, I/O driver, and time driver being
/// initialized.
///
/// Most applications will not need to call this function directly. Instead,
/// they will use the [`#[tokio::main]` attribute][main]. When a more complex
/// configuration is necessary, the [runtime builder] may be used.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// Creating a new `Runtime` with default configuration values.
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
/// ```
///
/// [mod]: index.html
/// [main]: ../attr.main.html
/// [threaded scheduler]: index.html#threaded-scheduler
/// [basic scheduler]: index.html#basic-scheduler
/// [runtime builder]: crate::runtime::Builder
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new() -> std::io::Result<Runtime> {
Builder::new_multi_thread().enable_all().build()
cfg_not_wasi! {
/// Creates a new runtime instance with default configuration values.
///
/// This results in the multi threaded scheduler, I/O driver, and time driver being
/// initialized.
///
/// Most applications will not need to call this function directly. Instead,
/// they will use the [`#[tokio::main]` attribute][main]. When a more complex
/// configuration is necessary, the [runtime builder] may be used.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// Creating a new `Runtime` with default configuration values.
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
/// ```
///
/// [mod]: index.html
/// [main]: ../attr.main.html
/// [threaded scheduler]: index.html#threaded-scheduler
/// [basic scheduler]: index.html#basic-scheduler
/// [runtime builder]: crate::runtime::Builder
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new() -> std::io::Result<Runtime> {
Builder::new_multi_thread().enable_all().build()
}
}
/// Returns a handle to the runtime's spawner.
@ -472,6 +475,7 @@ cfg_rt! {
/// ```
///
/// [handle]: fn@Handle::block_on
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "block_on", None, task::Id::next().as_u64());
@ -480,7 +484,7 @@ cfg_rt! {
match &self.kind {
Kind::CurrentThread(exec) => exec.block_on(future),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Kind::ThreadPool(exec) => exec.block_on(future),
}
}
@ -573,7 +577,7 @@ cfg_rt! {
/// may result in a resource leak (in that any blocking tasks are still running until they
/// return.
///
/// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`.
/// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
///
/// ```
/// use tokio::runtime::Runtime;
@ -610,7 +614,7 @@ cfg_rt! {
},
}
},
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Kind::ThreadPool(_) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.

View File

@ -10,13 +10,13 @@ cfg_rt_multi_thread! {
#[derive(Debug, Clone)]
pub(crate) enum Spawner {
Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
ThreadPool(thread_pool::Spawner),
}
impl Spawner {
pub(crate) fn shutdown(&mut self) {
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
{
if let Spawner::ThreadPool(spawner) = self {
spawner.shutdown();
@ -31,7 +31,7 @@ impl Spawner {
{
match self {
Spawner::Basic(spawner) => spawner.spawn(future, id),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Spawner::ThreadPool(spawner) => spawner.spawn(future, id),
}
}
@ -39,7 +39,7 @@ impl Spawner {
pub(crate) fn as_handle_inner(&self) -> &HandleInner {
match self {
Spawner::Basic(spawner) => spawner.as_handle_inner(),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Spawner::ThreadPool(spawner) => spawner.as_handle_inner(),
}
}
@ -52,7 +52,7 @@ cfg_metrics! {
pub(crate) fn num_workers(&self) -> usize {
match self {
Spawner::Basic(_) => 1,
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Spawner::ThreadPool(spawner) => spawner.num_workers(),
}
}
@ -60,7 +60,7 @@ cfg_metrics! {
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Spawner::Basic(spawner) => spawner.scheduler_metrics(),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Spawner::ThreadPool(spawner) => spawner.scheduler_metrics(),
}
}
@ -68,7 +68,7 @@ cfg_metrics! {
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
match self {
Spawner::Basic(spawner) => spawner.worker_metrics(worker),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Spawner::ThreadPool(spawner) => spawner.worker_metrics(worker),
}
}
@ -76,7 +76,7 @@ cfg_metrics! {
pub(crate) fn injection_queue_depth(&self) -> usize {
match self {
Spawner::Basic(spawner) => spawner.injection_queue_depth(),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Spawner::ThreadPool(spawner) => spawner.injection_queue_depth(),
}
}
@ -84,7 +84,7 @@ cfg_metrics! {
pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
match self {
Spawner::Basic(spawner) => spawner.worker_metrics(worker).queue_depth(),
#[cfg(feature = "rt-multi-thread")]
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Spawner::ThreadPool(spawner) => spawner.worker_local_queue_depth(worker),
}
}

View File

@ -60,8 +60,6 @@ pub(crate) struct Header {
/// Task state.
pub(super) state: State,
pub(super) owned: UnsafeCell<linked_list::Pointers<Header>>,
/// Pointer to next task, used with the injection queue.
pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
@ -89,12 +87,23 @@ pub(crate) struct Header {
unsafe impl Send for Header {}
unsafe impl Sync for Header {}
/// Cold data is stored after the future.
/// Cold data is stored after the future. Data is considered cold if it is only
/// used during creation or shutdown of the task.
pub(super) struct Trailer {
/// Pointers for the linked list in the `OwnedTasks` that owns this task.
pub(super) owned: linked_list::Pointers<Header>,
/// Consumer task waiting on completion of this task.
pub(super) waker: UnsafeCell<Option<Waker>>,
}
generate_addr_of_methods! {
impl<> Trailer {
pub(super) unsafe fn addr_of_owned(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Header>> {
&self.owned
}
}
}
/// Either the future or the output.
pub(super) enum Stage<T: Future> {
Running(T),
@ -108,10 +117,9 @@ impl<T: Future, S: Schedule> Cell<T, S> {
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let id = future.id();
Box::new(Cell {
let result = Box::new(Cell {
header: Header {
state,
owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
owner_id: UnsafeCell::new(0),
@ -127,8 +135,19 @@ impl<T: Future, S: Schedule> Cell<T, S> {
},
trailer: Trailer {
waker: UnsafeCell::new(None),
owned: linked_list::Pointers::new(),
},
})
});
#[cfg(debug_assertions)]
{
let trailer_addr = (&result.trailer) as *const Trailer as usize;
let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(&result.header)) };
assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);
}
result
}
}
@ -240,6 +259,17 @@ impl Header {
// the safety requirements on `set_owner_id`.
unsafe { self.owner_id.with(|ptr| *ptr) }
}
/// Gets a pointer to the `Trailer` of the task containing this `Header`.
///
/// # Safety
///
/// The provided raw pointer must point at the header of a task.
pub(super) unsafe fn get_trailer(me: NonNull<Header>) -> NonNull<Trailer> {
let offset = me.as_ref().vtable.trailer_offset;
let trailer = me.as_ptr().cast::<u8>().add(offset).cast::<Trailer>();
NonNull::new_unchecked(trailer)
}
}
impl Trailer {

View File

@ -164,7 +164,7 @@ impl<S: 'static> OwnedTasks<S> {
// safety: We just checked that the provided task is not in some other
// linked list.
unsafe { self.inner.lock().list.remove(task.header().into()) }
unsafe { self.inner.lock().list.remove(task.header_ptr()) }
}
pub(crate) fn is_empty(&self) -> bool {

View File

@ -334,6 +334,10 @@ impl<S: 'static> Task<S> {
fn header(&self) -> &Header {
self.raw.header()
}
fn header_ptr(&self) -> NonNull<Header> {
self.raw.header_ptr()
}
}
impl<S: 'static> Notified<S> {
@ -365,7 +369,7 @@ cfg_rt_multi_thread! {
}
impl<S: Schedule> Task<S> {
/// Pre-emptively cancels the task as part of the shutdown process.
/// Preemptively cancels the task as part of the shutdown process.
pub(crate) fn shutdown(self) {
let raw = self.raw;
mem::forget(self);
@ -473,8 +477,7 @@ unsafe impl<S> linked_list::Link for Task<S> {
}
unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> {
// Not super great as it avoids some of looms checking...
NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr))
self::core::Trailer::addr_of_owned(Header::get_trailer(target))
}
}

View File

@ -1,4 +1,5 @@
use crate::future::Future;
use crate::runtime::task::core::{Core, Trailer};
use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State};
use std::ptr::NonNull;
@ -35,6 +36,9 @@ pub(super) struct Vtable {
/// Scheduler is being shutdown.
pub(super) shutdown: unsafe fn(NonNull<Header>),
/// The number of bytes that the `trailer` field is offset from the header.
pub(super) trailer_offset: usize,
}
/// Get the vtable for the requested `T` and `S` generics.
@ -48,9 +52,55 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
drop_abort_handle: drop_abort_handle::<T, S>,
remote_abort: remote_abort::<T, S>,
shutdown: shutdown::<T, S>,
trailer_offset: TrailerOffsetHelper::<T, S>::OFFSET,
}
}
/// Calling `get_trailer_offset` directly in vtable doesn't work because it
/// prevents the vtable from being promoted to a static reference.
///
/// See this thread for more info:
/// <https://users.rust-lang.org/t/custom-vtables-with-integers/78508>
struct TrailerOffsetHelper<T, S>(T, S);
impl<T: Future, S: Schedule> TrailerOffsetHelper<T, S> {
// Pass `size_of`/`align_of` as arguments rather than calling them directly
// inside `get_trailer_offset` because trait bounds on generic parameters
// of const fn are unstable on our MSRV.
const OFFSET: usize = get_trailer_offset(
std::mem::size_of::<Header>(),
std::mem::size_of::<Core<T, S>>(),
std::mem::align_of::<Core<T, S>>(),
std::mem::align_of::<Trailer>(),
);
}
/// Compute the offset of the `Trailer` field in `Cell<T, S>` using the
/// `#[repr(C)]` algorithm.
///
/// Pseudo-code for the `#[repr(C)]` algorithm can be found here:
/// <https://doc.rust-lang.org/reference/type-layout.html#reprc-structs>
const fn get_trailer_offset(
header_size: usize,
core_size: usize,
core_align: usize,
trailer_align: usize,
) -> usize {
let mut offset = header_size;
let core_misalign = offset % core_align;
if core_misalign > 0 {
offset += core_align - core_misalign;
}
offset += core_size;
let trailer_misalign = offset % trailer_align;
if trailer_misalign > 0 {
offset += trailer_align - trailer_misalign;
}
offset
}
impl RawTask {
pub(super) fn new<T, S>(task: T, scheduler: S, id: Id) -> RawTask
where

View File

@ -13,7 +13,7 @@ pub(super) struct WakerRef<'a, S: 'static> {
_p: PhantomData<(&'a Header, S)>,
}
/// Returns a `WakerRef` which avoids having to pre-emptively increase the
/// Returns a `WakerRef` which avoids having to preemptively increase the
/// refcount if there is no need to do so.
pub(super) fn waker_ref<T, S>(header: &NonNull<Header>) -> WakerRef<'_, S>
where

View File

@ -112,6 +112,14 @@ struct Waiter {
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
}
}
impl Semaphore {
/// The maximum number of permits which a semaphore can hold.
///
@ -704,12 +712,6 @@ impl std::error::Error for TryAcquireError {}
///
/// `Waiter` is forced to be !Unpin.
unsafe impl linked_list::Link for Waiter {
// XXX: ideally, we would be able to use `Pin` here, to enforce the
// invariant that list entries may not move while in the list. However, we
// can't do this currently, as using `Pin<&'a mut Waiter>` as the `Handle`
// type would require `Semaphore` to be generic over a lifetime. We can't
// use `Pin<*mut Waiter>`, as raw pointers are `Unpin` regardless of whether
// or not they dereference to an `!Unpin` target.
type Handle = NonNull<Waiter>;
type Target = Waiter;
@ -721,7 +723,7 @@ unsafe impl linked_list::Link for Waiter {
ptr
}
unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
Waiter::addr_of_pointers(target)
}
}

View File

@ -361,6 +361,14 @@ struct Waiter {
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
}
}
struct RecvGuard<'a, T> {
slot: RwLockReadGuard<'a, Slot<T>>,
}
@ -1140,8 +1148,8 @@ unsafe impl linked_list::Link for Waiter {
ptr
}
unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
Waiter::addr_of_pointers(target)
}
}

View File

@ -214,7 +214,6 @@ enum NotificationType {
}
#[derive(Debug)]
#[repr(C)] // required by `linked_list::Link` impl
struct Waiter {
/// Intrusive linked-list pointers.
pointers: linked_list::Pointers<Waiter>,
@ -229,6 +228,14 @@ struct Waiter {
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
}
}
/// Future returned from [`Notify::notified()`].
///
/// This future is fused, so once it has completed, any future calls to poll
@ -950,7 +957,7 @@ unsafe impl linked_list::Link for Waiter {
}
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
target.cast()
Waiter::addr_of_pointers(target)
}
}

View File

@ -12,7 +12,7 @@ impl AssertSync for AtomicWaker {}
impl AssertSend for Waker {}
impl AssertSync for Waker {}
#[cfg(target_arch = "wasm32")]
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]

View File

@ -4,7 +4,7 @@ use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::task::{Context, RawWaker, RawWakerVTable, Waker};
#[cfg(target_arch = "wasm32")]
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]

View File

@ -1,7 +1,7 @@
use crate::sync::batch_semaphore::Semaphore;
use tokio_test::*;
#[cfg(target_arch = "wasm32")]
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]

View File

@ -665,7 +665,7 @@ impl<T> Sender<T> {
///
/// The `modify` closure must return `true` if the value has actually
/// been modified during the mutable borrow. It should only return `false`
/// if the value is guaranteed to be unnmodified despite the mutable
/// if the value is guaranteed to be unmodified despite the mutable
/// borrow.
///
/// Receivers are only notified if the closure returned `true`. If the

View File

@ -102,11 +102,21 @@ cfg_rt! {
/// their own. If you want to spawn an ordinary thread, you should use
/// [`thread::spawn`] instead.
///
/// Closures spawned using `spawn_blocking` cannot be cancelled. When you shut
/// down the executor, it will wait indefinitely for all blocking operations to
/// Closures spawned using `spawn_blocking` cannot be cancelled abruptly; there
/// is no standard low level API to cause a thread to stop running. However,
/// a useful pattern is to pass some form of "cancellation token" into
/// the thread. This could be an [`AtomicBool`] that the task checks periodically.
/// Another approach is to have the thread primarily read or write from a channel,
/// and to exit when the channel closes; assuming the other side of the channel is dropped
/// when cancellation occurs, this will cause the blocking task thread to exit
/// soon after as well.
///
/// When you shut down the executor, it will wait indefinitely for all blocking operations to
/// finish. You can use [`shutdown_timeout`] to stop waiting for them after a
/// certain timeout. Be aware that this will still not cancel the tasks — they
/// are simply allowed to keep running after the method returns.
/// are simply allowed to keep running after the method returns. It is possible
/// for a blocking task to be cancelled if it has not yet started running, but this
/// is not guaranteed.
///
/// Note that if you are using the single threaded runtime, this function will
/// still spawn additional threads for blocking operations. The basic
@ -140,6 +150,7 @@ cfg_rt! {
/// [`thread::spawn`]: fn@std::thread::spawn
/// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
/// [bridgesync]: https://tokio.rs/tokio/topics/bridging
/// [`AtomicBool`]: struct@std::sync::atomic::AtomicBool
///
/// # Examples
///

View File

@ -3,7 +3,7 @@ use crate::{
runtime::{context, Handle},
task::{JoinHandle, LocalSet},
};
use std::future::Future;
use std::{future::Future, io};
/// Factory which is used to configure the properties of a new task.
///
@ -48,7 +48,7 @@ use std::future::Future;
/// .spawn(async move {
/// // Process each socket concurrently.
/// process(socket).await
/// });
/// })?;
/// }
/// }
/// ```
@ -83,12 +83,12 @@ impl<'a> Builder<'a> {
/// See [`task::spawn`](crate::task::spawn) for
/// more details.
#[track_caller]
pub fn spawn<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
pub fn spawn<Fut>(self, future: Fut) -> io::Result<JoinHandle<Fut::Output>>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
super::spawn::spawn_inner(future, self.name)
Ok(super::spawn::spawn_inner(future, self.name))
}
/// Spawn a task with this builder's settings on the provided [runtime
@ -99,12 +99,16 @@ impl<'a> Builder<'a> {
/// [runtime handle]: crate::runtime::Handle
/// [`Handle::spawn`]: crate::runtime::Handle::spawn
#[track_caller]
pub fn spawn_on<Fut>(&mut self, future: Fut, handle: &Handle) -> JoinHandle<Fut::Output>
pub fn spawn_on<Fut>(
&mut self,
future: Fut,
handle: &Handle,
) -> io::Result<JoinHandle<Fut::Output>>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
handle.spawn_named(future, self.name)
Ok(handle.spawn_named(future, self.name))
}
/// Spawns `!Send` a task on the current [`LocalSet`] with this builder's
@ -122,12 +126,12 @@ impl<'a> Builder<'a> {
/// [`task::spawn_local`]: crate::task::spawn_local
/// [`LocalSet`]: crate::task::LocalSet
#[track_caller]
pub fn spawn_local<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
pub fn spawn_local<Fut>(self, future: Fut) -> io::Result<JoinHandle<Fut::Output>>
where
Fut: Future + 'static,
Fut::Output: 'static,
{
super::local::spawn_local_inner(future, self.name)
Ok(super::local::spawn_local_inner(future, self.name))
}
/// Spawns `!Send` a task on the provided [`LocalSet`] with this builder's
@ -138,12 +142,16 @@ impl<'a> Builder<'a> {
/// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local
/// [`LocalSet`]: crate::task::LocalSet
#[track_caller]
pub fn spawn_local_on<Fut>(self, future: Fut, local_set: &LocalSet) -> JoinHandle<Fut::Output>
pub fn spawn_local_on<Fut>(
self,
future: Fut,
local_set: &LocalSet,
) -> io::Result<JoinHandle<Fut::Output>>
where
Fut: Future + 'static,
Fut::Output: 'static,
{
local_set.spawn_named(future, self.name)
Ok(local_set.spawn_named(future, self.name))
}
/// Spawns blocking code on the blocking threadpool.
@ -155,7 +163,10 @@ impl<'a> Builder<'a> {
/// See [`task::spawn_blocking`](crate::task::spawn_blocking)
/// for more details.
#[track_caller]
pub fn spawn_blocking<Function, Output>(self, function: Function) -> JoinHandle<Output>
pub fn spawn_blocking<Function, Output>(
self,
function: Function,
) -> io::Result<JoinHandle<Output>>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
@ -174,18 +185,20 @@ impl<'a> Builder<'a> {
self,
function: Function,
handle: &Handle,
) -> JoinHandle<Output>
) -> io::Result<JoinHandle<Output>>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
use crate::runtime::Mandatory;
let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner(
let (join_handle, spawn_result) = handle.as_inner().spawn_blocking_inner(
function,
Mandatory::NonMandatory,
self.name,
handle,
);
join_handle
spawn_result?;
Ok(join_handle)
}
}

View File

@ -4,7 +4,7 @@ use std::task::Poll;
/// runtime *if* the task's coop budget was exhausted.
///
/// The task will only yield if its entire coop budget has been exhausted.
/// This function can can be used in order to insert optional yield points into long
/// This function can be used in order to insert optional yield points into long
/// computations that do not use Tokio resources like sockets or semaphores,
/// without redundantly yielding to the runtime each time.
///

View File

@ -101,13 +101,15 @@ impl<T: 'static> JoinSet<T> {
/// use tokio::task::JoinSet;
///
/// #[tokio::main]
/// async fn main() {
/// async fn main() -> std::io::Result<()> {
/// let mut set = JoinSet::new();
///
/// // Use the builder to configure a task's name before spawning it.
/// set.build_task()
/// .name("my_task")
/// .spawn(async { /* ... */ });
/// .spawn(async { /* ... */ })?;
///
/// Ok(())
/// }
/// ```
#[cfg(all(tokio_unstable, feature = "tracing"))]
@ -377,13 +379,13 @@ impl<'a, T: 'static> Builder<'a, T> {
///
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn<F>(self, future: F) -> AbortHandle
pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
self.joinset.insert(self.builder.spawn(future))
Ok(self.joinset.insert(self.builder.spawn(future)?))
}
/// Spawn the provided task on the provided [runtime handle] with this
@ -397,13 +399,13 @@ impl<'a, T: 'static> Builder<'a, T> {
/// [`AbortHandle`]: crate::task::AbortHandle
/// [runtime handle]: crate::runtime::Handle
#[track_caller]
pub fn spawn_on<F>(mut self, future: F, handle: &Handle) -> AbortHandle
pub fn spawn_on<F>(mut self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
self.joinset.insert(self.builder.spawn_on(future, handle))
Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
}
/// Spawn the provided task on the current [`LocalSet`] with this builder's
@ -420,12 +422,12 @@ impl<'a, T: 'static> Builder<'a, T> {
/// [`LocalSet`]: crate::task::LocalSet
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn_local<F>(self, future: F) -> AbortHandle
pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
where
F: Future<Output = T>,
F: 'static,
{
self.joinset.insert(self.builder.spawn_local(future))
Ok(self.joinset.insert(self.builder.spawn_local(future)?))
}
/// Spawn the provided task on the provided [`LocalSet`] with this builder's
@ -438,13 +440,14 @@ impl<'a, T: 'static> Builder<'a, T> {
/// [`LocalSet`]: crate::task::LocalSet
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> AbortHandle
pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
where
F: Future<Output = T>,
F: 'static,
{
self.joinset
.insert(self.builder.spawn_local_on(future, local_set))
Ok(self
.joinset
.insert(self.builder.spawn_local_on(future, local_set)?))
}
}

View File

@ -10,6 +10,7 @@ use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Poll;
use pin_project_lite::pin_project;
@ -159,7 +160,7 @@ cfg_rt! {
/// tokio::task::spawn_local(run_task(new_task));
/// }
/// // If the while loop returns, then all the LocalSpawner
/// // objects have have been dropped.
/// // objects have been dropped.
/// });
///
/// // This will return once all senders are dropped and all
@ -215,7 +216,7 @@ cfg_rt! {
tick: Cell<u8>,
/// State available from thread-local.
context: Context,
context: Rc<Context>,
/// This type should not be Send.
_not_send: PhantomData<*const ()>,
@ -260,7 +261,7 @@ pin_project! {
}
}
scoped_thread_local!(static CURRENT: Context);
thread_local!(static CURRENT: Cell<Option<Rc<Context>>> = Cell::new(None));
cfg_rt! {
/// Spawns a `!Send` future on the local task set.
@ -310,10 +311,12 @@ cfg_rt! {
F::Output: 'static
{
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");
let ctx = clone_rc(maybe_cx);
match ctx {
None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
Some(cx) => cx.spawn(future, name)
}
cx.spawn(future, name)
})
}
}
@ -327,12 +330,29 @@ const MAX_TASKS_PER_TICK: usize = 61;
/// How often it check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;
/// Context guard for LocalSet
pub struct LocalEnterGuard(Option<Rc<Context>>);
impl Drop for LocalEnterGuard {
fn drop(&mut self) {
CURRENT.with(|ctx| {
ctx.replace(self.0.take());
})
}
}
impl fmt::Debug for LocalEnterGuard {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalEnterGuard").finish()
}
}
impl LocalSet {
/// Returns a new local task set.
pub fn new() -> LocalSet {
LocalSet {
tick: Cell::new(0),
context: Context {
context: Rc::new(Context {
owned: LocalOwnedTasks::new(),
queue: VecDequeCell::with_capacity(INITIAL_CAPACITY),
shared: Arc::new(Shared {
@ -342,11 +362,24 @@ impl LocalSet {
unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
}),
unhandled_panic: Cell::new(false),
},
}),
_not_send: PhantomData,
}
}
/// Enters the context of this `LocalSet`.
///
/// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
/// context you are inside.
///
/// [`spawn_local`]: fn@crate::task::spawn_local
pub fn enter(&self) -> LocalEnterGuard {
CURRENT.with(|ctx| {
let old = ctx.replace(Some(self.context.clone()));
LocalEnterGuard(old)
})
}
/// Spawns a `!Send` task onto the local task set.
///
/// This task is guaranteed to be run on the current thread.
@ -454,6 +487,7 @@ impl LocalSet {
/// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
/// [in-place blocking]: fn@crate::task::block_in_place
/// [`spawn_blocking`]: fn@crate::task::spawn_blocking
#[track_caller]
#[cfg(feature = "rt")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
@ -579,7 +613,25 @@ impl LocalSet {
}
fn with<T>(&self, f: impl FnOnce() -> T) -> T {
CURRENT.set(&self.context, f)
CURRENT.with(|ctx| {
struct Reset<'a> {
ctx_ref: &'a Cell<Option<Rc<Context>>>,
val: Option<Rc<Context>>,
}
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.ctx_ref.replace(self.val.take());
}
}
let old = ctx.replace(Some(self.context.clone()));
let _reset = Reset {
ctx_ref: ctx,
val: old,
};
f()
})
}
}
@ -600,7 +652,7 @@ cfg_unstable! {
/// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
/// shutdown immediately when a spawned task panics even if that
/// task's `JoinHandle` has not been dropped. All other spawned tasks
/// will immediatetly terminate and further calls to
/// will immediately terminate and further calls to
/// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
///
/// # Panics
@ -645,8 +697,9 @@ cfg_unstable! {
/// [`JoinHandle`]: struct@crate::task::JoinHandle
pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
// TODO: This should be set as a builder
Arc::get_mut(&mut self.context.shared)
.expect("TODO: we shouldn't panic")
Rc::get_mut(&mut self.context)
.and_then(|ctx| Arc::get_mut(&mut ctx.shared))
.expect("Unhandled Panic behavior modified after starting LocalSet")
.unhandled_panic = behavior;
self
}
@ -769,23 +822,33 @@ impl<T: Future> Future for RunUntil<'_, T> {
}
}
fn clone_rc<T>(rc: &Cell<Option<Rc<T>>>) -> Option<Rc<T>> {
let value = rc.take();
let cloned = value.clone();
rc.set(value);
cloned
}
impl Shared {
/// Schedule the provided task on the scheduler.
fn schedule(&self, task: task::Notified<Arc<Self>>) {
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if cx.shared.ptr_eq(self) => {
cx.queue.push_back(task);
}
_ => {
// First check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
Some(cx) if cx.shared.ptr_eq(self) => {
cx.queue.push_back(task);
}
_ => {
// First check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();
if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
}
}
});
@ -799,9 +862,14 @@ impl Shared {
impl task::Schedule for Arc<Shared> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
let ctx = clone_rc(maybe_cx);
match ctx {
None => panic!("scheduler context missing"),
Some(cx) => {
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
}
}
})
}
@ -821,13 +889,15 @@ impl task::Schedule for Arc<Shared> {
// This hook is only called from within the runtime, so
// `CURRENT` should match with `&self`, i.e. there is no
// opportunity for a nested scheduler to be called.
CURRENT.with(|maybe_cx| match maybe_cx {
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
cx.unhandled_panic.set(true);
cx.owned.close_and_shutdown_all();
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
})
}})
}
}
}

View File

@ -278,8 +278,10 @@
cfg_rt! {
pub use crate::runtime::task::{JoinError, JoinHandle};
mod blocking;
pub use blocking::spawn_blocking;
cfg_not_wasi! {
mod blocking;
pub use blocking::spawn_blocking;
}
mod spawn;
pub use spawn::spawn;
@ -297,7 +299,7 @@ cfg_rt! {
}
mod local;
pub use local::{spawn_local, LocalSet};
pub use local::{spawn_local, LocalSet, LocalEnterGuard};
mod task_local;
pub use task_local::LocalKey;

View File

@ -51,6 +51,7 @@ macro_rules! task_local {
#[macro_export]
macro_rules! __task_local_inner {
($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
$(#[$attr])*
$vis static $name: $crate::task::LocalKey<$t> = {
std::thread_local! {
static __KEY: std::cell::RefCell<Option<$t>> = const { std::cell::RefCell::new(None) };
@ -66,6 +67,7 @@ macro_rules! __task_local_inner {
#[macro_export]
macro_rules! __task_local_inner {
($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
$(#[$attr])*
$vis static $name: $crate::task::LocalKey<$t> = {
std::thread_local! {
static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None);
@ -285,6 +287,7 @@ impl<T: Copy + 'static> LocalKey<T> {
/// # Panics
///
/// This function will panic if the task local doesn't have a value set.
#[track_caller]
pub fn get(&'static self) -> T {
self.with(|v| *v)
}
@ -423,6 +426,7 @@ enum ScopeInnerErr {
}
impl ScopeInnerErr {
#[track_caller]
fn panic(&self) -> ! {
match self {
Self::BorrowError => panic!("cannot enter a task-local scope while the task-local storage is borrowed"),

View File

@ -36,7 +36,7 @@
//! point than it was originally scheduled for.
//!
//! This is accomplished by lazily rescheduling timers. That is, we update the
//! state field field with the true expiration of the timer from the holder of
//! state field with the true expiration of the timer from the holder of
//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
//! walking lists of timers), it checks this "true when" value, and reschedules
//! based on it.
@ -326,7 +326,7 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, Ti
///
/// Note that this structure is located inside the `TimerEntry` structure.
#[derive(Debug)]
#[repr(C)] // required by `link_list::Link` impl
#[repr(C)]
pub(crate) struct TimerShared {
/// Data manipulated by the driver thread itself, only.
driver_state: CachePadded<TimerSharedPadded>,
@ -339,6 +339,14 @@ pub(crate) struct TimerShared {
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> TimerShared {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<TimerShared>> {
&self.driver_state.0.pointers
}
}
}
impl TimerShared {
pub(super) fn new() -> Self {
Self {
@ -421,7 +429,6 @@ impl TimerShared {
/// padded. This contains the information that the driver thread accesses most
/// frequently to minimize contention. In particular, we move it away from the
/// waker, as the waker is updated on every poll.
#[repr(C)] // required by `link_list::Link` impl
struct TimerSharedPadded {
/// A link within the doubly-linked list of timers on a particular level and
/// slot. Valid only if state is equal to Registered.
@ -476,7 +483,7 @@ unsafe impl linked_list::Link for TimerShared {
unsafe fn pointers(
target: NonNull<Self::Target>,
) -> NonNull<linked_list::Pointers<Self::Target>> {
target.cast()
TimerShared::addr_of_pointers(target)
}
}

View File

@ -1,3 +1,5 @@
#![cfg(not(target_os = "wasi"))]
use std::{task::Context, time::Duration};
#[cfg(not(loom))]
@ -289,11 +291,11 @@ fn balanced_incr_and_decr() {
let incr_inner = inner.clone();
let decr_inner = inner.clone();
let incr_hndle = thread::spawn(move || incr(incr_inner));
let decr_hndle = thread::spawn(move || decr(decr_inner));
let incr_handle = thread::spawn(move || incr(incr_inner));
let decr_handle = thread::spawn(move || decr(decr_inner));
incr_hndle.join().expect("should never fail");
decr_hndle.join().expect("should never fail");
incr_handle.join().expect("should never fail");
decr_handle.join().expect("should never fail");
assert_eq!(inner.num(Ordering::SeqCst), 0);
})

View File

@ -278,7 +278,7 @@ pub enum MissedTickBehavior {
/// // 50ms after the call to `tick` up above. That is, in `tick`, when we
/// // recognize that we missed a tick, we schedule the next tick to happen
/// // 50ms (or whatever the `period` is) from right then, not from when
/// // were were *supposed* to tick
/// // were *supposed* to tick
/// interval.tick().await;
/// # }
/// ```

View File

@ -88,10 +88,6 @@ enum List {
/// move it out from this entry to prevent it from getting leaked. (Since the
/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
/// value should not be leaked.)
///
/// This type is `#[repr(C)]` because its `linked_list::Link` implementation
/// requires that `pointers` is the first field.
#[repr(C)]
struct ListEntry<T> {
/// The linked list pointers of the list this entry is in.
pointers: linked_list::Pointers<ListEntry<T>>,
@ -105,6 +101,14 @@ struct ListEntry<T> {
_pin: PhantomPinned,
}
generate_addr_of_methods! {
impl<T> ListEntry<T> {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
&self.pointers
}
}
}
// With mutable access to the `IdleNotifiedSet`, you can get mutable access to
// the values.
unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
@ -453,11 +457,6 @@ unsafe impl<T> linked_list::Link for ListEntry<T> {
unsafe fn pointers(
target: NonNull<ListEntry<T>>,
) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
// Safety: The pointers struct is the first field and ListEntry is
// `#[repr(C)]` so this cast is safe.
//
// We do this rather than doing a field access since `std::ptr::addr_of`
// is too new for our MSRV.
target.cast()
ListEntry::addr_of_pointers(target)
}
}

View File

@ -631,6 +631,7 @@ mod tests {
}
}
#[cfg(not(target_arch = "wasm32"))]
fn run_fuzz(ops: Vec<usize>) {
use std::collections::VecDeque;

View File

@ -127,70 +127,96 @@ macro_rules! assert_value {
};
}
assert_value!(tokio::fs::DirBuilder: Send & Sync & Unpin);
assert_value!(tokio::fs::DirEntry: Send & Sync & Unpin);
assert_value!(tokio::fs::File: Send & Sync & Unpin);
assert_value!(tokio::fs::OpenOptions: Send & Sync & Unpin);
assert_value!(tokio::fs::ReadDir: Send & Sync & Unpin);
macro_rules! cfg_not_wasi {
($($item:item)*) => {
$(
#[cfg(not(target_os = "wasi"))]
$item
)*
}
}
async_assert_fn!(tokio::fs::canonicalize(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::copy(&str, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::create_dir(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::create_dir_all(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::hard_link(&str, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::metadata(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read_dir(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read_link(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read_to_string(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::remove_dir(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::remove_dir_all(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::remove_file(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::rename(&str, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::set_permissions(&str, std::fs::Permissions): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::symlink_metadata(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::write(&str, Vec<u8>): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::ReadDir::next_entry(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::OpenOptions::open(_, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::DirBuilder::create(_, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::DirEntry::metadata(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::open(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::create(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::metadata(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::try_clone(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::into_std(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::set_permissions(_, std::fs::Permissions): Send & Sync & !Unpin);
cfg_not_wasi! {
mod fs {
use super::*;
assert_value!(tokio::fs::DirBuilder: Send & Sync & Unpin);
assert_value!(tokio::fs::DirEntry: Send & Sync & Unpin);
assert_value!(tokio::fs::File: Send & Sync & Unpin);
assert_value!(tokio::fs::OpenOptions: Send & Sync & Unpin);
assert_value!(tokio::fs::ReadDir: Send & Sync & Unpin);
async_assert_fn!(tokio::fs::canonicalize(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::copy(&str, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::create_dir(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::create_dir_all(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::hard_link(&str, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::metadata(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read_dir(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read_link(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::read_to_string(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::remove_dir(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::remove_dir_all(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::remove_file(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::rename(&str, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::set_permissions(&str, std::fs::Permissions): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::symlink_metadata(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::write(&str, Vec<u8>): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::ReadDir::next_entry(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::OpenOptions::open(_, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::DirBuilder::create(_, &str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::DirEntry::metadata(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::open(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::create(&str): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::metadata(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::try_clone(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::fs::File::into_std(_): Send & Sync & !Unpin);
async_assert_fn!(
tokio::fs::File::set_permissions(_, std::fs::Permissions): Send & Sync & !Unpin
);
}
}
cfg_not_wasi! {
assert_value!(tokio::net::TcpSocket: Send & Sync & Unpin);
async_assert_fn!(tokio::net::TcpListener::bind(SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::TcpStream::connect(SocketAddr): Send & Sync & !Unpin);
}
assert_value!(tokio::net::TcpListener: Send & Sync & Unpin);
assert_value!(tokio::net::TcpSocket: Send & Sync & Unpin);
assert_value!(tokio::net::TcpStream: Send & Sync & Unpin);
assert_value!(tokio::net::UdpSocket: Send & Sync & Unpin);
assert_value!(tokio::net::tcp::OwnedReadHalf: Send & Sync & Unpin);
assert_value!(tokio::net::tcp::OwnedWriteHalf: Send & Sync & Unpin);
assert_value!(tokio::net::tcp::ReadHalf<'_>: Send & Sync & Unpin);
assert_value!(tokio::net::tcp::ReuniteError: Send & Sync & Unpin);
assert_value!(tokio::net::tcp::WriteHalf<'_>: Send & Sync & Unpin);
async_assert_fn!(tokio::net::TcpListener::accept(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::TcpListener::bind(SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::TcpStream::connect(SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::TcpStream::peek(_, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::TcpStream::readable(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::TcpStream::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::TcpStream::writable(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::bind(SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::connect(_, SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::peek_from(_, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::readable(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::writable(_): Send & Sync & !Unpin);
// Wasi does not support UDP
cfg_not_wasi! {
mod udp_socket {
use super::*;
assert_value!(tokio::net::UdpSocket: Send & Sync & Unpin);
async_assert_fn!(tokio::net::UdpSocket::bind(SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::connect(_, SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::peek_from(_, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::readable(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::ready(_, tokio::io::Interest): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::UdpSocket::writable(_): Send & Sync & !Unpin);
}
}
async_assert_fn!(tokio::net::lookup_host(SocketAddr): Send & Sync & !Unpin);
async_assert_fn!(tokio::net::tcp::ReadHalf::peek(_, &mut [u8]): Send & Sync & !Unpin);
@ -242,16 +268,22 @@ mod windows_named_pipe {
async_assert_fn!(NamedPipeServer::writable(_): Send & Sync & !Unpin);
}
assert_value!(tokio::process::Child: Send & Sync & Unpin);
assert_value!(tokio::process::ChildStderr: Send & Sync & Unpin);
assert_value!(tokio::process::ChildStdin: Send & Sync & Unpin);
assert_value!(tokio::process::ChildStdout: Send & Sync & Unpin);
assert_value!(tokio::process::Command: Send & Sync & Unpin);
async_assert_fn!(tokio::process::Child::kill(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::process::Child::wait(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::process::Child::wait_with_output(_): Send & Sync & !Unpin);
cfg_not_wasi! {
mod test_process {
use super::*;
assert_value!(tokio::process::Child: Send & Sync & Unpin);
assert_value!(tokio::process::ChildStderr: Send & Sync & Unpin);
assert_value!(tokio::process::ChildStdin: Send & Sync & Unpin);
assert_value!(tokio::process::ChildStdout: Send & Sync & Unpin);
assert_value!(tokio::process::Command: Send & Sync & Unpin);
async_assert_fn!(tokio::process::Child::kill(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::process::Child::wait(_): Send & Sync & !Unpin);
async_assert_fn!(tokio::process::Child::wait_with_output(_): Send & Sync & !Unpin);
}
async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync & !Unpin);
}
async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync & !Unpin);
#[cfg(unix)]
mod unix_signal {
use super::*;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support bind()
use tokio::net::TcpListener;
use tokio_test::assert_ok;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support file operations
use tokio::fs;
use tokio_test::assert_ok;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support file operations
use tempfile::tempdir;
use tokio::fs;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support directory operations
use tokio::fs;
use tokio_test::{assert_err, assert_ok};

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support file operations
use std::io::prelude::*;
use tempfile::NamedTempFile;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support file operations
use tokio::fs;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support bind()
use std::time::Duration;
use tokio::io::{self, copy_bidirectional, AsyncReadExt, AsyncWriteExt};

View File

@ -1,5 +1,6 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
// Wasi does not support panic recovery or threading
#![cfg(all(feature = "full", not(target_os = "wasi")))]
use tokio::net::TcpListener;
use tokio::runtime;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support bind
use tokio::net::TcpListener;
use tokio::runtime;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support file operations
use tempfile::NamedTempFile;
use tokio::fs::File;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery
use std::task::{Context, Poll};
use std::{error::Error, pin::Pin};

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_test::assert_ok;

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery
use tokio::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf};

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery
use std::pin::Pin;
use std::task::{Context, Poll};

View File

@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery
struct PanicsOnDrop;

Some files were not shown because too many files have changed in this diff Show More