diff --git a/src/unix/mod.rs b/src/unix/mod.rs index 78c40b561..b5eebc6fd 100644 --- a/src/unix/mod.rs +++ b/src/unix/mod.rs @@ -84,7 +84,7 @@ impl OrphanQueue for GlobalOrphanQueue { #[must_use = "futures do nothing unless polled"] pub struct Child { - inner: Reaper>>, + inner: Reaper>>, } impl fmt::Debug for Child { @@ -104,7 +104,7 @@ pub(crate) fn spawn_child(cmd: &mut process::Command, handle: &Handle) -> io::Re let signal = Signal::with_handle(libc::SIGCHLD, handle).flatten_stream(); Ok(SpawnedChild { child: Child { - inner: Reaper::new(child, signal), + inner: Reaper::new(child, GlobalOrphanQueue, signal), }, stdin, stdout, diff --git a/src/unix/orphan.rs b/src/unix/orphan.rs index 3641c7828..6b6a2f287 100644 --- a/src/unix/orphan.rs +++ b/src/unix/orphan.rs @@ -12,6 +12,16 @@ pub(crate) trait Wait { fn try_wait(&mut self) -> io::Result>; } +impl<'a, T: 'a + Wait> Wait for &'a mut T { + fn id(&self) -> u32 { + (**self).id() + } + + fn try_wait(&mut self) -> io::Result> { + (**self).try_wait() + } +} + /// An interface for queueing up an orphaned process so that it can be reaped. pub(crate) trait OrphanQueue { /// Add an orphan to the queue. @@ -21,6 +31,16 @@ pub(crate) trait OrphanQueue { fn reap_orphans(&self); } +impl<'a, T, O: 'a + OrphanQueue> OrphanQueue for &'a O { + fn push_orphan(&self, orphan: T) { + (**self).push_orphan(orphan); + } + + fn reap_orphans(&self) { + (**self).reap_orphans() + } +} + /// An atomic implementation of `OrphanQueue`. #[derive(Debug)] pub(crate) struct AtomicOrphanQueue { diff --git a/src/unix/reap.rs b/src/unix/reap.rs index 831eefd5a..d24829eb7 100644 --- a/src/unix/reap.rs +++ b/src/unix/reap.rs @@ -2,7 +2,7 @@ use futures::{Async, Future, Poll, Stream}; use std::io; use std::ops::Deref; use std::process::ExitStatus; -use super::orphan::Wait; +use super::orphan::{OrphanQueue, Wait}; /// An interface for killing a running process. pub(crate) trait Kill { @@ -13,30 +13,50 @@ pub(crate) trait Kill { /// Orchestrates between registering interest for receiving signals when a /// child process has exited, and attempting to poll for process completion. #[derive(Debug)] -pub(crate) struct Reaper { - inner: W, +pub(crate) struct Reaper + where W: Wait, + Q: OrphanQueue, +{ + inner: Option, + orphan_queue: Q, signal: S, } -impl Deref for Reaper { +impl Deref for Reaper + where W: Wait, + Q: OrphanQueue, +{ type Target = W; fn deref(&self) -> &Self::Target { - &self.inner + self.inner() } } -impl Reaper { - pub(crate) fn new(inner: W, signal: S) -> Self { +impl Reaper + where W: Wait, + Q: OrphanQueue, +{ + pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { Self { - inner, + inner: Some(inner), + orphan_queue, signal, } } + + fn inner(&self) -> &W { + self.inner.as_ref().expect("inner has gone away") + } + + fn inner_mut(&mut self) -> &mut W { + self.inner.as_mut().expect("inner has gone away") + } } -impl Future for Reaper +impl Future for Reaper where W: Wait, + Q: OrphanQueue, S: Stream, { type Item = ExitStatus; @@ -65,7 +85,8 @@ impl Future for Reaper // should not cause significant issues with parent futures. let registered_interest = self.signal.poll()?.is_not_ready(); - if let Some(status) = self.inner.try_wait()? { + self.orphan_queue.reap_orphans(); + if let Some(status) = self.inner_mut().try_wait()? { return Ok(Async::Ready(status)); } @@ -83,21 +104,39 @@ impl Future for Reaper } } -impl Kill for Reaper - where W: Kill, +impl Kill for Reaper + where W: Kill + Wait, + Q: OrphanQueue, { fn kill(&mut self) -> io::Result<()> { - self.inner.kill() + self.inner_mut().kill() + } +} + + +impl Drop for Reaper + where W: Wait, + Q: OrphanQueue, +{ + fn drop(&mut self) { + if let Ok(Some(_)) = self.inner_mut().try_wait() { + return; + } + + let orphan = self.inner.take().unwrap(); + self.orphan_queue.push_orphan(orphan); } } #[cfg(test)] mod test { use futures::{Async, Poll, Stream}; + use std::cell::{Cell, RefCell}; use std::process::ExitStatus; use std::os::unix::process::ExitStatusExt; use super::*; + #[derive(Debug)] struct MockWait { total_kills: usize, total_waits: usize, @@ -167,11 +206,36 @@ mod test { } } + struct MockQueue { + all_enqueued: RefCell>, + total_reaps: Cell, + } + + impl MockQueue { + fn new() -> Self { + Self { + all_enqueued: RefCell::new(Vec::new()), + total_reaps: Cell::new(0), + } + } + } + + impl OrphanQueue for MockQueue { + fn push_orphan(&self, orphan: W) { + self.all_enqueued.borrow_mut() + .push(orphan); + } + + fn reap_orphans(&self) { + self.total_reaps.set(self.total_reaps.get() + 1); + } + } + #[test] fn reaper() { let exit = ExitStatus::from_raw(0); let mock = MockWait::new(exit, 3); - let mut grim = Reaper::new(mock, MockStream::new(vec!( + let mut grim = Reaper::new(mock, MockQueue::new(), MockStream::new(vec!( None, Some(()), None, @@ -183,17 +247,23 @@ mod test { assert_eq!(Async::NotReady, grim.poll().expect("failed to wait")); assert_eq!(1, grim.signal.total_polls); assert_eq!(1, grim.total_waits); + assert_eq!(1, grim.orphan_queue.total_reaps.get()); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Not yet exited, couldn't register interest the first time // but managed to register interest the second time around assert_eq!(Async::NotReady, grim.poll().expect("failed to wait")); assert_eq!(3, grim.signal.total_polls); assert_eq!(3, grim.total_waits); + assert_eq!(3, grim.orphan_queue.total_reaps.get()); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); // Exited assert_eq!(Async::Ready(exit), grim.poll().expect("failed to wait")); assert_eq!(4, grim.signal.total_polls); assert_eq!(4, grim.total_waits); + assert_eq!(4, grim.orphan_queue.total_reaps.get()); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); } #[test] @@ -201,10 +271,59 @@ mod test { let exit = ExitStatus::from_raw(0); let mut grim = Reaper::new( MockWait::new(exit, 0), + MockQueue::new(), MockStream::new(vec!(None)) ); grim.kill().unwrap(); assert_eq!(1, grim.total_kills); + assert_eq!(0, grim.orphan_queue.total_reaps.get()); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + } + + #[test] + fn drop_reaps_if_possible() { + let exit = ExitStatus::from_raw(0); + let mut mock = MockWait::new(exit, 0); + + { + let queue = MockQueue::new(); + + let grim = Reaper::new( + &mut mock, + &queue, + MockStream::new(vec!()) + ); + + drop(grim); + + assert_eq!(0, queue.total_reaps.get()); + assert!(queue.all_enqueued.borrow().is_empty()); + } + + assert_eq!(1, mock.total_waits); + assert_eq!(0, mock.total_kills); + } + + #[test] + fn drop_enqueues_orphan_if_wait_fails() { + let exit = ExitStatus::from_raw(0); + let mut mock = MockWait::new(exit, 2); + + { + let queue = MockQueue::<&mut MockWait>::new(); + let grim = Reaper::new( + &mut mock, + &queue, + MockStream::new(vec!()) + ); + drop(grim); + + assert_eq!(0, queue.total_reaps.get()); + assert_eq!(1, queue.all_enqueued.borrow().len()); + } + + assert_eq!(1, mock.total_waits); + assert_eq!(0, mock.total_kills); } }