diff --git a/src/unix.rs b/src/unix.rs index 85cbc8fe0..b4467e264 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -9,6 +9,7 @@ pub extern crate libc; extern crate mio; extern crate mio_uds; +use std::collections::hash_map::HashMap; use std::cell::UnsafeCell; use std::io; use std::io::prelude::*; @@ -50,6 +51,9 @@ struct Globals { sender: UnixStream, receiver: UnixStream, signals: Vec, + // A count of how many signal streams are registered on the same event loop instance + // so that we can avoid deregistering our receiver stream prematurely + pollfd_register_count: Mutex>, } impl Default for SignalInfo { @@ -76,6 +80,7 @@ fn globals() -> &'static Globals { sender: sender, receiver: receiver, signals: (0..SIGNUM).map(|_| Default::default()).collect(), + pollfd_register_count: Mutex::new(HashMap::new()), }; GLOBALS = Box::into_raw(Box::new(globals)); }); @@ -191,14 +196,30 @@ impl Evented for EventedReceiver { events: Ready, opts: PollOpt, ) -> io::Result<()> { - let fd = globals().receiver.as_raw_fd(); + let globals = globals(); + let fd = globals.receiver.as_raw_fd(); + + // NB: we must try registering with the event loop regardless of our + // reference count. If the event loop descriptor is closed, we won't + // get any notifications to clean up our map, so if a new event loop + // descriptor collides with an entry in our map, we want to make sure + // we don't accidentally skip the registration. match EventedFd(&fd).register(poll, token, events, opts) { - Ok(()) => Ok(()), + Ok(()) => {} // Due to tokio-rs/tokio-core#307 - Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok(()), - Err(e) => Err(e), + Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => {}, + Err(e) => return Err(e), } + + *globals.pollfd_register_count + .lock() + .expect("poisoned") + .entry(poll.as_raw_fd()) + .or_insert(0) += 1; + + Ok(()) } + fn reregister( &self, poll: &MioPoll, @@ -210,8 +231,23 @@ impl Evented for EventedReceiver { EventedFd(&fd).reregister(poll, token, events, opts) } fn deregister(&self, poll: &MioPoll) -> io::Result<()> { - let fd = globals().receiver.as_raw_fd(); - EventedFd(&fd).deregister(poll) + let globals = globals(); + + let mut guard = globals.pollfd_register_count + .lock() + .expect("poisoned"); + let ref_count = guard.entry(poll.as_raw_fd()).or_insert(1); + *ref_count -= 1; + + // Only deregister if we're the last stream listening for this signal + // on this event loop. Otherwise, if multiple streams are opened for + // the same signal and one of them is closed, the remainder will starve. + if *ref_count == 0 { + let fd = globals.receiver.as_raw_fd(); + EventedFd(&fd).deregister(poll) + } else { + Ok(()) + } } }