From 5a5dde70b349ca761d2900f4c7602df4b5a79c3e Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Fri, 16 Nov 2018 22:56:43 +0000 Subject: [PATCH] signal: miscellaneous tweaks and improvements (#751) * Minimize allocation needed for channels * Use a newtype for signal ids * We can just cast the raw pointer to a `usize` and still perform a simple identity check, without incurring any implications of storing a raw pointer (e.g. previously Signal was !Sync and had an unsafe impl of Send, and now it is naturally Sync+Send) * Broadcast with `try_send` instead of `start_send` The `Stream::start_send` method uses backpressure and schedules the current task to be notified whenever the channel has additional room, which means we'll generate a lot of unnecessary wakeups whenever a channel gets full By changing to `try_send` and handling any errors, we ensure the Driver's task won't get woken up when a Signal finally consumes its notification, since we're coalescing things anyway --- tokio-signal/CHANGELOG.md | 6 ++ tokio-signal/src/unix.rs | 122 ++++++++++++++++++++++++++++---------- 2 files changed, 96 insertions(+), 32 deletions(-) diff --git a/tokio-signal/CHANGELOG.md b/tokio-signal/CHANGELOG.md index acfc30a68..b95b1d153 100644 --- a/tokio-signal/CHANGELOG.md +++ b/tokio-signal/CHANGELOG.md @@ -1,4 +1,10 @@ ## Unreleased +### Changed +* `unix::Signal` now implements `Sync` + +### Fixes +* `unix::Signal` now avoids extraneous wakeups generated as a result of +dropping other instances ## 0.2.6 - (October 26, 2018) ### Changed diff --git a/tokio-signal/src/unix.rs b/tokio-signal/src/unix.rs index fa2cba543..582f62337 100644 --- a/tokio-signal/src/unix.rs +++ b/tokio-signal/src/unix.rs @@ -19,8 +19,8 @@ use self::libc::c_int; use self::mio_uds::UnixStream; use futures::future; use futures::sync::mpsc::{channel, Receiver, Sender}; -use futures::{Async, AsyncSink, Future}; -use futures::{Poll, Sink, Stream}; +use futures::{Async, Future}; +use futures::{Poll, Stream}; use tokio_reactor::{Handle, PollEvented}; use tokio_io::IoFuture; @@ -46,10 +46,12 @@ pub mod bsd { // (FreeBSD has 33) const SIGNUM: usize = 33; +type SignalSender = Sender; + struct SignalInfo { pending: AtomicBool, // The ones interested in this signal - recipients: Mutex>>>, + recipients: Mutex>>, init: Once, initialized: AtomicBool, @@ -61,6 +63,40 @@ struct Globals { signals: Vec, } +impl Globals { + /// Register a new `Signal` instance's channel sender. + /// Returns a `SignalId` which should be later used for deregistering + /// this sender. + fn register_signal_sender(signal: c_int, tx: SignalSender) -> SignalId { + let tx = Box::new(tx); + let id = SignalId::from(&tx); + + let idx = signal as usize; + globals().signals[idx].recipients.lock().unwrap().push(tx); + id + } + + /// Deregister a `Signal` instance's channel sender because the `Signal` + /// is no longer interested in receiving events (e.g. dropped). + fn deregister_signal_receiver(signal: c_int, id: SignalId) { + let idx = signal as usize; + let mut list = globals().signals[idx].recipients.lock().unwrap(); + list.retain(|sender| SignalId::from(sender) != id); + } +} + +/// A newtype which represents a unique identifier for each `Signal` instance. +/// The id is derived by boxing the channel `Sender` associated with this instance +/// and using its address in memory. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +struct SignalId(usize); + +impl<'a> From<&'a Box> for SignalId { + fn from(tx: &'a Box) -> Self { + SignalId(&**tx as *const _ as usize) + } +} + impl Default for SignalInfo { fn default() -> SignalInfo { SignalInfo { @@ -220,17 +256,19 @@ impl Driver { // worry about it as everything is coalesced anyway. If the channel // has gone away then we can remove that slot. for i in (0..recipients.len()).rev() { - // TODO: This thing probably generates unnecessary wakups of - // this task when `NotReady` is received because we don't - // actually want to get woken up to continue sending a - // message. Let's optimise it later on though, as we know - // this works. - match recipients[i].start_send(signum) { - Ok(AsyncSink::Ready) => {} - Ok(AsyncSink::NotReady(_)) => {} - Err(_) => { + match recipients[i].try_send(signum) { + Ok(()) => {}, + Err(ref e) if e.is_disconnected() => { recipients.swap_remove(i); - } + }, + + // Channel is full, ignore the error since the + // receiver has already been woken up + Err(e) => { + // Sanity check in case this error type ever gets + // additional variants we have not considered. + debug_assert!(e.is_full()); + }, } } } @@ -274,19 +312,10 @@ impl Driver { pub struct Signal { driver: Driver, signal: c_int, - // Used only as an identifier. We place the real sender into a Box, so it - // stays on the same address forever. That gives us a unique pointer, so we - // can use this to identify the sender in a Vec and delete it when we are - // dropped. - id: *const Sender, + id: SignalId, rx: Receiver, } -// The raw pointer prevents the compiler from determining it as Send -// automatically. But the only thing we use the raw pointer for is to identify -// the correct Box to delete, not manipulate any data through that. -unsafe impl Send for Signal {} - impl Signal { /// Creates a new stream which will receive notifications when the current /// process receives the signal `signal`. @@ -347,12 +376,10 @@ impl Signal { let driver = try!(Driver::new(&handle)); // One wakeup in a queue is enough, no need for us to buffer up any - // more. - let (tx, rx) = channel(1); - let tx = Box::new(tx); - let id: *const _ = &*tx; - let idx = signal as usize; - globals().signals[idx].recipients.lock().unwrap().push(tx); + // more. NB: channels always guarantee at least one slot per sender, + // so we don't need additional slots + let (tx, rx) = channel(0); + let id = Globals::register_signal_sender(signal, tx); Ok(Signal { driver: driver, rx: rx, @@ -378,8 +405,39 @@ impl Stream for Signal { impl Drop for Signal { fn drop(&mut self) { - let idx = self.signal as usize; - let mut list = globals().signals[idx].recipients.lock().unwrap(); - list.retain(|sender| &**sender as *const _ != self.id); + Globals::deregister_signal_receiver(self.signal, self.id); + } +} + +#[cfg(test)] +mod tests { + extern crate tokio; + + use super::*; + + #[test] + fn dropped_signal_senders_are_cleaned_up() { + let mut rt = self::tokio::runtime::current_thread::Runtime::new() + .expect("failed to init runtime"); + + let signum = libc::SIGUSR1; + let signal = rt.block_on(Signal::new(signum)) + .expect("failed to create signal"); + + { + let recipients = globals().signals[signum as usize].recipients.lock().unwrap(); + assert!(!recipients.is_empty()); + } + + drop(signal); + + unsafe { + assert_eq!(libc::kill(libc::getpid(), signum), 0); + } + + { + let recipients = globals().signals[signum as usize].recipients.lock().unwrap(); + assert!(recipients.is_empty()); + } } }