signal: refactor: Prefer the implicit handle passing used by tokio

This commit is contained in:
Markus Westerlind 2018-05-05 20:09:22 +02:00 committed by Carl Lerche
parent 209232befd
commit e73b8a0cc9
8 changed files with 93 additions and 50 deletions

View File

@ -19,19 +19,16 @@ Next you can use this in conjunction with the `tokio-core` and `futures` crates:
```rust,no_run
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_signal;
use tokio_core::reactor::Core;
use futures::{Future, Stream};
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
// Create an infinite stream of "Ctrl+C" notifications. Each item received
// on this stream may represent multiple ctrl-c signals.
let ctrl_c = tokio_signal::ctrl_c(&handle).flatten_stream();
let ctrl_c = tokio_signal::ctrl_c().flatten_stream();
// Process each ctrl-c as it comes in
let prog = ctrl_c.for_each(|()| {
@ -39,7 +36,7 @@ fn main() {
Ok(())
});
core.run(prog).unwrap();
tokio::run(prog.map_err(|err| panic!("{}", err)));
}
```

View File

@ -19,7 +19,7 @@ fn main() {
// the `flatten_stream()` convenience method lazily defers that
// initialisation, allowing us to use it 'as if' it is already the
// stream we want, reducing boilerplate Future-handling.
let endless_stream = tokio_signal::ctrl_c(&core.handle().new_tokio_handle()).flatten_stream();
let endless_stream = tokio_signal::ctrl_c().flatten_stream();
// don't keep going forever: convert the endless stream to a bounded one.
let limited_stream = endless_stream.take(STOP_AFTER);

View File

@ -10,12 +10,10 @@ use tokio_signal::unix::{Signal, SIGINT, SIGTERM};
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let handle = handle.new_tokio_handle();
// Create a stream for each of the signals we'd like to handle.
let sigint = Signal::new(SIGINT, &handle).flatten_stream();
let sigterm = Signal::new(SIGTERM, &handle).flatten_stream();
let sigint = Signal::new(SIGINT).flatten_stream();
let sigterm = Signal::new(SIGTERM).flatten_stream();
// Use the `select` combinator to merge these two streams into one
let stream = sigint.select(sigterm);

View File

@ -11,7 +11,7 @@ fn main() {
let mut core = Core::new().unwrap();
// on Unix, we can listen to whatever signal we want, in this case: SIGHUP
let stream = Signal::new(SIGHUP, &core.handle().new_tokio_handle()).flatten_stream();
let stream = Signal::new(SIGHUP).flatten_stream();
println!("Waiting for SIGHUPS (Ctrl+C to quit)");
println!(

View File

@ -27,12 +27,10 @@
//!
//! fn main() {
//! let mut core = Core::new().unwrap();
//! let handle = core.handle();
//! let handle = handle.new_tokio_handle();
//!
//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
//! // on this stream may represent multiple ctrl-c signals.
//! let ctrl_c = tokio_signal::ctrl_c(&handle).flatten_stream();
//! let ctrl_c = tokio_signal::ctrl_c().flatten_stream();
//!
//! // Process each ctrl-c as it comes in
//! let prog = ctrl_c.for_each(|()| {
@ -63,12 +61,10 @@
//!
//! fn main() {
//! let mut core = Core::new().unwrap();
//! let handle = core.handle();
//! let handle = handle.new_tokio_handle();
//!
//! // Like the previous example, this is an infinite stream of signals
//! // being received, and signals may be coalesced while pending.
//! let stream = Signal::new(SIGHUP, &handle).flatten_stream();
//! let stream = Signal::new(SIGHUP).flatten_stream();
//!
//! // Convert out stream into a future and block the program
//! core.run(stream.into_future()).ok().unwrap();
@ -100,6 +96,21 @@ pub type IoFuture<T> = Box<Future<Item = T, Error = io::Error> + Send>;
/// A stream whose error is `io::Error`
pub type IoStream<T> = Box<Stream<Item = T, Error = io::Error> + Send>;
/// Creates a stream which receives "ctrl-c" notifications sent to a process.
///
/// In general signals are handled very differently across Unix and Windows, but
/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c
/// event to a console process can be represented as a stream for both Windows
/// and Unix.
///
/// This function binds to the default event loop. Note that
/// there are a number of caveats listening for signals, and you may wish to
/// read up on the documentation in the `unix` or `windows` module to take a
/// peek.
pub fn ctrl_c() -> IoFuture<IoStream<()>> {
ctrl_c_handle(&Handle::current())
}
/// Creates a stream which receives "ctrl-c" notifications sent to a process.
///
/// In general signals are handled very differently across Unix and Windows, but
@ -112,14 +123,14 @@ pub type IoStream<T> = Box<Stream<Item = T, Error = io::Error> + Send>;
/// there are a number of caveats listening for signals, and you may wish to
/// read up on the documentation in the `unix` or `windows` module to take a
/// peek.
pub fn ctrl_c(handle: &Handle) -> IoFuture<IoStream<()>> {
pub fn ctrl_c_handle(handle: &Handle) -> IoFuture<IoStream<()>> {
return ctrl_c_imp(handle);
#[cfg(unix)]
fn ctrl_c_imp(handle: &Handle) -> IoFuture<IoStream<()>> {
let handle = handle.clone();
Box::new(future::lazy(move || {
unix::Signal::new(unix::libc::SIGINT, &handle)
unix::Signal::with_handle(unix::libc::SIGINT, &handle)
.map(|x| Box::new(x.map(|_| ())) as Box<Stream<Item = _, Error = _> + Send>)
}))
}
@ -129,7 +140,7 @@ pub fn ctrl_c(handle: &Handle) -> IoFuture<IoStream<()>> {
let handle = handle.clone();
// Use lazy to ensure that `ctrl_c` gets called while on an event loop
Box::new(future::lazy(move || {
windows::Event::ctrl_c(&handle)
windows::Event::ctrl_c_handle(&handle)
.map(|x| Box::new(x) as Box<Stream<Item = _, Error = _> + Send>)
}))
}

View File

@ -344,6 +344,28 @@ pub struct Signal {
unsafe impl Send for Signal {}
impl Signal {
/// Creates a new stream which will receive notifications when the current
/// process receives the signal `signal`.
///
/// This function will create a new stream which binds to the default event
/// loop. This function returns a future which will
/// then resolve to the signal stream, if successful.
///
/// The `Signal` stream is an infinite stream which will receive
/// notifications whenever a signal is received. More documentation can be
/// found on `Signal` itself, but to reiterate:
///
/// * Signals may be coalesced beyond what the kernel already does.
/// * Once a signal handler is registered with the process the underlying
/// libc signal handler is never unregistered.
///
/// A `Signal` stream can be created for a particular signal number
/// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification.
pub fn new(signal: c_int) -> IoFuture<Signal> {
Signal::with_handle(signal, &Handle::current())
}
/// Creates a new stream which will receive notifications when the current
/// process receives the signal `signal`.
///
@ -362,7 +384,7 @@ impl Signal {
/// A `Signal` stream can be created for a particular signal number
/// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification.
pub fn new(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
pub fn with_handle(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
let handle = handle.clone();
Box::new(future::lazy(move || {
let result = (|| {

View File

@ -84,7 +84,15 @@ impl Event {
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_c(handle: &Handle) -> IoFuture<Event> {
pub fn ctrl_c() -> IoFuture<Event> {
Event::ctrl_c_handle(&Handle::current())
}
/// Creates a new stream listening for the `CTRL_C_EVENT` events.
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_c_handle(handle: &Handle) -> IoFuture<Event> {
Event::new(CTRL_C_EVENT, handle)
}
@ -92,7 +100,15 @@ impl Event {
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_break(handle: &Handle) -> IoFuture<Event> {
pub fn ctrl_break() -> IoFuture<Event> {
Event::ctrl_break_handle(&Handle::current())
}
/// Creates a new stream listening for the `CTRL_BREAK_EVENT` events.
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_break_handle(handle: &Handle) -> IoFuture<Event> {
Event::new(CTRL_BREAK_EVENT, handle)
}

View File

@ -11,16 +11,14 @@ use std::thread;
use std::time::Duration;
use futures::stream::Stream;
use futures::{future, Future, IntoFuture};
use futures::{Future, IntoFuture};
use tokio_core::reactor::{Core, Timeout};
use tokio_signal::unix::Signal;
#[test]
fn simple() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let signal = lp.run(Signal::new(libc::SIGUSR1, &handle.new_tokio_handle()))
.unwrap();
let signal = lp.run(Signal::new(libc::SIGUSR1)).unwrap();
unsafe {
assert_eq!(libc::kill(libc::getpid(), libc::SIGUSR1), 0);
}
@ -30,8 +28,7 @@ fn simple() {
#[test]
fn tokio_simple() {
tokio::run(
future::lazy(|| {
Signal::new(libc::SIGUSR1, &tokio::reactor::Handle::default())
Signal::new(libc::SIGUSR1)
.into_future()
.and_then(|signal| {
unsafe {
@ -39,7 +36,7 @@ fn tokio_simple() {
}
signal.into_future().map(|_| ()).map_err(|(err, _)| err)
})
}).map_err(|err| panic!("{}", err)),
.map_err(|err| panic!("{}", err)),
)
}
@ -47,10 +44,14 @@ fn tokio_simple() {
fn notify_both() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let signal1 = lp.run(Signal::new(libc::SIGUSR2, &handle.new_tokio_handle()))
.unwrap();
let signal2 = lp.run(Signal::new(libc::SIGUSR2, &handle.new_tokio_handle()))
.unwrap();
let signal1 = lp.run(Signal::with_handle(
libc::SIGUSR2,
&handle.new_tokio_handle(),
)).unwrap();
let signal2 = lp.run(Signal::with_handle(
libc::SIGUSR2,
&handle.new_tokio_handle(),
)).unwrap();
unsafe {
assert_eq!(libc::kill(libc::getpid(), libc::SIGUSR2), 0);
}
@ -63,8 +64,10 @@ fn notify_both() {
fn drop_then_get_a_signal() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let signal = lp.run(Signal::new(libc::SIGUSR1, &handle.new_tokio_handle()))
.unwrap();
let signal = lp.run(Signal::with_handle(
libc::SIGUSR1,
&handle.new_tokio_handle(),
)).unwrap();
drop(signal);
unsafe {
assert_eq!(libc::kill(libc::getpid(), libc::SIGUSR1), 0);
@ -76,9 +79,7 @@ fn drop_then_get_a_signal() {
#[test]
fn twice() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let signal = lp.run(Signal::new(libc::SIGUSR1, &handle.new_tokio_handle()))
.unwrap();
let signal = lp.run(Signal::new(libc::SIGUSR1)).unwrap();
unsafe {
assert_eq!(libc::kill(libc::getpid(), libc::SIGUSR1), 0);
}
@ -102,9 +103,7 @@ fn multi_loop() {
let sender = sender.clone();
thread::spawn(move || {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let signal = lp.run(Signal::new(libc::SIGHUP, &handle.new_tokio_handle()))
.unwrap();
let signal = lp.run(Signal::new(libc::SIGHUP)).unwrap();
sender.send(()).unwrap();
lp.run(signal.into_future()).ok().unwrap();
})