mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
process: Implement a queue for repeatedly attempting to reap orphaned processes
This commit is contained in:
parent
fc15d7d4a4
commit
ecaa069f0f
@ -51,6 +51,9 @@ features = [
|
||||
]
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
crossbeam-queue = "0.1.2"
|
||||
lazy_static = "1.3"
|
||||
libc = "0.2"
|
||||
log = "0.4"
|
||||
mio = "0.6.5"
|
||||
tokio-signal = "0.2.5"
|
||||
|
@ -162,6 +162,13 @@ extern crate futures;
|
||||
extern crate tokio_io;
|
||||
extern crate tokio_reactor;
|
||||
|
||||
#[cfg(unix)]
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
#[cfg(unix)]
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::process::{Command, ExitStatus, Output, Stdio};
|
||||
|
||||
|
@ -25,6 +25,7 @@ extern crate libc;
|
||||
extern crate mio;
|
||||
extern crate tokio_signal;
|
||||
|
||||
mod orphan;
|
||||
mod reap;
|
||||
|
||||
use futures::future::FlattenStream;
|
||||
@ -32,6 +33,8 @@ use futures::{Future, Poll};
|
||||
use self::mio::{Poll as MioPoll, PollOpt, Ready, Token};
|
||||
use self::mio::unix::{EventedFd, UnixReady};
|
||||
use self::mio::event::Evented;
|
||||
use self::orphan::{AtomicOrphanQueue, OrphanQueue, Wait};
|
||||
use self::reap::{Kill, Reaper};
|
||||
use self::tokio_signal::unix::Signal;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
@ -42,6 +45,10 @@ use tokio_io::IoFuture;
|
||||
use tokio_reactor::{Handle, PollEvented};
|
||||
|
||||
impl Wait for process::Child {
|
||||
fn id(&self) -> u32 {
|
||||
self.id()
|
||||
}
|
||||
|
||||
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
|
||||
self.try_wait()
|
||||
}
|
||||
@ -53,6 +60,28 @@ impl Kill for process::Child {
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref ORPHAN_QUEUE: AtomicOrphanQueue<process::Child> = AtomicOrphanQueue::new();
|
||||
}
|
||||
|
||||
struct GlobalOrphanQueue;
|
||||
|
||||
impl fmt::Debug for GlobalOrphanQueue {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
ORPHAN_QUEUE.fmt(fmt)
|
||||
}
|
||||
}
|
||||
|
||||
impl OrphanQueue<process::Child> for GlobalOrphanQueue {
|
||||
fn push_orphan(&self, orphan: process::Child) {
|
||||
ORPHAN_QUEUE.push_orphan(orphan)
|
||||
}
|
||||
|
||||
fn reap_orphans(&self) {
|
||||
ORPHAN_QUEUE.reap_orphans()
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct Child {
|
||||
inner: Reaper<process::Child, FlattenStream<IoFuture<Signal>>>,
|
||||
|
173
src/unix/orphan.rs
Normal file
173
src/unix/orphan.rs
Normal file
@ -0,0 +1,173 @@
|
||||
extern crate crossbeam_queue;
|
||||
|
||||
use self::crossbeam_queue::SegQueue;
|
||||
use std::io;
|
||||
use std::process::ExitStatus;
|
||||
|
||||
/// An interface for waiting on a process to exit.
|
||||
pub(crate) trait Wait {
|
||||
/// Get the identifier for this process or diagnostics.
|
||||
fn id(&self) -> u32;
|
||||
/// Try waiting for a process to exit in a non-blocking manner.
|
||||
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
|
||||
}
|
||||
|
||||
/// An interface for queueing up an orphaned process so that it can be reaped.
|
||||
pub(crate) trait OrphanQueue<T> {
|
||||
/// Add an orphan to the queue.
|
||||
fn push_orphan(&self, orphan: T);
|
||||
/// Attempt to reap every process in the queue, ignoring any errors and
|
||||
/// enqueueing any orphans which have not yet exited.
|
||||
fn reap_orphans(&self);
|
||||
}
|
||||
|
||||
/// An atomic implementation of `OrphanQueue`.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct AtomicOrphanQueue<T> {
|
||||
queue: SegQueue<T>,
|
||||
}
|
||||
|
||||
impl<T> AtomicOrphanQueue<T> {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
queue: SegQueue::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Wait> OrphanQueue<T> for AtomicOrphanQueue<T> {
|
||||
fn push_orphan(&self, orphan: T) {
|
||||
self.queue.push(orphan)
|
||||
}
|
||||
|
||||
fn reap_orphans(&self) {
|
||||
let len = self.queue.len();
|
||||
|
||||
if len == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut orphans = Vec::with_capacity(len);
|
||||
while let Ok(mut orphan) = self.queue.pop() {
|
||||
match orphan.try_wait() {
|
||||
Ok(Some(_)) => {},
|
||||
Err(e) => error!(
|
||||
"leaking orphaned process {} due to try_wait() error: {}",
|
||||
orphan.id(),
|
||||
e,
|
||||
),
|
||||
|
||||
// Still not done yet, we need to put it back in the queue
|
||||
// when were done draining it, so that we don't get stuck
|
||||
// in an infinite loop here
|
||||
Ok(None) => orphans.push(orphan),
|
||||
}
|
||||
}
|
||||
|
||||
for orphan in orphans {
|
||||
self.queue.push(orphan);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::cell::Cell;
|
||||
use std::io;
|
||||
use std::os::unix::process::ExitStatusExt;
|
||||
use std::process::ExitStatus;
|
||||
use std::rc::Rc;
|
||||
use super::{AtomicOrphanQueue, OrphanQueue};
|
||||
use super::Wait;
|
||||
|
||||
struct MockWait {
|
||||
total_waits: Rc<Cell<usize>>,
|
||||
num_wait_until_status: usize,
|
||||
return_err: bool,
|
||||
}
|
||||
|
||||
impl MockWait {
|
||||
fn new(num_wait_until_status: usize) -> Self {
|
||||
Self {
|
||||
total_waits: Rc::new(Cell::new(0)),
|
||||
num_wait_until_status,
|
||||
return_err: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_err() -> Self {
|
||||
Self {
|
||||
total_waits: Rc::new(Cell::new(0)),
|
||||
num_wait_until_status: 0,
|
||||
return_err: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Wait for MockWait {
|
||||
fn id(&self) -> u32 {
|
||||
42
|
||||
}
|
||||
|
||||
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
|
||||
let waits = self.total_waits.get();
|
||||
|
||||
let ret = if self.num_wait_until_status == waits {
|
||||
if self.return_err {
|
||||
Ok(Some(ExitStatus::from_raw(0)))
|
||||
} else {
|
||||
Err(io::Error::new(io::ErrorKind::Other, "mock err"))
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
};
|
||||
|
||||
self.total_waits.set(waits + 1);
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drain_attempts_a_single_reap_of_all_queued_orphans() {
|
||||
let first_orphan = MockWait::new(0);
|
||||
let second_orphan = MockWait::new(1);
|
||||
let third_orphan = MockWait::new(2);
|
||||
let fourth_orphan = MockWait::with_err();
|
||||
|
||||
let first_waits = first_orphan.total_waits.clone();
|
||||
let second_waits = second_orphan.total_waits.clone();
|
||||
let third_waits = third_orphan.total_waits.clone();
|
||||
let fourth_waits = fourth_orphan.total_waits.clone();
|
||||
|
||||
let orphanage = AtomicOrphanQueue::new();
|
||||
orphanage.push_orphan(first_orphan);
|
||||
orphanage.push_orphan(third_orphan);
|
||||
orphanage.push_orphan(second_orphan);
|
||||
orphanage.push_orphan(fourth_orphan);
|
||||
|
||||
assert_eq!(orphanage.queue.len(), 4);
|
||||
|
||||
orphanage.reap_orphans();
|
||||
assert_eq!(orphanage.queue.len(), 2);
|
||||
assert_eq!(first_waits.get(), 1);
|
||||
assert_eq!(second_waits.get(), 1);
|
||||
assert_eq!(third_waits.get(), 1);
|
||||
assert_eq!(fourth_waits.get(), 1);
|
||||
|
||||
orphanage.reap_orphans();
|
||||
assert_eq!(orphanage.queue.len(), 1);
|
||||
assert_eq!(first_waits.get(), 1);
|
||||
assert_eq!(second_waits.get(), 2);
|
||||
assert_eq!(third_waits.get(), 2);
|
||||
assert_eq!(fourth_waits.get(), 1);
|
||||
|
||||
orphanage.reap_orphans();
|
||||
assert_eq!(orphanage.queue.len(), 0);
|
||||
assert_eq!(first_waits.get(), 1);
|
||||
assert_eq!(second_waits.get(), 2);
|
||||
assert_eq!(third_waits.get(), 3);
|
||||
assert_eq!(fourth_waits.get(), 1);
|
||||
|
||||
orphanage.reap_orphans(); // Safe to reap when empty
|
||||
}
|
||||
}
|
@ -2,15 +2,10 @@ use futures::{Async, Future, Poll, Stream};
|
||||
use std::io;
|
||||
use std::ops::Deref;
|
||||
use std::process::ExitStatus;
|
||||
|
||||
/// An interface for waiting on a process to exit.
|
||||
pub trait Wait {
|
||||
/// Try waiting for a process to exit in a non-blocking manner.
|
||||
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
|
||||
}
|
||||
use super::orphan::Wait;
|
||||
|
||||
/// An interface for killing a running process.
|
||||
pub trait Kill {
|
||||
pub(crate) trait Kill {
|
||||
/// Forcefully kill the process.
|
||||
fn kill(&mut self) -> io::Result<()>;
|
||||
}
|
||||
@ -18,7 +13,7 @@ pub trait Kill {
|
||||
/// Orchestrates between registering interest for receiving signals when a
|
||||
/// child process has exited, and attempting to poll for process completion.
|
||||
#[derive(Debug)]
|
||||
pub struct Reaper<W, S> {
|
||||
pub(crate) struct Reaper<W, S> {
|
||||
inner: W,
|
||||
signal: S,
|
||||
}
|
||||
@ -32,7 +27,7 @@ impl<W, S> Deref for Reaper<W, S> {
|
||||
}
|
||||
|
||||
impl<W, S> Reaper<W, S> {
|
||||
pub fn new(inner: W, signal: S) -> Self {
|
||||
pub(crate) fn new(inner: W, signal: S) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
signal,
|
||||
@ -122,6 +117,10 @@ mod test {
|
||||
}
|
||||
|
||||
impl Wait for MockWait {
|
||||
fn id(&self) -> u32 {
|
||||
0
|
||||
}
|
||||
|
||||
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
|
||||
let ret = if self.num_wait_until_status == self.total_waits {
|
||||
Some(self.status)
|
||||
|
Loading…
x
Reference in New Issue
Block a user