mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00

This fixes #4801, where, as a result of https://github.com/rust-lang/rust/pull/95469, our implementation of cat used for this test no longer works, as stdio functions on windows now can abort the process if the pipe is set to nonblocking mode. Unfortunately in windows, setting one end of the pipe to be nonblocking makes the whole thing nonblocking, so when, in tokio::process we set the child pipes to nonblocking mode, it causes serious problems for any rust program at the other end. Fixing this issue is for another day, but fixing the tests is for today.
203 lines
5.8 KiB
Rust
203 lines
5.8 KiB
Rust
#![warn(rust_2018_idioms)]
|
|
#![cfg(feature = "full")]
|
|
|
|
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
|
use tokio::join;
|
|
use tokio::process::{Child, Command};
|
|
use tokio_test::assert_ok;
|
|
|
|
use futures::future::{self, FutureExt};
|
|
use std::convert::TryInto;
|
|
use std::io;
|
|
use std::process::{ExitStatus, Stdio};
|
|
|
|
// so, we need to change this back as a test, but for now this doesn't work because of:
|
|
// https://github.com/rust-lang/rust/pull/95469
|
|
//
|
|
// undo when this is closed: https://github.com/tokio-rs/tokio/issues/4802
|
|
|
|
// fn cat() -> Command {
|
|
// let mut cmd = Command::new(std::env!("CARGO_BIN_EXE_test-cat"));
|
|
// cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
|
|
// cmd
|
|
// }
|
|
|
|
fn cat() -> Command {
|
|
let mut cmd = Command::new("cat");
|
|
cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
|
|
cmd
|
|
}
|
|
|
|
async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
|
|
let mut stdin = cat.stdin.take().unwrap();
|
|
let stdout = cat.stdout.take().unwrap();
|
|
|
|
// Produce n lines on the child's stdout.
|
|
let write = async {
|
|
for i in 0..n {
|
|
let bytes = format!("line {}\n", i).into_bytes();
|
|
stdin.write_all(&bytes).await.unwrap();
|
|
}
|
|
|
|
drop(stdin);
|
|
};
|
|
|
|
let read = async {
|
|
let mut reader = BufReader::new(stdout).lines();
|
|
let mut num_lines = 0;
|
|
|
|
// Try to read `n + 1` lines, ensuring the last one is empty
|
|
// (i.e. EOF is reached after `n` lines.
|
|
loop {
|
|
let data = reader
|
|
.next_line()
|
|
.await
|
|
.unwrap_or_else(|_| Some(String::new()))
|
|
.expect("failed to read line");
|
|
|
|
let num_read = data.len();
|
|
let done = num_lines >= n;
|
|
|
|
match (done, num_read) {
|
|
(false, 0) => panic!("broken pipe"),
|
|
(true, n) if n != 0 => panic!("extraneous data"),
|
|
_ => {
|
|
let expected = format!("line {}", num_lines);
|
|
assert_eq!(expected, data);
|
|
}
|
|
};
|
|
|
|
num_lines += 1;
|
|
if num_lines >= n {
|
|
break;
|
|
}
|
|
}
|
|
};
|
|
|
|
// Compose reading and writing concurrently.
|
|
future::join3(write, read, cat.wait())
|
|
.map(|(_, _, status)| status)
|
|
.await
|
|
}
|
|
|
|
/// Check for the following properties when feeding stdin and
|
|
/// consuming stdout of a cat-like process:
|
|
///
|
|
/// - A number of lines that amounts to a number of bytes exceeding a
|
|
/// typical OS buffer size can be fed to the child without
|
|
/// deadlock. This tests that we also consume the stdout
|
|
/// concurrently; otherwise this would deadlock.
|
|
///
|
|
/// - We read the same lines from the child that we fed it.
|
|
///
|
|
/// - The child does produce EOF on stdout after the last line.
|
|
#[tokio::test]
|
|
async fn feed_a_lot() {
|
|
let child = cat().spawn().unwrap();
|
|
let status = feed_cat(child, 10000).await.unwrap();
|
|
assert_eq!(status.code(), Some(0));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn wait_with_output_captures() {
|
|
let mut child = cat().spawn().unwrap();
|
|
let mut stdin = child.stdin.take().unwrap();
|
|
|
|
let write_bytes = b"1234";
|
|
|
|
let future = async {
|
|
stdin.write_all(write_bytes).await?;
|
|
drop(stdin);
|
|
let out = child.wait_with_output();
|
|
out.await
|
|
};
|
|
|
|
let output = future.await.unwrap();
|
|
|
|
assert!(output.status.success());
|
|
assert_eq!(output.stdout, write_bytes);
|
|
assert_eq!(output.stderr.len(), 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn status_closes_any_pipes() {
|
|
// Cat will open a pipe between the parent and child.
|
|
// If `status_async` doesn't ensure the handles are closed,
|
|
// we would end up blocking forever (and time out).
|
|
let child = cat().status();
|
|
|
|
assert_ok!(child.await);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn try_wait() {
|
|
let mut child = cat().spawn().unwrap();
|
|
|
|
let id = child.id().expect("missing id");
|
|
assert!(id > 0);
|
|
|
|
assert_eq!(None, assert_ok!(child.try_wait()));
|
|
|
|
// Drop the child's stdio handles so it can terminate
|
|
drop(child.stdin.take());
|
|
drop(child.stderr.take());
|
|
drop(child.stdout.take());
|
|
|
|
assert_ok!(child.wait().await);
|
|
|
|
// test that the `.try_wait()` method is fused just like the stdlib
|
|
assert!(assert_ok!(child.try_wait()).unwrap().success());
|
|
|
|
// Can't get id after process has exited
|
|
assert_eq!(child.id(), None);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn pipe_from_one_command_to_another() {
|
|
let mut first = cat().spawn().expect("first cmd");
|
|
let mut third = cat().spawn().expect("third cmd");
|
|
|
|
// Convert ChildStdout to Stdio
|
|
let second_stdin: Stdio = first
|
|
.stdout
|
|
.take()
|
|
.expect("first.stdout")
|
|
.try_into()
|
|
.expect("first.stdout into Stdio");
|
|
|
|
// Convert ChildStdin to Stdio
|
|
let second_stdout: Stdio = third
|
|
.stdin
|
|
.take()
|
|
.expect("third.stdin")
|
|
.try_into()
|
|
.expect("third.stdin into Stdio");
|
|
|
|
let mut second = cat()
|
|
.stdin(second_stdin)
|
|
.stdout(second_stdout)
|
|
.spawn()
|
|
.expect("first cmd");
|
|
|
|
let msg = "hello world! please pipe this message through";
|
|
|
|
let mut stdin = first.stdin.take().expect("first.stdin");
|
|
let write = async move { stdin.write_all(msg.as_bytes()).await };
|
|
|
|
let mut stdout = third.stdout.take().expect("third.stdout");
|
|
let read = async move {
|
|
let mut data = String::new();
|
|
stdout.read_to_string(&mut data).await.map(|_| data)
|
|
};
|
|
|
|
let (read, write, first_status, second_status, third_status) =
|
|
join!(read, write, first.wait(), second.wait(), third.wait());
|
|
|
|
assert_eq!(msg, read.expect("read result"));
|
|
write.expect("write result");
|
|
|
|
assert!(first_status.expect("first status").success());
|
|
assert!(second_status.expect("second status").success());
|
|
assert!(third_status.expect("third status").success());
|
|
}
|