tests: fix bug + reorganize tests. (#1726)

Fixes a bug in the thread-pool executor related to shutdown
concurrent with a task that is self-notifying. A `loom` test is
added to validate the fix.

Additionally, in anticipation of the `thread_pool` module being
switched to private, tests are updated to use `Runtime` directly
instead of `thread_pool`. Those tests that cannot be updated
are switched to unit tests.
This commit is contained in:
Carl Lerche 2019-11-02 17:03:06 -07:00 committed by GitHub
parent c8fdbed27a
commit e19bd77ef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 936 additions and 964 deletions

View File

@ -18,6 +18,8 @@ impl<T: 'static> OwnedList<T> {
} }
pub(crate) fn insert(&mut self, task: &Task<T>) { pub(crate) fn insert(&mut self, task: &Task<T>) {
debug_assert!(!self.contains(task));
unsafe { unsafe {
debug_assert!((*task.header().owned_next.get()).is_none()); debug_assert!((*task.header().owned_next.get()).is_none());
debug_assert!((*task.header().owned_prev.get()).is_none()); debug_assert!((*task.header().owned_prev.get()).is_none());
@ -35,6 +37,8 @@ impl<T: 'static> OwnedList<T> {
} }
pub(crate) fn remove(&mut self, task: &Task<T>) { pub(crate) fn remove(&mut self, task: &Task<T>) {
debug_assert!(self.head.is_some());
unsafe { unsafe {
if let Some(next) = *task.header().owned_next.get() { if let Some(next) = *task.header().owned_next.get() {
*next.as_ref().owned_prev.get() = *task.header().owned_prev.get(); *next.as_ref().owned_prev.get() = *task.header().owned_prev.get();
@ -66,6 +70,23 @@ impl<T: 'static> OwnedList<T> {
} }
} }
} }
/// Only used by debug assertions
fn contains(&self, task: &Task<T>) -> bool {
let mut curr = self.head;
while let Some(p) = curr {
if p == task.header().into() {
return true;
}
unsafe {
curr = *p.as_ref().owned_next.get();
}
}
false
}
} }
impl<T: 'static> fmt::Debug for OwnedList<T> { impl<T: 'static> fmt::Debug for OwnedList<T> {

View File

@ -199,8 +199,10 @@ impl State {
// Use the running flag to signal cancellation // Use the running flag to signal cancellation
if prev.is_running() { if prev.is_running() {
next.0 -= RUNNING; next.0 -= RUNNING;
next.0 |= NOTIFIED;
} else if prev.is_notified() { } else if prev.is_notified() {
next.0 += RUNNING; next.0 += RUNNING;
next.0 |= NOTIFIED;
} else { } else {
next.0 |= CANCELLED; next.0 |= CANCELLED;
} }

View File

@ -146,6 +146,41 @@ fn pool_shutdown() {
}); });
} }
#[test]
fn complete_block_on_under_load() {
loom::model(|| {
let pool = ThreadPool::new();
pool.block_on(async {
// Spin hard
crate::spawn(async {
for _ in 0..2 {
yield_once().await;
}
});
gated2(true).await
});
});
}
use futures::future::poll_fn;
use std::task::Poll;
async fn yield_once() {
let mut yielded = false;
poll_fn(|cx| {
if yielded {
Poll::Ready(())
} else {
loom::thread::yield_now();
yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
}
fn gated() -> impl Future<Output = &'static str> { fn gated() -> impl Future<Output = &'static str> {
gated2(false) gated2(false)
} }

View File

@ -4,6 +4,9 @@ mod loom_pool;
#[cfg(loom)] #[cfg(loom)]
mod loom_queue; mod loom_queue;
#[cfg(not(loom))]
mod pool;
#[cfg(not(loom))] #[cfg(not(loom))]
mod queue; mod queue;

View File

@ -0,0 +1,196 @@
#![warn(rust_2018_idioms)]
use crate::executor::park::{Park, Unpark};
use crate::executor::thread_pool;
use futures_util::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::*;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
#[test]
fn eagerly_drops_futures() {
use std::sync::{mpsc, Mutex};
struct MyPark {
rx: mpsc::Receiver<()>,
tx: Mutex<mpsc::Sender<()>>,
#[allow(dead_code)]
park_tx: mpsc::SyncSender<()>,
unpark_tx: mpsc::SyncSender<()>,
}
impl Park for MyPark {
type Unpark = MyUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
MyUnpark {
tx: Mutex::new(self.tx.lock().unwrap().clone()),
unpark_tx: self.unpark_tx.clone(),
}
}
fn park(&mut self) -> Result<(), Self::Error> {
let _ = self.rx.recv();
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
let _ = self.rx.recv_timeout(duration);
Ok(())
}
}
struct MyUnpark {
tx: Mutex<mpsc::Sender<()>>,
#[allow(dead_code)]
unpark_tx: mpsc::SyncSender<()>,
}
impl Unpark for MyUnpark {
fn unpark(&self) {
let _ = self.tx.lock().unwrap().send(());
}
}
let (task_tx, task_rx) = mpsc::channel();
let (drop_tx, drop_rx) = mpsc::channel();
let (park_tx, park_rx) = mpsc::sync_channel(0);
let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
let pool = thread_pool::Builder::new()
.num_threads(4)
.build_with_park(move |_| {
let (tx, rx) = mpsc::channel();
MyPark {
tx: Mutex::new(tx),
rx,
park_tx: park_tx.clone(),
unpark_tx: unpark_tx.clone(),
}
});
struct MyTask {
task_tx: Option<mpsc::Sender<Waker>>,
drop_tx: mpsc::Sender<()>,
}
impl Future for MyTask {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Some(tx) = self.get_mut().task_tx.take() {
tx.send(cx.waker().clone()).unwrap();
}
Poll::Pending
}
}
impl Drop for MyTask {
fn drop(&mut self) {
self.drop_tx.send(()).unwrap();
}
}
pool.spawn(MyTask {
task_tx: Some(task_tx),
drop_tx,
});
// Wait until we get the task handle.
let task = task_rx.recv().unwrap();
// Drop the pool, this should result in futures being forcefully dropped.
drop(pool);
// Make sure `MyPark` and `MyUnpark` were dropped during shutdown.
assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
// If the future is forcefully dropped, then we will get a signal here.
drop_rx.recv().unwrap();
// Ensure `task` lives until after the test completes.
drop(task);
}
#[test]
fn park_called_at_interval() {
struct MyPark {
park_light: Arc<AtomicBool>,
}
struct MyUnpark {}
impl Park for MyPark {
type Unpark = MyUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
MyUnpark {}
}
fn park(&mut self) -> Result<(), Self::Error> {
use std::thread;
use std::time::Duration;
thread::sleep(Duration::from_millis(1));
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
if duration == Duration::from_millis(0) {
self.park_light.store(true, Relaxed);
Ok(())
} else {
self.park()
}
}
}
impl Unpark for MyUnpark {
fn unpark(&self) {}
}
let park_light_1 = Arc::new(AtomicBool::new(false));
let park_light_2 = park_light_1.clone();
let (done_tx, done_rx) = mpsc::channel();
// Use 1 thread to ensure the worker stays busy.
let pool = thread_pool::Builder::new()
.num_threads(1)
.build_with_park(move |idx| {
assert_eq!(idx, 0);
MyPark {
park_light: park_light_2.clone(),
}
});
let mut cnt = 0;
pool.spawn(poll_fn(move |cx| {
let did_park_light = park_light_1.load(Relaxed);
if did_park_light {
// There is a bit of a race where the worker can tick a few times
// before seeing the task
assert!(cnt > 50);
done_tx.send(()).unwrap();
return Poll::Ready(());
}
cnt += 1;
cx.waker().wake_by_ref();
Poll::Pending
}));
done_rx.recv().unwrap();
}

421
tokio/tests/rt_common.rs Normal file
View File

@ -0,0 +1,421 @@
// Tests to run on both current-thread & therad-pool runtime variants.
#![warn(rust_2018_idioms)]
macro_rules! rt_test {
($($t:tt)*) => {
mod current_thread {
$($t)*
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.build()
.unwrap()
}
}
mod thread_pool {
$($t)*
fn rt() -> Runtime {
Runtime::new().unwrap()
}
}
}
}
#[test]
fn send_sync_bound() {
use tokio::runtime::Runtime;
fn is_send<T: Send + Sync>() {}
is_send::<Runtime>();
}
rt_test! {
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::timer;
use tokio_test::{assert_err, assert_ok};
use futures_util::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll};
use std::thread;
use std::time::{Duration, Instant};
#[test]
fn block_on_sync() {
let mut rt = rt();
let mut win = false;
rt.block_on(async {
win = true;
});
assert!(win);
}
#[test]
fn block_on_async() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_one() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_two() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async move {
assert_ok!(tx1.send("ZOMG"));
});
tokio::spawn(async move {
let msg = assert_ok!(rx1.await);
assert_ok!(tx2.send(msg));
});
assert_ok!(rx2.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_many() {
use tokio::sync::mpsc;
const ITER: usize = 20;
let mut rt = rt();
let out = rt.block_on(async {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
let mut done_tx = done_tx.clone();
tokio::spawn(async move {
let msg = assert_ok!(rx.await);
assert_eq!(i, msg);
assert_ok!(done_tx.try_send(msg));
});
tx
})
.collect::<Vec<_>>();
drop(done_tx);
thread::spawn(move || {
for (i, tx) in txs.drain(..).enumerate() {
assert_ok!(tx.send(i));
}
});
let mut out = vec![];
while let Some(i) = done_rx.recv().await {
out.push(i);
}
out.sort();
out
});
assert_eq!(ITER, out.len());
for i in 0..ITER {
assert_eq!(i, out[i]);
}
}
#[test]
fn outstanding_tasks_dropped() {
let mut rt = rt();
let cnt = Arc::new(());
rt.block_on(async {
let cnt = cnt.clone();
tokio::spawn(poll_fn(move |_| {
assert_eq!(2, Arc::strong_count(&cnt));
Poll::Pending
}));
});
assert_eq!(2, Arc::strong_count(&cnt));
drop(rt);
assert_eq!(1, Arc::strong_count(&cnt));
}
#[test]
#[should_panic]
fn nested_rt() {
let mut rt1 = rt();
let mut rt2 = rt();
rt1.block_on(async { rt2.block_on(async { "hello" }) });
}
#[test]
fn create_rt_in_block_on() {
let mut rt1 = rt();
let mut rt2 = rt1.block_on(async { rt() });
let out = rt2.block_on(async { "ZOMG" });
assert_eq!(out, "ZOMG");
}
#[test]
fn complete_block_on_under_load() {
let mut rt = rt();
rt.block_on(async {
let (tx, rx) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
}
#[test]
fn complete_task_under_load() {
let mut rt = rt();
rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx1.send(()));
});
tokio::spawn(async move {
assert_ok!(rx1.await);
assert_ok!(tx2.send(()));
});
assert_ok!(rx2.await);
});
}
#[test]
fn spawn_from_other_thread() {
let mut rt = rt();
let sp = rt.spawner();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
sp.spawn(async move {
assert_ok!(tx.send(()));
});
});
rt.block_on(async move {
assert_ok!(rx.await);
});
}
#[test]
fn delay_at_root() {
let mut rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
timer::delay_for(dur).await;
});
assert!(now.elapsed() >= dur);
}
#[test]
fn delay_in_spawn() {
let mut rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
timer::delay_for(dur).await;
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
assert!(now.elapsed() >= dur);
}
#[test]
fn block_on_socket() {
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = listener.accept().await;
tx.send(()).unwrap();
});
TcpStream::connect(&addr).await.unwrap();
rx.await.unwrap();
});
}
#[test]
fn client_server_block_on() {
let mut rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async move { client_server(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
}
#[test]
#[ignore]
fn panic_in_task() {
let rt = rt();
let (tx, rx) = mpsc::channel();
struct Boom(mpsc::Sender<()>);
impl Future for Boom {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
panic!();
}
}
impl Drop for Boom {
fn drop(&mut self) {
assert!(::std::thread::panicking());
self.0.send(()).unwrap();
}
}
rt.spawn(Boom(tx));
rx.recv().unwrap();
}
#[test]
#[should_panic]
fn panic_in_block_on() {
let mut rt = rt();
rt.block_on(async { panic!() });
}
async fn yield_once() {
let mut yielded = false;
poll_fn(|cx| {
if yielded {
Poll::Ready(())
} else {
yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
}
async fn client_server(tx: mpsc::Sender<()>) {
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
tokio::spawn(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
}
}

View File

@ -1,332 +1,29 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::timer;
use tokio_test::{assert_err, assert_ok}; use tokio_test::{assert_err, assert_ok};
use futures_util::future::poll_fn;
use std::sync::{mpsc, Arc};
use std::task::Poll;
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::Duration;
#[test] #[test]
fn block_on_sync() { fn spawned_task_does_not_progress_without_block_on() {
let mut rt = rt(); let (tx, mut rx) = oneshot::channel();
let mut win = false;
rt.block_on(async {
win = true;
});
assert!(win);
}
#[test]
fn block_on_async() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_one() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_two() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async move {
assert_ok!(tx1.send("ZOMG"));
});
tokio::spawn(async move {
let msg = assert_ok!(rx1.await);
assert_ok!(tx2.send(msg));
});
assert_ok!(rx2.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_many() {
use tokio::sync::mpsc;
const ITER: usize = 10;
let mut rt = rt(); let mut rt = rt();
let out = rt.block_on(async { rt.spawn(async move {
let (done_tx, mut done_rx) = mpsc::unbounded_channel(); assert_ok!(tx.send("hello"));
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
let mut done_tx = done_tx.clone();
tokio::spawn(async move {
let msg = assert_ok!(rx.await);
assert_eq!(i, msg);
assert_ok!(done_tx.try_send(msg));
});
tx
})
.collect::<Vec<_>>();
drop(done_tx);
thread::spawn(move || {
for (i, tx) in txs.drain(..).enumerate() {
assert_ok!(tx.send(i));
}
});
let mut out = vec![];
while let Some(i) = done_rx.recv().await {
out.push(i);
}
out.sort();
out
}); });
assert_eq!(ITER, out.len()); thread::sleep(Duration::from_millis(50));
for i in 0..ITER {
assert_eq!(i, out[i]);
}
}
#[test]
fn outstanding_tasks_dropped() {
let mut rt = rt();
let cnt = Arc::new(());
rt.block_on(async {
let cnt = cnt.clone();
tokio::spawn(poll_fn(move |_| {
assert_eq!(2, Arc::strong_count(&cnt));
Poll::Pending
}));
});
assert_eq!(2, Arc::strong_count(&cnt));
drop(rt);
assert_eq!(1, Arc::strong_count(&cnt));
}
#[test]
#[should_panic]
fn nested_rt() {
let mut rt1 = rt();
let mut rt2 = rt();
rt1.block_on(async { rt2.block_on(async { "hello" }) });
}
#[test]
fn create_rt_in_block_on() {
let mut rt1 = rt();
let mut rt2 = rt1.block_on(async { rt() });
let out = rt2.block_on(async { "ZOMG" });
assert_eq!(out, "ZOMG");
}
#[test]
fn complete_block_on_under_load() {
let mut rt = rt();
rt.block_on(async {
let (tx, rx) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
}
#[test]
fn complete_task_under_load() {
let mut rt = rt();
rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx1.send(()));
});
tokio::spawn(async move {
assert_ok!(rx1.await);
assert_ok!(tx2.send(()));
});
assert_ok!(rx2.await);
});
}
#[test]
fn spawn_from_other_thread() {
let mut rt = rt();
let sp = rt.spawner();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
sp.spawn(async move {
assert_ok!(tx.send(()));
});
});
rt.block_on(async move {
assert_ok!(rx.await);
});
}
#[test]
fn delay_at_root() {
let mut rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
timer::delay_for(dur).await;
});
assert!(now.elapsed() >= dur);
}
#[test]
fn delay_in_spawn() {
let mut rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
timer::delay_for(dur).await;
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
assert!(now.elapsed() >= dur);
}
#[test]
fn client_server_block_on() {
let mut rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async move { client_server(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv()); assert_err!(rx.try_recv());
}
async fn yield_once() { let out = rt.block_on(async { assert_ok!(rx.await) });
let mut yielded = false;
poll_fn(|cx| {
if yielded {
Poll::Ready(())
} else {
yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
}
async fn client_server(tx: mpsc::Sender<()>) { assert_eq!(out, "hello");
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
tokio::spawn(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
} }
fn rt() -> Runtime { fn rt() -> Runtime {

View File

@ -2,14 +2,139 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Runtime; use tokio::runtime::{self, Runtime};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::timer::delay;
use tokio_test::{assert_err, assert_ok}; use tokio_test::{assert_err, assert_ok};
use std::sync::{mpsc, Arc, Mutex}; use std::future::Future;
use std::thread; use std::pin::Pin;
use std::time::{Duration, Instant}; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll};
#[test]
fn single_thread() {
// No panic when starting a runtime w/ a single thread
let _ = runtime::Builder::new().num_threads(1).build();
}
#[test]
fn many_oneshot_futures() {
// used for notifying the main thread
const NUM: usize = 1_000;
for _ in 0..5 {
let (tx, rx) = mpsc::channel();
let rt = rt();
let cnt = Arc::new(AtomicUsize::new(0));
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
drop(rt);
}
}
#[test]
fn many_multishot_futures() {
use tokio::sync::mpsc;
const CHAIN: usize = 200;
const CYCLES: usize = 5;
const TRACKS: usize = 50;
for _ in 0..50 {
let rt = rt();
let mut start_txs = Vec::with_capacity(TRACKS);
let mut final_rxs = Vec::with_capacity(TRACKS);
for _ in 0..TRACKS {
let (start_tx, mut chain_rx) = mpsc::channel(10);
for _ in 0..CHAIN {
let (mut next_tx, next_rx) = mpsc::channel(10);
// Forward all the messages
rt.spawn(async move {
while let Some(v) = chain_rx.recv().await {
next_tx.send(v).await.unwrap();
}
});
chain_rx = next_rx;
}
// This final task cycles if needed
let (mut final_tx, final_rx) = mpsc::channel(10);
let mut cycle_tx = start_tx.clone();
let mut rem = CYCLES;
rt.spawn(async move {
for _ in 0..CYCLES {
let msg = chain_rx.recv().await.unwrap();
rem -= 1;
if rem == 0 {
final_tx.send(msg).await.unwrap();
} else {
cycle_tx.send(msg).await.unwrap();
}
}
});
start_txs.push(start_tx);
final_rxs.push(final_rx);
}
{
let mut e = tokio::executor::enter().unwrap();
e.block_on(async move {
for mut start_tx in start_txs {
start_tx.send("ping").await.unwrap();
}
for mut final_rx in final_rxs {
final_rx.recv().await.unwrap();
}
});
}
}
}
#[test]
fn spawn_shutdown() {
let mut rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async {
tokio::spawn(client_server(tx.clone()));
});
// Use spawner
rt.spawn(client_server(tx));
assert_ok!(rx.recv());
assert_ok!(rx.recv());
drop(rt);
assert_err!(rx.try_recv());
}
async fn client_server(tx: mpsc::Sender<()>) { async fn client_server(tx: mpsc::Sender<()>) {
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@ -36,129 +161,58 @@ async fn client_server(tx: mpsc::Sender<()>) {
} }
#[test] #[test]
fn send_sync_bound() { fn drop_threadpool_drops_futures() {
fn is_send<T: Send + Sync>() {} for _ in 0..1_000 {
let num_inc = Arc::new(AtomicUsize::new(0));
let num_dec = Arc::new(AtomicUsize::new(0));
let num_drop = Arc::new(AtomicUsize::new(0));
is_send::<Runtime>(); struct Never(Arc<AtomicUsize>);
}
#[test] impl Future for Never {
fn spawn_shutdown() { type Output = ();
let mut rt = Runtime::new().unwrap();
let (tx, rx) = mpsc::channel();
rt.block_on(async { fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
tokio::spawn(client_server(tx.clone())); Poll::Pending
}); }
// Use spawner
rt.spawner().spawn(client_server(tx));
assert_ok!(rx.recv());
assert_ok!(rx.recv());
drop(rt);
assert_err!(rx.try_recv());
}
#[test]
fn block_on_timer() {
let mut rt = Runtime::new().unwrap();
let v = rt.block_on(async move {
delay(Instant::now() + Duration::from_millis(100)).await;
42
});
assert_eq!(v, 42);
}
#[test]
fn block_on_socket() {
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = listener.accept().await;
tx.send(()).unwrap();
});
TcpStream::connect(&addr).await.unwrap();
rx.await.unwrap();
});
}
#[test]
fn block_waits() {
let (a_tx, a_rx) = oneshot::channel();
let (b_tx, b_rx) = mpsc::channel();
thread::spawn(|| {
use std::time::Duration;
thread::sleep(Duration::from_millis(1000));
a_tx.send(()).unwrap();
});
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
a_rx.await.unwrap();
b_tx.send(()).unwrap();
});
assert_ok!(b_rx.try_recv());
}
#[test]
fn spawn_many() {
const ITER: usize = 200;
let mut rt = Runtime::new().unwrap();
let cnt = Arc::new(Mutex::new(0));
let (tx, rx) = mpsc::channel();
let tx = Arc::new(Mutex::new(tx));
let c = cnt.clone();
rt.block_on(async move {
for _ in 0..ITER {
let c = c.clone();
let tx = tx.clone();
tokio::spawn(async move {
let mut x = c.lock().unwrap();
*x = 1 + *x;
if *x == ITER {
tx.lock().unwrap().send(()).unwrap();
}
});
} }
});
rx.recv().unwrap(); impl Drop for Never {
assert_eq!(ITER, *cnt.lock().unwrap()); fn drop(&mut self) {
} self.0.fetch_add(1, Relaxed);
}
}
#[test] let a = num_inc.clone();
fn nested_enter() { let b = num_dec.clone();
use std::panic;
let mut rt = Runtime::new().unwrap(); let rt = runtime::Builder::new()
rt.block_on(async { .after_start(move || {
assert_err!(tokio::executor::enter()); a.fetch_add(1, Relaxed);
})
.before_stop(move || {
b.fetch_add(1, Relaxed);
})
.build()
.unwrap();
let res = panic::catch_unwind(move || { rt.spawn(Never(num_drop.clone()));
let mut rt = Runtime::new().unwrap();
rt.block_on(async {});
});
assert_err!(res); // Wait for the pool to shutdown
}); drop(rt);
// Assert that only a single thread was spawned.
let a = num_inc.load(Relaxed);
assert!(a >= 1);
// Assert that all threads shutdown
let b = num_dec.load(Relaxed);
assert_eq!(a, b);
// Assert that the future was dropped
let c = num_drop.load(Relaxed);
assert_eq!(c, 1);
}
} }
#[test] #[test]
@ -180,14 +234,87 @@ fn after_start_and_before_stop_is_called() {
.build() .build()
.unwrap(); .unwrap();
let (tx, rx) = mpsc::channel(); let (tx, rx) = oneshot::channel();
rt.block_on(client_server(tx)); rt.spawn(async move {
assert_ok!(tx.send(()));
});
assert_ok!(rt.block_on(rx));
drop(rt); drop(rt);
assert_ok!(rx.try_recv());
assert!(after_start.load(Ordering::Relaxed) > 0); assert!(after_start.load(Ordering::Relaxed) > 0);
assert!(before_stop.load(Ordering::Relaxed) > 0); assert!(before_stop.load(Ordering::Relaxed) > 0);
} }
#[test]
fn blocking() {
// used for notifying the main thread
const NUM: usize = 1_000;
for _ in 0..10 {
let (tx, rx) = mpsc::channel();
let rt = rt();
let cnt = Arc::new(AtomicUsize::new(0));
// there are four workers in the pool
// so, if we run 4 blocking tasks, we know that handoff must have happened
let block = Arc::new(std::sync::Barrier::new(5));
for _ in 0..4 {
let block = block.clone();
rt.spawn(async move {
tokio::executor::thread_pool::blocking(move || {
block.wait();
block.wait();
})
});
}
block.wait();
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
block.wait();
}
}
#[test]
fn multi_threadpool() {
use tokio::sync::oneshot;
let rt1 = rt();
let rt2 = rt();
let (tx, rx) = oneshot::channel();
let (done_tx, done_rx) = mpsc::channel();
rt2.spawn(async move {
rx.await.unwrap();
done_tx.send(()).unwrap();
});
rt1.spawn(async move {
tx.send(()).unwrap();
});
done_rx.recv().unwrap();
}
fn rt() -> Runtime {
Runtime::new().unwrap()
}

View File

@ -1,530 +0,0 @@
#![warn(rust_2018_idioms)]
use tokio::executor::park::{Park, Unpark};
use tokio::executor::thread_pool::{self, *};
use futures_util::future::poll_fn;
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::*;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
thread_local!(static FOO: Cell<u32> = Cell::new(0));
#[test]
fn single_thread() {
// No panic when starting a runtime w/ a single thread
let _ = Builder::new().num_threads(1).build();
}
#[test]
fn shutdown_drops_futures() {
for _ in 0..1_000 {
let num_inc = Arc::new(AtomicUsize::new(0));
let num_dec = Arc::new(AtomicUsize::new(0));
let num_drop = Arc::new(AtomicUsize::new(0));
struct Never(Arc<AtomicUsize>);
impl Future for Never {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}
impl Drop for Never {
fn drop(&mut self) {
self.0.fetch_add(1, Relaxed);
}
}
let a = num_inc.clone();
let b = num_dec.clone();
let mut pool = Builder::new()
.around_worker(move |_, work| {
a.fetch_add(1, Relaxed);
work();
b.fetch_add(1, Relaxed);
})
.build();
// let tx = pool.sender().clone();
pool.spawn(Never(num_drop.clone()));
// Wait for the pool to shutdown
pool.shutdown_now();
// Assert that only a single thread was spawned.
let a = num_inc.load(Relaxed);
assert!(a >= 1);
// Assert that all threads shutdown
let b = num_dec.load(Relaxed);
assert_eq!(a, b);
// Assert that the future was dropped
let c = num_drop.load(Relaxed);
assert_eq!(c, 1);
}
}
#[test]
fn drop_threadpool_drops_futures() {
const NUM_THREADS: usize = 10;
for _ in 0..1_000 {
let num_inc = Arc::new(AtomicUsize::new(0));
let num_dec = Arc::new(AtomicUsize::new(0));
let num_drop = Arc::new(AtomicUsize::new(0));
struct Never(Arc<AtomicUsize>);
impl Future for Never {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}
impl Drop for Never {
fn drop(&mut self) {
self.0.fetch_add(1, Relaxed);
}
}
let a = num_inc.clone();
let b = num_dec.clone();
let pool = Builder::new()
.num_threads(NUM_THREADS)
.around_worker(move |_, work| {
a.fetch_add(1, Relaxed);
work();
b.fetch_add(1, Relaxed);
})
.build();
pool.spawn(Never(num_drop.clone()));
// Wait for the pool to shutdown
drop(pool);
// Assert that all the threads spawned
let a = num_inc.load(Relaxed);
assert_eq!(a, NUM_THREADS);
// Assert that all threads shutdown
let b = num_dec.load(Relaxed);
assert_eq!(a, b);
// Assert that the future was dropped
let c = num_drop.load(Relaxed);
assert_eq!(c, 1);
}
}
#[test]
fn blocking() {
// used for notifying the main thread
const NUM: usize = 10_000;
for _ in 0..50 {
let (tx, rx) = mpsc::channel();
let mut pool = new_pool();
let cnt = Arc::new(AtomicUsize::new(0));
// there are four workers in the pool
// so, if we run 4 blocking tasks, we know that handoff must have happened
let block = Arc::new(std::sync::Barrier::new(5));
for _ in 0..4 {
let block = block.clone();
pool.spawn(async move {
thread_pool::blocking(move || {
block.wait();
block.wait();
})
});
}
block.wait();
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
pool.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
block.wait();
pool.shutdown_now();
}
}
#[test]
fn many_oneshot_futures() {
// used for notifying the main thread
const NUM: usize = 10_000;
for _ in 0..50 {
let (tx, rx) = mpsc::channel();
let mut pool = new_pool();
let cnt = Arc::new(AtomicUsize::new(0));
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
pool.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
// Wait for the pool to shutdown
pool.shutdown_now();
}
}
#[test]
fn many_multishot_futures() {
use tokio::sync::mpsc;
const CHAIN: usize = 200;
const CYCLES: usize = 5;
const TRACKS: usize = 50;
for _ in 0..50 {
let pool = new_pool();
let mut start_txs = Vec::with_capacity(TRACKS);
let mut final_rxs = Vec::with_capacity(TRACKS);
for _ in 0..TRACKS {
let (start_tx, mut chain_rx) = mpsc::channel(10);
for _ in 0..CHAIN {
let (mut next_tx, next_rx) = mpsc::channel(10);
// Forward all the messages
pool.spawn(async move {
while let Some(v) = chain_rx.recv().await {
next_tx.send(v).await.unwrap();
}
});
chain_rx = next_rx;
}
// This final task cycles if needed
let (mut final_tx, final_rx) = mpsc::channel(10);
let mut cycle_tx = start_tx.clone();
let mut rem = CYCLES;
pool.spawn(async move {
for _ in 0..CYCLES {
let msg = chain_rx.recv().await.unwrap();
rem -= 1;
if rem == 0 {
final_tx.send(msg).await.unwrap();
} else {
cycle_tx.send(msg).await.unwrap();
}
}
});
start_txs.push(start_tx);
final_rxs.push(final_rx);
}
{
let mut e = tokio::executor::enter().unwrap();
e.block_on(async move {
for mut start_tx in start_txs {
start_tx.send("ping").await.unwrap();
}
for mut final_rx in final_rxs {
final_rx.recv().await.unwrap();
}
});
}
}
}
#[test]
fn global_executor_is_configured() {
let pool = new_pool();
let (signal_tx, signal_rx) = mpsc::channel();
pool.spawn(async move {
tokio::executor::spawn(async move {
signal_tx.send(()).unwrap();
});
});
signal_rx.recv().unwrap();
}
#[test]
fn new_threadpool_is_idle() {
let mut pool = new_pool();
pool.shutdown_now();
}
#[test]
fn panic_in_task() {
let pool = new_pool();
let (tx, rx) = mpsc::channel();
struct Boom(mpsc::Sender<()>);
impl Future for Boom {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
panic!();
}
}
impl Drop for Boom {
fn drop(&mut self) {
assert!(::std::thread::panicking());
self.0.send(()).unwrap();
}
}
pool.spawn(Boom(tx));
rx.recv().unwrap();
}
#[test]
fn multi_threadpool() {
use tokio::sync::oneshot;
let pool1 = new_pool();
let pool2 = new_pool();
let (tx, rx) = oneshot::channel();
let (done_tx, done_rx) = mpsc::channel();
pool2.spawn(async move {
rx.await.unwrap();
done_tx.send(()).unwrap();
});
pool1.spawn(async move {
tx.send(()).unwrap();
});
done_rx.recv().unwrap();
}
#[test]
fn eagerly_drops_futures() {
use std::sync::{mpsc, Mutex};
struct MyPark {
rx: mpsc::Receiver<()>,
tx: Mutex<mpsc::Sender<()>>,
#[allow(dead_code)]
park_tx: mpsc::SyncSender<()>,
unpark_tx: mpsc::SyncSender<()>,
}
impl Park for MyPark {
type Unpark = MyUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
MyUnpark {
tx: Mutex::new(self.tx.lock().unwrap().clone()),
unpark_tx: self.unpark_tx.clone(),
}
}
fn park(&mut self) -> Result<(), Self::Error> {
let _ = self.rx.recv();
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
let _ = self.rx.recv_timeout(duration);
Ok(())
}
}
struct MyUnpark {
tx: Mutex<mpsc::Sender<()>>,
#[allow(dead_code)]
unpark_tx: mpsc::SyncSender<()>,
}
impl Unpark for MyUnpark {
fn unpark(&self) {
let _ = self.tx.lock().unwrap().send(());
}
}
let (task_tx, task_rx) = mpsc::channel();
let (drop_tx, drop_rx) = mpsc::channel();
let (park_tx, park_rx) = mpsc::sync_channel(0);
let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
let pool = Builder::new().num_threads(4).build_with_park(move |_| {
let (tx, rx) = mpsc::channel();
MyPark {
tx: Mutex::new(tx),
rx,
park_tx: park_tx.clone(),
unpark_tx: unpark_tx.clone(),
}
});
struct MyTask {
task_tx: Option<mpsc::Sender<Waker>>,
drop_tx: mpsc::Sender<()>,
}
impl Future for MyTask {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Some(tx) = self.get_mut().task_tx.take() {
tx.send(cx.waker().clone()).unwrap();
}
Poll::Pending
}
}
impl Drop for MyTask {
fn drop(&mut self) {
self.drop_tx.send(()).unwrap();
}
}
pool.spawn(MyTask {
task_tx: Some(task_tx),
drop_tx,
});
// Wait until we get the task handle.
let task = task_rx.recv().unwrap();
// Drop the pool, this should result in futures being forcefully dropped.
drop(pool);
// Make sure `MyPark` and `MyUnpark` were dropped during shutdown.
assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
// If the future is forcefully dropped, then we will get a signal here.
drop_rx.recv().unwrap();
// Ensure `task` lives until after the test completes.
drop(task);
}
#[test]
fn park_called_at_interval() {
struct MyPark {
park_light: Arc<AtomicBool>,
}
struct MyUnpark {}
impl Park for MyPark {
type Unpark = MyUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
MyUnpark {}
}
fn park(&mut self) -> Result<(), Self::Error> {
use std::thread;
use std::time::Duration;
thread::sleep(Duration::from_millis(1));
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
if duration == Duration::from_millis(0) {
self.park_light.store(true, Relaxed);
Ok(())
} else {
self.park()
}
}
}
impl Unpark for MyUnpark {
fn unpark(&self) {}
}
let park_light_1 = Arc::new(AtomicBool::new(false));
let park_light_2 = park_light_1.clone();
let (done_tx, done_rx) = mpsc::channel();
// Use 1 thread to ensure the worker stays busy.
let pool = Builder::new().num_threads(1).build_with_park(move |idx| {
assert_eq!(idx, 0);
MyPark {
park_light: park_light_2.clone(),
}
});
let mut cnt = 0;
pool.spawn(poll_fn(move |cx| {
let did_park_light = park_light_1.load(Relaxed);
if did_park_light {
// There is a bit of a race where the worker can tick a few times
// before seeing the task
assert!(cnt > 50);
done_tx.send(()).unwrap();
return Poll::Ready(());
}
cnt += 1;
cx.waker().wake_by_ref();
Poll::Pending
}));
done_rx.recv().unwrap();
}
fn new_pool() -> ThreadPool {
Builder::new().num_threads(4).build()
}