process: Utilize a global orphan process queue to avoid leaks

This commit is contained in:
Ivan Petkov 2019-06-15 13:34:51 -07:00
parent ecaa069f0f
commit fa5da27d98
No known key found for this signature in database
GPG Key ID: 0B431E9837056942
3 changed files with 155 additions and 16 deletions

View File

@ -84,7 +84,7 @@ impl OrphanQueue<process::Child> for GlobalOrphanQueue {
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct Child { pub struct Child {
inner: Reaper<process::Child, FlattenStream<IoFuture<Signal>>>, inner: Reaper<process::Child, GlobalOrphanQueue, FlattenStream<IoFuture<Signal>>>,
} }
impl fmt::Debug for Child { impl fmt::Debug for Child {
@ -104,7 +104,7 @@ pub(crate) fn spawn_child(cmd: &mut process::Command, handle: &Handle) -> io::Re
let signal = Signal::with_handle(libc::SIGCHLD, handle).flatten_stream(); let signal = Signal::with_handle(libc::SIGCHLD, handle).flatten_stream();
Ok(SpawnedChild { Ok(SpawnedChild {
child: Child { child: Child {
inner: Reaper::new(child, signal), inner: Reaper::new(child, GlobalOrphanQueue, signal),
}, },
stdin, stdin,
stdout, stdout,

View File

@ -12,6 +12,16 @@ pub(crate) trait Wait {
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>; fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
} }
impl<'a, T: 'a + Wait> Wait for &'a mut T {
fn id(&self) -> u32 {
(**self).id()
}
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
(**self).try_wait()
}
}
/// An interface for queueing up an orphaned process so that it can be reaped. /// An interface for queueing up an orphaned process so that it can be reaped.
pub(crate) trait OrphanQueue<T> { pub(crate) trait OrphanQueue<T> {
/// Add an orphan to the queue. /// Add an orphan to the queue.
@ -21,6 +31,16 @@ pub(crate) trait OrphanQueue<T> {
fn reap_orphans(&self); fn reap_orphans(&self);
} }
impl<'a, T, O: 'a + OrphanQueue<T>> OrphanQueue<T> for &'a O {
fn push_orphan(&self, orphan: T) {
(**self).push_orphan(orphan);
}
fn reap_orphans(&self) {
(**self).reap_orphans()
}
}
/// An atomic implementation of `OrphanQueue`. /// An atomic implementation of `OrphanQueue`.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct AtomicOrphanQueue<T> { pub(crate) struct AtomicOrphanQueue<T> {

View File

@ -2,7 +2,7 @@ use futures::{Async, Future, Poll, Stream};
use std::io; use std::io;
use std::ops::Deref; use std::ops::Deref;
use std::process::ExitStatus; use std::process::ExitStatus;
use super::orphan::Wait; use super::orphan::{OrphanQueue, Wait};
/// An interface for killing a running process. /// An interface for killing a running process.
pub(crate) trait Kill { pub(crate) trait Kill {
@ -13,30 +13,50 @@ pub(crate) trait Kill {
/// Orchestrates between registering interest for receiving signals when a /// Orchestrates between registering interest for receiving signals when a
/// child process has exited, and attempting to poll for process completion. /// child process has exited, and attempting to poll for process completion.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Reaper<W, S> { pub(crate) struct Reaper<W, Q, S>
inner: W, where W: Wait,
Q: OrphanQueue<W>,
{
inner: Option<W>,
orphan_queue: Q,
signal: S, signal: S,
} }
impl<W, S> Deref for Reaper<W, S> { impl<W, Q, S> Deref for Reaper<W, Q, S>
where W: Wait,
Q: OrphanQueue<W>,
{
type Target = W; type Target = W;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.inner self.inner()
} }
} }
impl<W, S> Reaper<W, S> { impl<W, Q, S> Reaper<W, Q, S>
pub(crate) fn new(inner: W, signal: S) -> Self { where W: Wait,
Q: OrphanQueue<W>,
{
pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
Self { Self {
inner, inner: Some(inner),
orphan_queue,
signal, signal,
} }
} }
fn inner(&self) -> &W {
self.inner.as_ref().expect("inner has gone away")
}
fn inner_mut(&mut self) -> &mut W {
self.inner.as_mut().expect("inner has gone away")
}
} }
impl<W, S> Future for Reaper<W, S> impl<W, Q, S> Future for Reaper<W, Q, S>
where W: Wait, where W: Wait,
Q: OrphanQueue<W>,
S: Stream<Error = io::Error>, S: Stream<Error = io::Error>,
{ {
type Item = ExitStatus; type Item = ExitStatus;
@ -65,7 +85,8 @@ impl<W, S> Future for Reaper<W, S>
// should not cause significant issues with parent futures. // should not cause significant issues with parent futures.
let registered_interest = self.signal.poll()?.is_not_ready(); let registered_interest = self.signal.poll()?.is_not_ready();
if let Some(status) = self.inner.try_wait()? { self.orphan_queue.reap_orphans();
if let Some(status) = self.inner_mut().try_wait()? {
return Ok(Async::Ready(status)); return Ok(Async::Ready(status));
} }
@ -83,21 +104,39 @@ impl<W, S> Future for Reaper<W, S>
} }
} }
impl<W, S> Kill for Reaper<W, S> impl<W, Q, S> Kill for Reaper<W, Q, S>
where W: Kill, where W: Kill + Wait,
Q: OrphanQueue<W>,
{ {
fn kill(&mut self) -> io::Result<()> { fn kill(&mut self) -> io::Result<()> {
self.inner.kill() self.inner_mut().kill()
}
}
impl<W, Q, S> Drop for Reaper<W, Q, S>
where W: Wait,
Q: OrphanQueue<W>,
{
fn drop(&mut self) {
if let Ok(Some(_)) = self.inner_mut().try_wait() {
return;
}
let orphan = self.inner.take().unwrap();
self.orphan_queue.push_orphan(orphan);
} }
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use std::cell::{Cell, RefCell};
use std::process::ExitStatus; use std::process::ExitStatus;
use std::os::unix::process::ExitStatusExt; use std::os::unix::process::ExitStatusExt;
use super::*; use super::*;
#[derive(Debug)]
struct MockWait { struct MockWait {
total_kills: usize, total_kills: usize,
total_waits: usize, total_waits: usize,
@ -167,11 +206,36 @@ mod test {
} }
} }
struct MockQueue<W> {
all_enqueued: RefCell<Vec<W>>,
total_reaps: Cell<usize>,
}
impl<W> MockQueue<W> {
fn new() -> Self {
Self {
all_enqueued: RefCell::new(Vec::new()),
total_reaps: Cell::new(0),
}
}
}
impl<W: Wait> OrphanQueue<W> for MockQueue<W> {
fn push_orphan(&self, orphan: W) {
self.all_enqueued.borrow_mut()
.push(orphan);
}
fn reap_orphans(&self) {
self.total_reaps.set(self.total_reaps.get() + 1);
}
}
#[test] #[test]
fn reaper() { fn reaper() {
let exit = ExitStatus::from_raw(0); let exit = ExitStatus::from_raw(0);
let mock = MockWait::new(exit, 3); let mock = MockWait::new(exit, 3);
let mut grim = Reaper::new(mock, MockStream::new(vec!( let mut grim = Reaper::new(mock, MockQueue::new(), MockStream::new(vec!(
None, None,
Some(()), Some(()),
None, None,
@ -183,17 +247,23 @@ mod test {
assert_eq!(Async::NotReady, grim.poll().expect("failed to wait")); assert_eq!(Async::NotReady, grim.poll().expect("failed to wait"));
assert_eq!(1, grim.signal.total_polls); assert_eq!(1, grim.signal.total_polls);
assert_eq!(1, grim.total_waits); assert_eq!(1, grim.total_waits);
assert_eq!(1, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Not yet exited, couldn't register interest the first time // Not yet exited, couldn't register interest the first time
// but managed to register interest the second time around // but managed to register interest the second time around
assert_eq!(Async::NotReady, grim.poll().expect("failed to wait")); assert_eq!(Async::NotReady, grim.poll().expect("failed to wait"));
assert_eq!(3, grim.signal.total_polls); assert_eq!(3, grim.signal.total_polls);
assert_eq!(3, grim.total_waits); assert_eq!(3, grim.total_waits);
assert_eq!(3, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
// Exited // Exited
assert_eq!(Async::Ready(exit), grim.poll().expect("failed to wait")); assert_eq!(Async::Ready(exit), grim.poll().expect("failed to wait"));
assert_eq!(4, grim.signal.total_polls); assert_eq!(4, grim.signal.total_polls);
assert_eq!(4, grim.total_waits); assert_eq!(4, grim.total_waits);
assert_eq!(4, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
} }
#[test] #[test]
@ -201,10 +271,59 @@ mod test {
let exit = ExitStatus::from_raw(0); let exit = ExitStatus::from_raw(0);
let mut grim = Reaper::new( let mut grim = Reaper::new(
MockWait::new(exit, 0), MockWait::new(exit, 0),
MockQueue::new(),
MockStream::new(vec!(None)) MockStream::new(vec!(None))
); );
grim.kill().unwrap(); grim.kill().unwrap();
assert_eq!(1, grim.total_kills); assert_eq!(1, grim.total_kills);
assert_eq!(0, grim.orphan_queue.total_reaps.get());
assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
}
#[test]
fn drop_reaps_if_possible() {
let exit = ExitStatus::from_raw(0);
let mut mock = MockWait::new(exit, 0);
{
let queue = MockQueue::new();
let grim = Reaper::new(
&mut mock,
&queue,
MockStream::new(vec!())
);
drop(grim);
assert_eq!(0, queue.total_reaps.get());
assert!(queue.all_enqueued.borrow().is_empty());
}
assert_eq!(1, mock.total_waits);
assert_eq!(0, mock.total_kills);
}
#[test]
fn drop_enqueues_orphan_if_wait_fails() {
let exit = ExitStatus::from_raw(0);
let mut mock = MockWait::new(exit, 2);
{
let queue = MockQueue::<&mut MockWait>::new();
let grim = Reaper::new(
&mut mock,
&queue,
MockStream::new(vec!())
);
drop(grim);
assert_eq!(0, queue.total_reaps.get());
assert_eq!(1, queue.all_enqueued.borrow().len());
}
assert_eq!(1, mock.total_waits);
assert_eq!(0, mock.total_kills);
} }
} }