signal: Fix starvation of signal streams on drop of another instance

* We introduce a new global structure which keeps track of how many
signal streams have been registered with a given event loop (the event
loop is identified by its OS file descriptor)
* We only attempt to deregister our global evented pipe from any event
loop if and only if we are the last signal that was registered with it
This commit is contained in:
Ivan Petkov 2018-07-14 16:22:05 -07:00 committed by Carl Lerche
parent b6ecfa251c
commit 2d4bfa1485

View File

@ -9,6 +9,7 @@ pub extern crate libc;
extern crate mio;
extern crate mio_uds;
use std::collections::hash_map::HashMap;
use std::cell::UnsafeCell;
use std::io;
use std::io::prelude::*;
@ -50,6 +51,9 @@ struct Globals {
sender: UnixStream,
receiver: UnixStream,
signals: Vec<SignalInfo>,
// A count of how many signal streams are registered on the same event loop instance
// so that we can avoid deregistering our receiver stream prematurely
pollfd_register_count: Mutex<HashMap<RawFd, usize>>,
}
impl Default for SignalInfo {
@ -76,6 +80,7 @@ fn globals() -> &'static Globals {
sender: sender,
receiver: receiver,
signals: (0..SIGNUM).map(|_| Default::default()).collect(),
pollfd_register_count: Mutex::new(HashMap::new()),
};
GLOBALS = Box::into_raw(Box::new(globals));
});
@ -191,14 +196,30 @@ impl Evented for EventedReceiver {
events: Ready,
opts: PollOpt,
) -> io::Result<()> {
let fd = globals().receiver.as_raw_fd();
let globals = globals();
let fd = globals.receiver.as_raw_fd();
// NB: we must try registering with the event loop regardless of our
// reference count. If the event loop descriptor is closed, we won't
// get any notifications to clean up our map, so if a new event loop
// descriptor collides with an entry in our map, we want to make sure
// we don't accidentally skip the registration.
match EventedFd(&fd).register(poll, token, events, opts) {
Ok(()) => Ok(()),
Ok(()) => {}
// Due to tokio-rs/tokio-core#307
Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok(()),
Err(e) => Err(e),
Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => {},
Err(e) => return Err(e),
}
*globals.pollfd_register_count
.lock()
.expect("poisoned")
.entry(poll.as_raw_fd())
.or_insert(0) += 1;
Ok(())
}
fn reregister(
&self,
poll: &MioPoll,
@ -210,8 +231,23 @@ impl Evented for EventedReceiver {
EventedFd(&fd).reregister(poll, token, events, opts)
}
fn deregister(&self, poll: &MioPoll) -> io::Result<()> {
let fd = globals().receiver.as_raw_fd();
EventedFd(&fd).deregister(poll)
let globals = globals();
let mut guard = globals.pollfd_register_count
.lock()
.expect("poisoned");
let ref_count = guard.entry(poll.as_raw_fd()).or_insert(1);
*ref_count -= 1;
// Only deregister if we're the last stream listening for this signal
// on this event loop. Otherwise, if multiple streams are opened for
// the same signal and one of them is closed, the remainder will starve.
if *ref_count == 0 {
let fd = globals.receiver.as_raw_fd();
EventedFd(&fd).deregister(poll)
} else {
Ok(())
}
}
}