diff --git a/Cargo.toml b/Cargo.toml index a2b8997c0..3c5d131ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,9 @@ features = [ ] [target.'cfg(unix)'.dependencies] +crossbeam-queue = "0.1.2" +lazy_static = "1.3" libc = "0.2" +log = "0.4" mio = "0.6.5" tokio-signal = "0.2.5" diff --git a/src/lib.rs b/src/lib.rs index ec42c7ca2..2d0116fc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -162,6 +162,13 @@ extern crate futures; extern crate tokio_io; extern crate tokio_reactor; +#[cfg(unix)] +#[macro_use] +extern crate lazy_static; +#[cfg(unix)] +#[macro_use] +extern crate log; + use std::io::{self, Read, Write}; use std::process::{Command, ExitStatus, Output, Stdio}; diff --git a/src/unix/mod.rs b/src/unix/mod.rs index 6b023de3d..78c40b561 100644 --- a/src/unix/mod.rs +++ b/src/unix/mod.rs @@ -25,6 +25,7 @@ extern crate libc; extern crate mio; extern crate tokio_signal; +mod orphan; mod reap; use futures::future::FlattenStream; @@ -32,6 +33,8 @@ use futures::{Future, Poll}; use self::mio::{Poll as MioPoll, PollOpt, Ready, Token}; use self::mio::unix::{EventedFd, UnixReady}; use self::mio::event::Evented; +use self::orphan::{AtomicOrphanQueue, OrphanQueue, Wait}; +use self::reap::{Kill, Reaper}; use self::tokio_signal::unix::Signal; use std::fmt; use std::io; @@ -42,6 +45,10 @@ use tokio_io::IoFuture; use tokio_reactor::{Handle, PollEvented}; impl Wait for process::Child { + fn id(&self) -> u32 { + self.id() + } + fn try_wait(&mut self) -> io::Result> { self.try_wait() } @@ -53,6 +60,28 @@ impl Kill for process::Child { } } +lazy_static! { + static ref ORPHAN_QUEUE: AtomicOrphanQueue = AtomicOrphanQueue::new(); +} + +struct GlobalOrphanQueue; + +impl fmt::Debug for GlobalOrphanQueue { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + ORPHAN_QUEUE.fmt(fmt) + } +} + +impl OrphanQueue for GlobalOrphanQueue { + fn push_orphan(&self, orphan: process::Child) { + ORPHAN_QUEUE.push_orphan(orphan) + } + + fn reap_orphans(&self) { + ORPHAN_QUEUE.reap_orphans() + } +} + #[must_use = "futures do nothing unless polled"] pub struct Child { inner: Reaper>>, diff --git a/src/unix/orphan.rs b/src/unix/orphan.rs new file mode 100644 index 000000000..3641c7828 --- /dev/null +++ b/src/unix/orphan.rs @@ -0,0 +1,173 @@ +extern crate crossbeam_queue; + +use self::crossbeam_queue::SegQueue; +use std::io; +use std::process::ExitStatus; + +/// An interface for waiting on a process to exit. +pub(crate) trait Wait { + /// Get the identifier for this process or diagnostics. + fn id(&self) -> u32; + /// Try waiting for a process to exit in a non-blocking manner. + fn try_wait(&mut self) -> io::Result>; +} + +/// An interface for queueing up an orphaned process so that it can be reaped. +pub(crate) trait OrphanQueue { + /// Add an orphan to the queue. + fn push_orphan(&self, orphan: T); + /// Attempt to reap every process in the queue, ignoring any errors and + /// enqueueing any orphans which have not yet exited. + fn reap_orphans(&self); +} + +/// An atomic implementation of `OrphanQueue`. +#[derive(Debug)] +pub(crate) struct AtomicOrphanQueue { + queue: SegQueue, +} + +impl AtomicOrphanQueue { + pub(crate) fn new() -> Self { + Self { + queue: SegQueue::new(), + } + } +} + +impl OrphanQueue for AtomicOrphanQueue { + fn push_orphan(&self, orphan: T) { + self.queue.push(orphan) + } + + fn reap_orphans(&self) { + let len = self.queue.len(); + + if len == 0 { + return; + } + + let mut orphans = Vec::with_capacity(len); + while let Ok(mut orphan) = self.queue.pop() { + match orphan.try_wait() { + Ok(Some(_)) => {}, + Err(e) => error!( + "leaking orphaned process {} due to try_wait() error: {}", + orphan.id(), + e, + ), + + // Still not done yet, we need to put it back in the queue + // when were done draining it, so that we don't get stuck + // in an infinite loop here + Ok(None) => orphans.push(orphan), + } + } + + for orphan in orphans { + self.queue.push(orphan); + } + } +} + +#[cfg(test)] +mod test { + use std::cell::Cell; + use std::io; + use std::os::unix::process::ExitStatusExt; + use std::process::ExitStatus; + use std::rc::Rc; + use super::{AtomicOrphanQueue, OrphanQueue}; + use super::Wait; + + struct MockWait { + total_waits: Rc>, + num_wait_until_status: usize, + return_err: bool, + } + + impl MockWait { + fn new(num_wait_until_status: usize) -> Self { + Self { + total_waits: Rc::new(Cell::new(0)), + num_wait_until_status, + return_err: false, + } + } + + fn with_err() -> Self { + Self { + total_waits: Rc::new(Cell::new(0)), + num_wait_until_status: 0, + return_err: true, + } + } + } + + impl Wait for MockWait { + fn id(&self) -> u32 { + 42 + } + + fn try_wait(&mut self) -> io::Result> { + let waits = self.total_waits.get(); + + let ret = if self.num_wait_until_status == waits { + if self.return_err { + Ok(Some(ExitStatus::from_raw(0))) + } else { + Err(io::Error::new(io::ErrorKind::Other, "mock err")) + } + } else { + Ok(None) + }; + + self.total_waits.set(waits + 1); + ret + } + } + + #[test] + fn drain_attempts_a_single_reap_of_all_queued_orphans() { + let first_orphan = MockWait::new(0); + let second_orphan = MockWait::new(1); + let third_orphan = MockWait::new(2); + let fourth_orphan = MockWait::with_err(); + + let first_waits = first_orphan.total_waits.clone(); + let second_waits = second_orphan.total_waits.clone(); + let third_waits = third_orphan.total_waits.clone(); + let fourth_waits = fourth_orphan.total_waits.clone(); + + let orphanage = AtomicOrphanQueue::new(); + orphanage.push_orphan(first_orphan); + orphanage.push_orphan(third_orphan); + orphanage.push_orphan(second_orphan); + orphanage.push_orphan(fourth_orphan); + + assert_eq!(orphanage.queue.len(), 4); + + orphanage.reap_orphans(); + assert_eq!(orphanage.queue.len(), 2); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 1); + assert_eq!(third_waits.get(), 1); + assert_eq!(fourth_waits.get(), 1); + + orphanage.reap_orphans(); + assert_eq!(orphanage.queue.len(), 1); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 2); + assert_eq!(third_waits.get(), 2); + assert_eq!(fourth_waits.get(), 1); + + orphanage.reap_orphans(); + assert_eq!(orphanage.queue.len(), 0); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 2); + assert_eq!(third_waits.get(), 3); + assert_eq!(fourth_waits.get(), 1); + + orphanage.reap_orphans(); // Safe to reap when empty + } +} diff --git a/src/unix/reap.rs b/src/unix/reap.rs index c17ac21ab..831eefd5a 100644 --- a/src/unix/reap.rs +++ b/src/unix/reap.rs @@ -2,15 +2,10 @@ use futures::{Async, Future, Poll, Stream}; use std::io; use std::ops::Deref; use std::process::ExitStatus; - -/// An interface for waiting on a process to exit. -pub trait Wait { - /// Try waiting for a process to exit in a non-blocking manner. - fn try_wait(&mut self) -> io::Result>; -} +use super::orphan::Wait; /// An interface for killing a running process. -pub trait Kill { +pub(crate) trait Kill { /// Forcefully kill the process. fn kill(&mut self) -> io::Result<()>; } @@ -18,7 +13,7 @@ pub trait Kill { /// Orchestrates between registering interest for receiving signals when a /// child process has exited, and attempting to poll for process completion. #[derive(Debug)] -pub struct Reaper { +pub(crate) struct Reaper { inner: W, signal: S, } @@ -32,7 +27,7 @@ impl Deref for Reaper { } impl Reaper { - pub fn new(inner: W, signal: S) -> Self { + pub(crate) fn new(inner: W, signal: S) -> Self { Self { inner, signal, @@ -122,6 +117,10 @@ mod test { } impl Wait for MockWait { + fn id(&self) -> u32 { + 0 + } + fn try_wait(&mut self) -> io::Result> { let ret = if self.num_wait_until_status == self.total_waits { Some(self.status)