sync: move into tokio crate (#1705)

A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).

The sync implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
This commit is contained in:
Carl Lerche 2019-10-29 15:11:31 -07:00 committed by GitHub
parent c62ef2d232
commit 2b909d6805
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 574 additions and 1195 deletions

View File

@ -3,7 +3,6 @@
members = [
"tokio",
"tokio-macros",
"tokio-sync",
"tokio-test",
"tokio-tls",
"tokio-util",

View File

@ -47,8 +47,6 @@ jobs:
displayName: Test sub crates -
rust: beta
crates:
tokio-sync:
- async-traits
tokio-macros: []
tokio-test: []
tokio-util: []

View File

@ -3,6 +3,6 @@
[patch.crates-io]
tokio = { path = "tokio" }
tokio-macros = { path = "tokio-macros" }
tokio-sync = { path = "tokio-sync" }
tokio-test = { path = "tokio-test" }
tokio-tls = { path = "tokio-tls" }
tokio-util = { path = "tokio-util" }

View File

@ -1,76 +0,0 @@
# 0.2.0-alpha.6 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.2.0-alpha.5 (September 19, 2019)
### Changed
- rename `Lock` -> `Mutex` and make it more like `std::sync::Mutex` (#1573).
### Added
- `Barrier`, an async version of `std::sync::Barrier` (#1571).
# 0.2.0-alpha.4 (August 29, 2019)
- Track tokio release.
# 0.2.0-alpha.3 (August 23, 2019)
- Track `tokio` version number
# 0.2.0-alpha.2 (August 17, 2019)
### Changed
- Update `futures` dependency to 0.3.0-alpha.18.
# 0.2.0-alpha.1 (August 8, 2019)
### Changed
- Switch to `async`, `await`, and `std::future`.
# 0.1.6 (June 4, 2019)
### Added
- Add Sync impl for Lock (#1117).
# 0.1.5 (April 22, 2019)
### Added
- Add asynchronous mutual exclusion primitive (#964).
# 0.1.4 (March 13, 2019)
### Fixed
- Fix memory leak on channel drop (#917).
### Added
- `std::error::Error` implementation for `oneshot`, `watch` error types (#967).
# 0.1.3 (March 1, 2019)
### Added
- `Watch`, a single value broadcast channel (#922).
- `std::error::Error` implementation for more `mpsc` types (#937).
# 0.1.2 (February 20, 2019)
### Fixes
- `mpsc` and `Semaphore` when releasing permits (#904).
- `oneshot` task handle leak (#911).
### Changes
- Performance improvements in `AtomicTask` (#892).
- Improved assert message when creating a channel with bound of zero (#906).
### Adds
- `AtomicTask::take_task` (#895).
# 0.1.1 (February 1, 2019)
### Fixes
- Panic when creating a channel with bound 0 (#879).
# 0.1.0 (January 24, 2019)
- Initial Release

View File

@ -1,39 +0,0 @@
[package]
name = "tokio-sync"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "v0.2.x" git tag.
version = "0.2.0-alpha.6"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-sync/0.2.0-alpha.6/tokio_sync"
description = """
Synchronization utilities.
"""
categories = ["asynchronous"]
[features]
async-traits = ["futures-sink-preview"]
[dependencies]
fnv = "1.0.6"
futures-core-preview = { version = "=0.3.0-alpha.19" }
futures-sink-preview = { version = "=0.3.0-alpha.19", optional = true }
futures-util-preview = { version = "=0.3.0-alpha.19" }
[dev-dependencies]
tokio = { version = "0.2.0-alpha.6", path = "../tokio" }
tokio-test = { version = "0.2.0-alpha.6", path = "../tokio-test" }
env_logger = { version = "0.6", default-features = false }
loom = { version = "0.2.1", features = ["futures"] }
[package.metadata.docs.rs]
all-features = true

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# tokio-sync
Synchronization utilities
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tokio by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,536 +0,0 @@
#![cfg(feature = "broken")]
#![feature(test)]
#![warn(rust_2018_idioms)]
extern crate test;
type Medium = [usize; 64];
type Large = [Medium; 64];
mod tokio {
use futures::{future, Async, Future, Sink, Stream};
use std::thread;
use test::{self, Bencher};
use tokio_sync::mpsc::*;
#[bench]
fn bounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Medium>(1_000));
})
}
#[bench]
fn unbounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded_channel::<super::Medium>());
})
}
#[bench]
fn bounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Large>(1_000));
})
}
#[bench]
fn unbounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded_channel::<super::Large>());
})
}
#[bench]
fn send_one_message(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
// Send
tx.try_send(1).unwrap();
// Receive
assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
})
}
#[bench]
fn send_one_message_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<super::Large>(1_000);
// Send
let _ = tx.try_send([[0; 64]; 64]);
// Receive
let _ = test::black_box(&rx.poll());
})
}
#[bench]
fn bounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = channel::<i32>(1_000);
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(1);
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_not_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(1);
tx.try_send(1).unwrap();
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = unbounded_channel::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
let (_tx, mut rx) = unbounded_channel::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_uncontended_1(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
for i in 0..1000 {
tx.try_send(i).unwrap();
// No need to create a task, because poll is not going to park.
assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
}
})
}
#[bench]
fn bounded_uncontended_1_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<super::Large>(1_000);
for i in 0..1000 {
let _ = tx.try_send([[i; 64]; 64]);
// No need to create a task, because poll is not going to park.
let _ = test::black_box(&rx.poll());
}
})
}
#[bench]
fn bounded_uncontended_2(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1000);
for i in 0..1000 {
tx.try_send(i).unwrap();
}
for i in 0..1000 {
// No need to create a task, because poll is not going to park.
assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
}
})
}
#[bench]
fn contended_unbounded_tx(b: &mut Bencher) {
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..4 {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for mut tx in rx.iter() {
for i in 0..1_000 {
tx.try_send(i).unwrap();
}
}
}));
}
b.iter(|| {
// TODO make unbounded
let (tx, rx) = channel::<i32>(1_000_000);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(4 * 1_000);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
#[bench]
fn contended_bounded_tx(b: &mut Bencher) {
const THREADS: usize = 4;
const ITERS: usize = 100;
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..THREADS {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for tx in rx.iter() {
let mut tx = tx.wait();
for i in 0..ITERS {
tx.send(i as i32).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(THREADS * ITERS);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
}
mod legacy {
use futures::sync::mpsc::*;
use futures::{future, Async, Future, Sink, Stream};
use std::thread;
use test::{self, Bencher};
#[bench]
fn bounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Medium>(1_000));
})
}
#[bench]
fn unbounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded::<super::Medium>());
})
}
#[bench]
fn bounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Large>(1_000));
})
}
#[bench]
fn unbounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded::<super::Large>());
})
}
#[bench]
fn send_one_message(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
// Send
tx.try_send(1).unwrap();
// Receive
assert_eq!(Ok(Async::Ready(Some(1))), rx.poll());
})
}
#[bench]
fn send_one_message_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<super::Large>(1_000);
// Send
let _ = tx.try_send([[0; 64]; 64]);
// Receive
let _ = test::black_box(&rx.poll());
})
}
#[bench]
fn bounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = channel::<i32>(1_000);
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(0);
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_not_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(0);
tx.try_send(1).unwrap();
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = unbounded::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
let (_tx, mut rx) = unbounded::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_uncontended_1(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded();
for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
// No need to create a task, because poll is not going to park.
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
}
})
}
#[bench]
fn unbounded_uncontended_1_large(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded::<super::Large>();
for i in 0..1000 {
let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]);
// No need to create a task, because poll is not going to park.
let _ = test::black_box(&rx.poll());
}
})
}
#[bench]
fn unbounded_uncontended_2(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded();
for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
}
for i in 0..1000 {
// No need to create a task, because poll is not going to park.
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
}
})
}
#[bench]
fn multi_thread_unbounded_tx(b: &mut Bencher) {
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..4 {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for mut tx in rx.iter() {
for i in 0..1_000 {
tx.try_send(i).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1_000_000);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(4 * 1_000);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
#[bench]
fn contended_bounded_tx(b: &mut Bencher) {
const THREADS: usize = 4;
const ITERS: usize = 100;
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..THREADS {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for tx in rx.iter() {
let mut tx = tx.wait();
for i in 0..ITERS {
tx.send(i as i32).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(THREADS * ITERS);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
}

View File

@ -1,239 +0,0 @@
#![cfg(feature = "broken")]
#![feature(test)]
#![warn(rust_2018_idioms)]
extern crate test;
mod tokio {
use futures::{future, Async, Future};
use test::Bencher;
use tokio_sync::oneshot;
#[bench]
fn new(b: &mut Bencher) {
b.iter(|| {
let _ = ::test::black_box(&oneshot::channel::<i32>());
})
}
#[bench]
fn same_thread_send_recv(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = oneshot::channel();
let _ = tx.send(1);
assert_eq!(Async::Ready(1), rx.poll().unwrap());
});
}
#[bench]
fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = oneshot::channel();
future::lazy(|| {
let _ = rx.poll();
let _ = rx.poll();
let _ = rx.poll();
let _ = rx.poll();
let _ = tx.send(1);
assert_eq!(Async::Ready(1), rx.poll().unwrap());
Ok::<_, ()>(())
})
.wait()
.unwrap();
});
}
#[bench]
fn multi_thread_send_recv(b: &mut Bencher) {
const MAX: usize = 10_000_000;
use std::thread;
fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
use futures::Async::Ready;
loop {
match f.poll() {
Ok(Ready(v)) => return Ok(v),
Ok(_) => {}
Err(e) => return Err(e),
}
}
}
let mut ping_txs = vec![];
let mut ping_rxs = vec![];
let mut pong_txs = vec![];
let mut pong_rxs = vec![];
for _ in 0..MAX {
let (tx, rx) = oneshot::channel::<()>();
ping_txs.push(Some(tx));
ping_rxs.push(Some(rx));
let (tx, rx) = oneshot::channel::<()>();
pong_txs.push(Some(tx));
pong_rxs.push(Some(rx));
}
thread::spawn(move || {
future::lazy(|| {
for i in 0..MAX {
let ping_rx = ping_rxs[i].take().unwrap();
let pong_tx = pong_txs[i].take().unwrap();
if spin(ping_rx).is_err() {
return Ok(());
}
pong_tx.send(()).unwrap();
}
Ok::<(), ()>(())
})
.wait()
.unwrap();
});
future::lazy(|| {
let mut i = 0;
b.iter(|| {
let ping_tx = ping_txs[i].take().unwrap();
let pong_rx = pong_rxs[i].take().unwrap();
ping_tx.send(()).unwrap();
spin(pong_rx).unwrap();
i += 1;
});
Ok::<(), ()>(())
})
.wait()
.unwrap();
}
}
mod legacy {
use futures::sync::oneshot;
use futures::{future, Async, Future};
use test::Bencher;
#[bench]
fn new(b: &mut Bencher) {
b.iter(|| {
let _ = ::test::black_box(&oneshot::channel::<i32>());
})
}
#[bench]
fn same_thread_send_recv(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = oneshot::channel();
let _ = tx.send(1);
assert_eq!(Async::Ready(1), rx.poll().unwrap());
});
}
#[bench]
fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = oneshot::channel();
future::lazy(|| {
let _ = rx.poll();
let _ = rx.poll();
let _ = rx.poll();
let _ = rx.poll();
let _ = tx.send(1);
assert_eq!(Async::Ready(1), rx.poll().unwrap());
Ok::<_, ()>(())
})
.wait()
.unwrap();
});
}
#[bench]
fn multi_thread_send_recv(b: &mut Bencher) {
const MAX: usize = 10_000_000;
use std::thread;
fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
use futures::Async::Ready;
loop {
match f.poll() {
Ok(Ready(v)) => return Ok(v),
Ok(_) => {}
Err(e) => return Err(e),
}
}
}
let mut ping_txs = vec![];
let mut ping_rxs = vec![];
let mut pong_txs = vec![];
let mut pong_rxs = vec![];
for _ in 0..MAX {
let (tx, rx) = oneshot::channel::<()>();
ping_txs.push(Some(tx));
ping_rxs.push(Some(rx));
let (tx, rx) = oneshot::channel::<()>();
pong_txs.push(Some(tx));
pong_rxs.push(Some(rx));
}
thread::spawn(move || {
future::lazy(|| {
for i in 0..MAX {
let ping_rx = ping_rxs[i].take().unwrap();
let pong_tx = pong_txs[i].take().unwrap();
if spin(ping_rx).is_err() {
return Ok(());
}
pong_tx.send(()).unwrap();
}
Ok::<(), ()>(())
})
.wait()
.unwrap();
});
future::lazy(|| {
let mut i = 0;
b.iter(|| {
let ping_tx = ping_txs[i].take().unwrap();
let pong_rx = pong_rxs[i].take().unwrap();
ping_tx.send(()).unwrap();
spin(pong_rx).unwrap();
i += 1;
});
Ok::<(), ()>(())
})
.wait()
.unwrap();
}
}

View File

@ -1,43 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tokio-sync/0.2.0-alpha.6")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![deny(intra_doc_link_resolution_failure)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Asynchronous synchronization primitives.
//!
//! This crate provides primitives for synchronizing asynchronous tasks.
macro_rules! debug {
($($t:tt)*) => {
if false {
println!($($t)*);
}
}
}
macro_rules! if_fuzz {
($($t:tt)*) => {{
if false { $($t)* }
}}
}
mod barrier;
mod loom;
pub mod mpsc;
mod mutex;
pub mod oneshot;
pub mod semaphore;
mod task;
pub mod watch;
pub use barrier::{Barrier, BarrierWaitResult};
pub use mutex::{Mutex, MutexGuard};
pub use task::AtomicWaker;

View File

@ -1,38 +0,0 @@
pub(crate) mod future {
pub(crate) use crate::task::AtomicWaker;
}
pub(crate) mod sync {
pub(crate) use std::sync::atomic;
pub(crate) use std::sync::Arc;
use std::cell::UnsafeCell;
pub(crate) struct CausalCell<T>(UnsafeCell<T>);
impl<T> CausalCell<T> {
pub(crate) fn new(data: T) -> CausalCell<T> {
CausalCell(UnsafeCell::new(data))
}
pub(crate) fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get())
}
pub(crate) fn with_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut T) -> R,
{
f(self.0.get())
}
}
}
pub(crate) mod thread {
pub(crate) fn yield_now() {
::std::sync::atomic::spin_loop_hint();
}
}

View File

@ -21,7 +21,6 @@ categories = ["asynchronous", "testing"]
[dependencies]
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" }
bytes = "0.4"
futures-core-preview = "=0.3.0-alpha.19"

View File

@ -63,7 +63,7 @@ signal = [
"net-driver",
"signal-hook-registry"
]
sync = ["tokio-sync"]
sync = ["fnv"]
tcp = ["io", "net-driver"]
timer = ["crossbeam-utils", "slab"]
udp = ["io", "net-driver"]
@ -81,6 +81,8 @@ process = [
]
[dependencies]
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
futures-core-preview = "=0.3.0-alpha.19"
futures-sink-preview = "=0.3.0-alpha.19"
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "channel"] }
@ -89,6 +91,7 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann
bytes = { version = "0.4", optional = true }
crossbeam-channel = { version = "0.3.8", optional = true }
crossbeam-utils = { version = "0.6.0", optional = true }
fnv = { version = "1.0.6", optional = true }
iovec = { version = "0.1", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
@ -97,8 +100,6 @@ num_cpus = { version = "1.8.0", optional = true }
pin-project = { version = "0.4", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }
tokio-sync = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-sync", features = ["async-traits"] }
[target.'cfg(unix)'.dependencies]
crossbeam-queue = { version = "0.1.2", optional = true }
@ -124,7 +125,7 @@ flate2 = { version = "1", features = ["tokio"] }
http = "0.1"
httparse = "1.0"
libc = "0.2"
loom = { version = "0.2.11", features = ["futures", "checkpoint"] }
loom = { version = "0.2.12", features = ["futures", "checkpoint"] }
num_cpus = "1.0"
rand = "0.7.2"
serde = { version = "1.0", features = ["derive"] }

270
tokio/benches/mpsc.rs Normal file
View File

@ -0,0 +1,270 @@
#![feature(test)]
#![warn(rust_2018_idioms)]
extern crate test;
use tokio::sync::mpsc::*;
use futures::{future, Async, Future, Sink, Stream};
use std::thread;
use test::Bencher;
type Medium = [usize; 64];
type Large = [Medium; 64];
#[bench]
fn bounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<Medium>(1_000));
})
}
#[bench]
fn unbounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded_channel::<Medium>());
})
}
#[bench]
fn bounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<Large>(1_000));
})
}
#[bench]
fn unbounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded_channel::<Large>());
})
}
#[bench]
fn send_one_message(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
// Send
tx.try_send(1).unwrap();
// Receive
assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
})
}
#[bench]
fn send_one_message_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<Large>(1_000);
// Send
let _ = tx.try_send([[0; 64]; 64]);
// Receive
let _ = test::black_box(&rx.poll());
})
}
#[bench]
fn bounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = channel::<i32>(1_000);
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(1);
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_not_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(1);
tx.try_send(1).unwrap();
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = unbounded_channel::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
let (_tx, mut rx) = unbounded_channel::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_uncontended_1(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
for i in 0..1000 {
tx.try_send(i).unwrap();
// No need to create a task, because poll is not going to park.
assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
}
})
}
#[bench]
fn bounded_uncontended_1_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<Large>(1_000);
for i in 0..1000 {
let _ = tx.try_send([[i; 64]; 64]);
// No need to create a task, because poll is not going to park.
let _ = test::black_box(&rx.poll());
}
})
}
#[bench]
fn bounded_uncontended_2(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1000);
for i in 0..1000 {
tx.try_send(i).unwrap();
}
for i in 0..1000 {
// No need to create a task, because poll is not going to park.
assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
}
})
}
#[bench]
fn contended_unbounded_tx(b: &mut Bencher) {
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..4 {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for mut tx in rx.iter() {
for i in 0..1_000 {
tx.try_send(i).unwrap();
}
}
}));
}
b.iter(|| {
// TODO make unbounded
let (tx, rx) = channel::<i32>(1_000_000);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(4 * 1_000);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
#[bench]
fn contended_bounded_tx(b: &mut Bencher) {
const THREADS: usize = 4;
const ITERS: usize = 100;
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..THREADS {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for tx in rx.iter() {
let mut tx = tx.wait();
for i in 0..ITERS {
tx.send(i as i32).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(THREADS * ITERS);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}

120
tokio/benches/oneshot.rs Normal file
View File

@ -0,0 +1,120 @@
#![feature(test)]
#![warn(rust_2018_idioms)]
extern crate test;
use tokio::sync::oneshot;
use futures::{future, Async, Future};
use test::Bencher;
#[bench]
fn new(b: &mut Bencher) {
b.iter(|| {
let _ = ::test::black_box(&oneshot::channel::<i32>());
})
}
#[bench]
fn same_thread_send_recv(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = oneshot::channel();
let _ = tx.send(1);
assert_eq!(Async::Ready(1), rx.poll().unwrap());
});
}
#[bench]
fn same_thread_recv_multi_send_recv(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = oneshot::channel();
future::lazy(|| {
let _ = rx.poll();
let _ = rx.poll();
let _ = rx.poll();
let _ = rx.poll();
let _ = tx.send(1);
assert_eq!(Async::Ready(1), rx.poll().unwrap());
Ok::<_, ()>(())
})
.wait()
.unwrap();
});
}
#[bench]
fn multi_thread_send_recv(b: &mut Bencher) {
const MAX: usize = 10_000_000;
use std::thread;
fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
use futures::Async::Ready;
loop {
match f.poll() {
Ok(Ready(v)) => return Ok(v),
Ok(_) => {}
Err(e) => return Err(e),
}
}
}
let mut ping_txs = vec![];
let mut ping_rxs = vec![];
let mut pong_txs = vec![];
let mut pong_rxs = vec![];
for _ in 0..MAX {
let (tx, rx) = oneshot::channel::<()>();
ping_txs.push(Some(tx));
ping_rxs.push(Some(rx));
let (tx, rx) = oneshot::channel::<()>();
pong_txs.push(Some(tx));
pong_rxs.push(Some(rx));
}
thread::spawn(move || {
future::lazy(|| {
for i in 0..MAX {
let ping_rx = ping_rxs[i].take().unwrap();
let pong_tx = pong_txs[i].take().unwrap();
if spin(ping_rx).is_err() {
return Ok(());
}
pong_tx.send(()).unwrap();
}
Ok::<(), ()>(())
})
.wait()
.unwrap();
});
future::lazy(|| {
let mut i = 0;
b.iter(|| {
let ping_tx = ping_txs[i].take().unwrap();
let pong_rx = pong_rxs[i].take().unwrap();
ping_tx.send(()).unwrap();
spin(pong_rx).unwrap();
i += 1;
});
Ok::<(), ()>(())
})
.wait()
.unwrap();
}

View File

@ -3,7 +3,7 @@
extern crate test;
use tokio::executor::thread_pool::{Builder, Spawner};
use tokio_sync::oneshot;
use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;

View File

@ -3,7 +3,7 @@
use crate::executor::loom::sync::{Arc, Condvar, Mutex};
use crate::executor::loom::thread;
#[cfg(feature = "blocking")]
use tokio_sync::oneshot;
use crate::sync::oneshot;
use std::cell::Cell;
use std::collections::VecDeque;

View File

@ -4,8 +4,7 @@
//! dropped, the `Receiver` receives a notification.
use crate::executor::loom::sync::Arc;
use tokio_sync::oneshot;
use crate::sync::oneshot;
#[derive(Debug, Clone)]
pub(super) struct Sender {

View File

@ -3,8 +3,7 @@ use crate::loom::{
atomic::{AtomicUsize, Ordering},
CausalCell,
};
use tokio_sync::AtomicWaker;
use crate::sync::AtomicWaker;
#[derive(Debug)]
pub(crate) struct ScheduledIo {

View File

@ -1,5 +1,3 @@
#![cfg(feature = "async-traits")]
use super::{UnixListener, UnixStream};
use futures_core::ready;

View File

@ -90,7 +90,6 @@ impl UnixListener {
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
#[cfg(feature = "async-traits")]
pub fn incoming(self) -> super::Incoming {
super::Incoming::new(self)
}

View File

@ -6,7 +6,6 @@ mod datagram;
pub use self::datagram::UnixDatagram;
mod incoming;
#[cfg(feature = "async-traits")]
pub use self::incoming::Incoming;
mod listener;

View File

@ -1,6 +1,6 @@
use crate::signal::os::{OsExtraData, OsStorage};
use tokio_sync::mpsc::Sender;
use crate::sync::mpsc::Sender;
use lazy_static::lazy_static;
use std::ops;

View File

@ -8,8 +8,7 @@
use crate::io::AsyncRead;
use crate::net::util::PollEvented;
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
use tokio_sync::mpsc::{channel, Receiver};
use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use libc::c_int;

View File

@ -7,9 +7,8 @@
#![cfg(windows)]
use super::registry::{globals, EventId, EventInfo, Init, Storage};
use tokio_sync::mpsc::{channel, Receiver};
use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
use futures_core::stream::Stream;
use std::convert::TryFrom;

View File

@ -1,4 +1,5 @@
use crate::watch;
use crate::sync::watch;
use std::sync::Mutex;
/// A barrier enables multiple threads to synchronize the beginning of some computation.
@ -6,8 +7,8 @@ use std::sync::Mutex;
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::sync::Barrier;
/// use std::sync::Arc;
/// use tokio_sync::Barrier;
/// use futures_util::future::join_all;
///
/// let mut handles = Vec::with_capacity(10);
@ -49,7 +50,7 @@ impl Barrier {
/// A barrier will block `n`-1 threads which call [`Barrier::wait`] and then wake up all
/// threads at once when the `n`th thread calls `wait`.
pub fn new(mut n: usize) -> Barrier {
let (waker, wait) = crate::watch::channel(0);
let (waker, wait) = crate::sync::watch::channel(0);
if n == 0 {
// if n is 0, it's not clear what behavior the user wants.

48
tokio/src/sync/loom.rs Normal file
View File

@ -0,0 +1,48 @@
#[cfg(not(all(test, loom)))]
mod imp {
pub(crate) mod future {
pub(crate) use crate::sync::task::AtomicWaker;
}
pub(crate) mod sync {
pub(crate) use std::sync::atomic;
pub(crate) use std::sync::Arc;
use std::cell::UnsafeCell;
pub(crate) struct CausalCell<T>(UnsafeCell<T>);
impl<T> CausalCell<T> {
pub(crate) fn new(data: T) -> CausalCell<T> {
CausalCell(UnsafeCell::new(data))
}
pub(crate) fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get())
}
pub(crate) fn with_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut T) -> R,
{
f(self.0.get())
}
}
}
pub(crate) mod thread {
pub(crate) fn yield_now() {
::std::sync::atomic::spin_loop_hint();
}
}
}
#[cfg(all(test, loom))]
mod imp {
pub(crate) use loom::*;
}
pub(crate) use self::imp::*;

View File

@ -13,6 +13,46 @@
//! - [watch](watch/index.html), a single-producer, multi-consumer channel that
//! only stores the **most recently** sent value.
pub use tokio_sync::Barrier;
pub use tokio_sync::{mpsc, oneshot, watch};
pub use tokio_sync::{Mutex, MutexGuard};
macro_rules! debug {
($($t:tt)*) => {
if false {
println!($($t)*);
}
}
}
macro_rules! if_loom {
($($t:tt)*) => {{
#[cfg(loom)]
const LOOM: bool = true;
#[cfg(not(loom))]
const LOOM: bool = false;
if LOOM {
$($t)*
}
}}
}
mod barrier;
pub use barrier::{Barrier, BarrierWaitResult};
mod loom;
pub mod mpsc;
mod mutex;
pub use mutex::{Mutex, MutexGuard};
pub mod oneshot;
pub mod semaphore;
mod task;
pub use task::AtomicWaker;
pub mod watch;
/// Unit tests
#[cfg(test)]
mod tests;

View File

@ -1,8 +1,9 @@
use crate::loom::{
use crate::sync::loom::{
sync::atomic::{AtomicPtr, AtomicUsize},
sync::CausalCell,
thread,
};
use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
@ -365,7 +366,7 @@ impl<T> Values<T> {
let mut vals = MaybeUninit::uninit();
// When fuzzing, `CausalCell` needs to be initialized.
if_fuzz! {
if_loom! {
let p = vals.as_mut_ptr() as *mut CausalCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)

View File

@ -1,10 +1,9 @@
use super::chan;
use crate::sync::mpsc::chan;
use crate::sync::semaphore;
use std::fmt;
use std::task::{Context, Poll};
#[cfg(feature = "async-traits")]
use std::pin::Pin;
use std::task::{Context, Poll};
/// Send values to the associated `Receiver`.
///
@ -104,7 +103,7 @@ pub struct RecvError(());
/// ```
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = (crate::semaphore::Semaphore::new(buffer), buffer);
let semaphore = (semaphore::Semaphore::new(buffer), buffer);
let (tx, rx) = chan::channel(semaphore);
let tx = Sender::new(tx);
@ -115,7 +114,7 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
/// representing the channel bound.
type Semaphore = (crate::semaphore::Semaphore, usize);
type Semaphore = (semaphore::Semaphore, usize);
impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
@ -181,7 +180,6 @@ impl<T> Receiver<T> {
}
}
#[cfg(feature = "async-traits")]
impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
@ -244,7 +242,6 @@ impl<T> Sender<T> {
}
}
#[cfg(feature = "async-traits")]
impl<T> futures_sink::Sink<T> for Sender<T> {
type Error = SendError;

View File

@ -1,9 +1,10 @@
use super::list;
use crate::loom::{
use crate::sync::loom::{
future::AtomicWaker,
sync::atomic::AtomicUsize,
sync::{Arc, CausalCell},
};
use crate::sync::mpsc::list;
use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
@ -335,7 +336,7 @@ impl<T, S> Drop for Chan<T, S> {
}
}
use crate::semaphore::TryAcquireError;
use crate::sync::semaphore::TryAcquireError;
impl From<TryAcquireError> for TrySendError {
fn from(src: TryAcquireError) -> TrySendError {
@ -351,9 +352,9 @@ impl From<TryAcquireError> for TrySendError {
// ===== impl Semaphore for (::Semaphore, capacity) =====
use crate::semaphore::Permit;
use crate::sync::semaphore::Permit;
impl Semaphore for (crate::semaphore::Semaphore, usize) {
impl Semaphore for (crate::sync::semaphore::Semaphore, usize) {
type Permit = Permit;
fn new_permit() -> Permit {

View File

@ -1,10 +1,11 @@
//! A concurrent, lock-free, FIFO list.
use super::block::{self, Block};
use crate::loom::{
use crate::sync::loom::{
sync::atomic::{AtomicPtr, AtomicUsize},
thread,
};
use crate::sync::mpsc::block::{self, Block};
use std::fmt;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

View File

@ -34,13 +34,16 @@
//! [`Sender`]: struct.Sender.html
//! [`Receiver`]: struct.Receiver.html
mod block;
mod bounded;
mod chan;
mod list;
mod unbounded;
pub(super) mod block;
mod bounded;
pub use self::bounded::{channel, Receiver, Sender};
mod chan;
pub(super) mod list;
mod unbounded;
pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender};
pub mod error {
@ -54,8 +57,11 @@ pub mod error {
///
/// This value must be a power of 2. It also must be smaller than the number of
/// bits in `usize`.
#[cfg(target_pointer_width = "64")]
#[cfg(all(target_pointer_width = "64", not(loom)))]
const BLOCK_CAP: usize = 32;
#[cfg(not(target_pointer_width = "64"))]
#[cfg(all(not(target_pointer_width = "64"), not(loom)))]
const BLOCK_CAP: usize = 16;
#[cfg(loom)]
const BLOCK_CAP: usize = 2;

View File

@ -1,10 +1,9 @@
use super::chan;
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
use std::fmt;
use std::task::{Context, Poll};
#[cfg(feature = "async-traits")]
use std::pin::Pin;
/// Send values to the associated `UnboundedReceiver`.
@ -146,7 +145,6 @@ impl<T> UnboundedReceiver<T> {
}
}
#[cfg(feature = "async-traits")]
impl<T> futures_core::Stream for UnboundedReceiver<T> {
type Item = T;
@ -167,7 +165,6 @@ impl<T> UnboundedSender<T> {
}
}
#[cfg(feature = "async-traits")]
impl<T> futures_sink::Sink<T> for UnboundedSender<T> {
type Error = UnboundedSendError;

View File

@ -29,7 +29,7 @@
//! [`Mutex`]: struct.Mutex.html
//! [`MutexGuard`]: struct.MutexGuard.html
use crate::semaphore;
use crate::sync::semaphore;
use futures_util::future::poll_fn;
use std::cell::UnsafeCell;
@ -69,6 +69,7 @@ unsafe impl<T> Sync for Mutex<T> where T: Send {}
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}
#[test]
#[cfg(not(loom))]
fn bounds() {
fn check<T: Send>() {}
check::<MutexGuard<'_, u32>>();

View File

@ -1,6 +1,6 @@
//! A channel for sending a single message between asynchronous tasks.
use crate::loom::sync::{atomic::AtomicUsize, Arc, CausalCell};
use crate::sync::loom::sync::{atomic::AtomicUsize, Arc, CausalCell};
use futures_core::ready;
use std::fmt;

View File

@ -8,7 +8,7 @@
//! section. If no permits are available, then acquiring the semaphore returns
//! `Pending`. The task is woken once a permit becomes available.
use crate::loom::{
use crate::sync::loom::{
future::AtomicWaker,
sync::{
atomic::{AtomicPtr, AtomicUsize},
@ -555,7 +555,7 @@ impl Permit {
/// # Examples
///
/// ```
/// use tokio_sync::semaphore::Permit;
/// use tokio::sync::semaphore::Permit;
///
/// let permit = Permit::new();
/// assert!(!permit.is_acquired());

View File

@ -1,5 +1,5 @@
use crate::loom::sync::atomic::{self, AtomicUsize};
use crate::loom::sync::CausalCell;
use crate::sync::loom::sync::atomic::{self, AtomicUsize};
use crate::sync::loom::sync::CausalCell;
use std::fmt;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};

View File

@ -1,5 +1,4 @@
//! Thread-safe task notification primitives.
mod atomic_waker;
pub use self::atomic_waker::AtomicWaker;

View File

@ -1,12 +1,4 @@
#![warn(rust_2018_idioms)]
#[macro_use]
extern crate loom;
#[allow(dead_code)]
#[path = "../src/task/atomic_waker.rs"]
mod atomic_waker;
use crate::atomic_waker::AtomicWaker;
use crate::sync::task::AtomicWaker;
use futures_util::future::poll_fn;
use loom::future::block_on;

View File

@ -1,30 +1,11 @@
#![warn(rust_2018_idioms)]
#[macro_use]
extern crate loom;
macro_rules! if_fuzz {
($($t:tt)*) => {
$($t)*
}
}
#[path = "../src/mpsc/list.rs"]
#[allow(warnings)]
mod list;
#[path = "../src/mpsc/block.rs"]
#[allow(warnings)]
mod block;
const BLOCK_CAP: usize = 2;
use crate::sync::mpsc::list;
use loom::thread;
use std::sync::Arc;
#[test]
fn smoke() {
use crate::block::Read::*;
use crate::sync::mpsc::block::Read::*;
const NUM_TX: usize = 2;
const NUM_MSG: usize = 2;

View File

@ -1,21 +1,4 @@
#![warn(rust_2018_idioms)]
#[macro_use]
extern crate loom;
macro_rules! if_fuzz {
($($t:tt)*) => {
$($t)*
}
}
#[path = "../src/mpsc/mod.rs"]
#[allow(warnings)]
mod mpsc;
#[path = "../src/semaphore.rs"]
#[allow(warnings)]
mod semaphore;
use crate::sync::mpsc;
use futures_util::future::poll_fn;
use loom::future::block_on;

View File

@ -1,14 +1,8 @@
#![warn(rust_2018_idioms)]
#[path = "../src/oneshot.rs"]
#[allow(warnings)]
mod oneshot;
use loom;
use loom::future::block_on;
use loom::thread;
use crate::sync::oneshot;
use futures_util::future::poll_fn;
use loom::future::block_on;
use loom::thread;
use std::task::Poll::{Pending, Ready};
#[test]

View File

@ -1,13 +1,4 @@
#![warn(rust_2018_idioms)]
#[macro_use]
extern crate loom;
#[path = "../src/semaphore.rs"]
#[allow(warnings)]
mod semaphore;
use crate::semaphore::*;
use crate::sync::semaphore::*;
use futures_core::ready;
use futures_util::future::poll_fn;

View File

@ -0,0 +1,7 @@
#![cfg(loom)]
mod loom_atomic_waker;
mod loom_list;
mod loom_mpsc;
mod loom_oneshot;
mod loom_semaphore;

View File

@ -51,7 +51,7 @@
//! [`Sender::closed`]: struct.Sender.html#method.closed
//! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref
use crate::task::AtomicWaker;
use crate::sync::task::AtomicWaker;
use core::task::Poll::{Pending, Ready};
use core::task::{Context, Poll};
@ -62,11 +62,8 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
#[cfg(feature = "async-traits")]
use futures_core::ready;
#[cfg(feature = "async-traits")]
use futures_util::pin_mut;
#[cfg(feature = "async-traits")]
use std::pin::Pin;
/// Receives values from the associated [`Sender`](struct.Sender.html).
@ -298,7 +295,6 @@ impl<T: Clone> Receiver<T> {
}
}
#[cfg(feature = "async-traits")]
impl<T: Clone> futures_core::Stream for Receiver<T> {
type Item = T;
@ -399,7 +395,6 @@ impl<T> Sender<T> {
}
}
#[cfg(feature = "async-traits")]
impl<T> futures_sink::Sink<T> for Sender<T> {
type Error = error::SendError<T>;

View File

@ -1,9 +1,8 @@
use crate::sync::AtomicWaker;
use crate::timer::atomic::AtomicU64;
use crate::timer::timer::{HandlePriv, Inner};
use crate::timer::Error;
use tokio_sync::AtomicWaker;
use crossbeam_utils::CachePadded;
use std::cell::UnsafeCell;
use std::ptr;

View File

@ -1,4 +1,4 @@
use tokio_sync::oneshot;
use tokio::sync::oneshot;
use std::cell::RefCell;
use std::collections::VecDeque;

View File

@ -1,9 +1,10 @@
#![warn(rust_2018_idioms)]
use std::task::Waker;
use tokio_sync::AtomicWaker;
use tokio::sync::AtomicWaker;
use tokio_test::task::MockTask;
use std::task::Waker;
trait AssertSend: Send {}
trait AssertSync: Send {}

View File

@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)]
use tokio_sync::Barrier;
use tokio::sync::Barrier;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};

View File

@ -4,7 +4,7 @@ fn is_error<T: ::std::error::Error + Send + Sync>() {}
#[test]
fn mpsc_error_bound() {
use tokio_sync::mpsc::error;
use tokio::sync::mpsc::error;
is_error::<error::SendError>();
is_error::<error::TrySendError<()>>();
@ -15,7 +15,7 @@ fn mpsc_error_bound() {
#[test]
fn oneshot_error_bound() {
use tokio_sync::oneshot::error;
use tokio::sync::oneshot::error;
is_error::<error::RecvError>();
is_error::<error::TryRecvError>();
@ -23,7 +23,7 @@ fn oneshot_error_bound() {
#[test]
fn watch_error_bound() {
use tokio_sync::watch::error;
use tokio::sync::watch::error;
is_error::<error::SendError<()>>();
}

View File

@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]
use tokio_sync::mpsc;
use tokio::sync::mpsc;
use tokio_test::task::MockTask;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@ -53,7 +53,6 @@ async fn async_send_recv_with_buffer() {
}
#[test]
#[cfg(feature = "async-traits")]
fn send_sink_recv_with_buffer() {
use futures_core::Stream;
use futures_sink::Sink;
@ -166,7 +165,6 @@ async fn async_send_recv_unbounded() {
}
#[test]
#[cfg(feature = "async-traits")]
fn sink_send_recv_unbounded() {
use futures_core::Stream;
use futures_sink::Sink;

View File

@ -1,10 +1,11 @@
#![warn(rust_2018_idioms)]
use std::sync::Arc;
use tokio_sync::Mutex;
use tokio::sync::Mutex;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
use std::sync::Arc;
#[test]
fn straight_execution() {
let l = Mutex::new(100);

View File

@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]
use tokio_sync::oneshot;
use tokio::sync::oneshot;
use tokio_test::task::MockTask;
use tokio_test::*;

View File

@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]
use tokio_sync::semaphore::{Permit, Semaphore};
use tokio::sync::semaphore::{Permit, Semaphore};
use tokio_test::task::MockTask;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok};

View File

@ -1,31 +1,9 @@
#![warn(rust_2018_idioms)]
use tokio_sync::watch;
use tokio::sync::watch;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
/*
macro_rules! assert_ready {
($e:expr) => {{
match $e {
Ok(futures::Async::Ready(v)) => v,
Ok(_) => panic!("not ready"),
Err(e) => panic!("error = {:?}", e),
}
}};
}
macro_rules! assert_not_ready {
($e:expr) => {{
match $e {
Ok(futures::Async::NotReady) => {}
Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v),
Err(e) => panic!("error = {:?}", e),
}
}};
}
*/
#[test]
fn single_rx_recv_ref() {
let (tx, mut rx) = watch::channel("one");
@ -97,7 +75,6 @@ fn single_rx_recv() {
}
#[test]
#[cfg(feature = "async-traits")]
fn stream_impl() {
use tokio::prelude::*;

View File

@ -274,7 +274,7 @@ fn panic_in_task() {
#[test]
fn multi_threadpool() {
use tokio_sync::oneshot;
use tokio::sync::oneshot;
let pool1 = new_pool();
let pool2 = new_pool();

View File

@ -129,7 +129,6 @@ fn deadline_future_elapses() {
});
}
#[cfg(feature = "async-traits")]
macro_rules! poll {
($task:ident, $stream:ident) => {{
use futures_core::Stream;
@ -138,9 +137,8 @@ macro_rules! poll {
}
#[test]
#[cfg(feature = "async-traits")]
fn stream_and_timeout_in_future() {
use tokio_sync::mpsc;
use tokio::sync::mpsc;
let mut t = MockTask::new();
@ -168,9 +166,8 @@ fn stream_and_timeout_in_future() {
}
#[test]
#[cfg(feature = "async-traits")]
fn idle_stream_timesout_periodically() {
use tokio_sync::mpsc;
use tokio::sync::mpsc;
let mut t = MockTask::new();