From fbf90e63567d57520901369d780e94669e216f98 Mon Sep 17 00:00:00 2001 From: andy finch Date: Mon, 29 Jul 2019 21:36:11 -0400 Subject: [PATCH] Update process to use std::future (#1343) --- Cargo.toml | 1 + azure-pipelines.yml | 1 + tokio-process/Cargo.toml | 16 +-- tokio-process/src/lib.rs | 204 ++++++++++++++++++----------- tokio-process/src/unix/mod.rs | 62 +++++---- tokio-process/src/unix/reap.rs | 78 +++++++---- tokio-process/src/windows.rs | 43 +++--- tokio-process/tests/issue_42.rs | 29 ++-- tokio-process/tests/stdio.rs | 80 +++++++---- tokio-process/tests/support/mod.rs | 23 ++-- 10 files changed, 331 insertions(+), 206 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cb6d0ce2d..d5b7026d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "tokio-fs", "tokio-io", "tokio-macros", + "tokio-process", "tokio-reactor", "tokio-signal", "tokio-sync", diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 9c583a086..58dcb595e 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -37,6 +37,7 @@ jobs: rust: $(nightly) crates: tokio-fs: [] + tokio-process: [] tokio-reactor: [] tokio-signal: [] tokio-tcp: diff --git a/tokio-process/Cargo.toml b/tokio-process/Cargo.toml index 2c3112b84..0b2fcd636 100644 --- a/tokio-process/Cargo.toml +++ b/tokio-process/Cargo.toml @@ -17,18 +17,16 @@ An implementation of an asynchronous process management backed futures. categories = ["asynchronous"] [dependencies] -futures = "0.1.11" -tokio-io = "0.1" -tokio-reactor = "0.1" +futures-util-preview = { version = "0.3.0-alpha.17", features = ["io"] } +futures-core-preview = { version = "0.3.0-alpha.17" } +tokio-io = { version = "0.2.0", path = "../tokio-io" } +tokio-reactor = { version = "0.2.0", path = "../tokio-reactor" } +tokio-sync = { version = "0.2.0", path = "../tokio-sync" } [dev-dependencies] failure = "0.1" log = "0.4" - -[dev-dependencies.tokio] -version = "0.1" -default-features = false -features = ["rt-full"] +tokio = { version = "0.2.0", path = "../tokio" } [target.'cfg(windows)'.dependencies] mio-named-pipes = "0.1" @@ -52,4 +50,4 @@ lazy_static = "1.3" libc = "0.2" log = "0.4" mio = "0.6.5" -tokio-signal = "0.2.5" +tokio-signal = { version = "0.3.0", path = "../tokio-signal" } diff --git a/tokio-process/src/lib.rs b/tokio-process/src/lib.rs index e9e963689..6bc94ef07 100644 --- a/tokio-process/src/lib.rs +++ b/tokio-process/src/lib.rs @@ -13,13 +13,12 @@ //! for it using an event loop. //! //! ```no_run -//! extern crate futures; //! extern crate tokio; //! extern crate tokio_process; //! //! use std::process::Command; //! -//! use futures::Future; +//! use futures_util::future::FutureExt; //! use tokio_process::CommandExt; //! //! fn main() { @@ -42,13 +41,12 @@ //! world` but we also capture its output. //! //! ```no_run -//! extern crate futures; //! extern crate tokio; //! extern crate tokio_process; //! //! use std::process::Command; //! -//! use futures::Future; +//! use futures_util::future::FutureExt; //! use tokio_process::CommandExt; //! //! fn main() { @@ -71,13 +69,13 @@ //! //! ```no_run //! extern crate failure; -//! extern crate futures; //! extern crate tokio; //! extern crate tokio_process; //! extern crate tokio_io; //! //! use failure::Error; -//! use futures::{Future, Stream}; +//! use futures_util::future::FutureExt; +//! use futures_util::stream::StreamExt; //! use std::io::BufReader; //! use std::process::{Command, Stdio}; //! use tokio_process::{Child, ChildStdout, CommandExt}; @@ -157,8 +155,8 @@ #![warn(missing_debug_implementations)] #![deny(missing_docs)] #![doc(html_root_url = "https://docs.rs/tokio-process/0.2")] +#![feature(async_await)] -extern crate futures; extern crate tokio_io; extern crate tokio_reactor; @@ -172,12 +170,20 @@ extern crate log; use std::io::{self, Read, Write}; use std::process::{Command, ExitStatus, Output, Stdio}; -use crate::kill::Kill; -use futures::future::{ok, Either}; -use futures::{Async, Future, IntoFuture, Poll}; +use futures_core::future::TryFuture; +use futures_util::future; +use futures_util::future::FutureExt; +use futures_util::io::{AsyncRead, AsyncWrite}; +use futures_util::try_future::TryFutureExt; + +use kill::Kill; use std::fmt; -use tokio_io::io::read_to_end; -use tokio_io::{AsyncRead, AsyncWrite, IoFuture}; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use tokio_io::AsyncRead as TokioAsyncRead; +use tokio_io::AsyncWrite as TokioAsyncWrite; use tokio_reactor::Handle; #[path = "unix/mod.rs"] @@ -362,13 +368,11 @@ impl CommandExt for Command { self.stdout(Stdio::piped()); self.stderr(Stdio::piped()); - let inner = self - .spawn_async_with_handle(handle) - .into_future() - .and_then(Child::wait_with_output); + let inner = + future::ready(self.spawn_async_with_handle(handle)).and_then(Child::wait_with_output); OutputAsync { - inner: Box::new(inner), + inner: inner.boxed(), } } } @@ -414,16 +418,16 @@ impl Drop for ChildDropGuard { } } -impl Future for ChildDropGuard { - type Item = T::Item; - type Error = T::Error; +impl Future for ChildDropGuard { + type Output = Result; - fn poll(&mut self) -> Poll { - let ret = self.inner.poll(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = Pin::get_mut(self); + let ret = inner.inner.try_poll_unpin(cx); - if let Ok(Async::Ready(_)) = ret { + if let Poll::Ready(Ok(_)) = ret { // Avoid the overhead of trying to kill a reaped process - self.kill_on_drop = false; + inner.kill_on_drop = false; } ret @@ -501,24 +505,39 @@ impl Child { /// `stderr(Stdio::piped())`, respectively, when creating a `Command`. pub fn wait_with_output(mut self) -> WaitWithOutput { drop(self.stdin().take()); - let stdout = match self.stdout().take() { - Some(io) => Either::A(read_to_end(io, Vec::new()).map(|p| p.1)), - None => Either::B(ok(Vec::new())), + let stdout_val = self.stdout.take(); + let stderr_val = self.stderr.take(); + let stdout_fut = async { + match stdout_val { + Some(mut io) => { + let mut vec = Vec::new(); + futures_util::io::AsyncReadExt::read_to_end(&mut io, &mut vec).await?; + Ok(vec) + } + None => Ok(Vec::new()), + } }; - let stderr = match self.stderr().take() { - Some(io) => Either::A(read_to_end(io, Vec::new()).map(|p| p.1)), - None => Either::B(ok(Vec::new())), + let stderr_fut = async { + match stderr_val { + Some(mut io) => { + let mut vec = Vec::new(); + futures_util::io::AsyncReadExt::read_to_end(&mut io, &mut vec).await?; + Ok(vec) + } + None => Ok(Vec::new()), + } }; WaitWithOutput { - inner: Box::new( - self.join3(stdout, stderr) - .map(|(status, stdout, stderr)| Output { + inner: futures_util::try_future::try_join3(stdout_fut, stderr_fut, self) + .and_then(|(stdout, stderr, status)| { + future::ok(Output { status, stdout, stderr, - }), - ), + }) + }) + .boxed(), } } @@ -533,13 +552,12 @@ impl Child { /// > `Child` instance into an event loop as an alternative to this method. /// /// ```no_run - /// # extern crate futures; /// # extern crate tokio; /// # extern crate tokio_process; /// # /// # use std::process::Command; /// # - /// # use futures::Future; + /// # use futures_util::future::FutureExt; /// # use tokio_process::CommandExt; /// # /// # fn main() { @@ -559,11 +577,10 @@ impl Child { } impl Future for Child { - type Item = ExitStatus; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { - self.child.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Pin::get_mut(self).child.poll_unpin(cx) } } @@ -573,7 +590,7 @@ impl Future for Child { /// contains the exit status, stdout, and stderr of a child process. #[must_use = "futures do nothing unless polled"] pub struct WaitWithOutput { - inner: IoFuture, + inner: Pin> + Send>>, } impl fmt::Debug for WaitWithOutput { @@ -585,11 +602,10 @@ impl fmt::Debug for WaitWithOutput { } impl Future for WaitWithOutput { - type Item = Output; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { - self.inner.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Pin::get_mut(self).inner.poll_unpin(cx) } } @@ -609,11 +625,10 @@ pub struct StatusAsync { } impl Future for StatusAsync { - type Item = ExitStatus; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { - self.inner.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Pin::get_mut(self).inner.poll_unpin(cx) } } @@ -624,7 +639,7 @@ impl Future for StatusAsync { /// process, collecting all of its output and its exit status. #[must_use = "futures do nothing unless polled"] pub struct OutputAsync { - inner: IoFuture, + inner: Pin> + Send>>, } impl fmt::Debug for OutputAsync { @@ -636,11 +651,10 @@ impl fmt::Debug for OutputAsync { } impl Future for OutputAsync { - type Item = Output; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { - self.inner.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Pin::get_mut(self).inner.poll_unpin(cx) } } @@ -678,35 +692,59 @@ pub struct ChildStderr { impl Write for ChildStdin { fn write(&mut self, bytes: &[u8]) -> io::Result { - self.inner.write(bytes) + self.inner.get_mut().write(bytes) } fn flush(&mut self) -> io::Result<()> { - self.inner.flush() + self.inner.get_mut().flush() } } impl AsyncWrite for ChildStdin { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.shutdown() + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + Pin::new(&mut Pin::get_mut(self).inner).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut Pin::get_mut(self).inner).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut Pin::get_mut(self).inner).poll_shutdown(cx) } } impl Read for ChildStdout { - fn read(&mut self, bytes: &mut [u8]) -> io::Result { - self.inner.read(bytes) + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.get_mut().read(buf) } } -impl AsyncRead for ChildStdout {} +impl AsyncRead for ChildStdout { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).inner).poll_read(cx, buf) + } +} impl Read for ChildStderr { - fn read(&mut self, bytes: &mut [u8]) -> io::Result { - self.inner.read(bytes) + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.get_mut().read(buf) } } -impl AsyncRead for ChildStderr {} +impl AsyncRead for ChildStderr { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).inner).poll_read(cx, buf) + } +} #[cfg(unix)] mod sys { @@ -760,21 +798,25 @@ mod sys { mod test { use super::ChildDropGuard; use crate::kill::Kill; - use futures::{Async, Future, Poll}; + use futures_util::future::FutureExt; + use std::future::Future; use std::io; + use std::pin::Pin; + use std::task::Context; + use std::task::Poll; struct Mock { num_kills: usize, num_polls: usize, - poll_result: Poll<(), ()>, + poll_result: Poll>, } impl Mock { fn new() -> Self { - Self::with_result(Ok(Async::NotReady)) + Self::with_result(Poll::Pending) } - fn with_result(result: Poll<(), ()>) -> Self { + fn with_result(result: Poll>) -> Self { Self { num_kills: 0, num_polls: 0, @@ -791,12 +833,12 @@ mod test { } impl Future for Mock { - type Item = (); - type Error = (); + type Output = Result<(), ()>; - fn poll(&mut self) -> Poll { - self.num_polls += 1; - self.poll_result + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { + let inner = Pin::get_mut(self); + inner.num_polls += 1; + inner.poll_result } } @@ -829,19 +871,21 @@ mod test { #[test] fn no_kill_if_reaped() { - let mut mock_pending = Mock::with_result(Ok(Async::NotReady)); - let mut mock_reaped = Mock::with_result(Ok(Async::Ready(()))); - let mut mock_err = Mock::with_result(Err(())); + let mut mock_pending = Mock::with_result(Poll::Pending); + let mut mock_reaped = Mock::with_result(Poll::Ready(Ok(()))); + let mut mock_err = Mock::with_result(Poll::Ready(Err(()))); + let waker = futures_util::task::noop_waker(); + let mut context = Context::from_waker(&waker); { let mut guard = ChildDropGuard::new(&mut mock_pending); - let _ = guard.poll(); + let _ = guard.poll_unpin(&mut context); let mut guard = ChildDropGuard::new(&mut mock_reaped); - let _ = guard.poll(); + let _ = guard.poll_unpin(&mut context); let mut guard = ChildDropGuard::new(&mut mock_err); - let _ = guard.poll(); + let _ = guard.poll_unpin(&mut context); } assert_eq!(1, mock_pending.num_kills); diff --git a/tokio-process/src/unix/mod.rs b/tokio-process/src/unix/mod.rs index adf0f4ba2..585b43673 100644 --- a/tokio-process/src/unix/mod.rs +++ b/tokio-process/src/unix/mod.rs @@ -33,17 +33,23 @@ use self::mio::unix::{EventedFd, UnixReady}; use self::mio::{Poll as MioPoll, PollOpt, Ready, Token}; use self::orphan::{AtomicOrphanQueue, OrphanQueue, Wait}; use self::reap::Reaper; -use self::tokio_signal::unix::Signal; use super::SpawnedChild; use crate::kill::Kill; -use futures::future::FlattenStream; -use futures::{Future, Poll}; +use futures_core::stream::Stream; +use futures_util::future; +use futures_util::future::FutureExt; +use futures_util::stream::StreamExt; +use futures_util::try_future::TryFutureExt; use std::fmt; +use std::future::Future; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; +use std::pin::Pin; use std::process::{self, ExitStatus}; -use tokio_io::IoFuture; +use std::task::Context; +use std::task::Poll; use tokio_reactor::{Handle, PollEvented}; +use tokio_signal::unix::Signal; impl Wait for process::Child { fn id(&self) -> u32 { @@ -83,9 +89,11 @@ impl OrphanQueue for GlobalOrphanQueue { } } +type ChildReaperFuture = Pin> + Send>>; + #[must_use = "futures do nothing unless polled"] pub struct Child { - inner: Reaper>>, + inner: Reaper, } impl fmt::Debug for Child { @@ -102,7 +110,10 @@ pub(crate) fn spawn_child(cmd: &mut process::Command, handle: &Handle) -> io::Re let stdout = stdio(child.stdout.take(), handle)?; let stderr = stdio(child.stderr.take(), handle)?; - let signal = Signal::with_handle(libc::SIGCHLD, handle).flatten_stream(); + let signal = Signal::with_handle(libc::SIGCHLD, handle) + .and_then(|stream| future::ok(stream.map(Ok))) + .try_flatten_stream() + .boxed(); Ok(SpawnedChild { child: Child { inner: Reaper::new(child, GlobalOrphanQueue, signal), @@ -126,30 +137,37 @@ impl Kill for Child { } impl Future for Child { - type Item = ExitStatus; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { - self.inner.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + (&mut Pin::get_mut(self).inner).poll_unpin(cx) } } #[derive(Debug)] -pub struct Fd(T); +pub struct Fd { + inner: T, +} -impl io::Read for Fd { +impl io::Read for Fd +where + T: io::Read, +{ fn read(&mut self, bytes: &mut [u8]) -> io::Result { - self.0.read(bytes) + self.inner.read(bytes) } } -impl io::Write for Fd { +impl io::Write for Fd +where + T: io::Write, +{ fn write(&mut self, bytes: &[u8]) -> io::Result { - self.0.write(bytes) + self.inner.write(bytes) } fn flush(&mut self) -> io::Result<()> { - self.0.flush() + self.inner.flush() } } @@ -158,14 +176,10 @@ where T: AsRawFd, { fn as_raw_fd(&self) -> RawFd { - self.0.as_raw_fd() + self.inner.as_raw_fd() } } -pub type ChildStdin = PollEvented>; -pub type ChildStdout = PollEvented>; -pub type ChildStderr = PollEvented>; - impl Evented for Fd where T: AsRawFd, @@ -195,6 +209,10 @@ where } } +pub type ChildStdin = PollEvented>; +pub type ChildStdout = PollEvented>; +pub type ChildStderr = PollEvented>; + fn stdio(option: Option, handle: &Handle) -> io::Result>>> where T: AsRawFd, @@ -216,6 +234,6 @@ where return Err(io::Error::last_os_error()); } } - let io = PollEvented::new_with_handle(Fd(io), handle)?; + let io = PollEvented::new_with_handle(Fd { inner: io }, handle)?; Ok(Some(io)) } diff --git a/tokio-process/src/unix/reap.rs b/tokio-process/src/unix/reap.rs index 125aaf30f..8a70715f4 100644 --- a/tokio-process/src/unix/reap.rs +++ b/tokio-process/src/unix/reap.rs @@ -1,16 +1,21 @@ use super::orphan::{OrphanQueue, Wait}; use crate::kill::Kill; -use futures::{Async, Future, Poll, Stream}; +use futures_core::stream::TryStream; +use futures_util::try_stream::TryStreamExt; +use std::future::Future; use std::io; use std::ops::Deref; +use std::pin::Pin; use std::process::ExitStatus; +use std::task::Context; +use std::task::Poll; /// Orchestrates between registering interest for receiving signals when a /// child process has exited, and attempting to poll for process completion. #[derive(Debug)] pub(crate) struct Reaper where - W: Wait, + W: Wait + Unpin, Q: OrphanQueue, { inner: Option, @@ -20,7 +25,7 @@ where impl Deref for Reaper where - W: Wait, + W: Wait + Unpin, Q: OrphanQueue, { type Target = W; @@ -32,7 +37,7 @@ where impl Reaper where - W: Wait, + W: Wait + Unpin, Q: OrphanQueue, { pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { @@ -54,14 +59,14 @@ where impl Future for Reaper where - W: Wait, - Q: OrphanQueue, - S: Stream, + W: Wait + Unpin, + Q: OrphanQueue + Unpin, + S: TryStream + Unpin, { - type Item = ExitStatus; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = Pin::get_mut(self); loop { // If the child hasn't exited yet, then it's our responsibility to // ensure the current task gets notified when it might be able to @@ -82,17 +87,21 @@ where // this future's task will be notified/woken up again. Since the // futures model allows for spurious wake ups this extra wakeup // should not cause significant issues with parent futures. - let registered_interest = self.signal.poll()?.is_not_ready(); + let signal_poll = inner.signal.try_poll_next_unpin(cx); + if let Poll::Ready(Some(Err(err))) = signal_poll { + return Poll::Ready(Err(err)); + } + let registered_interest = signal_poll.is_pending(); - self.orphan_queue.reap_orphans(); - if let Some(status) = self.inner_mut().try_wait()? { - return Ok(Async::Ready(status)); + inner.orphan_queue.reap_orphans(); + if let Some(status) = inner.inner_mut().try_wait()? { + return Poll::Ready(Ok(status)); } // If our attempt to poll for the next signal was not ready, then // we've arranged for our task to get notified and we can bail out. if registered_interest { - return Ok(Async::NotReady); + return Poll::Pending; } else { // Otherwise, if the signal stream delivered a signal to us, we // won't get notified at the next signal, so we'll loop and try @@ -105,7 +114,7 @@ where impl Kill for Reaper where - W: Kill + Wait, + W: Kill + Wait + Unpin, Q: OrphanQueue, { fn kill(&mut self) -> io::Result<()> { @@ -115,7 +124,7 @@ where impl Drop for Reaper where - W: Wait, + W: Wait + Unpin, Q: OrphanQueue, { fn drop(&mut self) { @@ -131,10 +140,14 @@ where #[cfg(test)] mod test { use super::*; - use futures::{Async, Poll, Stream}; + use futures_core::stream::Stream; + use futures_util::future::FutureExt; use std::cell::{Cell, RefCell}; use std::os::unix::process::ExitStatusExt; + use std::pin::Pin; use std::process::ExitStatus; + use std::task::Context; + use std::task::Poll; #[derive(Debug)] struct MockWait { @@ -194,14 +207,14 @@ mod test { } impl Stream for MockStream { - type Item = (); - type Error = io::Error; + type Item = io::Result<()>; - fn poll(&mut self) -> Poll, Self::Error> { - self.total_polls += 1; - match self.values.remove(0) { - Some(()) => Ok(Async::Ready(Some(()))), - None => Ok(Async::NotReady), + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let inner = Pin::get_mut(self); + inner.total_polls += 1; + match inner.values.remove(0) { + Some(()) => Poll::Ready(Some(Ok(()))), + None => Poll::Pending, } } } @@ -240,8 +253,11 @@ mod test { MockStream::new(vec![None, Some(()), None, None, None]), ); + let waker = futures_util::task::noop_waker(); + let mut context = Context::from_waker(&waker); + // Not yet exited, interest registered - assert_eq!(Async::NotReady, grim.poll().expect("failed to wait")); + assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(1, grim.signal.total_polls); assert_eq!(1, grim.total_waits); assert_eq!(1, grim.orphan_queue.total_reaps.get()); @@ -249,14 +265,20 @@ mod test { // Not yet exited, couldn't register interest the first time // but managed to register interest the second time around - assert_eq!(Async::NotReady, grim.poll().expect("failed to wait")); + assert!(grim.poll_unpin(&mut context).is_pending()); assert_eq!(3, grim.signal.total_polls); assert_eq!(3, grim.total_waits); assert_eq!(3, grim.orphan_queue.total_reaps.get()); assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Exited - assert_eq!(Async::Ready(exit), grim.poll().expect("failed to wait")); + if let Poll::Ready(r) = grim.poll_unpin(&mut context) { + assert!(r.is_ok()); + let exit_code = r.unwrap(); + assert_eq!(exit_code, exit); + } else { + unreachable!(); + } assert_eq!(4, grim.signal.total_polls); assert_eq!(4, grim.total_waits); assert_eq!(4, grim.orphan_queue.total_reaps.get()); diff --git a/tokio-process/src/windows.rs b/tokio-process/src/windows.rs index 0d43f29fa..56aab82b7 100644 --- a/tokio-process/src/windows.rs +++ b/tokio-process/src/windows.rs @@ -18,12 +18,21 @@ extern crate mio_named_pipes; extern crate winapi; +use crate::kill::Kill; + use std::fmt; +use std::future::Future; use std::io; use std::os::windows::prelude::*; use std::os::windows::process::ExitStatusExt; +use std::pin::Pin; use std::process::{self, ExitStatus}; use std::ptr; +use std::task::Context; +use std::task::Poll; + +use futures_util::future::Fuse; +use futures_util::future::FutureExt; use self::mio_named_pipes::NamedPipe; use self::winapi::shared::minwindef::*; @@ -35,11 +44,8 @@ use self::winapi::um::threadpoollegacyapiset::*; use self::winapi::um::winbase::*; use self::winapi::um::winnt::*; use super::SpawnedChild; -use futures::future::Fuse; -use futures::sync::oneshot; -use futures::{Async, Future, Poll}; -use kill::Kill; use tokio_reactor::{Handle, PollEvented}; +use tokio_sync::oneshot; #[must_use = "futures do nothing unless polled"] pub struct Child { @@ -96,22 +102,23 @@ impl Kill for Child { } impl Future for Child { - type Item = ExitStatus; - type Error = io::Error; + type Output = io::Result; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = Pin::get_mut(self); loop { - if let Some(ref mut w) = self.waiting { - match w.rx.poll().expect("should not be canceled") { - Async::Ready(()) => {} - Async::NotReady => return Ok(Async::NotReady), + if let Some(ref mut w) = inner.waiting { + match w.rx.poll_unpin(cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(_)) => panic!("should not be canceled"), + Poll::Pending => return Poll::Pending, } - let status = try_wait(&self.child)?.expect("not ready yet"); - return Ok(status.into()); + let status = try_wait(&inner.child)?.expect("not ready yet"); + return Poll::Ready(Ok(status.into())); } - if let Some(e) = try_wait(&self.child)? { - return Ok(e.into()); + if let Some(e) = try_wait(&inner.child)? { + return Poll::Ready(Ok(e.into())); } let (tx, rx) = oneshot::channel(); let ptr = Box::into_raw(Box::new(Some(tx))); @@ -119,7 +126,7 @@ impl Future for Child { let rc = unsafe { RegisterWaitForSingleObject( &mut wait_object, - self.child.as_raw_handle(), + inner.child.as_raw_handle(), Some(callback), ptr as *mut _, INFINITE, @@ -129,9 +136,9 @@ impl Future for Child { if rc == 0 { let err = io::Error::last_os_error(); drop(unsafe { Box::from_raw(ptr) }); - return Err(err); + return Poll::Ready(Err(err)); } - self.waiting = Some(Waiting { + inner.waiting = Some(Waiting { rx: rx.fuse(), wait_object, tx: ptr, diff --git a/tokio-process/tests/issue_42.rs b/tokio-process/tests/issue_42.rs index 0b34efcf1..465f612bb 100644 --- a/tokio-process/tests/issue_42.rs +++ b/tokio-process/tests/issue_42.rs @@ -1,35 +1,42 @@ #![cfg(unix)] -extern crate futures; extern crate tokio_process; -use futures::{stream, Future, IntoFuture, Stream}; -use std::process::{Command, Stdio}; +use futures_util::future::FutureExt; +use futures_util::stream::FuturesOrdered; +use futures_util::stream::StreamExt; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::process::{Command, ExitStatus, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; use tokio_process::CommandExt; +mod support; + fn run_test() { let finished = Arc::new(AtomicBool::new(false)); let finished_clone = finished.clone(); thread::spawn(move || { - let _ = stream::iter_ok(0..2) - .map(|i| { + let mut futures: FuturesOrdered>>>> = + FuturesOrdered::new(); + for i in 0..2 { + futures.push( Command::new("echo") .arg(format!("I am spawned process #{}", i)) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn_async() - .into_future() - .flatten() - }) - .buffered(2) - .collect() - .wait(); + .unwrap() + .boxed(), + ) + } + support::run_with_timeout(futures.collect::>>()); finished_clone.store(true, Ordering::SeqCst); }); diff --git a/tokio-process/tests/stdio.rs b/tokio-process/tests/stdio.rs index 647350f72..5a3b64a83 100644 --- a/tokio-process/tests/stdio.rs +++ b/tokio-process/tests/stdio.rs @@ -1,15 +1,22 @@ -extern crate futures; +#![feature(async_await)] + #[macro_use] extern crate log; extern crate tokio_io; extern crate tokio_process; +use std::future::Future; use std::io; +use std::pin::Pin; use std::process::{Command, ExitStatus, Stdio}; -use futures::future::Future; -use futures::stream::{self, Stream}; -use tokio_io::io::{read_until, write_all}; +use futures_util::future; +use futures_util::future::FutureExt; +use futures_util::io::AsyncBufReadExt; +use futures_util::io::AsyncReadExt; +use futures_util::io::AsyncWriteExt; +use futures_util::io::BufReader; +use futures_util::stream::{self, StreamExt}; use tokio_process::{Child, CommandExt}; mod support; @@ -20,28 +27,37 @@ fn cat() -> Command { cmd } -fn feed_cat(mut cat: Child, n: usize) -> Box> { +fn feed_cat(mut cat: Child, n: usize) -> Pin>>> { let stdin = cat.stdin().take().unwrap(); let stdout = cat.stdout().take().unwrap(); debug!("starting to feed"); // Produce n lines on the child's stdout. - let numbers = stream::iter_ok(0..n); + let numbers = stream::iter(0..n); let write = numbers - .fold(stdin, |stdin, i| { - debug!("sending line {} to child", i); - write_all(stdin, format!("line {}\n", i).into_bytes()).map(|p| p.0) + .fold(stdin, move |mut stdin, i| { + let fut = async move { + debug!("sending line {} to child", i); + let bytes = format!("line {}\n", i).into_bytes(); + AsyncWriteExt::write_all(&mut stdin, &bytes).await.unwrap(); + stdin + }; + fut }) .map(|_| ()); // Try to read `n + 1` lines, ensuring the last one is empty // (i.e. EOF is reached after `n` lines. - let reader = io::BufReader::new(stdout); - let expected_numbers = stream::iter_ok(0..=n); - let read = expected_numbers.fold((reader, 0), move |(reader, i), _| { - let done = i >= n; - debug!("starting read from child"); - read_until(reader, b'\n', Vec::new()).and_then(move |(reader, vec)| { + let reader = BufReader::new(stdout); + let expected_numbers = stream::iter(0..=n); + let read = expected_numbers.fold((reader, 0), move |(mut reader, i), _| { + let fut = async move { + let done = i >= n; + debug!("starting read from child"); + let mut vec = Vec::new(); + AsyncBufReadExt::read_until(&mut reader, b'\n', &mut vec) + .await + .unwrap(); debug!( "read line {} from child ({} bytes, done: {})", i, @@ -49,23 +65,28 @@ fn feed_cat(mut cat: Child, n: usize) -> Box Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")), - (true, n) if n != 0 => Err(io::Error::new(io::ErrorKind::Other, "extraneous data")), + (false, 0) => { + panic!("broken pipe"); + } + (true, n) if n != 0 => { + panic!("extraneous data"); + } _ => { let s = std::str::from_utf8(&vec).unwrap(); let expected = format!("line {}\n", i); if done || s == expected { - Ok((reader, i + 1)) + (reader, i + 1) } else { - Err(io::Error::new(io::ErrorKind::Other, "unexpected data")) + panic!("unexpected data"); } } } - }) + }; + fut }); // Compose reading and writing concurrently. - Box::new(write.join(read).and_then(|_| cat)) + future::join(write, read).then(|_| cat).boxed() } /// Check for the following properties when feeding stdin and @@ -89,15 +110,22 @@ fn feed_a_lot() { #[test] fn wait_with_output_captures() { let mut child = cat().spawn_async().unwrap(); - let stdin = child.stdin().take().unwrap(); - let out = child.wait_with_output(); + let mut stdin = child.stdin().take().unwrap(); + + let write_bytes = b"1234"; + + let future = async { + AsyncWriteExt::write_all(&mut stdin, write_bytes).await?; + drop(stdin); + let out = child.wait_with_output(); + out.await + }; - let future = write_all(stdin, b"1234").map(|p| p.1).join(out); let ret = support::run_with_timeout(future).unwrap(); - let (written, output) = ret; + let output = ret; assert!(output.status.success()); - assert_eq!(output.stdout, written); + assert_eq!(output.stdout, write_bytes); assert_eq!(output.stderr.len(), 0); } diff --git a/tokio-process/tests/support/mod.rs b/tokio-process/tests/support/mod.rs index 0e9ce6845..44f7630fb 100644 --- a/tokio-process/tests/support/mod.rs +++ b/tokio-process/tests/support/mod.rs @@ -1,14 +1,16 @@ -extern crate futures; extern crate tokio; -use self::futures::Future; -use self::tokio::timer::Timeout; +use futures_util::future; +use futures_util::future::FutureExt; use std::env; +use std::future::Future; use std::process::Command; use std::time::Duration; +use tokio::timer::Timeout; pub use self::tokio::runtime::current_thread::Runtime as CurrentThreadRuntime; +#[allow(dead_code)] pub fn cmd(s: &str) -> Command { let mut me = env::current_exe().unwrap(); me.pop(); @@ -19,19 +21,16 @@ pub fn cmd(s: &str) -> Command { Command::new(me) } -pub fn with_timeout(future: F) -> impl Future { - Timeout::new(future, Duration::from_secs(3)).map_err(|e| { - if e.is_timer() { - panic!("failed to register timer"); - } else if e.is_elapsed() { - panic!("timed out") - } else { - e.into_inner().expect("missing inner error") +pub fn with_timeout(future: F) -> impl Future { + Timeout::new(future, Duration::from_secs(3)).then(|r| { + if r.is_err() { + panic!("timed out {:?}", r.err()); } + future::ready(r.unwrap()) }) } -pub fn run_with_timeout(future: F) -> Result +pub fn run_with_timeout(future: F) -> F::Output where F: Future, {