mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00

A first pass at updating Tokio to use `std::future`. Implementations of `Future` from the futures crate are updated to implement `Future` from std. Implementations of `Stream` are moved to a feature flag. This commits disables a number of crates that have not yet been updated.
173 lines
3.7 KiB
Rust
173 lines
3.7 KiB
Rust
#![deny(warnings, rust_2018_idioms)]
|
|
|
|
#[macro_use]
|
|
extern crate loom;
|
|
|
|
#[path = "../src/semaphore.rs"]
|
|
#[allow(warnings)]
|
|
mod semaphore;
|
|
|
|
use crate::semaphore::*;
|
|
|
|
use async_util::future::poll_fn;
|
|
use loom::futures::block_on;
|
|
use loom::thread;
|
|
use std::future::Future;
|
|
use std::pin::Pin;
|
|
use std::sync::atomic::AtomicUsize;
|
|
use std::sync::atomic::Ordering::SeqCst;
|
|
use std::sync::Arc;
|
|
use std::task::Poll::Ready;
|
|
use std::task::{Context, Poll};
|
|
|
|
/// Unwrap a ready value or propagate `Poll::Pending`.
|
|
#[macro_export]
|
|
macro_rules! ready {
|
|
($e:expr) => {{
|
|
use std::task::Poll::{Pending, Ready};
|
|
|
|
match $e {
|
|
Ready(v) => v,
|
|
Pending => return Pending,
|
|
}
|
|
}};
|
|
}
|
|
|
|
#[test]
|
|
fn basic_usage() {
|
|
const NUM: usize = 2;
|
|
|
|
struct Actor {
|
|
waiter: Permit,
|
|
shared: Arc<Shared>,
|
|
}
|
|
|
|
struct Shared {
|
|
semaphore: Semaphore,
|
|
active: AtomicUsize,
|
|
}
|
|
|
|
impl Future for Actor {
|
|
type Output = ();
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
|
let me = &mut *self;
|
|
|
|
ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap();
|
|
|
|
let actual = me.shared.active.fetch_add(1, SeqCst);
|
|
assert!(actual <= NUM - 1);
|
|
|
|
let actual = me.shared.active.fetch_sub(1, SeqCst);
|
|
assert!(actual <= NUM);
|
|
|
|
me.waiter.release(&me.shared.semaphore);
|
|
|
|
Ready(())
|
|
}
|
|
}
|
|
|
|
loom::fuzz(|| {
|
|
let shared = Arc::new(Shared {
|
|
semaphore: Semaphore::new(NUM),
|
|
active: AtomicUsize::new(0),
|
|
});
|
|
|
|
for _ in 0..NUM {
|
|
let shared = shared.clone();
|
|
|
|
thread::spawn(move || {
|
|
block_on(Actor {
|
|
waiter: Permit::new(),
|
|
shared,
|
|
});
|
|
});
|
|
}
|
|
|
|
block_on(Actor {
|
|
waiter: Permit::new(),
|
|
shared,
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn release() {
|
|
loom::fuzz(|| {
|
|
let semaphore = Arc::new(Semaphore::new(1));
|
|
|
|
{
|
|
let semaphore = semaphore.clone();
|
|
thread::spawn(move || {
|
|
let mut permit = Permit::new();
|
|
|
|
block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
|
|
|
|
permit.release(&semaphore);
|
|
});
|
|
}
|
|
|
|
let mut permit = Permit::new();
|
|
|
|
block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap();
|
|
|
|
permit.release(&semaphore);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn basic_closing() {
|
|
const NUM: usize = 2;
|
|
|
|
loom::fuzz(|| {
|
|
let semaphore = Arc::new(Semaphore::new(1));
|
|
|
|
for _ in 0..NUM {
|
|
let semaphore = semaphore.clone();
|
|
|
|
thread::spawn(move || {
|
|
let mut permit = Permit::new();
|
|
|
|
for _ in 0..2 {
|
|
block_on(poll_fn(|cx| {
|
|
permit.poll_acquire(cx, &semaphore).map_err(|_| ())
|
|
}))?;
|
|
|
|
permit.release(&semaphore);
|
|
}
|
|
|
|
Ok::<(), ()>(())
|
|
});
|
|
}
|
|
|
|
semaphore.close();
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn concurrent_close() {
|
|
const NUM: usize = 3;
|
|
|
|
loom::fuzz(|| {
|
|
let semaphore = Arc::new(Semaphore::new(1));
|
|
|
|
for _ in 0..NUM {
|
|
let semaphore = semaphore.clone();
|
|
|
|
thread::spawn(move || {
|
|
let mut permit = Permit::new();
|
|
|
|
block_on(poll_fn(|cx| {
|
|
permit.poll_acquire(cx, &semaphore).map_err(|_| ())
|
|
}))?;
|
|
|
|
permit.release(&semaphore);
|
|
|
|
semaphore.close();
|
|
|
|
Ok::<(), ()>(())
|
|
});
|
|
}
|
|
});
|
|
}
|