signal: Various cleanups:

* Drop nix/lazy_static
* Use previously registered handlers
* Handle some more errors
This commit is contained in:
Alex Crichton 2017-01-24 10:02:42 -08:00 committed by Carl Lerche
parent 367cb56e02
commit ba0921a01d
3 changed files with 144 additions and 106 deletions

View File

@ -11,19 +11,15 @@ An implementation of an asynchronous Unix signal handling backed futures.
"""
[dependencies]
tokio-core = "0.1"
tokio-core = "0.1.4"
futures = "0.1.7"
lazy_static = "0.2"
nix = "0.7"
[target.'cfg(unix)'.dependencies]
libc = "0.2"
mio = "0.6"
mio-uds = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.2"
kernel32-sys = "0.2"
mio = "0.6"
[replace]
"tokio-core:0.1.3" = { git = "https://github.com/tokio-rs/tokio-core" }

View File

@ -25,8 +25,6 @@
#[macro_use]
extern crate futures;
extern crate tokio_core;
#[macro_use]
extern crate lazy_static;
use futures::Future;
use futures::stream::Stream;

View File

@ -7,28 +7,28 @@
pub extern crate libc;
extern crate mio;
extern crate nix;
extern crate mio_uds;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::os::unix::io::RawFd;
use std::cell::UnsafeCell;
use std::collections::HashSet;
use std::io::prelude::*;
use std::io;
use std::mem;
use std::os::unix::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex, Once, ONCE_INIT};
use futures::future;
use futures::sync::mpsc::{Receiver, Sender, channel};
use futures::{Async, AsyncSink, Future, IntoFuture};
use futures::{Sink, Stream, Poll};
use self::libc::c_int;
use self::nix::sys::signal::{sigaction, SigAction, SigHandler, SigSet, SA_NOCLDSTOP, SA_RESTART};
use self::nix::sys::signal::Signal as NixSignal;
use self::nix::Error as NixError;
use self::nix::Errno;
use self::nix::sys::socket::{recv, send, socketpair, AddressFamily, SockType, SockFlag, MSG_DONTWAIT};
use self::mio::{Evented, Token, Ready, PollOpt};
use self::mio::Poll as MioPoll;
use self::mio::unix::EventedFd;
use futures::{Async, AsyncSink, Future, IntoFuture};
use futures::sync::mpsc::{Receiver, Sender, channel};
use futures::{Sink, Stream, Poll};
use tokio_core::reactor::{Handle, CoreId, PollEvented};
use self::mio::{Evented, Token, Ready, PollOpt};
use self::mio_uds::UnixStream;
use tokio_core::io::IoFuture;
use tokio_core::reactor::{Handle, CoreId, PollEvented};
pub use self::libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
pub use self::libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP};
@ -36,82 +36,133 @@ pub use self::libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP};
// Number of different unix signals
const SIGNUM: usize = 32;
#[derive(Default)]
struct SignalInfo {
initialized: bool,
pending: AtomicBool,
// The ones interested in this signal
recipients: Vec<Sender<c_int>>,
// TODO: Other stuff, like the previous sigaction to call
recipients: Mutex<Vec<Sender<c_int>>>,
init: Once,
initialized: UnsafeCell<bool>,
prev: UnsafeCell<libc::sigaction>,
}
struct Globals {
pending: [AtomicBool; SIGNUM],
sender: RawFd,
receiver: RawFd,
signals: [Mutex<SignalInfo>; SIGNUM],
sender: UnixStream,
receiver: UnixStream,
signals: [SignalInfo; SIGNUM],
drivers: Mutex<HashSet<CoreId>>,
}
impl Globals {
fn new() -> Self {
// TODO: Better error handling
// We use socket pair instead of pipe, as it allows send() and recv().
let (receiver, sender) = socketpair(AddressFamily::Unix, SockType::Stream, 0, SockFlag::empty()).unwrap();
Globals {
// Bunch of false values
pending: Default::default(),
sender: sender,
receiver: receiver,
signals: Default::default(),
drivers: Mutex::new(HashSet::new()),
impl Default for SignalInfo {
fn default() -> SignalInfo {
SignalInfo {
pending: AtomicBool::new(false),
init: ONCE_INIT,
initialized: UnsafeCell::new(false),
recipients: Mutex::new(Vec::new()),
prev: UnsafeCell::new(unsafe { mem::zeroed() }),
}
}
}
lazy_static! {
// TODO: Get rid of lazy_static once the prototype is done get rid of the dependency as well
// as the possible lock in there, which *might* be problematic in signals
static ref GLOBALS: Globals = Globals::new();
static mut GLOBALS: *mut Globals = 0 as *mut Globals;
fn globals() -> &'static Globals {
static INIT: Once = ONCE_INIT;
unsafe {
INIT.call_once(|| {
let (receiver, sender) = UnixStream::pair().unwrap();
let globals = Globals {
sender: sender,
receiver: receiver,
signals: Default::default(),
drivers: Mutex::new(HashSet::new()),
};
GLOBALS = Box::into_raw(Box::new(globals));
});
&*GLOBALS
}
}
// Flag the relevant signal and wake up through a self-pipe
extern "C" fn pipe_wakeup(signal: c_int) {
let index = signal as usize;
// TODO: Handle the old signal handler
// It might be good enough to use some lesser ordering than this, but how to prove it?
GLOBALS.pending[index].store(true, Ordering::SeqCst);
// Send a wakeup, ignore any errors (anything reasonably possible is full pipe and then it will
// wake up anyway).
let _ = send(GLOBALS.sender, &[0u8], MSG_DONTWAIT);
extern fn handler(signum: c_int,
info: *mut libc::siginfo_t,
ptr: *mut libc::c_void) {
type FnSigaction = extern fn(c_int, *mut libc::siginfo_t, *mut libc::c_void);
type FnHandler = extern fn(c_int);
unsafe {
let slot = match (*GLOBALS).signals.get(signum as usize) {
Some(slot) => slot,
None => return,
};
slot.pending.store(true, Ordering::SeqCst);
// Send a wakeup, ignore any errors (anything reasonably possible is
// full pipe and then it will wake up anyway).
drop((*GLOBALS).sender.write(&[1]));
let fnptr = (*slot.prev.get()).sa_sigaction;
if fnptr == 0 || fnptr == libc::SIG_DFL || fnptr == libc::SIG_IGN {
return
}
if (*slot.prev.get()).sa_flags & libc::SA_SIGINFO == 0 {
let action = mem::transmute::<usize, FnHandler>(fnptr);
action(signum)
} else {
let action = mem::transmute::<usize, FnSigaction>(fnptr);
action(signum, info, ptr)
}
}
}
// Make sure we listen to the given signal and provide the recipient end of the self-pipe
fn signal_enable(signal: c_int) {
let index = signal as usize;
let mut siginfo = GLOBALS.signals[index].lock().unwrap();
if !siginfo.initialized {
let action = SigAction::new(SigHandler::Handler(pipe_wakeup), SA_NOCLDSTOP | SA_RESTART, SigSet::empty());
unsafe { sigaction(NixSignal::from_c_int(signal).unwrap(), &action).unwrap() };
// TODO: Handle the old signal handler
siginfo.initialized = true;
}
// Make sure we listen to the given signal and provide the recipient end of the
// self-pipe
fn signal_enable(signal: c_int) -> io::Result<()> {
let siginfo = &globals().signals[signal as usize];
unsafe {
let mut err = None;
siginfo.init.call_once(|| {
let mut new: libc::sigaction = mem::zeroed();
new.sa_sigaction = handler as usize;
new.sa_flags = libc::SA_RESTART |
libc::SA_SIGINFO |
libc::SA_NOCLDSTOP;
if libc::sigaction(signal, &new, &mut *siginfo.prev.get()) != 0 {
err = Some(io::Error::last_os_error());
} else {
*siginfo.initialized.get() = true;
}
});
if let Some(err) = err {
return Err(err)
}
if *siginfo.initialized.get() {
Ok(())
} else {
Err(io::Error::new(io::ErrorKind::Other,
"failed to register signal handler"))
}
}
}
struct EventedReceiver;
impl Evented for EventedReceiver {
fn register(&self, poll: &MioPoll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
EventedFd(&GLOBALS.receiver).register(poll, token, events, opts)
let fd = globals().receiver.as_raw_fd();
EventedFd(&fd).register(poll, token, events, opts)
}
fn reregister(&self, poll: &MioPoll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> {
EventedFd(&GLOBALS.receiver).reregister(poll, token, events, opts)
let fd = globals().receiver.as_raw_fd();
EventedFd(&fd).reregister(poll, token, events, opts)
}
fn deregister(&self, poll: &MioPoll) -> io::Result<()> {
EventedFd(&GLOBALS.receiver).deregister(poll)
let fd = globals().receiver.as_raw_fd();
EventedFd(&fd).deregister(poll)
}
}
// There'll be stuff inside
struct Driver {
id: CoreId,
wakeup: PollEvented<EventedReceiver>,
@ -134,7 +185,7 @@ impl Future for Driver {
impl Drop for Driver {
fn drop(&mut self) {
let mut drivers = GLOBALS.drivers.lock().unwrap();
let mut drivers = globals().drivers.lock().unwrap();
drivers.remove(&self.id);
}
}
@ -156,13 +207,11 @@ impl Driver {
}
// Read all available data (until EAGAIN)
let mut received = false;
let mut buffer = [0; 1024];
loop {
match recv(GLOBALS.receiver, &mut buffer, MSG_DONTWAIT) {
match (&globals().receiver).read(&mut [0; 128]) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => received = true,
Err(NixError::Sys(Errno::EAGAIN)) => break,
Err(NixError::Sys(Errno::EINTR)) => (),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {}", e),
}
}
@ -174,30 +223,26 @@ impl Driver {
}
// Go through all the signals and broadcast everything
fn broadcast(&self) {
for (sig, value) in GLOBALS.pending.iter().enumerate() {
for (sig, slot) in globals().signals.iter().enumerate() {
// Any signal of this kind arrived since we checked last?
if value.swap(false, Ordering::SeqCst) {
let signum = sig as c_int;
let mut siginfo = GLOBALS.signals[sig].lock().unwrap();
// It doesn't seem to be possible to do this through the iterators for now.
// This trick is copied from https://github.com/rust-lang/rfcs/pull/1353.
for i in (0 .. siginfo.recipients.len()).rev() {
// TODO: This thing probably generates unnecessary wakups of this task.
// But let's optimise it later on, when we know this works.
match siginfo.recipients[i].start_send(signum) {
// We don't care if it was full or not we just want to wake up the other
// side.
Ok(AsyncSink::Ready) => {
// We are required to call this if we push something inside
let _ = siginfo.recipients[i].poll_complete();
},
// The channel is full -> it'll get woken up anyway
Ok(AsyncSink::NotReady(_)) => (),
// The other side disappeared, drop this end.
Err(_) => {
siginfo.recipients.swap_remove(i);
},
}
if !slot.pending.swap(false, Ordering::SeqCst) {
continue
}
let signum = sig as c_int;
let mut recipients = slot.recipients.lock().unwrap();
// Notify all waiters on this signal that the signal has been
// received. If we can't push a message into the queue then we don't
// worry about it as everything is coalesced anyway.
for i in (0..recipients.len()).rev() {
// TODO: This thing probably generates unnecessary wakups of
// this task. But let's optimise it later on, when we
// know this works.
match recipients[i].start_send(signum) {
Ok(AsyncSink::Ready) => {}
Ok(AsyncSink::NotReady(_)) => {}
Err(_) => { recipients.swap_remove(i); }
}
}
}
@ -268,25 +313,24 @@ impl Signal {
/// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification.
pub fn new(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
let index = signal as usize;
// One wakeup in a queue is enough
let (sender, receiver) = channel(1);
{
let mut siginfo = GLOBALS.signals[index].lock().unwrap();
siginfo.recipients.push(sender);
}
// Turn the signal delivery on once we are ready for it
signal_enable(signal);
if let Err(e) = signal_enable(signal) {
return future::err(e).boxed()
}
// One wakeup in a queue is enough
let (tx, rx) = channel(1);
globals().signals[signal as usize].recipients.lock().unwrap().push(tx);
let id = handle.id();
{
let mut drivers = GLOBALS.drivers.lock().unwrap();
let mut drivers = globals().drivers.lock().unwrap();
if !drivers.contains(&id) {
handle.spawn(Driver::new(handle));
drivers.insert(id);
}
}
// TODO: Init the driving task for this handle
Ok(Signal(receiver)).into_future().boxed()
Ok(Signal(rx)).into_future().boxed()
}
}