Update process to use std::future (#1343)

This commit is contained in:
andy finch 2019-07-29 21:36:11 -04:00 committed by Ivan Petkov
parent 74168ae82f
commit fbf90e6356
10 changed files with 331 additions and 206 deletions

View File

@ -9,6 +9,7 @@ members = [
"tokio-fs",
"tokio-io",
"tokio-macros",
"tokio-process",
"tokio-reactor",
"tokio-signal",
"tokio-sync",

View File

@ -37,6 +37,7 @@ jobs:
rust: $(nightly)
crates:
tokio-fs: []
tokio-process: []
tokio-reactor: []
tokio-signal: []
tokio-tcp:

View File

@ -17,18 +17,16 @@ An implementation of an asynchronous process management backed futures.
categories = ["asynchronous"]
[dependencies]
futures = "0.1.11"
tokio-io = "0.1"
tokio-reactor = "0.1"
futures-util-preview = { version = "0.3.0-alpha.17", features = ["io"] }
futures-core-preview = { version = "0.3.0-alpha.17" }
tokio-io = { version = "0.2.0", path = "../tokio-io" }
tokio-reactor = { version = "0.2.0", path = "../tokio-reactor" }
tokio-sync = { version = "0.2.0", path = "../tokio-sync" }
[dev-dependencies]
failure = "0.1"
log = "0.4"
[dev-dependencies.tokio]
version = "0.1"
default-features = false
features = ["rt-full"]
tokio = { version = "0.2.0", path = "../tokio" }
[target.'cfg(windows)'.dependencies]
mio-named-pipes = "0.1"
@ -52,4 +50,4 @@ lazy_static = "1.3"
libc = "0.2"
log = "0.4"
mio = "0.6.5"
tokio-signal = "0.2.5"
tokio-signal = { version = "0.3.0", path = "../tokio-signal" }

View File

@ -13,13 +13,12 @@
//! for it using an event loop.
//!
//! ```no_run
//! extern crate futures;
//! extern crate tokio;
//! extern crate tokio_process;
//!
//! use std::process::Command;
//!
//! use futures::Future;
//! use futures_util::future::FutureExt;
//! use tokio_process::CommandExt;
//!
//! fn main() {
@ -42,13 +41,12 @@
//! world` but we also capture its output.
//!
//! ```no_run
//! extern crate futures;
//! extern crate tokio;
//! extern crate tokio_process;
//!
//! use std::process::Command;
//!
//! use futures::Future;
//! use futures_util::future::FutureExt;
//! use tokio_process::CommandExt;
//!
//! fn main() {
@ -71,13 +69,13 @@
//!
//! ```no_run
//! extern crate failure;
//! extern crate futures;
//! extern crate tokio;
//! extern crate tokio_process;
//! extern crate tokio_io;
//!
//! use failure::Error;
//! use futures::{Future, Stream};
//! use futures_util::future::FutureExt;
//! use futures_util::stream::StreamExt;
//! use std::io::BufReader;
//! use std::process::{Command, Stdio};
//! use tokio_process::{Child, ChildStdout, CommandExt};
@ -157,8 +155,8 @@
#![warn(missing_debug_implementations)]
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/tokio-process/0.2")]
#![feature(async_await)]
extern crate futures;
extern crate tokio_io;
extern crate tokio_reactor;
@ -172,12 +170,20 @@ extern crate log;
use std::io::{self, Read, Write};
use std::process::{Command, ExitStatus, Output, Stdio};
use crate::kill::Kill;
use futures::future::{ok, Either};
use futures::{Async, Future, IntoFuture, Poll};
use futures_core::future::TryFuture;
use futures_util::future;
use futures_util::future::FutureExt;
use futures_util::io::{AsyncRead, AsyncWrite};
use futures_util::try_future::TryFutureExt;
use kill::Kill;
use std::fmt;
use tokio_io::io::read_to_end;
use tokio_io::{AsyncRead, AsyncWrite, IoFuture};
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio_io::AsyncRead as TokioAsyncRead;
use tokio_io::AsyncWrite as TokioAsyncWrite;
use tokio_reactor::Handle;
#[path = "unix/mod.rs"]
@ -362,13 +368,11 @@ impl CommandExt for Command {
self.stdout(Stdio::piped());
self.stderr(Stdio::piped());
let inner = self
.spawn_async_with_handle(handle)
.into_future()
.and_then(Child::wait_with_output);
let inner =
future::ready(self.spawn_async_with_handle(handle)).and_then(Child::wait_with_output);
OutputAsync {
inner: Box::new(inner),
inner: inner.boxed(),
}
}
}
@ -414,16 +418,16 @@ impl<T: Kill> Drop for ChildDropGuard<T> {
}
}
impl<T: Future + Kill> Future for ChildDropGuard<T> {
type Item = T::Item;
type Error = T::Error;
impl<T: TryFuture + Kill + Unpin> Future for ChildDropGuard<T> {
type Output = Result<T::Ok, T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let ret = self.inner.poll();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = Pin::get_mut(self);
let ret = inner.inner.try_poll_unpin(cx);
if let Ok(Async::Ready(_)) = ret {
if let Poll::Ready(Ok(_)) = ret {
// Avoid the overhead of trying to kill a reaped process
self.kill_on_drop = false;
inner.kill_on_drop = false;
}
ret
@ -501,24 +505,39 @@ impl Child {
/// `stderr(Stdio::piped())`, respectively, when creating a `Command`.
pub fn wait_with_output(mut self) -> WaitWithOutput {
drop(self.stdin().take());
let stdout = match self.stdout().take() {
Some(io) => Either::A(read_to_end(io, Vec::new()).map(|p| p.1)),
None => Either::B(ok(Vec::new())),
let stdout_val = self.stdout.take();
let stderr_val = self.stderr.take();
let stdout_fut = async {
match stdout_val {
Some(mut io) => {
let mut vec = Vec::new();
futures_util::io::AsyncReadExt::read_to_end(&mut io, &mut vec).await?;
Ok(vec)
}
None => Ok(Vec::new()),
}
};
let stderr = match self.stderr().take() {
Some(io) => Either::A(read_to_end(io, Vec::new()).map(|p| p.1)),
None => Either::B(ok(Vec::new())),
let stderr_fut = async {
match stderr_val {
Some(mut io) => {
let mut vec = Vec::new();
futures_util::io::AsyncReadExt::read_to_end(&mut io, &mut vec).await?;
Ok(vec)
}
None => Ok(Vec::new()),
}
};
WaitWithOutput {
inner: Box::new(
self.join3(stdout, stderr)
.map(|(status, stdout, stderr)| Output {
inner: futures_util::try_future::try_join3(stdout_fut, stderr_fut, self)
.and_then(|(stdout, stderr, status)| {
future::ok(Output {
status,
stdout,
stderr,
}),
),
})
})
.boxed(),
}
}
@ -533,13 +552,12 @@ impl Child {
/// > `Child` instance into an event loop as an alternative to this method.
///
/// ```no_run
/// # extern crate futures;
/// # extern crate tokio;
/// # extern crate tokio_process;
/// #
/// # use std::process::Command;
/// #
/// # use futures::Future;
/// # use futures_util::future::FutureExt;
/// # use tokio_process::CommandExt;
/// #
/// # fn main() {
@ -559,11 +577,10 @@ impl Child {
}
impl Future for Child {
type Item = ExitStatus;
type Error = io::Error;
type Output = io::Result<ExitStatus>;
fn poll(&mut self) -> Poll<ExitStatus, io::Error> {
self.child.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::get_mut(self).child.poll_unpin(cx)
}
}
@ -573,7 +590,7 @@ impl Future for Child {
/// contains the exit status, stdout, and stderr of a child process.
#[must_use = "futures do nothing unless polled"]
pub struct WaitWithOutput {
inner: IoFuture<Output>,
inner: Pin<Box<dyn Future<Output = io::Result<Output>> + Send>>,
}
impl fmt::Debug for WaitWithOutput {
@ -585,11 +602,10 @@ impl fmt::Debug for WaitWithOutput {
}
impl Future for WaitWithOutput {
type Item = Output;
type Error = io::Error;
type Output = io::Result<Output>;
fn poll(&mut self) -> Poll<Output, io::Error> {
self.inner.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::get_mut(self).inner.poll_unpin(cx)
}
}
@ -609,11 +625,10 @@ pub struct StatusAsync {
}
impl Future for StatusAsync {
type Item = ExitStatus;
type Error = io::Error;
type Output = io::Result<ExitStatus>;
fn poll(&mut self) -> Poll<ExitStatus, io::Error> {
self.inner.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::get_mut(self).inner.poll_unpin(cx)
}
}
@ -624,7 +639,7 @@ impl Future for StatusAsync {
/// process, collecting all of its output and its exit status.
#[must_use = "futures do nothing unless polled"]
pub struct OutputAsync {
inner: IoFuture<Output>,
inner: Pin<Box<dyn Future<Output = io::Result<Output>> + Send>>,
}
impl fmt::Debug for OutputAsync {
@ -636,11 +651,10 @@ impl fmt::Debug for OutputAsync {
}
impl Future for OutputAsync {
type Item = Output;
type Error = io::Error;
type Output = io::Result<Output>;
fn poll(&mut self) -> Poll<Output, io::Error> {
self.inner.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::get_mut(self).inner.poll_unpin(cx)
}
}
@ -678,35 +692,59 @@ pub struct ChildStderr {
impl Write for ChildStdin {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.inner.write(bytes)
self.inner.get_mut().write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
self.inner.get_mut().flush()
}
}
impl AsyncWrite for ChildStdin {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
Pin::new(&mut Pin::get_mut(self).inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut Pin::get_mut(self).inner).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut Pin::get_mut(self).inner).poll_shutdown(cx)
}
}
impl Read for ChildStdout {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.inner.read(bytes)
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.get_mut().read(buf)
}
}
impl AsyncRead for ChildStdout {}
impl AsyncRead for ChildStdout {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut Pin::get_mut(self).inner).poll_read(cx, buf)
}
}
impl Read for ChildStderr {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.inner.read(bytes)
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.get_mut().read(buf)
}
}
impl AsyncRead for ChildStderr {}
impl AsyncRead for ChildStderr {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut Pin::get_mut(self).inner).poll_read(cx, buf)
}
}
#[cfg(unix)]
mod sys {
@ -760,21 +798,25 @@ mod sys {
mod test {
use super::ChildDropGuard;
use crate::kill::Kill;
use futures::{Async, Future, Poll};
use futures_util::future::FutureExt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
struct Mock {
num_kills: usize,
num_polls: usize,
poll_result: Poll<(), ()>,
poll_result: Poll<Result<(), ()>>,
}
impl Mock {
fn new() -> Self {
Self::with_result(Ok(Async::NotReady))
Self::with_result(Poll::Pending)
}
fn with_result(result: Poll<(), ()>) -> Self {
fn with_result(result: Poll<Result<(), ()>>) -> Self {
Self {
num_kills: 0,
num_polls: 0,
@ -791,12 +833,12 @@ mod test {
}
impl Future for Mock {
type Item = ();
type Error = ();
type Output = Result<(), ()>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.num_polls += 1;
self.poll_result
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let inner = Pin::get_mut(self);
inner.num_polls += 1;
inner.poll_result
}
}
@ -829,19 +871,21 @@ mod test {
#[test]
fn no_kill_if_reaped() {
let mut mock_pending = Mock::with_result(Ok(Async::NotReady));
let mut mock_reaped = Mock::with_result(Ok(Async::Ready(())));
let mut mock_err = Mock::with_result(Err(()));
let mut mock_pending = Mock::with_result(Poll::Pending);
let mut mock_reaped = Mock::with_result(Poll::Ready(Ok(())));
let mut mock_err = Mock::with_result(Poll::Ready(Err(())));
let waker = futures_util::task::noop_waker();
let mut context = Context::from_waker(&waker);
{
let mut guard = ChildDropGuard::new(&mut mock_pending);
let _ = guard.poll();
let _ = guard.poll_unpin(&mut context);
let mut guard = ChildDropGuard::new(&mut mock_reaped);
let _ = guard.poll();
let _ = guard.poll_unpin(&mut context);
let mut guard = ChildDropGuard::new(&mut mock_err);
let _ = guard.poll();
let _ = guard.poll_unpin(&mut context);
}
assert_eq!(1, mock_pending.num_kills);

View File

@ -33,17 +33,23 @@ use self::mio::unix::{EventedFd, UnixReady};
use self::mio::{Poll as MioPoll, PollOpt, Ready, Token};
use self::orphan::{AtomicOrphanQueue, OrphanQueue, Wait};
use self::reap::Reaper;
use self::tokio_signal::unix::Signal;
use super::SpawnedChild;
use crate::kill::Kill;
use futures::future::FlattenStream;
use futures::{Future, Poll};
use futures_core::stream::Stream;
use futures_util::future;
use futures_util::future::FutureExt;
use futures_util::stream::StreamExt;
use futures_util::try_future::TryFutureExt;
use std::fmt;
use std::future::Future;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::process::{self, ExitStatus};
use tokio_io::IoFuture;
use std::task::Context;
use std::task::Poll;
use tokio_reactor::{Handle, PollEvented};
use tokio_signal::unix::Signal;
impl Wait for process::Child {
fn id(&self) -> u32 {
@ -83,9 +89,11 @@ impl OrphanQueue<process::Child> for GlobalOrphanQueue {
}
}
type ChildReaperFuture = Pin<Box<dyn Stream<Item = io::Result<()>> + Send>>;
#[must_use = "futures do nothing unless polled"]
pub struct Child {
inner: Reaper<process::Child, GlobalOrphanQueue, FlattenStream<IoFuture<Signal>>>,
inner: Reaper<process::Child, GlobalOrphanQueue, ChildReaperFuture>,
}
impl fmt::Debug for Child {
@ -102,7 +110,10 @@ pub(crate) fn spawn_child(cmd: &mut process::Command, handle: &Handle) -> io::Re
let stdout = stdio(child.stdout.take(), handle)?;
let stderr = stdio(child.stderr.take(), handle)?;
let signal = Signal::with_handle(libc::SIGCHLD, handle).flatten_stream();
let signal = Signal::with_handle(libc::SIGCHLD, handle)
.and_then(|stream| future::ok(stream.map(Ok)))
.try_flatten_stream()
.boxed();
Ok(SpawnedChild {
child: Child {
inner: Reaper::new(child, GlobalOrphanQueue, signal),
@ -126,30 +137,37 @@ impl Kill for Child {
}
impl Future for Child {
type Item = ExitStatus;
type Error = io::Error;
type Output = io::Result<ExitStatus>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
(&mut Pin::get_mut(self).inner).poll_unpin(cx)
}
}
#[derive(Debug)]
pub struct Fd<T>(T);
pub struct Fd<T> {
inner: T,
}
impl<T: io::Read> io::Read for Fd<T> {
impl<T> io::Read for Fd<T>
where
T: io::Read,
{
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.0.read(bytes)
self.inner.read(bytes)
}
}
impl<T: io::Write> io::Write for Fd<T> {
impl<T> io::Write for Fd<T>
where
T: io::Write,
{
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.0.write(bytes)
self.inner.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
self.inner.flush()
}
}
@ -158,14 +176,10 @@ where
T: AsRawFd,
{
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
self.inner.as_raw_fd()
}
}
pub type ChildStdin = PollEvented<Fd<process::ChildStdin>>;
pub type ChildStdout = PollEvented<Fd<process::ChildStdout>>;
pub type ChildStderr = PollEvented<Fd<process::ChildStderr>>;
impl<T> Evented for Fd<T>
where
T: AsRawFd,
@ -195,6 +209,10 @@ where
}
}
pub type ChildStdin = PollEvented<Fd<process::ChildStdin>>;
pub type ChildStdout = PollEvented<Fd<process::ChildStdout>>;
pub type ChildStderr = PollEvented<Fd<process::ChildStderr>>;
fn stdio<T>(option: Option<T>, handle: &Handle) -> io::Result<Option<PollEvented<Fd<T>>>>
where
T: AsRawFd,
@ -216,6 +234,6 @@ where
return Err(io::Error::last_os_error());
}
}
let io = PollEvented::new_with_handle(Fd(io), handle)?;
let io = PollEvented::new_with_handle(Fd { inner: io }, handle)?;
Ok(Some(io))
}

View File

@ -1,16 +1,21 @@
use super::orphan::{OrphanQueue, Wait};
use crate::kill::Kill;
use futures::{Async, Future, Poll, Stream};
use futures_core::stream::TryStream;
use futures_util::try_stream::TryStreamExt;
use std::future::Future;
use std::io;
use std::ops::Deref;
use std::pin::Pin;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
/// Orchestrates between registering interest for receiving signals when a
/// child process has exited, and attempting to poll for process completion.
#[derive(Debug)]
pub(crate) struct Reaper<W, Q, S>
where
W: Wait,
W: Wait + Unpin,
Q: OrphanQueue<W>,
{
inner: Option<W>,
@ -20,7 +25,7 @@ where
impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait,
W: Wait + Unpin,
Q: OrphanQueue<W>,
{
type Target = W;
@ -32,7 +37,7 @@ where
impl<W, Q, S> Reaper<W, Q, S>
where
W: Wait,
W: Wait + Unpin,
Q: OrphanQueue<W>,
{
pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
@ -54,14 +59,14 @@ where
impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait,
Q: OrphanQueue<W>,
S: Stream<Error = io::Error>,
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
S: TryStream<Error = io::Error> + Unpin,
{
type Item = ExitStatus;
type Error = io::Error;
type Output = io::Result<ExitStatus>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = Pin::get_mut(self);
loop {
// If the child hasn't exited yet, then it's our responsibility to
// ensure the current task gets notified when it might be able to
@ -82,17 +87,21 @@ where
// this future's task will be notified/woken up again. Since the
// futures model allows for spurious wake ups this extra wakeup
// should not cause significant issues with parent futures.
let registered_interest = self.signal.poll()?.is_not_ready();
let signal_poll = inner.signal.try_poll_next_unpin(cx);
if let Poll::Ready(Some(Err(err))) = signal_poll {
return Poll::Ready(Err(err));
}
let registered_interest = signal_poll.is_pending();
self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
return Ok(Async::Ready(status));
inner.orphan_queue.reap_orphans();
if let Some(status) = inner.inner_mut().try_wait()? {
return Poll::Ready(Ok(status));
}
// If our attempt to poll for the next signal was not ready, then
// we've arranged for our task to get notified and we can bail out.
if registered_interest {
return Ok(Async::NotReady);
return Poll::Pending;
} else {
// Otherwise, if the signal stream delivered a signal to us, we
// won't get notified at the next signal, so we'll loop and try
@ -105,7 +114,7 @@ where
impl<W, Q, S> Kill for Reaper<W, Q, S>
where
W: Kill + Wait,
W: Kill + Wait + Unpin,
Q: OrphanQueue<W>,
{
fn kill(&mut self) -> io::Result<()> {
@ -115,7 +124,7 @@ where
impl<W, Q, S> Drop for Reaper<W, Q, S>
where
W: Wait,
W: Wait + Unpin,
Q: OrphanQueue<W>,
{
fn drop(&mut self) {
@ -131,10 +140,14 @@ where
#[cfg(test)]
mod test {
use super::*;
use futures::{Async, Poll, Stream};
use futures_core::stream::Stream;
use futures_util::future::FutureExt;
use std::cell::{Cell, RefCell};
use std::os::unix::process::ExitStatusExt;
use std::pin::Pin;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
#[derive(Debug)]
struct MockWait {
@ -194,14 +207,14 @@ mod test {
}
impl Stream for MockStream {
type Item = ();
type Error = io::Error;
type Item = io::Result<()>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.total_polls += 1;
match self.values.remove(0) {
Some(()) => Ok(Async::Ready(Some(()))),
None => Ok(Async::NotReady),
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
let inner = Pin::get_mut(self);
inner.total_polls += 1;
match inner.values.remove(0) {
Some(()) => Poll::Ready(Some(Ok(()))),
None => Poll::Pending,
}
}
}
@ -240,8 +253,11 @@ mod test {
MockStream::new(vec![None, Some(()), None, None, None]),
);
let waker = futures_util::task::noop_waker();
let mut context = Context::from_waker(&waker);
// Not yet exited, interest registered
assert_eq!(Async::NotReady, grim.poll().expect("failed to wait"));
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(1, grim.signal.total_polls);
assert_eq!(1, grim.total_waits);
assert_eq!(1, grim.orphan_queue.total_reaps.get());
@ -249,14 +265,20 @@ mod test {
// Not yet exited, couldn't register interest the first time
// but managed to register interest the second time around
assert_eq!(Async::NotReady, grim.poll().expect("failed to wait"));
assert!(grim.poll_unpin(&mut context).is_pending());
assert_eq!(3, grim.signal.total_polls);
assert_eq!(3, grim.total_waits);
assert_eq!(3, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Exited
assert_eq!(Async::Ready(exit), grim.poll().expect("failed to wait"));
if let Poll::Ready(r) = grim.poll_unpin(&mut context) {
assert!(r.is_ok());
let exit_code = r.unwrap();
assert_eq!(exit_code, exit);
} else {
unreachable!();
}
assert_eq!(4, grim.signal.total_polls);
assert_eq!(4, grim.total_waits);
assert_eq!(4, grim.orphan_queue.total_reaps.get());

View File

@ -18,12 +18,21 @@
extern crate mio_named_pipes;
extern crate winapi;
use crate::kill::Kill;
use std::fmt;
use std::future::Future;
use std::io;
use std::os::windows::prelude::*;
use std::os::windows::process::ExitStatusExt;
use std::pin::Pin;
use std::process::{self, ExitStatus};
use std::ptr;
use std::task::Context;
use std::task::Poll;
use futures_util::future::Fuse;
use futures_util::future::FutureExt;
use self::mio_named_pipes::NamedPipe;
use self::winapi::shared::minwindef::*;
@ -35,11 +44,8 @@ use self::winapi::um::threadpoollegacyapiset::*;
use self::winapi::um::winbase::*;
use self::winapi::um::winnt::*;
use super::SpawnedChild;
use futures::future::Fuse;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use kill::Kill;
use tokio_reactor::{Handle, PollEvented};
use tokio_sync::oneshot;
#[must_use = "futures do nothing unless polled"]
pub struct Child {
@ -96,22 +102,23 @@ impl Kill for Child {
}
impl Future for Child {
type Item = ExitStatus;
type Error = io::Error;
type Output = io::Result<ExitStatus>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = Pin::get_mut(self);
loop {
if let Some(ref mut w) = self.waiting {
match w.rx.poll().expect("should not be canceled") {
Async::Ready(()) => {}
Async::NotReady => return Ok(Async::NotReady),
if let Some(ref mut w) = inner.waiting {
match w.rx.poll_unpin(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => panic!("should not be canceled"),
Poll::Pending => return Poll::Pending,
}
let status = try_wait(&self.child)?.expect("not ready yet");
return Ok(status.into());
let status = try_wait(&inner.child)?.expect("not ready yet");
return Poll::Ready(Ok(status.into()));
}
if let Some(e) = try_wait(&self.child)? {
return Ok(e.into());
if let Some(e) = try_wait(&inner.child)? {
return Poll::Ready(Ok(e.into()));
}
let (tx, rx) = oneshot::channel();
let ptr = Box::into_raw(Box::new(Some(tx)));
@ -119,7 +126,7 @@ impl Future for Child {
let rc = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
self.child.as_raw_handle(),
inner.child.as_raw_handle(),
Some(callback),
ptr as *mut _,
INFINITE,
@ -129,9 +136,9 @@ impl Future for Child {
if rc == 0 {
let err = io::Error::last_os_error();
drop(unsafe { Box::from_raw(ptr) });
return Err(err);
return Poll::Ready(Err(err));
}
self.waiting = Some(Waiting {
inner.waiting = Some(Waiting {
rx: rx.fuse(),
wait_object,
tx: ptr,

View File

@ -1,35 +1,42 @@
#![cfg(unix)]
extern crate futures;
extern crate tokio_process;
use futures::{stream, Future, IntoFuture, Stream};
use std::process::{Command, Stdio};
use futures_util::future::FutureExt;
use futures_util::stream::FuturesOrdered;
use futures_util::stream::StreamExt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::process::{Command, ExitStatus, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio_process::CommandExt;
mod support;
fn run_test() {
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
thread::spawn(move || {
let _ = stream::iter_ok(0..2)
.map(|i| {
let mut futures: FuturesOrdered<Pin<Box<dyn Future<Output = io::Result<ExitStatus>>>>> =
FuturesOrdered::new();
for i in 0..2 {
futures.push(
Command::new("echo")
.arg(format!("I am spawned process #{}", i))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn_async()
.into_future()
.flatten()
})
.buffered(2)
.collect()
.wait();
.unwrap()
.boxed(),
)
}
support::run_with_timeout(futures.collect::<Vec<io::Result<ExitStatus>>>());
finished_clone.store(true, Ordering::SeqCst);
});

View File

@ -1,15 +1,22 @@
extern crate futures;
#![feature(async_await)]
#[macro_use]
extern crate log;
extern crate tokio_io;
extern crate tokio_process;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::process::{Command, ExitStatus, Stdio};
use futures::future::Future;
use futures::stream::{self, Stream};
use tokio_io::io::{read_until, write_all};
use futures_util::future;
use futures_util::future::FutureExt;
use futures_util::io::AsyncBufReadExt;
use futures_util::io::AsyncReadExt;
use futures_util::io::AsyncWriteExt;
use futures_util::io::BufReader;
use futures_util::stream::{self, StreamExt};
use tokio_process::{Child, CommandExt};
mod support;
@ -20,28 +27,37 @@ fn cat() -> Command {
cmd
}
fn feed_cat(mut cat: Child, n: usize) -> Box<Future<Item = ExitStatus, Error = io::Error>> {
fn feed_cat(mut cat: Child, n: usize) -> Pin<Box<dyn Future<Output = io::Result<ExitStatus>>>> {
let stdin = cat.stdin().take().unwrap();
let stdout = cat.stdout().take().unwrap();
debug!("starting to feed");
// Produce n lines on the child's stdout.
let numbers = stream::iter_ok(0..n);
let numbers = stream::iter(0..n);
let write = numbers
.fold(stdin, |stdin, i| {
debug!("sending line {} to child", i);
write_all(stdin, format!("line {}\n", i).into_bytes()).map(|p| p.0)
.fold(stdin, move |mut stdin, i| {
let fut = async move {
debug!("sending line {} to child", i);
let bytes = format!("line {}\n", i).into_bytes();
AsyncWriteExt::write_all(&mut stdin, &bytes).await.unwrap();
stdin
};
fut
})
.map(|_| ());
// Try to read `n + 1` lines, ensuring the last one is empty
// (i.e. EOF is reached after `n` lines.
let reader = io::BufReader::new(stdout);
let expected_numbers = stream::iter_ok(0..=n);
let read = expected_numbers.fold((reader, 0), move |(reader, i), _| {
let done = i >= n;
debug!("starting read from child");
read_until(reader, b'\n', Vec::new()).and_then(move |(reader, vec)| {
let reader = BufReader::new(stdout);
let expected_numbers = stream::iter(0..=n);
let read = expected_numbers.fold((reader, 0), move |(mut reader, i), _| {
let fut = async move {
let done = i >= n;
debug!("starting read from child");
let mut vec = Vec::new();
AsyncBufReadExt::read_until(&mut reader, b'\n', &mut vec)
.await
.unwrap();
debug!(
"read line {} from child ({} bytes, done: {})",
i,
@ -49,23 +65,28 @@ fn feed_cat(mut cat: Child, n: usize) -> Box<Future<Item = ExitStatus, Error = i
done
);
match (done, vec.len()) {
(false, 0) => Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")),
(true, n) if n != 0 => Err(io::Error::new(io::ErrorKind::Other, "extraneous data")),
(false, 0) => {
panic!("broken pipe");
}
(true, n) if n != 0 => {
panic!("extraneous data");
}
_ => {
let s = std::str::from_utf8(&vec).unwrap();
let expected = format!("line {}\n", i);
if done || s == expected {
Ok((reader, i + 1))
(reader, i + 1)
} else {
Err(io::Error::new(io::ErrorKind::Other, "unexpected data"))
panic!("unexpected data");
}
}
}
})
};
fut
});
// Compose reading and writing concurrently.
Box::new(write.join(read).and_then(|_| cat))
future::join(write, read).then(|_| cat).boxed()
}
/// Check for the following properties when feeding stdin and
@ -89,15 +110,22 @@ fn feed_a_lot() {
#[test]
fn wait_with_output_captures() {
let mut child = cat().spawn_async().unwrap();
let stdin = child.stdin().take().unwrap();
let out = child.wait_with_output();
let mut stdin = child.stdin().take().unwrap();
let write_bytes = b"1234";
let future = async {
AsyncWriteExt::write_all(&mut stdin, write_bytes).await?;
drop(stdin);
let out = child.wait_with_output();
out.await
};
let future = write_all(stdin, b"1234").map(|p| p.1).join(out);
let ret = support::run_with_timeout(future).unwrap();
let (written, output) = ret;
let output = ret;
assert!(output.status.success());
assert_eq!(output.stdout, written);
assert_eq!(output.stdout, write_bytes);
assert_eq!(output.stderr.len(), 0);
}

View File

@ -1,14 +1,16 @@
extern crate futures;
extern crate tokio;
use self::futures::Future;
use self::tokio::timer::Timeout;
use futures_util::future;
use futures_util::future::FutureExt;
use std::env;
use std::future::Future;
use std::process::Command;
use std::time::Duration;
use tokio::timer::Timeout;
pub use self::tokio::runtime::current_thread::Runtime as CurrentThreadRuntime;
#[allow(dead_code)]
pub fn cmd(s: &str) -> Command {
let mut me = env::current_exe().unwrap();
me.pop();
@ -19,19 +21,16 @@ pub fn cmd(s: &str) -> Command {
Command::new(me)
}
pub fn with_timeout<F: Future>(future: F) -> impl Future<Item = F::Item, Error = F::Error> {
Timeout::new(future, Duration::from_secs(3)).map_err(|e| {
if e.is_timer() {
panic!("failed to register timer");
} else if e.is_elapsed() {
panic!("timed out")
} else {
e.into_inner().expect("missing inner error")
pub fn with_timeout<F: Future>(future: F) -> impl Future<Output = F::Output> {
Timeout::new(future, Duration::from_secs(3)).then(|r| {
if r.is_err() {
panic!("timed out {:?}", r.err());
}
future::ready(r.unwrap())
})
}
pub fn run_with_timeout<F>(future: F) -> Result<F::Item, F::Error>
pub fn run_with_timeout<F>(future: F) -> F::Output
where
F: Future,
{