diff --git a/Cargo.toml b/Cargo.toml index 35f0d7f88..3015dfdc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,19 +11,15 @@ An implementation of an asynchronous Unix signal handling backed futures. """ [dependencies] -tokio-core = "0.1" +tokio-core = "0.1.4" futures = "0.1.7" -lazy_static = "0.2" -nix = "0.7" [target.'cfg(unix)'.dependencies] libc = "0.2" mio = "0.6" +mio-uds = "0.6" [target.'cfg(windows)'.dependencies] winapi = "0.2" kernel32-sys = "0.2" mio = "0.6" - -[replace] -"tokio-core:0.1.3" = { git = "https://github.com/tokio-rs/tokio-core" } diff --git a/src/lib.rs b/src/lib.rs index 0af09609a..780d7f0bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,8 +25,6 @@ #[macro_use] extern crate futures; extern crate tokio_core; -#[macro_use] -extern crate lazy_static; use futures::Future; use futures::stream::Stream; diff --git a/src/unix.rs b/src/unix.rs index 1239559bc..7aa1618eb 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -7,28 +7,28 @@ pub extern crate libc; extern crate mio; -extern crate nix; +extern crate mio_uds; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Mutex; -use std::os::unix::io::RawFd; +use std::cell::UnsafeCell; use std::collections::HashSet; +use std::io::prelude::*; use std::io; +use std::mem; +use std::os::unix::prelude::*; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Mutex, Once, ONCE_INIT}; +use futures::future; +use futures::sync::mpsc::{Receiver, Sender, channel}; +use futures::{Async, AsyncSink, Future, IntoFuture}; +use futures::{Sink, Stream, Poll}; use self::libc::c_int; -use self::nix::sys::signal::{sigaction, SigAction, SigHandler, SigSet, SA_NOCLDSTOP, SA_RESTART}; -use self::nix::sys::signal::Signal as NixSignal; -use self::nix::Error as NixError; -use self::nix::Errno; -use self::nix::sys::socket::{recv, send, socketpair, AddressFamily, SockType, SockFlag, MSG_DONTWAIT}; -use self::mio::{Evented, Token, Ready, PollOpt}; use self::mio::Poll as MioPoll; use self::mio::unix::EventedFd; -use futures::{Async, AsyncSink, Future, IntoFuture}; -use futures::sync::mpsc::{Receiver, Sender, channel}; -use futures::{Sink, Stream, Poll}; -use tokio_core::reactor::{Handle, CoreId, PollEvented}; +use self::mio::{Evented, Token, Ready, PollOpt}; +use self::mio_uds::UnixStream; use tokio_core::io::IoFuture; +use tokio_core::reactor::{Handle, CoreId, PollEvented}; pub use self::libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2}; pub use self::libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP}; @@ -36,82 +36,133 @@ pub use self::libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP}; // Number of different unix signals const SIGNUM: usize = 32; -#[derive(Default)] struct SignalInfo { - initialized: bool, + pending: AtomicBool, // The ones interested in this signal - recipients: Vec>, - // TODO: Other stuff, like the previous sigaction to call + recipients: Mutex>>, + + init: Once, + initialized: UnsafeCell, + prev: UnsafeCell, } struct Globals { - pending: [AtomicBool; SIGNUM], - sender: RawFd, - receiver: RawFd, - signals: [Mutex; SIGNUM], + sender: UnixStream, + receiver: UnixStream, + signals: [SignalInfo; SIGNUM], drivers: Mutex>, } -impl Globals { - fn new() -> Self { - // TODO: Better error handling - // We use socket pair instead of pipe, as it allows send() and recv(). - let (receiver, sender) = socketpair(AddressFamily::Unix, SockType::Stream, 0, SockFlag::empty()).unwrap(); - Globals { - // Bunch of false values - pending: Default::default(), - sender: sender, - receiver: receiver, - signals: Default::default(), - drivers: Mutex::new(HashSet::new()), +impl Default for SignalInfo { + fn default() -> SignalInfo { + SignalInfo { + pending: AtomicBool::new(false), + init: ONCE_INIT, + initialized: UnsafeCell::new(false), + recipients: Mutex::new(Vec::new()), + prev: UnsafeCell::new(unsafe { mem::zeroed() }), } } } -lazy_static! { - // TODO: Get rid of lazy_static once the prototype is done ‒ get rid of the dependency as well - // as the possible lock in there, which *might* be problematic in signals - static ref GLOBALS: Globals = Globals::new(); +static mut GLOBALS: *mut Globals = 0 as *mut Globals; + +fn globals() -> &'static Globals { + static INIT: Once = ONCE_INIT; + + unsafe { + INIT.call_once(|| { + let (receiver, sender) = UnixStream::pair().unwrap(); + let globals = Globals { + sender: sender, + receiver: receiver, + signals: Default::default(), + drivers: Mutex::new(HashSet::new()), + }; + GLOBALS = Box::into_raw(Box::new(globals)); + }); + &*GLOBALS + } } // Flag the relevant signal and wake up through a self-pipe -extern "C" fn pipe_wakeup(signal: c_int) { - let index = signal as usize; - // TODO: Handle the old signal handler - // It might be good enough to use some lesser ordering than this, but how to prove it? - GLOBALS.pending[index].store(true, Ordering::SeqCst); - // Send a wakeup, ignore any errors (anything reasonably possible is full pipe and then it will - // wake up anyway). - let _ = send(GLOBALS.sender, &[0u8], MSG_DONTWAIT); +extern fn handler(signum: c_int, + info: *mut libc::siginfo_t, + ptr: *mut libc::c_void) { + type FnSigaction = extern fn(c_int, *mut libc::siginfo_t, *mut libc::c_void); + type FnHandler = extern fn(c_int); + unsafe { + let slot = match (*GLOBALS).signals.get(signum as usize) { + Some(slot) => slot, + None => return, + }; + slot.pending.store(true, Ordering::SeqCst); + + // Send a wakeup, ignore any errors (anything reasonably possible is + // full pipe and then it will wake up anyway). + drop((*GLOBALS).sender.write(&[1])); + + let fnptr = (*slot.prev.get()).sa_sigaction; + if fnptr == 0 || fnptr == libc::SIG_DFL || fnptr == libc::SIG_IGN { + return + } + if (*slot.prev.get()).sa_flags & libc::SA_SIGINFO == 0 { + let action = mem::transmute::(fnptr); + action(signum) + } else { + let action = mem::transmute::(fnptr); + action(signum, info, ptr) + } + } } -// Make sure we listen to the given signal and provide the recipient end of the self-pipe -fn signal_enable(signal: c_int) { - let index = signal as usize; - let mut siginfo = GLOBALS.signals[index].lock().unwrap(); - if !siginfo.initialized { - let action = SigAction::new(SigHandler::Handler(pipe_wakeup), SA_NOCLDSTOP | SA_RESTART, SigSet::empty()); - unsafe { sigaction(NixSignal::from_c_int(signal).unwrap(), &action).unwrap() }; - // TODO: Handle the old signal handler - siginfo.initialized = true; - } +// Make sure we listen to the given signal and provide the recipient end of the +// self-pipe +fn signal_enable(signal: c_int) -> io::Result<()> { + let siginfo = &globals().signals[signal as usize]; + unsafe { + let mut err = None; + siginfo.init.call_once(|| { + let mut new: libc::sigaction = mem::zeroed(); + new.sa_sigaction = handler as usize; + new.sa_flags = libc::SA_RESTART | + libc::SA_SIGINFO | + libc::SA_NOCLDSTOP; + if libc::sigaction(signal, &new, &mut *siginfo.prev.get()) != 0 { + err = Some(io::Error::last_os_error()); + } else { + *siginfo.initialized.get() = true; + } + }); + if let Some(err) = err { + return Err(err) + } + if *siginfo.initialized.get() { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to register signal handler")) + } + } } struct EventedReceiver; impl Evented for EventedReceiver { fn register(&self, poll: &MioPoll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> { - EventedFd(&GLOBALS.receiver).register(poll, token, events, opts) + let fd = globals().receiver.as_raw_fd(); + EventedFd(&fd).register(poll, token, events, opts) } fn reregister(&self, poll: &MioPoll, token: Token, events: Ready, opts: PollOpt) -> io::Result<()> { - EventedFd(&GLOBALS.receiver).reregister(poll, token, events, opts) + let fd = globals().receiver.as_raw_fd(); + EventedFd(&fd).reregister(poll, token, events, opts) } fn deregister(&self, poll: &MioPoll) -> io::Result<()> { - EventedFd(&GLOBALS.receiver).deregister(poll) + let fd = globals().receiver.as_raw_fd(); + EventedFd(&fd).deregister(poll) } } -// There'll be stuff inside struct Driver { id: CoreId, wakeup: PollEvented, @@ -134,7 +185,7 @@ impl Future for Driver { impl Drop for Driver { fn drop(&mut self) { - let mut drivers = GLOBALS.drivers.lock().unwrap(); + let mut drivers = globals().drivers.lock().unwrap(); drivers.remove(&self.id); } } @@ -156,13 +207,11 @@ impl Driver { } // Read all available data (until EAGAIN) let mut received = false; - let mut buffer = [0; 1024]; loop { - match recv(GLOBALS.receiver, &mut buffer, MSG_DONTWAIT) { + match (&globals().receiver).read(&mut [0; 128]) { Ok(0) => panic!("EOF on self-pipe"), Ok(_) => received = true, - Err(NixError::Sys(Errno::EAGAIN)) => break, - Err(NixError::Sys(Errno::EINTR)) => (), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, Err(e) => panic!("Bad read on self-pipe: {}", e), } } @@ -174,30 +223,26 @@ impl Driver { } // Go through all the signals and broadcast everything fn broadcast(&self) { - for (sig, value) in GLOBALS.pending.iter().enumerate() { + for (sig, slot) in globals().signals.iter().enumerate() { // Any signal of this kind arrived since we checked last? - if value.swap(false, Ordering::SeqCst) { - let signum = sig as c_int; - let mut siginfo = GLOBALS.signals[sig].lock().unwrap(); - // It doesn't seem to be possible to do this through the iterators for now. - // This trick is copied from https://github.com/rust-lang/rfcs/pull/1353. - for i in (0 .. siginfo.recipients.len()).rev() { - // TODO: This thing probably generates unnecessary wakups of this task. - // But let's optimise it later on, when we know this works. - match siginfo.recipients[i].start_send(signum) { - // We don't care if it was full or not ‒ we just want to wake up the other - // side. - Ok(AsyncSink::Ready) => { - // We are required to call this if we push something inside - let _ = siginfo.recipients[i].poll_complete(); - }, - // The channel is full -> it'll get woken up anyway - Ok(AsyncSink::NotReady(_)) => (), - // The other side disappeared, drop this end. - Err(_) => { - siginfo.recipients.swap_remove(i); - }, - } + if !slot.pending.swap(false, Ordering::SeqCst) { + continue + } + + let signum = sig as c_int; + let mut recipients = slot.recipients.lock().unwrap(); + + // Notify all waiters on this signal that the signal has been + // received. If we can't push a message into the queue then we don't + // worry about it as everything is coalesced anyway. + for i in (0..recipients.len()).rev() { + // TODO: This thing probably generates unnecessary wakups of + // this task. But let's optimise it later on, when we + // know this works. + match recipients[i].start_send(signum) { + Ok(AsyncSink::Ready) => {} + Ok(AsyncSink::NotReady(_)) => {} + Err(_) => { recipients.swap_remove(i); } } } } @@ -268,25 +313,24 @@ impl Signal { /// multiple times. When a signal is received then all the associated /// channels will receive the signal notification. pub fn new(signal: c_int, handle: &Handle) -> IoFuture { - let index = signal as usize; - // One wakeup in a queue is enough - let (sender, receiver) = channel(1); - { - let mut siginfo = GLOBALS.signals[index].lock().unwrap(); - siginfo.recipients.push(sender); - } // Turn the signal delivery on once we are ready for it - signal_enable(signal); + if let Err(e) = signal_enable(signal) { + return future::err(e).boxed() + } + + // One wakeup in a queue is enough + let (tx, rx) = channel(1); + globals().signals[signal as usize].recipients.lock().unwrap().push(tx); let id = handle.id(); { - let mut drivers = GLOBALS.drivers.lock().unwrap(); + let mut drivers = globals().drivers.lock().unwrap(); if !drivers.contains(&id) { handle.spawn(Driver::new(handle)); drivers.insert(id); } } // TODO: Init the driving task for this handle - Ok(Signal(receiver)).into_future().boxed() + Ok(Signal(rx)).into_future().boxed() } }