From 939a0dd7b0c0f0c82d335a1a6038877aa90df01a Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Sat, 30 Nov 2019 15:17:04 -0800 Subject: [PATCH] process: rewrite and simplify the issue_42 test (#1871) --- tokio/tests/process_issue_42.rs | 78 +++++++++++---------------------- 1 file changed, 26 insertions(+), 52 deletions(-) diff --git a/tokio/tests/process_issue_42.rs b/tokio/tests/process_issue_42.rs index 5b9759acf..022a109a6 100644 --- a/tokio/tests/process_issue_42.rs +++ b/tokio/tests/process_issue_42.rs @@ -2,61 +2,35 @@ #![cfg(feature = "full")] #![cfg(unix)] -use tokio::process::Command; -use tokio::runtime; - -use futures::future::FutureExt; -use futures::stream::FuturesOrdered; +use futures::future::join_all; use std::process::Stdio; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::thread; -use std::time::Duration; +use tokio::process::Command; +use tokio::task; -fn run_test() { - let finished = Arc::new(AtomicBool::new(false)); - let finished_clone = finished.clone(); +#[tokio::test] +async fn issue_42() { + // We spawn a many batches of processes which should exit at roughly the + // same time (modulo OS scheduling delays), to make sure that consuming + // a readiness event for one process doesn't inadvertently starve another. + // We then do this many times (in parallel) in an effort to stress test the + // implementation to ensure there are no race conditions. + // See alexcrichton/tokio-process#42 for background + let join_handles = (0..10usize).into_iter().map(|_| { + task::spawn(async { + let processes = (0..10usize).into_iter().map(|i| { + Command::new("echo") + .arg(format!("I am spawned process #{}", i)) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .kill_on_drop(true) + .spawn() + .unwrap() + }); - thread::spawn(move || { - let mut rt = runtime::Builder::new() - .basic_scheduler() - .enable_all() - .build() - .unwrap(); - - let mut futures = FuturesOrdered::new(); - rt.block_on(async { - for i in 0..2 { - futures.push( - Command::new("echo") - .arg(format!("I am spawned process #{}", i)) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .kill_on_drop(true) - .spawn() - .unwrap() - .boxed(), - ) - } - }); - - drop(rt); - finished_clone.store(true, Ordering::SeqCst); + join_all(processes).await; + }) }); - thread::sleep(Duration::from_millis(1000)); - assert!( - finished.load(Ordering::SeqCst), - "FINISHED flag not set, maybe we deadlocked?" - ); -} - -#[test] -fn issue_42() { - let max = 10; - for i in 0..max { - println!("running {}/{}", i, max); - run_test() - } + join_all(join_handles).await; }