signal: Provide the new kind of Signal stream

Which is just a wrapper around the futures::sync::mpsc. The sender is in
a global registry.

The part that connects the wakeups to the senders in the registry
doesn't yet exist.
This commit is contained in:
Michal 'vorner' Vaner 2017-01-22 12:58:48 +01:00 committed by Carl Lerche
parent ed4359bb26
commit 04d949c380

View File

@ -10,16 +10,24 @@ extern crate mio;
extern crate tokio_uds;
extern crate nix;
use std::sync::{Once, ONCE_INIT};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::os::unix::io::RawFd;
use std::io;
use self::libc::c_int;
use self::nix::unistd::pipe;
use self::nix::sys::signal::{SigAction, SigHandler, SigSet, SA_NOCLDSTOP, SA_RESTART, sigaction};
use self::nix::sys::signal::Signal as NixSignal;
use self::nix::sys::socket::{send, MSG_DONTWAIT};
use futures::{Future, IntoFuture};
use futures::sync::mpsc::{Receiver, Sender, channel};
use futures::{Stream, Poll};
use tokio_core::reactor::Handle;
use tokio_core::io::IoFuture;
pub use self::libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
pub use self::libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP};
// Number of different unix signals
const SIGNUM: usize = 32;
@ -27,6 +35,8 @@ const SIGNUM: usize = 32;
#[derive(Default)]
struct SignalInfo {
initialized: bool,
// The ones interested in this signal
recipients: Vec<Sender<c_int>>,
// TODO: Other stuff, like the previous sigaction to call
}
@ -81,22 +91,7 @@ fn signal_enable(signal: c_int) -> RawFd {
globals.receiver
}
use std::cell::RefCell;
use std::io::{self, Write, Read};
use std::mem;
use futures::future;
use futures::stream::Fuse;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::{Future, Stream, IntoFuture, Poll, Async};
use self::tokio_uds::UnixStream;
use tokio_core::io::IoFuture;
use tokio_core::reactor::{PollEvented, Handle};
static INIT: Once = ONCE_INIT;
static mut GLOBAL_STATE: *mut GlobalState = 0 as *mut _;
// TODO: Go through the docs, they are a copy-paste from the previous version
/// An implementation of `Stream` for receiving a particular type of signal.
///
/// This structure implements the `Stream` trait and represents notifications
@ -134,6 +129,74 @@ static mut GLOBAL_STATE: *mut GlobalState = 0 as *mut _;
/// If you've got any questions about this feel free to open an issue on the
/// repo, though, as I'd love to chat about this! In other words, I'd love to
/// alleviate some of these limitations if possible!
pub struct Signal(Receiver<c_int>);
impl Signal {
// TODO: Revisit the docs, they are from the previous version
/// Creates a new stream which will receive notifications when the current
/// process receives the signal `signum`.
///
/// This function will create a new stream which may be based on the
/// event loop handle provided. This function returns a future which will
/// then resolve to the signal stream, if successful.
///
/// The `Signal` stream is an infinite stream which will receive
/// notifications whenever a signal is received. More documentation can be
/// found on `Signal` itself, but to reiterate:
///
/// * Signals may be coalesced beyond what the kernel already does.
/// * While multiple event loops are supported, the first event loop to
/// register a signal handler must be active to deliver signal
/// notifications
/// * Once a signal handle is registered with the process the underlying
/// libc signal handler is never unregistered.
///
/// A `Signal` stream can be created for a particular signal number
/// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification.
pub fn new(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
let index = signal as usize;
// One wakeup in a queue is enough
let (sender, receiver) = channel(1);
{ // Hold the mutex only a short while
let mut siginfo = globals.signals[index].lock().unwrap();
siginfo.recipients.push(sender);
}
// Turn the signal delivery on once we are ready for it
let wakeup = signal_enable(signal);
// TODO: Init the driving task for this handle
Ok(Signal(receiver)).into_future().boxed()
}
}
impl Stream for Signal {
type Item = c_int;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<c_int>, io::Error> {
// It seems the channel doesn't generate any errors anyway
self.0.poll().map_err(|_| io::Error::new(io::ErrorKind::Other, "Unknown futures::sync::mpsc error"))
}
}
// TODO: Drop for Signal and remove the other end proactively?
/*
use std::cell::RefCell;
use std::io::{self, Write, Read};
use std::mem;
use futures::future;
use futures::stream::Fuse;
use futures::sync::mpsc;
use futures::sync::oneshot;
use self::tokio_uds::UnixStream;
use tokio_core::reactor::{PollEvented, Handle};
static INIT: Once = ONCE_INIT;
static mut GLOBAL_STATE: *mut GlobalState = 0 as *mut _;
pub struct Signal {
signum: c_int,
reg: PollEvented<MyRegistration>,
@ -167,31 +230,7 @@ struct SignalState {
tasks: Vec<(RefCell<oneshot::Receiver<()>>, mio::SetReadiness)>,
}
pub use self::libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
pub use self::libc::{SIGHUP, SIGQUIT, SIGPIPE, SIGALRM, SIGTRAP};
impl Signal {
/// Creates a new stream which will receive notifications when the current
/// process receives the signal `signum`.
///
/// This function will create a new stream which may be based on the
/// event loop handle provided. This function returns a future which will
/// then resolve to the signal stream, if successful.
///
/// The `Signal` stream is an infinite stream which will receive
/// notifications whenever a signal is received. More documentation can be
/// found on `Signal` itself, but to reiterate:
///
/// * Signals may be coalesced beyond what the kernel already does.
/// * While multiple event loops are supported, the first event loop to
/// register a signal handler must be active to deliver signal
/// notifications
/// * Once a signal handle is registered with the process the underlying
/// libc signal handler is never unregistered.
///
/// A `Signal` stream can be created for a particular signal number
/// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification.
pub fn new(signum: c_int, handle: &Handle) -> IoFuture<Signal> {
let mut init = None;
INIT.call_once(|| {
@ -464,3 +503,5 @@ impl mio::Evented for MyRegistration {
Ok(())
}
}
*/