signal: miscellaneous tweaks and improvements (#751)

* Minimize allocation needed for channels

* Use a newtype for signal ids

* We can just cast the raw pointer to a `usize` and still perform a
simple identity check, without incurring any implications of storing a
raw pointer (e.g. previously Signal was !Sync and had an unsafe impl of
Send, and now it is naturally Sync+Send)

* Broadcast with `try_send` instead of `start_send`

The `Stream::start_send` method uses backpressure and schedules the
current task to be notified whenever the channel has additional room,
which means we'll generate a lot of unnecessary wakeups whenever a
channel gets full

By changing to `try_send` and handling any errors, we ensure the
Driver's task won't get woken up when a Signal finally consumes its
notification, since we're coalescing things anyway
This commit is contained in:
Ivan Petkov 2018-11-16 22:56:43 +00:00 committed by Carl Lerche
parent d0963774a3
commit 5a5dde70b3
2 changed files with 96 additions and 32 deletions

View File

@ -1,4 +1,10 @@
## Unreleased
### Changed
* `unix::Signal` now implements `Sync`
### Fixes
* `unix::Signal` now avoids extraneous wakeups generated as a result of
dropping other instances
## 0.2.6 - (October 26, 2018)
### Changed

View File

@ -19,8 +19,8 @@ use self::libc::c_int;
use self::mio_uds::UnixStream;
use futures::future;
use futures::sync::mpsc::{channel, Receiver, Sender};
use futures::{Async, AsyncSink, Future};
use futures::{Poll, Sink, Stream};
use futures::{Async, Future};
use futures::{Poll, Stream};
use tokio_reactor::{Handle, PollEvented};
use tokio_io::IoFuture;
@ -46,10 +46,12 @@ pub mod bsd {
// (FreeBSD has 33)
const SIGNUM: usize = 33;
type SignalSender = Sender<c_int>;
struct SignalInfo {
pending: AtomicBool,
// The ones interested in this signal
recipients: Mutex<Vec<Box<Sender<c_int>>>>,
recipients: Mutex<Vec<Box<SignalSender>>>,
init: Once,
initialized: AtomicBool,
@ -61,6 +63,40 @@ struct Globals {
signals: Vec<SignalInfo>,
}
impl Globals {
/// Register a new `Signal` instance's channel sender.
/// Returns a `SignalId` which should be later used for deregistering
/// this sender.
fn register_signal_sender(signal: c_int, tx: SignalSender) -> SignalId {
let tx = Box::new(tx);
let id = SignalId::from(&tx);
let idx = signal as usize;
globals().signals[idx].recipients.lock().unwrap().push(tx);
id
}
/// Deregister a `Signal` instance's channel sender because the `Signal`
/// is no longer interested in receiving events (e.g. dropped).
fn deregister_signal_receiver(signal: c_int, id: SignalId) {
let idx = signal as usize;
let mut list = globals().signals[idx].recipients.lock().unwrap();
list.retain(|sender| SignalId::from(sender) != id);
}
}
/// A newtype which represents a unique identifier for each `Signal` instance.
/// The id is derived by boxing the channel `Sender` associated with this instance
/// and using its address in memory.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct SignalId(usize);
impl<'a> From<&'a Box<SignalSender>> for SignalId {
fn from(tx: &'a Box<SignalSender>) -> Self {
SignalId(&**tx as *const _ as usize)
}
}
impl Default for SignalInfo {
fn default() -> SignalInfo {
SignalInfo {
@ -220,17 +256,19 @@ impl Driver {
// worry about it as everything is coalesced anyway. If the channel
// has gone away then we can remove that slot.
for i in (0..recipients.len()).rev() {
// TODO: This thing probably generates unnecessary wakups of
// this task when `NotReady` is received because we don't
// actually want to get woken up to continue sending a
// message. Let's optimise it later on though, as we know
// this works.
match recipients[i].start_send(signum) {
Ok(AsyncSink::Ready) => {}
Ok(AsyncSink::NotReady(_)) => {}
Err(_) => {
match recipients[i].try_send(signum) {
Ok(()) => {},
Err(ref e) if e.is_disconnected() => {
recipients.swap_remove(i);
}
},
// Channel is full, ignore the error since the
// receiver has already been woken up
Err(e) => {
// Sanity check in case this error type ever gets
// additional variants we have not considered.
debug_assert!(e.is_full());
},
}
}
}
@ -274,19 +312,10 @@ impl Driver {
pub struct Signal {
driver: Driver,
signal: c_int,
// Used only as an identifier. We place the real sender into a Box, so it
// stays on the same address forever. That gives us a unique pointer, so we
// can use this to identify the sender in a Vec and delete it when we are
// dropped.
id: *const Sender<c_int>,
id: SignalId,
rx: Receiver<c_int>,
}
// The raw pointer prevents the compiler from determining it as Send
// automatically. But the only thing we use the raw pointer for is to identify
// the correct Box to delete, not manipulate any data through that.
unsafe impl Send for Signal {}
impl Signal {
/// Creates a new stream which will receive notifications when the current
/// process receives the signal `signal`.
@ -347,12 +376,10 @@ impl Signal {
let driver = try!(Driver::new(&handle));
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
let (tx, rx) = channel(1);
let tx = Box::new(tx);
let id: *const _ = &*tx;
let idx = signal as usize;
globals().signals[idx].recipients.lock().unwrap().push(tx);
// more. NB: channels always guarantee at least one slot per sender,
// so we don't need additional slots
let (tx, rx) = channel(0);
let id = Globals::register_signal_sender(signal, tx);
Ok(Signal {
driver: driver,
rx: rx,
@ -378,8 +405,39 @@ impl Stream for Signal {
impl Drop for Signal {
fn drop(&mut self) {
let idx = self.signal as usize;
let mut list = globals().signals[idx].recipients.lock().unwrap();
list.retain(|sender| &**sender as *const _ != self.id);
Globals::deregister_signal_receiver(self.signal, self.id);
}
}
#[cfg(test)]
mod tests {
extern crate tokio;
use super::*;
#[test]
fn dropped_signal_senders_are_cleaned_up() {
let mut rt = self::tokio::runtime::current_thread::Runtime::new()
.expect("failed to init runtime");
let signum = libc::SIGUSR1;
let signal = rt.block_on(Signal::new(signum))
.expect("failed to create signal");
{
let recipients = globals().signals[signum as usize].recipients.lock().unwrap();
assert!(!recipients.is_empty());
}
drop(signal);
unsafe {
assert_eq!(libc::kill(libc::getpid(), signum), 0);
}
{
let recipients = globals().signals[signum as usize].recipients.lock().unwrap();
assert!(recipients.is_empty());
}
}
}