diff --git a/Cargo.toml b/Cargo.toml index 4a9e88a05..4a2a71ba8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = [ "tokio", "tokio-macros", - "tokio-sync", "tokio-test", "tokio-tls", "tokio-util", diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 4fb217612..ae2bd1bb1 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -47,8 +47,6 @@ jobs: displayName: Test sub crates - rust: beta crates: - tokio-sync: - - async-traits tokio-macros: [] tokio-test: [] tokio-util: [] diff --git a/ci/patch.toml b/ci/patch.toml index 333c4dfff..22311cf9a 100644 --- a/ci/patch.toml +++ b/ci/patch.toml @@ -3,6 +3,6 @@ [patch.crates-io] tokio = { path = "tokio" } tokio-macros = { path = "tokio-macros" } -tokio-sync = { path = "tokio-sync" } +tokio-test = { path = "tokio-test" } tokio-tls = { path = "tokio-tls" } tokio-util = { path = "tokio-util" } diff --git a/tokio-sync/CHANGELOG.md b/tokio-sync/CHANGELOG.md deleted file mode 100644 index 440335031..000000000 --- a/tokio-sync/CHANGELOG.md +++ /dev/null @@ -1,76 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -### Changed -- rename `Lock` -> `Mutex` and make it more like `std::sync::Mutex` (#1573). - -### Added -- `Barrier`, an async version of `std::sync::Barrier` (#1571). - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 23, 2019) - -- Track `tokio` version number - -# 0.2.0-alpha.2 (August 17, 2019) - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -# 0.1.6 (June 4, 2019) - -### Added -- Add Sync impl for Lock (#1117). - -# 0.1.5 (April 22, 2019) - -### Added -- Add asynchronous mutual exclusion primitive (#964). - -# 0.1.4 (March 13, 2019) - -### Fixed -- Fix memory leak on channel drop (#917). - -### Added -- `std::error::Error` implementation for `oneshot`, `watch` error types (#967). - -# 0.1.3 (March 1, 2019) - -### Added -- `Watch`, a single value broadcast channel (#922). -- `std::error::Error` implementation for more `mpsc` types (#937). - -# 0.1.2 (February 20, 2019) - -### Fixes -- `mpsc` and `Semaphore` when releasing permits (#904). -- `oneshot` task handle leak (#911). - -### Changes -- Performance improvements in `AtomicTask` (#892). -- Improved assert message when creating a channel with bound of zero (#906). - -### Adds -- `AtomicTask::take_task` (#895). - -# 0.1.1 (February 1, 2019) - -### Fixes -- Panic when creating a channel with bound 0 (#879). - -# 0.1.0 (January 24, 2019) - -- Initial Release diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml deleted file mode 100644 index bb2fd6073..000000000 --- a/tokio-sync/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -name = "tokio-sync" -# When releasing to crates.io: -# - Remove path dependencies -# - Update html_root_url. -# - Update doc url -# - Cargo.toml -# - Update CHANGELOG.md. -# - Create "v0.2.x" git tag. -version = "0.2.0-alpha.6" -edition = "2018" -authors = ["Tokio Contributors "] -license = "MIT" -repository = "https://github.com/tokio-rs/tokio" -homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-sync/0.2.0-alpha.6/tokio_sync" -description = """ -Synchronization utilities. -""" -categories = ["asynchronous"] - -[features] -async-traits = ["futures-sink-preview"] - -[dependencies] -fnv = "1.0.6" -futures-core-preview = { version = "=0.3.0-alpha.19" } -futures-sink-preview = { version = "=0.3.0-alpha.19", optional = true } -futures-util-preview = { version = "=0.3.0-alpha.19" } - -[dev-dependencies] -tokio = { version = "0.2.0-alpha.6", path = "../tokio" } -tokio-test = { version = "0.2.0-alpha.6", path = "../tokio-test" } - -env_logger = { version = "0.6", default-features = false } -loom = { version = "0.2.1", features = ["futures"] } - -[package.metadata.docs.rs] -all-features = true diff --git a/tokio-sync/LICENSE b/tokio-sync/LICENSE deleted file mode 100644 index cdb28b4b5..000000000 --- a/tokio-sync/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2019 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/tokio-sync/README.md b/tokio-sync/README.md deleted file mode 100644 index 6e209f69a..000000000 --- a/tokio-sync/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# tokio-sync - -Synchronization utilities - -## License - -This project is licensed under the [MIT license](LICENSE). - -### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in Tokio by you, shall be licensed as MIT, without any additional -terms or conditions. diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs deleted file mode 100644 index 95d8e5c7c..000000000 --- a/tokio-sync/benches/mpsc.rs +++ /dev/null @@ -1,536 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -type Medium = [usize; 64]; -type Large = [Medium; 64]; - -mod tokio { - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - use tokio_sync::mpsc::*; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::()); - }) - } - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(1); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(1); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn bounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::(1_000); - - for i in 0..1000 { - let _ = tx.try_send([[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn bounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn contended_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - // TODO make unbounded - let (tx, rx) = channel::(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} - -mod legacy { - use futures::sync::mpsc::*; - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::()); - }) - } - - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Ok(Async::Ready(Some(1))), rx.poll()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(0); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::(0); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded::(); - - for i in 0..1000 { - let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn multi_thread_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} diff --git a/tokio-sync/benches/oneshot.rs b/tokio-sync/benches/oneshot.rs deleted file mode 100644 index b2f37805a..000000000 --- a/tokio-sync/benches/oneshot.rs +++ /dev/null @@ -1,239 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -mod tokio { - use futures::{future, Async, Future}; - use test::Bencher; - use tokio_sync::oneshot; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin(mut f: F) -> Result { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } -} - -mod legacy { - use futures::sync::oneshot; - use futures::{future, Async, Future}; - use test::Bencher; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin(mut f: F) -> Result { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } -} diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs deleted file mode 100644 index 6055f024b..000000000 --- a/tokio-sync/src/lib.rs +++ /dev/null @@ -1,43 +0,0 @@ -#![doc(html_root_url = "https://docs.rs/tokio-sync/0.2.0-alpha.6")] -#![warn( - missing_debug_implementations, - missing_docs, - rust_2018_idioms, - unreachable_pub -)] -#![deny(intra_doc_link_resolution_failure)] -#![doc(test( - no_crate_inject, - attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) -))] - -//! Asynchronous synchronization primitives. -//! -//! This crate provides primitives for synchronizing asynchronous tasks. - -macro_rules! debug { - ($($t:tt)*) => { - if false { - println!($($t)*); - } - } -} - -macro_rules! if_fuzz { - ($($t:tt)*) => {{ - if false { $($t)* } - }} -} - -mod barrier; -mod loom; -pub mod mpsc; -mod mutex; -pub mod oneshot; -pub mod semaphore; -mod task; -pub mod watch; - -pub use barrier::{Barrier, BarrierWaitResult}; -pub use mutex::{Mutex, MutexGuard}; -pub use task::AtomicWaker; diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs deleted file mode 100644 index 564efc4fa..000000000 --- a/tokio-sync/src/loom.rs +++ /dev/null @@ -1,38 +0,0 @@ -pub(crate) mod future { - pub(crate) use crate::task::AtomicWaker; -} - -pub(crate) mod sync { - pub(crate) use std::sync::atomic; - pub(crate) use std::sync::Arc; - - use std::cell::UnsafeCell; - - pub(crate) struct CausalCell(UnsafeCell); - - impl CausalCell { - pub(crate) fn new(data: T) -> CausalCell { - CausalCell(UnsafeCell::new(data)) - } - - pub(crate) fn with(&self, f: F) -> R - where - F: FnOnce(*const T) -> R, - { - f(self.0.get()) - } - - pub(crate) fn with_mut(&self, f: F) -> R - where - F: FnOnce(*mut T) -> R, - { - f(self.0.get()) - } - } -} - -pub(crate) mod thread { - pub(crate) fn yield_now() { - ::std::sync::atomic::spin_loop_hint(); - } -} diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index d5f637230..95dc5d7f5 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -21,7 +21,6 @@ categories = ["asynchronous", "testing"] [dependencies] tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } -tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" } bytes = "0.4" futures-core-preview = "=0.3.0-alpha.19" diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 569a3fe71..a85f0c7a5 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -63,7 +63,7 @@ signal = [ "net-driver", "signal-hook-registry" ] -sync = ["tokio-sync"] +sync = ["fnv"] tcp = ["io", "net-driver"] timer = ["crossbeam-utils", "slab"] udp = ["io", "net-driver"] @@ -81,6 +81,8 @@ process = [ ] [dependencies] +tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } + futures-core-preview = "=0.3.0-alpha.19" futures-sink-preview = "=0.3.0-alpha.19" futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "channel"] } @@ -89,6 +91,7 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann bytes = { version = "0.4", optional = true } crossbeam-channel = { version = "0.3.8", optional = true } crossbeam-utils = { version = "0.6.0", optional = true } +fnv = { version = "1.0.6", optional = true } iovec = { version = "0.1", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } @@ -97,8 +100,6 @@ num_cpus = { version = "1.8.0", optional = true } pin-project = { version = "0.4", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } -tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" } -tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync", features = ["async-traits"] } [target.'cfg(unix)'.dependencies] crossbeam-queue = { version = "0.1.2", optional = true } @@ -124,7 +125,7 @@ flate2 = { version = "1", features = ["tokio"] } http = "0.1" httparse = "1.0" libc = "0.2" -loom = { version = "0.2.11", features = ["futures", "checkpoint"] } +loom = { version = "0.2.12", features = ["futures", "checkpoint"] } num_cpus = "1.0" rand = "0.7.2" serde = { version = "1.0", features = ["derive"] } diff --git a/tokio/benches/mpsc.rs b/tokio/benches/mpsc.rs new file mode 100644 index 000000000..0b97d55d3 --- /dev/null +++ b/tokio/benches/mpsc.rs @@ -0,0 +1,270 @@ +#![feature(test)] +#![warn(rust_2018_idioms)] + +extern crate test; + +use tokio::sync::mpsc::*; + +use futures::{future, Async, Future, Sink, Stream}; +use std::thread; +use test::Bencher; + +type Medium = [usize; 64]; +type Large = [Medium; 64]; + +#[bench] +fn bounded_new_medium(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&channel::(1_000)); + }) +} + +#[bench] +fn unbounded_new_medium(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&unbounded_channel::()); + }) +} +#[bench] +fn bounded_new_large(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&channel::(1_000)); + }) +} + +#[bench] +fn unbounded_new_large(b: &mut Bencher) { + b.iter(|| { + let _ = test::black_box(&unbounded_channel::()); + }) +} + +#[bench] +fn send_one_message(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel(1_000); + + // Send + tx.try_send(1).unwrap(); + + // Receive + assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); + }) +} + +#[bench] +fn send_one_message_large(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel::(1_000); + + // Send + let _ = tx.try_send([[0; 64]; 64]); + + // Receive + let _ = test::black_box(&rx.poll()); + }) +} + +#[bench] +fn bounded_rx_not_ready(b: &mut Bencher) { + let (_tx, mut rx) = channel::(1_000); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn bounded_tx_poll_ready(b: &mut Bencher) { + let (mut tx, _rx) = channel::(1); + b.iter(|| { + future::lazy(|| { + assert!(tx.poll_ready().unwrap().is_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn bounded_tx_poll_not_ready(b: &mut Bencher) { + let (mut tx, _rx) = channel::(1); + tx.try_send(1).unwrap(); + b.iter(|| { + future::lazy(|| { + assert!(tx.poll_ready().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn unbounded_rx_not_ready(b: &mut Bencher) { + let (_tx, mut rx) = unbounded_channel::(); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn unbounded_rx_not_ready_x5(b: &mut Bencher) { + let (_tx, mut rx) = unbounded_channel::(); + b.iter(|| { + future::lazy(|| { + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + assert!(rx.poll().unwrap().is_not_ready()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }) +} + +#[bench] +fn bounded_uncontended_1(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel(1_000); + + for i in 0..1000 { + tx.try_send(i).unwrap(); + // No need to create a task, because poll is not going to park. + assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); + } + }) +} + +#[bench] +fn bounded_uncontended_1_large(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel::(1_000); + + for i in 0..1000 { + let _ = tx.try_send([[i; 64]; 64]); + // No need to create a task, because poll is not going to park. + let _ = test::black_box(&rx.poll()); + } + }) +} + +#[bench] +fn bounded_uncontended_2(b: &mut Bencher) { + b.iter(|| { + let (mut tx, mut rx) = channel(1000); + + for i in 0..1000 { + tx.try_send(i).unwrap(); + } + + for i in 0..1000 { + // No need to create a task, because poll is not going to park. + assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); + } + }) +} + +#[bench] +fn contended_unbounded_tx(b: &mut Bencher) { + let mut threads = vec![]; + let mut txs = vec![]; + + for _ in 0..4 { + let (tx, rx) = ::std::sync::mpsc::channel::>(); + txs.push(tx); + + threads.push(thread::spawn(move || { + for mut tx in rx.iter() { + for i in 0..1_000 { + tx.try_send(i).unwrap(); + } + } + })); + } + + b.iter(|| { + // TODO make unbounded + let (tx, rx) = channel::(1_000_000); + + for th in &txs { + th.send(tx.clone()).unwrap(); + } + + drop(tx); + + let rx = rx.wait().take(4 * 1_000); + + for v in rx { + let _ = test::black_box(v); + } + }); + + drop(txs); + + for th in threads { + th.join().unwrap(); + } +} + +#[bench] +fn contended_bounded_tx(b: &mut Bencher) { + const THREADS: usize = 4; + const ITERS: usize = 100; + + let mut threads = vec![]; + let mut txs = vec![]; + + for _ in 0..THREADS { + let (tx, rx) = ::std::sync::mpsc::channel::>(); + txs.push(tx); + + threads.push(thread::spawn(move || { + for tx in rx.iter() { + let mut tx = tx.wait(); + for i in 0..ITERS { + tx.send(i as i32).unwrap(); + } + } + })); + } + + b.iter(|| { + let (tx, rx) = channel::(1); + + for th in &txs { + th.send(tx.clone()).unwrap(); + } + + drop(tx); + + let rx = rx.wait().take(THREADS * ITERS); + + for v in rx { + let _ = test::black_box(v); + } + }); + + drop(txs); + + for th in threads { + th.join().unwrap(); + } +} diff --git a/tokio/benches/oneshot.rs b/tokio/benches/oneshot.rs new file mode 100644 index 000000000..a7f43c2f6 --- /dev/null +++ b/tokio/benches/oneshot.rs @@ -0,0 +1,120 @@ +#![feature(test)] +#![warn(rust_2018_idioms)] + +extern crate test; + +use tokio::sync::oneshot; + +use futures::{future, Async, Future}; +use test::Bencher; + +#[bench] +fn new(b: &mut Bencher) { + b.iter(|| { + let _ = ::test::black_box(&oneshot::channel::()); + }) +} + +#[bench] +fn same_thread_send_recv(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = oneshot::channel(); + + let _ = tx.send(1); + + assert_eq!(Async::Ready(1), rx.poll().unwrap()); + }); +} + +#[bench] +fn same_thread_recv_multi_send_recv(b: &mut Bencher) { + b.iter(|| { + let (tx, mut rx) = oneshot::channel(); + + future::lazy(|| { + let _ = rx.poll(); + let _ = rx.poll(); + let _ = rx.poll(); + let _ = rx.poll(); + + let _ = tx.send(1); + assert_eq!(Async::Ready(1), rx.poll().unwrap()); + + Ok::<_, ()>(()) + }) + .wait() + .unwrap(); + }); +} + +#[bench] +fn multi_thread_send_recv(b: &mut Bencher) { + const MAX: usize = 10_000_000; + + use std::thread; + + fn spin(mut f: F) -> Result { + use futures::Async::Ready; + loop { + match f.poll() { + Ok(Ready(v)) => return Ok(v), + Ok(_) => {} + Err(e) => return Err(e), + } + } + } + + let mut ping_txs = vec![]; + let mut ping_rxs = vec![]; + let mut pong_txs = vec![]; + let mut pong_rxs = vec![]; + + for _ in 0..MAX { + let (tx, rx) = oneshot::channel::<()>(); + + ping_txs.push(Some(tx)); + ping_rxs.push(Some(rx)); + + let (tx, rx) = oneshot::channel::<()>(); + + pong_txs.push(Some(tx)); + pong_rxs.push(Some(rx)); + } + + thread::spawn(move || { + future::lazy(|| { + for i in 0..MAX { + let ping_rx = ping_rxs[i].take().unwrap(); + let pong_tx = pong_txs[i].take().unwrap(); + + if spin(ping_rx).is_err() { + return Ok(()); + } + + pong_tx.send(()).unwrap(); + } + + Ok::<(), ()>(()) + }) + .wait() + .unwrap(); + }); + + future::lazy(|| { + let mut i = 0; + + b.iter(|| { + let ping_tx = ping_txs[i].take().unwrap(); + let pong_rx = pong_rxs[i].take().unwrap(); + + ping_tx.send(()).unwrap(); + spin(pong_rx).unwrap(); + + i += 1; + }); + + Ok::<(), ()>(()) + }) + .wait() + .unwrap(); +} diff --git a/tokio/benches/thread_pool.rs b/tokio/benches/thread_pool.rs index 3e1462f3f..97b25f18c 100644 --- a/tokio/benches/thread_pool.rs +++ b/tokio/benches/thread_pool.rs @@ -3,7 +3,7 @@ extern crate test; use tokio::executor::thread_pool::{Builder, Spawner}; -use tokio_sync::oneshot; +use tokio::sync::oneshot; use std::future::Future; use std::pin::Pin; diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs index 16faa03e4..2ad573d8f 100644 --- a/tokio/src/executor/blocking/mod.rs +++ b/tokio/src/executor/blocking/mod.rs @@ -3,7 +3,7 @@ use crate::executor::loom::sync::{Arc, Condvar, Mutex}; use crate::executor::loom::thread; #[cfg(feature = "blocking")] -use tokio_sync::oneshot; +use crate::sync::oneshot; use std::cell::Cell; use std::collections::VecDeque; diff --git a/tokio/src/executor/thread_pool/shutdown.rs b/tokio/src/executor/thread_pool/shutdown.rs index 40d8f04ac..b7c4177f0 100644 --- a/tokio/src/executor/thread_pool/shutdown.rs +++ b/tokio/src/executor/thread_pool/shutdown.rs @@ -4,8 +4,7 @@ //! dropped, the `Receiver` receives a notification. use crate::executor::loom::sync::Arc; - -use tokio_sync::oneshot; +use crate::sync::oneshot; #[derive(Debug, Clone)] pub(super) struct Sender { diff --git a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs index 34a07ea87..9cd99a869 100644 --- a/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs +++ b/tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs @@ -3,8 +3,7 @@ use crate::loom::{ atomic::{AtomicUsize, Ordering}, CausalCell, }; - -use tokio_sync::AtomicWaker; +use crate::sync::AtomicWaker; #[derive(Debug)] pub(crate) struct ScheduledIo { diff --git a/tokio/src/net/unix/incoming.rs b/tokio/src/net/unix/incoming.rs index 542b5e1d3..a66f21da0 100644 --- a/tokio/src/net/unix/incoming.rs +++ b/tokio/src/net/unix/incoming.rs @@ -1,5 +1,3 @@ -#![cfg(feature = "async-traits")] - use super::{UnixListener, UnixStream}; use futures_core::ready; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 3a36dc907..3cf8eff32 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -90,7 +90,6 @@ impl UnixListener { /// /// This method returns an implementation of the `Stream` trait which /// resolves to the sockets the are accepted on this listener. - #[cfg(feature = "async-traits")] pub fn incoming(self) -> super::Incoming { super::Incoming::new(self) } diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index 4447ca5ce..977e3a0f8 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -6,7 +6,6 @@ mod datagram; pub use self::datagram::UnixDatagram; mod incoming; -#[cfg(feature = "async-traits")] pub use self::incoming::Incoming; mod listener; diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 7d2fd7a91..e70d0495d 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -1,6 +1,6 @@ use crate::signal::os::{OsExtraData, OsStorage}; -use tokio_sync::mpsc::Sender; +use crate::sync::mpsc::Sender; use lazy_static::lazy_static; use std::ops; diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 075e788c0..87871503d 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -8,8 +8,7 @@ use crate::io::AsyncRead; use crate::net::util::PollEvented; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; - -use tokio_sync::mpsc::{channel, Receiver}; +use crate::sync::mpsc::{channel, Receiver}; use futures_core::stream::Stream; use libc::c_int; diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index abde334b1..1e68d6287 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -7,9 +7,8 @@ #![cfg(windows)] -use super::registry::{globals, EventId, EventInfo, Init, Storage}; - -use tokio_sync::mpsc::{channel, Receiver}; +use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage}; +use crate::sync::mpsc::{channel, Receiver}; use futures_core::stream::Stream; use std::convert::TryFrom; diff --git a/tokio-sync/src/barrier.rs b/tokio/src/sync/barrier.rs similarity index 97% rename from tokio-sync/src/barrier.rs rename to tokio/src/sync/barrier.rs index 6a409e26f..1582120e0 100644 --- a/tokio-sync/src/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -1,4 +1,5 @@ -use crate::watch; +use crate::sync::watch; + use std::sync::Mutex; /// A barrier enables multiple threads to synchronize the beginning of some computation. @@ -6,8 +7,8 @@ use std::sync::Mutex; /// ``` /// # #[tokio::main] /// # async fn main() { +/// use tokio::sync::Barrier; /// use std::sync::Arc; -/// use tokio_sync::Barrier; /// use futures_util::future::join_all; /// /// let mut handles = Vec::with_capacity(10); @@ -49,7 +50,7 @@ impl Barrier { /// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all /// threads at once when the `n`th thread calls `wait`. pub fn new(mut n: usize) -> Barrier { - let (waker, wait) = crate::watch::channel(0); + let (waker, wait) = crate::sync::watch::channel(0); if n == 0 { // if n is 0, it's not clear what behavior the user wants. diff --git a/tokio/src/sync/loom.rs b/tokio/src/sync/loom.rs new file mode 100644 index 000000000..1b5a5c9d7 --- /dev/null +++ b/tokio/src/sync/loom.rs @@ -0,0 +1,48 @@ +#[cfg(not(all(test, loom)))] +mod imp { + pub(crate) mod future { + pub(crate) use crate::sync::task::AtomicWaker; + } + + pub(crate) mod sync { + pub(crate) use std::sync::atomic; + pub(crate) use std::sync::Arc; + + use std::cell::UnsafeCell; + + pub(crate) struct CausalCell(UnsafeCell); + + impl CausalCell { + pub(crate) fn new(data: T) -> CausalCell { + CausalCell(UnsafeCell::new(data)) + } + + pub(crate) fn with(&self, f: F) -> R + where + F: FnOnce(*const T) -> R, + { + f(self.0.get()) + } + + pub(crate) fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut T) -> R, + { + f(self.0.get()) + } + } + } + + pub(crate) mod thread { + pub(crate) fn yield_now() { + ::std::sync::atomic::spin_loop_hint(); + } + } +} + +#[cfg(all(test, loom))] +mod imp { + pub(crate) use loom::*; +} + +pub(crate) use self::imp::*; diff --git a/tokio/src/sync.rs b/tokio/src/sync/mod.rs similarity index 52% rename from tokio/src/sync.rs rename to tokio/src/sync/mod.rs index 368472355..84f6bd98b 100644 --- a/tokio/src/sync.rs +++ b/tokio/src/sync/mod.rs @@ -13,6 +13,46 @@ //! - [watch](watch/index.html), a single-producer, multi-consumer channel that //! only stores the **most recently** sent value. -pub use tokio_sync::Barrier; -pub use tokio_sync::{mpsc, oneshot, watch}; -pub use tokio_sync::{Mutex, MutexGuard}; +macro_rules! debug { + ($($t:tt)*) => { + if false { + println!($($t)*); + } + } +} + +macro_rules! if_loom { + ($($t:tt)*) => {{ + #[cfg(loom)] + const LOOM: bool = true; + #[cfg(not(loom))] + const LOOM: bool = false; + + if LOOM { + $($t)* + } + }} +} + +mod barrier; +pub use barrier::{Barrier, BarrierWaitResult}; + +mod loom; + +pub mod mpsc; + +mod mutex; +pub use mutex::{Mutex, MutexGuard}; + +pub mod oneshot; + +pub mod semaphore; + +mod task; +pub use task::AtomicWaker; + +pub mod watch; + +/// Unit tests +#[cfg(test)] +mod tests; diff --git a/tokio-sync/src/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs similarity index 99% rename from tokio-sync/src/mpsc/block.rs rename to tokio/src/sync/mpsc/block.rs index 7d7f2e539..aea693849 100644 --- a/tokio-sync/src/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -1,8 +1,9 @@ -use crate::loom::{ +use crate::sync::loom::{ sync::atomic::{AtomicPtr, AtomicUsize}, sync::CausalCell, thread, }; + use std::mem::MaybeUninit; use std::ops; use std::ptr::{self, NonNull}; @@ -365,7 +366,7 @@ impl Values { let mut vals = MaybeUninit::uninit(); // When fuzzing, `CausalCell` needs to be initialized. - if_fuzz! { + if_loom! { let p = vals.as_mut_ptr() as *mut CausalCell>; for i in 0..BLOCK_CAP { p.add(i) diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs similarity index 97% rename from tokio-sync/src/mpsc/bounded.rs rename to tokio/src/sync/mpsc/bounded.rs index 711173aee..787dd5077 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,10 +1,9 @@ -use super::chan; +use crate::sync::mpsc::chan; +use crate::sync::semaphore; use std::fmt; -use std::task::{Context, Poll}; - -#[cfg(feature = "async-traits")] use std::pin::Pin; +use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. /// @@ -104,7 +103,7 @@ pub struct RecvError(()); /// ``` pub fn channel(buffer: usize) -> (Sender, Receiver) { assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); - let semaphore = (crate::semaphore::Semaphore::new(buffer), buffer); + let semaphore = (semaphore::Semaphore::new(buffer), buffer); let (tx, rx) = chan::channel(semaphore); let tx = Sender::new(tx); @@ -115,7 +114,7 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { /// Channel semaphore is a tuple of the semaphore implementation and a `usize` /// representing the channel bound. -type Semaphore = (crate::semaphore::Semaphore, usize); +type Semaphore = (semaphore::Semaphore, usize); impl Receiver { pub(crate) fn new(chan: chan::Rx) -> Receiver { @@ -181,7 +180,6 @@ impl Receiver { } } -#[cfg(feature = "async-traits")] impl futures_core::Stream for Receiver { type Item = T; @@ -244,7 +242,6 @@ impl Sender { } } -#[cfg(feature = "async-traits")] impl futures_sink::Sink for Sender { type Error = SendError; diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs similarity index 98% rename from tokio-sync/src/mpsc/chan.rs rename to tokio/src/sync/mpsc/chan.rs index 122537fc1..fe5ff9044 100644 --- a/tokio-sync/src/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -1,9 +1,10 @@ -use super::list; -use crate::loom::{ +use crate::sync::loom::{ future::AtomicWaker, sync::atomic::AtomicUsize, sync::{Arc, CausalCell}, }; +use crate::sync::mpsc::list; + use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Relaxed}; @@ -335,7 +336,7 @@ impl Drop for Chan { } } -use crate::semaphore::TryAcquireError; +use crate::sync::semaphore::TryAcquireError; impl From for TrySendError { fn from(src: TryAcquireError) -> TrySendError { @@ -351,9 +352,9 @@ impl From for TrySendError { // ===== impl Semaphore for (::Semaphore, capacity) ===== -use crate::semaphore::Permit; +use crate::sync::semaphore::Permit; -impl Semaphore for (crate::semaphore::Semaphore, usize) { +impl Semaphore for (crate::sync::semaphore::Semaphore, usize) { type Permit = Permit; fn new_permit() -> Permit { diff --git a/tokio-sync/src/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs similarity index 99% rename from tokio-sync/src/mpsc/list.rs rename to tokio/src/sync/mpsc/list.rs index b3019a51e..a1295bb05 100644 --- a/tokio-sync/src/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -1,10 +1,11 @@ //! A concurrent, lock-free, FIFO list. -use super::block::{self, Block}; -use crate::loom::{ +use crate::sync::loom::{ sync::atomic::{AtomicPtr, AtomicUsize}, thread, }; +use crate::sync::mpsc::block::{self, Block}; + use std::fmt; use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; diff --git a/tokio-sync/src/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs similarity index 92% rename from tokio-sync/src/mpsc/mod.rs rename to tokio/src/sync/mpsc/mod.rs index 320980c75..3b95b9544 100644 --- a/tokio-sync/src/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -34,13 +34,16 @@ //! [`Sender`]: struct.Sender.html //! [`Receiver`]: struct.Receiver.html -mod block; -mod bounded; -mod chan; -mod list; -mod unbounded; +pub(super) mod block; +mod bounded; pub use self::bounded::{channel, Receiver, Sender}; + +mod chan; + +pub(super) mod list; + +mod unbounded; pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender}; pub mod error { @@ -54,8 +57,11 @@ pub mod error { /// /// This value must be a power of 2. It also must be smaller than the number of /// bits in `usize`. -#[cfg(target_pointer_width = "64")] +#[cfg(all(target_pointer_width = "64", not(loom)))] const BLOCK_CAP: usize = 32; -#[cfg(not(target_pointer_width = "64"))] +#[cfg(all(not(target_pointer_width = "64"), not(loom)))] const BLOCK_CAP: usize = 16; + +#[cfg(loom)] +const BLOCK_CAP: usize = 2; diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs similarity index 97% rename from tokio-sync/src/mpsc/unbounded.rs rename to tokio/src/sync/mpsc/unbounded.rs index e4b99a759..5a73771e8 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -1,10 +1,9 @@ -use super::chan; -use crate::loom::sync::atomic::AtomicUsize; +use crate::sync::loom::sync::atomic::AtomicUsize; +use crate::sync::mpsc::chan; use std::fmt; use std::task::{Context, Poll}; -#[cfg(feature = "async-traits")] use std::pin::Pin; /// Send values to the associated `UnboundedReceiver`. @@ -146,7 +145,6 @@ impl UnboundedReceiver { } } -#[cfg(feature = "async-traits")] impl futures_core::Stream for UnboundedReceiver { type Item = T; @@ -167,7 +165,6 @@ impl UnboundedSender { } } -#[cfg(feature = "async-traits")] impl futures_sink::Sink for UnboundedSender { type Error = UnboundedSendError; diff --git a/tokio-sync/src/mutex.rs b/tokio/src/sync/mutex.rs similarity index 98% rename from tokio-sync/src/mutex.rs rename to tokio/src/sync/mutex.rs index c0c75776d..ae45c666c 100644 --- a/tokio-sync/src/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -29,7 +29,7 @@ //! [`Mutex`]: struct.Mutex.html //! [`MutexGuard`]: struct.MutexGuard.html -use crate::semaphore; +use crate::sync::semaphore; use futures_util::future::poll_fn; use std::cell::UnsafeCell; @@ -69,6 +69,7 @@ unsafe impl Sync for Mutex where T: Send {} unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {} #[test] +#[cfg(not(loom))] fn bounds() { fn check() {} check::>(); diff --git a/tokio-sync/src/oneshot.rs b/tokio/src/sync/oneshot.rs similarity index 99% rename from tokio-sync/src/oneshot.rs rename to tokio/src/sync/oneshot.rs index 8ed1672fc..7cf4eb8f1 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -1,6 +1,6 @@ //! A channel for sending a single message between asynchronous tasks. -use crate::loom::sync::{atomic::AtomicUsize, Arc, CausalCell}; +use crate::sync::loom::sync::{atomic::AtomicUsize, Arc, CausalCell}; use futures_core::ready; use std::fmt; diff --git a/tokio-sync/src/semaphore.rs b/tokio/src/sync/semaphore.rs similarity index 99% rename from tokio-sync/src/semaphore.rs rename to tokio/src/sync/semaphore.rs index a1085c6c2..1120be077 100644 --- a/tokio-sync/src/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -8,7 +8,7 @@ //! section. If no permits are available, then acquiring the semaphore returns //! `Pending`. The task is woken once a permit becomes available. -use crate::loom::{ +use crate::sync::loom::{ future::AtomicWaker, sync::{ atomic::{AtomicPtr, AtomicUsize}, @@ -555,7 +555,7 @@ impl Permit { /// # Examples /// /// ``` - /// use tokio_sync::semaphore::Permit; + /// use tokio::sync::semaphore::Permit; /// /// let permit = Permit::new(); /// assert!(!permit.is_acquired()); diff --git a/tokio-sync/src/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs similarity index 99% rename from tokio-sync/src/task/atomic_waker.rs rename to tokio/src/sync/task/atomic_waker.rs index 1cd891660..ff952a186 100644 --- a/tokio-sync/src/task/atomic_waker.rs +++ b/tokio/src/sync/task/atomic_waker.rs @@ -1,5 +1,5 @@ -use crate::loom::sync::atomic::{self, AtomicUsize}; -use crate::loom::sync::CausalCell; +use crate::sync::loom::sync::atomic::{self, AtomicUsize}; +use crate::sync::loom::sync::CausalCell; use std::fmt; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; diff --git a/tokio-sync/src/task/mod.rs b/tokio/src/sync/task/mod.rs similarity index 99% rename from tokio-sync/src/task/mod.rs rename to tokio/src/sync/task/mod.rs index cff96656b..446ff7126 100644 --- a/tokio-sync/src/task/mod.rs +++ b/tokio/src/sync/task/mod.rs @@ -1,5 +1,4 @@ //! Thread-safe task notification primitives. mod atomic_waker; - pub use self::atomic_waker::AtomicWaker; diff --git a/tokio-sync/tests/fuzz_atomic_waker.rs b/tokio/src/sync/tests/loom_atomic_waker.rs similarity index 84% rename from tokio-sync/tests/fuzz_atomic_waker.rs rename to tokio/src/sync/tests/loom_atomic_waker.rs index de274d32c..81e200ff9 100644 --- a/tokio-sync/tests/fuzz_atomic_waker.rs +++ b/tokio/src/sync/tests/loom_atomic_waker.rs @@ -1,12 +1,4 @@ -#![warn(rust_2018_idioms)] - -#[macro_use] -extern crate loom; - -#[allow(dead_code)] -#[path = "../src/task/atomic_waker.rs"] -mod atomic_waker; -use crate::atomic_waker::AtomicWaker; +use crate::sync::task::AtomicWaker; use futures_util::future::poll_fn; use loom::future::block_on; diff --git a/tokio-sync/tests/fuzz_list.rs b/tokio/src/sync/tests/loom_list.rs similarity index 78% rename from tokio-sync/tests/fuzz_list.rs rename to tokio/src/sync/tests/loom_list.rs index fa1e9f5e1..4f7746d58 100644 --- a/tokio-sync/tests/fuzz_list.rs +++ b/tokio/src/sync/tests/loom_list.rs @@ -1,30 +1,11 @@ -#![warn(rust_2018_idioms)] - -#[macro_use] -extern crate loom; - -macro_rules! if_fuzz { - ($($t:tt)*) => { - $($t)* - } -} - -#[path = "../src/mpsc/list.rs"] -#[allow(warnings)] -mod list; - -#[path = "../src/mpsc/block.rs"] -#[allow(warnings)] -mod block; - -const BLOCK_CAP: usize = 2; +use crate::sync::mpsc::list; use loom::thread; use std::sync::Arc; #[test] fn smoke() { - use crate::block::Read::*; + use crate::sync::mpsc::block::Read::*; const NUM_TX: usize = 2; const NUM_MSG: usize = 2; diff --git a/tokio-sync/tests/fuzz_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs similarity index 64% rename from tokio-sync/tests/fuzz_mpsc.rs rename to tokio/src/sync/tests/loom_mpsc.rs index 657bc8ffd..748ae9e1c 100644 --- a/tokio-sync/tests/fuzz_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -1,21 +1,4 @@ -#![warn(rust_2018_idioms)] - -#[macro_use] -extern crate loom; - -macro_rules! if_fuzz { - ($($t:tt)*) => { - $($t)* - } -} - -#[path = "../src/mpsc/mod.rs"] -#[allow(warnings)] -mod mpsc; - -#[path = "../src/semaphore.rs"] -#[allow(warnings)] -mod semaphore; +use crate::sync::mpsc; use futures_util::future::poll_fn; use loom::future::block_on; diff --git a/tokio-sync/tests/fuzz_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs similarity index 95% rename from tokio-sync/tests/fuzz_oneshot.rs rename to tokio/src/sync/tests/loom_oneshot.rs index 8d4bea7d8..521047368 100644 --- a/tokio-sync/tests/fuzz_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -1,14 +1,8 @@ -#![warn(rust_2018_idioms)] - -#[path = "../src/oneshot.rs"] -#[allow(warnings)] -mod oneshot; - -use loom; -use loom::future::block_on; -use loom::thread; +use crate::sync::oneshot; use futures_util::future::poll_fn; +use loom::future::block_on; +use loom::thread; use std::task::Poll::{Pending, Ready}; #[test] diff --git a/tokio-sync/tests/fuzz_semaphore.rs b/tokio/src/sync/tests/loom_semaphore.rs similarity index 95% rename from tokio-sync/tests/fuzz_semaphore.rs rename to tokio/src/sync/tests/loom_semaphore.rs index 018fa0a98..d14c76686 100644 --- a/tokio-sync/tests/fuzz_semaphore.rs +++ b/tokio/src/sync/tests/loom_semaphore.rs @@ -1,13 +1,4 @@ -#![warn(rust_2018_idioms)] - -#[macro_use] -extern crate loom; - -#[path = "../src/semaphore.rs"] -#[allow(warnings)] -mod semaphore; - -use crate::semaphore::*; +use crate::sync::semaphore::*; use futures_core::ready; use futures_util::future::poll_fn; diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs new file mode 100644 index 000000000..8e627cb84 --- /dev/null +++ b/tokio/src/sync/tests/mod.rs @@ -0,0 +1,7 @@ +#![cfg(loom)] + +mod loom_atomic_waker; +mod loom_list; +mod loom_mpsc; +mod loom_oneshot; +mod loom_semaphore; diff --git a/tokio-sync/src/watch.rs b/tokio/src/sync/watch.rs similarity index 98% rename from tokio-sync/src/watch.rs rename to tokio/src/sync/watch.rs index e6b33136f..30f1603f6 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio/src/sync/watch.rs @@ -51,7 +51,7 @@ //! [`Sender::closed`]: struct.Sender.html#method.closed //! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref -use crate::task::AtomicWaker; +use crate::sync::task::AtomicWaker; use core::task::Poll::{Pending, Ready}; use core::task::{Context, Poll}; @@ -62,11 +62,8 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; -#[cfg(feature = "async-traits")] use futures_core::ready; -#[cfg(feature = "async-traits")] use futures_util::pin_mut; -#[cfg(feature = "async-traits")] use std::pin::Pin; /// Receives values from the associated [`Sender`](struct.Sender.html). @@ -298,7 +295,6 @@ impl Receiver { } } -#[cfg(feature = "async-traits")] impl futures_core::Stream for Receiver { type Item = T; @@ -399,7 +395,6 @@ impl Sender { } } -#[cfg(feature = "async-traits")] impl futures_sink::Sink for Sender { type Error = error::SendError; diff --git a/tokio/src/timer/timer/entry.rs b/tokio/src/timer/timer/entry.rs index a31dda4a7..f4f43e6bf 100644 --- a/tokio/src/timer/timer/entry.rs +++ b/tokio/src/timer/timer/entry.rs @@ -1,9 +1,8 @@ +use crate::sync::AtomicWaker; use crate::timer::atomic::AtomicU64; use crate::timer::timer::{HandlePriv, Inner}; use crate::timer::Error; -use tokio_sync::AtomicWaker; - use crossbeam_utils::CachePadded; use std::cell::UnsafeCell; use std::ptr; diff --git a/tokio/tests/support/mock_pool.rs b/tokio/tests/support/mock_pool.rs index 501540f15..acdb8dbc4 100644 --- a/tokio/tests/support/mock_pool.rs +++ b/tokio/tests/support/mock_pool.rs @@ -1,4 +1,4 @@ -use tokio_sync::oneshot; +use tokio::sync::oneshot; use std::cell::RefCell; use std::collections::VecDeque; diff --git a/tokio-sync/tests/atomic_waker.rs b/tokio/tests/sync_atomic_waker.rs similarity index 96% rename from tokio-sync/tests/atomic_waker.rs rename to tokio/tests/sync_atomic_waker.rs index 5d3863309..77d6a73df 100644 --- a/tokio-sync/tests/atomic_waker.rs +++ b/tokio/tests/sync_atomic_waker.rs @@ -1,9 +1,10 @@ #![warn(rust_2018_idioms)] -use std::task::Waker; -use tokio_sync::AtomicWaker; +use tokio::sync::AtomicWaker; use tokio_test::task::MockTask; +use std::task::Waker; + trait AssertSend: Send {} trait AssertSync: Send {} diff --git a/tokio-sync/tests/barrier.rs b/tokio/tests/sync_barrier.rs similarity index 98% rename from tokio-sync/tests/barrier.rs rename to tokio/tests/sync_barrier.rs index d50522242..170e8c2b3 100644 --- a/tokio-sync/tests/barrier.rs +++ b/tokio/tests/sync_barrier.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] -use tokio_sync::Barrier; +use tokio::sync::Barrier; + use tokio_test::task::spawn; use tokio_test::{assert_pending, assert_ready}; diff --git a/tokio-sync/tests/errors.rs b/tokio/tests/sync_errors.rs similarity index 83% rename from tokio-sync/tests/errors.rs rename to tokio/tests/sync_errors.rs index 38ac1bf68..e68fe0819 100644 --- a/tokio-sync/tests/errors.rs +++ b/tokio/tests/sync_errors.rs @@ -4,7 +4,7 @@ fn is_error() {} #[test] fn mpsc_error_bound() { - use tokio_sync::mpsc::error; + use tokio::sync::mpsc::error; is_error::(); is_error::>(); @@ -15,7 +15,7 @@ fn mpsc_error_bound() { #[test] fn oneshot_error_bound() { - use tokio_sync::oneshot::error; + use tokio::sync::oneshot::error; is_error::(); is_error::(); @@ -23,7 +23,7 @@ fn oneshot_error_bound() { #[test] fn watch_error_bound() { - use tokio_sync::watch::error; + use tokio::sync::watch::error; is_error::>(); } diff --git a/tokio-sync/tests/mpsc.rs b/tokio/tests/sync_mpsc.rs similarity index 99% rename from tokio-sync/tests/mpsc.rs rename to tokio/tests/sync_mpsc.rs index 5b40dacdc..891e23618 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1,6 +1,6 @@ #![warn(rust_2018_idioms)] -use tokio_sync::mpsc; +use tokio::sync::mpsc; use tokio_test::task::MockTask; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -53,7 +53,6 @@ async fn async_send_recv_with_buffer() { } #[test] -#[cfg(feature = "async-traits")] fn send_sink_recv_with_buffer() { use futures_core::Stream; use futures_sink::Sink; @@ -166,7 +165,6 @@ async fn async_send_recv_unbounded() { } #[test] -#[cfg(feature = "async-traits")] fn sink_send_recv_unbounded() { use futures_core::Stream; use futures_sink::Sink; diff --git a/tokio-sync/tests/mutex.rs b/tokio/tests/sync_mutex.rs similarity index 98% rename from tokio-sync/tests/mutex.rs rename to tokio/tests/sync_mutex.rs index 47fcb449d..b100f270b 100644 --- a/tokio-sync/tests/mutex.rs +++ b/tokio/tests/sync_mutex.rs @@ -1,10 +1,11 @@ #![warn(rust_2018_idioms)] -use std::sync::Arc; -use tokio_sync::Mutex; +use tokio::sync::Mutex; use tokio_test::task::spawn; use tokio_test::{assert_pending, assert_ready}; +use std::sync::Arc; + #[test] fn straight_execution() { let l = Mutex::new(100); diff --git a/tokio-sync/tests/oneshot.rs b/tokio/tests/sync_oneshot.rs similarity index 99% rename from tokio-sync/tests/oneshot.rs rename to tokio/tests/sync_oneshot.rs index 8439f2942..1bc4443a1 100644 --- a/tokio-sync/tests/oneshot.rs +++ b/tokio/tests/sync_oneshot.rs @@ -1,6 +1,6 @@ #![warn(rust_2018_idioms)] -use tokio_sync::oneshot; +use tokio::sync::oneshot; use tokio_test::task::MockTask; use tokio_test::*; diff --git a/tokio-sync/tests/semaphore.rs b/tokio/tests/sync_semaphore.rs similarity index 98% rename from tokio-sync/tests/semaphore.rs rename to tokio/tests/sync_semaphore.rs index c1280f36c..1b421248b 100644 --- a/tokio-sync/tests/semaphore.rs +++ b/tokio/tests/sync_semaphore.rs @@ -1,6 +1,6 @@ #![warn(rust_2018_idioms)] -use tokio_sync::semaphore::{Permit, Semaphore}; +use tokio::sync::semaphore::{Permit, Semaphore}; use tokio_test::task::MockTask; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok}; diff --git a/tokio-sync/tests/watch.rs b/tokio/tests/sync_watch.rs similarity index 90% rename from tokio-sync/tests/watch.rs rename to tokio/tests/sync_watch.rs index 573a3af0b..4d73bc81f 100644 --- a/tokio-sync/tests/watch.rs +++ b/tokio/tests/sync_watch.rs @@ -1,31 +1,9 @@ #![warn(rust_2018_idioms)] -use tokio_sync::watch; +use tokio::sync::watch; use tokio_test::task::spawn; use tokio_test::{assert_pending, assert_ready}; -/* -macro_rules! assert_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::Ready(v)) => v, - Ok(_) => panic!("not ready"), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - -macro_rules! assert_not_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::NotReady) => {} - Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v), - Err(e) => panic!("error = {:?}", e), - } - }}; -} -*/ - #[test] fn single_rx_recv_ref() { let (tx, mut rx) = watch::channel("one"); @@ -97,7 +75,6 @@ fn single_rx_recv() { } #[test] -#[cfg(feature = "async-traits")] fn stream_impl() { use tokio::prelude::*; diff --git a/tokio/tests/thread_pool.rs b/tokio/tests/thread_pool.rs index 9ef4b3336..ef8fcf068 100644 --- a/tokio/tests/thread_pool.rs +++ b/tokio/tests/thread_pool.rs @@ -274,7 +274,7 @@ fn panic_in_task() { #[test] fn multi_threadpool() { - use tokio_sync::oneshot; + use tokio::sync::oneshot; let pool1 = new_pool(); let pool2 = new_pool(); diff --git a/tokio/tests/timer_timeout.rs b/tokio/tests/timer_timeout.rs index 9d704c6e2..db3b46bd0 100644 --- a/tokio/tests/timer_timeout.rs +++ b/tokio/tests/timer_timeout.rs @@ -129,7 +129,6 @@ fn deadline_future_elapses() { }); } -#[cfg(feature = "async-traits")] macro_rules! poll { ($task:ident, $stream:ident) => {{ use futures_core::Stream; @@ -138,9 +137,8 @@ macro_rules! poll { } #[test] -#[cfg(feature = "async-traits")] fn stream_and_timeout_in_future() { - use tokio_sync::mpsc; + use tokio::sync::mpsc; let mut t = MockTask::new(); @@ -168,9 +166,8 @@ fn stream_and_timeout_in_future() { } #[test] -#[cfg(feature = "async-traits")] fn idle_stream_timesout_periodically() { - use tokio_sync::mpsc; + use tokio::sync::mpsc; let mut t = MockTask::new();