Merge tokio with tokio-signal

This commit is contained in:
Michal 'vorner' Vaner 2018-09-14 23:27:22 +02:00
commit 462882b356
No known key found for this signature in database
GPG Key ID: F700D0C019E4C66F
22 changed files with 1718 additions and 0 deletions

29
tokio-signal/.travis.yml Normal file
View File

@ -0,0 +1,29 @@
language: rust
sudo: false
matrix:
include:
- rust: stable
- os: osx
- rust: beta
- rust: nightly
- rust: nightly
before_script:
- pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH
script:
- cargo doc --no-deps --all-features
after_success:
- travis-cargo --only nightly doc-upload
script:
- cargo test --no-fail-fast
- rustdoc --test README.md -L target/debug/deps
env:
global:
- secure: "SXXK7Znvm1s5WWQ94l9IP25mXA0uGIQ7ghBuumZz3nSAfxhhJQnYi5hCAbl2/cOfSbpgEtE137dgk6Nd9UDx7rIkLCSe8TWyYjzraX/vvX3xNtLh/fjsayYYRK9a6qU2HIJegZdxPgyF5h2DeBgeLks0Ue8drrFQ1s9bYZVUO0yeuZ3aLkL1FkIG6RXGItUFpb6srEYL1NLizYLxXFEG3cL+kKoFIWc2qPx3EwOqv/eii134nQsuObhWZvPqfTo7zfNP8W/6TnoiggpRH1nrZc3DI3CynTICIOJ2Ogn9gFX9LftYKuJysSwUNVN3WF5aOuLP/XjRSBLYc+PW3v0iqiGzMX3n1VpcyhcbsSNA7ZckGn1HZsWYwspAxkN3idSuVie9Mezm7IV4005juiYKEWEr6hlkv1lzd49QZkWOvLCFCMRiwOOGp4NyzilG1Q1Zs3G1wrcvstmasNpK+QUFNdOFvT2sm34rI4x2rQUvjC/OyqbAK+PjYmTHL47YKON5ymfUL3mAcwgUfBUSd4Wpx8G3VKg3gMcmQm27ah1knOGJWH6XulYTnfGfx6bLo5t2NGx+vZk0naqajD3auWnseobMDsFjhUIRrt6GlnfPqeFoJSm0unu3riAX+RDF/iqZdDfjhX4evETIw3SaTl8EQtVLwz7kJTnxSbTU4XTi+0M="
notifications:
email:
on_success: never

41
tokio-signal/CHANGELOG.md Normal file
View File

@ -0,0 +1,41 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.2.5] - 2018-08-29
### Fixes
* Fix a possible starvation when polling multiple `Signal` instances outside of
a tokio reactor (e.g. by using `Future::wait`)
## [0.2.4] - 2018-08-25
### Fixes
* Actually make `unix::bsd` public
## [0.2.3] - 2018-08-25
### Features
* Exposes `SIGINFO` on BSD-based operating systems. (#46)
## [0.2.2] - 2018-08-14
### Fixes
* Fix starvation of `Signal`s whenever a `Signal` instance is dropped
* Fix starvation of individual `Signal`s based on their creation order
## [0.2.1] - 2018-05-27
### Fixes
* Bump minimum supported version of `mio` to 0.6.14
## 0.2.0 - 2018-05-07
#### Features
* Uses `tokio` instead of `tokio_core` (#24)
* Supports all 33 signals on FreeBSD (#27)
[Unreleased]: https://github.com/alexcrichton/tokio-process/compare/0.2.5...HEAD
[0.2.5]: https://github.com/alexcrichton/tokio-signal/compare/0.2.4...0.2.5
[0.2.4]: https://github.com/alexcrichton/tokio-signal/compare/0.2.3...0.2.4
[0.2.3]: https://github.com/alexcrichton/tokio-signal/compare/0.2.2...0.2.3
[0.2.2]: https://github.com/alexcrichton/tokio-signal/compare/0.2.1...0.2.2
[0.2.1]: https://github.com/alexcrichton/tokio-signal/compare/0.2.0...0.2.1

36
tokio-signal/Cargo.toml Normal file
View File

@ -0,0 +1,36 @@
[package]
name = "tokio-signal"
version = "0.2.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/alexcrichton/tokio-signal"
homepage = "https://github.com/alexcrichton/tokio-signal"
documentation = "https://docs.rs/tokio-signal/0.2"
description = """
An implementation of an asynchronous Unix signal handling backed futures.
"""
categories = ["asynchronous"]
[badges]
travis-ci = { repository = "alexcrichton/tokio-signal" }
appveyor = { repository = "alexcrichton/tokio-signal" }
[dependencies]
futures = "0.1.11"
mio = "0.6.14"
tokio-reactor = "0.1.0"
tokio-executor = "0.1.0"
tokio-io = "0.1"
[target.'cfg(unix)'.dependencies]
libc = "0.2"
mio-uds = "0.6"
signal-hook = "0.1"
[dev-dependencies]
tokio-core = "0.1.17"
tokio = "0.1.6"
[target.'cfg(windows)'.dependencies.winapi]
version = "0.3"
features = ["minwindef", "wincon"]

201
tokio-signal/LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

25
tokio-signal/LICENSE-MIT Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2016 Alex Crichton
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

58
tokio-signal/README.md Normal file
View File

@ -0,0 +1,58 @@
# tokio-signal
An implementation of Unix signal handling for Tokio
[![Build Status](https://travis-ci.org/alexcrichton/tokio-signal.svg?branch=master)](https://travis-ci.org/alexcrichton/tokio-signal)
[Documentation](https://docs.rs/tokio-signal)
## Usage
First, add this to your `Cargo.toml`:
```toml
[dependencies]
tokio-signal = "0.2"
```
Next you can use this in conjunction with the `tokio` and `futures` crates:
```rust,no_run
extern crate futures;
extern crate tokio;
extern crate tokio_signal;
use futures::{Future, Stream};
fn main() {
// Create an infinite stream of "Ctrl+C" notifications. Each item received
// on this stream may represent multiple ctrl-c signals.
let ctrl_c = tokio_signal::ctrl_c().flatten_stream();
// Process each ctrl-c as it comes in
let prog = ctrl_c.for_each(|()| {
println!("ctrl-c received!");
Ok(())
});
tokio::run(prog.map_err(|err| panic!("{}", err)));
}
```
# License
This project is licensed under either of
* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or
http://opensource.org/licenses/MIT)
at your option.
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in tokio-signal by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.

35
tokio-signal/appveyor.yml Normal file
View File

@ -0,0 +1,35 @@
environment:
matrix:
# Stable channel
- TARGET: x86_64-pc-windows-gnu
CHANNEL: stable
- TARGET: x86_64-pc-windows-msvc
CHANNEL: stable
# Beta channel
- TARGET: x86_64-pc-windows-msvc
CHANNEL: beta
# Nightly channel
- TARGET: x86_64-pc-windows-msvc
CHANNEL: nightly
# Install Rust and Cargo
# (Based on from https://github.com/rust-lang/libc/blob/master/appveyor.yml)
install:
- curl -sSf -o rustup-init.exe https://win.rustup.rs
- rustup-init.exe --default-host %TARGET% --default-toolchain %CHANNEL% -y
- set PATH=%PATH%;C:\Users\appveyor\.cargo\bin
- rustc -Vv
- cargo -V
# 'cargo test' takes care of building for us, so disable Appveyor's build stage. This prevents
# the "directory does not contain a project or solution file" error.
# source: https://github.com/starkat99/appveyor-rust/blob/master/appveyor.yml#L113
build: false
test_script:
- cargo build --example ctrl-c
- rustdoc --test README.md -L target/debug/deps
branches:
only:
- master

View File

@ -0,0 +1,63 @@
extern crate futures;
extern crate tokio_core;
extern crate tokio_signal;
use futures::{Future, Stream};
use tokio_core::reactor::Core;
/// how many signals to handle before exiting
const STOP_AFTER: u64 = 10;
fn main() {
// set up a Tokio event loop
let mut core = Core::new().unwrap();
// tokio_signal provides a convenience builder for Ctrl+C
// this even works cross-platform: linux and windows!
//
// `fn ctrl_c()` produces a `Future` of the actual stream-initialisation
// the `flatten_stream()` convenience method lazily defers that
// initialisation, allowing us to use it 'as if' it is already the
// stream we want, reducing boilerplate Future-handling.
let endless_stream = tokio_signal::ctrl_c().flatten_stream();
// don't keep going forever: convert the endless stream to a bounded one.
let limited_stream = endless_stream.take(STOP_AFTER);
// how many Ctrl+C have we received so far?
let mut counter = 0;
println!(
"This program is now waiting for you to press Ctrl+C {0} times.
* If running via `cargo run --example ctrl-c`, Ctrl+C also kills it, \
due to https://github.com/rust-lang-nursery/rustup.rs/issues/806
* If running the binary directly, the Ctrl+C is properly trapped.
Terminate by repeating Ctrl+C {0} times, or ahead of time by \
opening a second terminal and issuing `pkill -sigkil ctrl-c`",
STOP_AFTER
);
// Stream::for_each is a powerful primitive provided by the Futures crate.
// It turns a Stream into a Future that completes after all stream-items
// have been completed, or the first time the closure returns an error
let future = limited_stream.for_each(|()| {
// Note how we manipulate the counter without any fancy synchronisation.
// The borrowchecker realises there can't be any conflicts, so the closure
// can just capture it.
counter += 1;
println!(
"Ctrl+C received {} times! {} more before exit",
counter,
STOP_AFTER - counter
);
// return Ok-result to continue handling the stream
Ok(())
});
// Up until now, we haven't really DONE anything, just prepared
// now it's time to actually schedule, and thus execute, the stream
// on our event loop
core.run(future).unwrap();
println!("Stream ended, quiting the program.");
}

View File

@ -0,0 +1,38 @@
//! A small example of how to listen for two signals at the same time
extern crate futures;
extern crate tokio_core;
extern crate tokio_signal;
use futures::{Future, Stream};
use tokio_core::reactor::Core;
use tokio_signal::unix::{Signal, SIGINT, SIGTERM};
fn main() {
let mut core = Core::new().unwrap();
// Create a stream for each of the signals we'd like to handle.
let sigint = Signal::new(SIGINT).flatten_stream();
let sigterm = Signal::new(SIGTERM).flatten_stream();
// Use the `select` combinator to merge these two streams into one
let stream = sigint.select(sigterm);
// Wait for a signal to arrive
println!("Waiting for SIGINT or SIGTERM");
println!(
" TIP: use `pkill -sigint multiple` from a second terminal \
to send a SIGINT to all processes named 'multiple' \
(i.e. this binary)"
);
let (item, _rest) = core.run(stream.into_future()).ok().unwrap();
// Figure out which signal we received
let item = item.unwrap();
if item == SIGINT {
println!("received SIGINT");
} else {
assert_eq!(item, SIGTERM);
println!("received SIGTERM");
}
}

View File

@ -0,0 +1,39 @@
extern crate futures;
extern crate tokio_core;
extern crate tokio_signal;
use futures::{Future, Stream};
use tokio_core::reactor::Core;
use tokio_signal::unix::{Signal, SIGHUP};
fn main() {
// set up a Tokio event loop
let mut core = Core::new().unwrap();
// on Unix, we can listen to whatever signal we want, in this case: SIGHUP
let stream = Signal::new(SIGHUP).flatten_stream();
println!("Waiting for SIGHUPS (Ctrl+C to quit)");
println!(
" TIP: use `pkill -sighup sighup-example` from a second terminal \
to send a SIGHUP to all processes named 'sighup-example' \
(i.e. this binary)"
);
// for_each is a powerful primitive provided by the Futures crate
// it turns a Stream into a Future that completes after all stream-items
// have been completed.
let future = stream.for_each(|the_signal| {
println!(
"*Got signal {:#x}* I should probably reload my config \
or something",
the_signal
);
Ok(())
});
// Up until now, we haven't really DONE anything, just prepared
// now it's time to actually schedule, and thus execute, the stream
// on our event loop, and loop forever
core.run(future).unwrap();
}

147
tokio-signal/src/lib.rs Normal file
View File

@ -0,0 +1,147 @@
//! Asynchronous signal handling for Tokio
//!
//! This crate implements asynchronous signal handling for Tokio, an
//! asynchronous I/O framework in Rust. The primary type exported from this
//! crate, `unix::Signal`, allows listening for arbitrary signals on Unix
//! platforms, receiving them in an asynchronous fashion.
//!
//! Note that signal handling is in general a very tricky topic and should be
//! used with great care. This crate attempts to implement 'best practice' for
//! signal handling, but it should be evaluated for your own applications' needs
//! to see if it's suitable.
//!
//! The are some fundamental limitations of this crate documented on the
//! `Signal` structure as well.
//!
//! # Examples
//!
//! Print out all ctrl-C notifications received
//!
//! ```rust,no_run
//! extern crate futures;
//! extern crate tokio_core;
//! extern crate tokio_signal;
//!
//! use tokio_core::reactor::Core;
//! use futures::{Future, Stream};
//!
//! fn main() {
//! let mut core = Core::new().unwrap();
//!
//! // Create an infinite stream of "Ctrl+C" notifications. Each item received
//! // on this stream may represent multiple ctrl-c signals.
//! let ctrl_c = tokio_signal::ctrl_c().flatten_stream();
//!
//! // Process each ctrl-c as it comes in
//! let prog = ctrl_c.for_each(|()| {
//! println!("ctrl-c received!");
//! Ok(())
//! });
//!
//! core.run(prog).unwrap();
//! }
//! ```
//!
//! Wait for SIGHUP on Unix
//!
//! ```rust,no_run
//! # extern crate futures;
//! # extern crate tokio_core;
//! # extern crate tokio_signal;
//! # #[cfg(unix)]
//! # mod foo {
//! #
//! extern crate futures;
//! extern crate tokio_core;
//! extern crate tokio_signal;
//!
//! use tokio_core::reactor::Core;
//! use futures::{Future, Stream};
//! use tokio_signal::unix::{Signal, SIGHUP};
//!
//! fn main() {
//! let mut core = Core::new().unwrap();
//!
//! // Like the previous example, this is an infinite stream of signals
//! // being received, and signals may be coalesced while pending.
//! let stream = Signal::new(SIGHUP).flatten_stream();
//!
//! // Convert out stream into a future and block the program
//! core.run(stream.into_future()).ok().unwrap();
//! }
//! # }
//! # fn main() {}
//! ```
#![doc(html_root_url = "https://docs.rs/tokio-signal/0.2")]
#![deny(missing_docs)]
extern crate futures;
extern crate mio;
extern crate tokio_executor;
extern crate tokio_io;
extern crate tokio_reactor;
use std::io;
use futures::stream::Stream;
use futures::{future, Future};
use tokio_reactor::Handle;
pub mod unix;
pub mod windows;
/// A future whose error is `io::Error`
pub type IoFuture<T> = Box<Future<Item = T, Error = io::Error> + Send>;
/// A stream whose error is `io::Error`
pub type IoStream<T> = Box<Stream<Item = T, Error = io::Error> + Send>;
/// Creates a stream which receives "ctrl-c" notifications sent to a process.
///
/// In general signals are handled very differently across Unix and Windows, but
/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c
/// event to a console process can be represented as a stream for both Windows
/// and Unix.
///
/// This function binds to the default event loop. Note that
/// there are a number of caveats listening for signals, and you may wish to
/// read up on the documentation in the `unix` or `windows` module to take a
/// peek.
pub fn ctrl_c() -> IoFuture<IoStream<()>> {
ctrl_c_handle(&Handle::current())
}
/// Creates a stream which receives "ctrl-c" notifications sent to a process.
///
/// In general signals are handled very differently across Unix and Windows, but
/// this is somewhat cross platform in terms of how it can be handled. A ctrl-c
/// event to a console process can be represented as a stream for both Windows
/// and Unix.
///
/// This function receives a `Handle` to an event loop and returns a future
/// which when resolves yields a stream receiving all signal events. Note that
/// there are a number of caveats listening for signals, and you may wish to
/// read up on the documentation in the `unix` or `windows` module to take a
/// peek.
pub fn ctrl_c_handle(handle: &Handle) -> IoFuture<IoStream<()>> {
return ctrl_c_imp(handle);
#[cfg(unix)]
fn ctrl_c_imp(handle: &Handle) -> IoFuture<IoStream<()>> {
let handle = handle.clone();
Box::new(future::lazy(move || {
unix::Signal::with_handle(unix::libc::SIGINT, &handle)
.map(|x| Box::new(x.map(|_| ())) as Box<Stream<Item = _, Error = _> + Send>)
}))
}
#[cfg(windows)]
fn ctrl_c_imp(handle: &Handle) -> IoFuture<IoStream<()>> {
let handle = handle.clone();
// Use lazy to ensure that `ctrl_c` gets called while on an event loop
Box::new(future::lazy(move || {
windows::Event::ctrl_c_handle(&handle)
.map(|x| Box::new(x) as Box<Stream<Item = _, Error = _> + Send>)
}))
}
}

385
tokio-signal/src/unix.rs Normal file
View File

@ -0,0 +1,385 @@
//! Unix-specific types for signal handling.
//!
//! This module is only defined on Unix platforms and contains the primary
//! `Signal` type for receiving notifications of signals.
#![cfg(unix)]
pub extern crate libc;
extern crate mio;
extern crate mio_uds;
extern crate signal_hook;
use std::io::{self, Error, ErrorKind};
use std::io::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex, Once, ONCE_INIT};
use self::libc::c_int;
use self::mio_uds::UnixStream;
use futures::future;
use futures::sync::mpsc::{channel, Receiver, Sender};
use futures::{Async, AsyncSink, Future};
use futures::{Poll, Sink, Stream};
use tokio_reactor::{Handle, PollEvented};
use tokio_io::IoFuture;
pub use self::libc::{SIGUSR1, SIGUSR2, SIGINT, SIGTERM};
pub use self::libc::{SIGALRM, SIGHUP, SIGPIPE, SIGQUIT, SIGTRAP};
/// BSD-specific definitions
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd",
))]
pub mod bsd {
#[cfg(any(target_os = "dragonfly", target_os = "freebsd",
target_os = "macos", target_os = "netbsd",
target_os = "openbsd"))]
pub use super::libc::SIGINFO;
}
// Number of different unix signals
// (FreeBSD has 33)
const SIGNUM: usize = 33;
struct SignalInfo {
pending: AtomicBool,
// The ones interested in this signal
recipients: Mutex<Vec<Box<Sender<c_int>>>>,
init: Once,
initialized: AtomicBool,
}
struct Globals {
sender: UnixStream,
receiver: UnixStream,
signals: Vec<SignalInfo>,
}
impl Default for SignalInfo {
fn default() -> SignalInfo {
SignalInfo {
pending: AtomicBool::new(false),
init: ONCE_INIT,
initialized: AtomicBool::new(false),
recipients: Mutex::new(Vec::new()),
}
}
}
static mut GLOBALS: *mut Globals = 0 as *mut Globals;
fn globals() -> &'static Globals {
static INIT: Once = ONCE_INIT;
unsafe {
INIT.call_once(|| {
let (receiver, sender) = UnixStream::pair().unwrap();
let globals = Globals {
sender: sender,
receiver: receiver,
signals: (0..SIGNUM).map(|_| Default::default()).collect(),
};
GLOBALS = Box::into_raw(Box::new(globals));
});
&*GLOBALS
}
}
/// Our global signal handler for all signals registered by this module.
///
/// The purpose of this signal handler is to primarily:
///
/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
/// 2. Wake up driver tasks by writing a byte to a pipe
///
/// Those two operations shoudl both be async-signal safe.
fn action(slot: &SignalInfo, mut sender: &UnixStream) {
slot.pending.store(true, Ordering::SeqCst);
// Send a wakeup, ignore any errors (anything reasonably possible is
// full pipe and then it will wake up anyway).
drop(sender.write(&[1]));
}
/// Enable this module to receive signal notifications for the `signal`
/// provided.
///
/// This will register the signal handler if it hasn't already been registered,
/// returning any error along the way if that fails.
fn signal_enable(signal: c_int) -> io::Result<()> {
if signal_hook::FORBIDDEN.contains(&signal) {
return Err(Error::new(ErrorKind::Other, format!("Refusing to register signal {}", signal)));
}
let globals = globals();
let siginfo = match globals.signals.get(signal as usize) {
Some(slot) => slot,
None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")),
};
let mut registered = Ok(());
siginfo.init.call_once(|| {
registered = unsafe {
signal_hook::register(signal, move || action(siginfo, &globals.sender)).map(|_| ())
};
if registered.is_ok() {
siginfo.initialized.store(true, Ordering::Relaxed);
}
});
registered?;
// If the call_once failed, it won't be retried on the next attempt to register the signal. In
// such case it is not run, registered is still `Ok(())`, initialized is still false.
if siginfo.initialized.load(Ordering::Relaxed) {
Ok(())
} else {
Err(Error::new(ErrorKind::Other, "Failed to register signal handler"))
}
}
struct Driver {
wakeup: PollEvented<UnixStream>,
}
impl Future for Driver {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
// Drain the data from the pipe and maintain interest in getting more
self.drain();
// Broadcast any signals which were received
self.broadcast();
// This task just lives until the end of the event loop
Ok(Async::NotReady)
}
}
impl Driver {
fn new(handle: &Handle) -> io::Result<Driver> {
// NB: We give each driver a "fresh" reciever file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
// In the past we would reuse the actual receiver file descriptor and
// swallow any errors around double registration of the same descriptor.
// I'm not sure if the second (failed) registration simply doesn't end up
// receiving wake up notifications, or there could be some race condition
// when consuming readiness events, but having distinct descriptors for
// distinct PollEvented instances appears to mitigate this.
//
// Unfortunately we cannot just use a single global PollEvented instance
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
let stream = globals().receiver.try_clone()?;
let wakeup = PollEvented::new_with_handle(stream, handle)?;
Ok(Driver {
wakeup: wakeup,
})
}
/// Drain all data in the global receiver, ensuring we'll get woken up when
/// there is a write on the other end.
///
/// We do *NOT* use the existence of any read bytes as evidence a sigal was
/// received since the `pending` flags would have already been set if that
/// was the case. See #38 for more info.
fn drain(&mut self) {
loop {
match self.wakeup.read(&mut [0; 128]) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => {},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {}", e),
}
}
}
/// Go through all the signals and broadcast everything.
///
/// Driver tasks wake up for *any* signal and simply process all globally
/// registered signal streams, so each task is sort of cooperatively working
/// for all the rest as well.
fn broadcast(&self) {
for (sig, slot) in globals().signals.iter().enumerate() {
// Any signal of this kind arrived since we checked last?
if !slot.pending.swap(false, Ordering::SeqCst) {
continue;
}
let signum = sig as c_int;
let mut recipients = slot.recipients.lock().unwrap();
// Notify all waiters on this signal that the signal has been
// received. If we can't push a message into the queue then we don't
// worry about it as everything is coalesced anyway. If the channel
// has gone away then we can remove that slot.
for i in (0..recipients.len()).rev() {
// TODO: This thing probably generates unnecessary wakups of
// this task when `NotReady` is received because we don't
// actually want to get woken up to continue sending a
// message. Let's optimise it later on though, as we know
// this works.
match recipients[i].start_send(signum) {
Ok(AsyncSink::Ready) => {}
Ok(AsyncSink::NotReady(_)) => {}
Err(_) => {
recipients.swap_remove(i);
}
}
}
}
}
}
/// An implementation of `Stream` for receiving a particular type of signal.
///
/// This structure implements the `Stream` trait and represents notifications
/// of the current process receiving a particular signal. The signal being
/// listened for is passed to `Signal::new`, and the same signal number is then
/// yielded as each element for the stream.
///
/// In general signal handling on Unix is a pretty tricky topic, and this
/// structure is no exception! There are some important limitations to keep in
/// mind when using `Signal` streams:
///
/// * Signals handling in Unix already necessitates coalescing signals
/// together sometimes. This `Signal` stream is also no exception here in
/// that it will also coalesce signals. That is, even if the signal handler
/// for this process runs multiple times, the `Signal` stream may only return
/// one signal notification. Specifically, before `poll` is called, all
/// signal notifications are coalesced into one item returned from `poll`.
/// Once `poll` has been called, however, a further signal is guaranteed to
/// be yielded as an item.
///
/// Put another way, any element pulled off the returned stream corresponds to
/// *at least one* signal, but possibly more.
///
/// * Signal handling in general is relatively inefficient. Although some
/// improvements are possible in this crate, it's recommended to not plan on
/// having millions of signal channels open.
///
/// * Currently the "driver task" to process incoming signals never exits. This
/// driver task runs in the background of the event loop provided, and
/// in general you shouldn't need to worry about it.
///
/// If you've got any questions about this feel free to open an issue on the
/// repo, though, as I'd love to chat about this! In other words, I'd love to
/// alleviate some of these limitations if possible!
pub struct Signal {
driver: Driver,
signal: c_int,
// Used only as an identifier. We place the real sender into a Box, so it
// stays on the same address forever. That gives us a unique pointer, so we
// can use this to identify the sender in a Vec and delete it when we are
// dropped.
id: *const Sender<c_int>,
rx: Receiver<c_int>,
}
// The raw pointer prevents the compiler from determining it as Send
// automatically. But the only thing we use the raw pointer for is to identify
// the correct Box to delete, not manipulate any data through that.
unsafe impl Send for Signal {}
impl Signal {
/// Creates a new stream which will receive notifications when the current
/// process receives the signal `signal`.
///
/// This function will create a new stream which binds to the default event
/// loop. This function returns a future which will
/// then resolve to the signal stream, if successful.
///
/// The `Signal` stream is an infinite stream which will receive
/// notifications whenever a signal is received. More documentation can be
/// found on `Signal` itself, but to reiterate:
///
/// * Signals may be coalesced beyond what the kernel already does.
/// * Once a signal handler is registered with the process the underlying
/// libc signal handler is never unregistered.
///
/// A `Signal` stream can be created for a particular signal number
/// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification.
///
/// # Errors
///
/// * If the lower-level C functions fail for some reason.
/// * If the previous initialization of this specific signal failed.
/// * If the signal is one of
/// [`signal_hook::FORBIDDEN`](https://docs.rs/signal-hook/*/signal_hook/fn.register.html#panics)
pub fn new(signal: c_int) -> IoFuture<Signal> {
Signal::with_handle(signal, &Handle::current())
}
/// Creates a new stream which will receive notifications when the current
/// process receives the signal `signal`.
///
/// This function will create a new stream which may be based on the
/// event loop handle provided. This function returns a future which will
/// then resolve to the signal stream, if successful.
///
/// The `Signal` stream is an infinite stream which will receive
/// notifications whenever a signal is received. More documentation can be
/// found on `Signal` itself, but to reiterate:
///
/// * Signals may be coalesced beyond what the kernel already does.
/// * Once a signal handler is registered with the process the underlying
/// libc signal handler is never unregistered.
///
/// A `Signal` stream can be created for a particular signal number
/// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification.
pub fn with_handle(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
let handle = handle.clone();
Box::new(future::lazy(move || {
let result = (|| {
// Turn the signal delivery on once we are ready for it
try!(signal_enable(signal));
// Ensure there's a driver for our associated event loop processing
// signals.
let driver = try!(Driver::new(&handle));
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
let (tx, rx) = channel(1);
let tx = Box::new(tx);
let id: *const _ = &*tx;
let idx = signal as usize;
globals().signals[idx].recipients.lock().unwrap().push(tx);
Ok(Signal {
driver: driver,
rx: rx,
id: id,
signal: signal,
})
})();
future::result(result)
}))
}
}
impl Stream for Signal {
type Item = c_int;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<c_int>, io::Error> {
self.driver.poll().unwrap();
// receivers don't generate errors
self.rx.poll().map_err(|_| panic!())
}
}
impl Drop for Signal {
fn drop(&mut self) {
let idx = self.signal as usize;
let mut list = globals().signals[idx].recipients.lock().unwrap();
list.retain(|sender| &**sender as *const _ != self.id);
}
}

346
tokio-signal/src/windows.rs Normal file
View File

@ -0,0 +1,346 @@
//! Windows-specific types for signal handling.
//!
//! This module is only defined on Windows and contains the primary `Event` type
//! for receiving notifications of events. These events are listened for via the
//! `SetConsoleCtrlHandler` function which receives events of the type
//! `CTRL_C_EVENT` and `CTRL_BREAK_EVENT`
#![cfg(windows)]
extern crate mio;
extern crate winapi;
use std::cell::RefCell;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Once, ONCE_INIT};
use futures::future;
use futures::stream::Fuse;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio_reactor::{Handle, PollEvented};
use mio::Ready;
use self::winapi::shared::minwindef::*;
use self::winapi::um::wincon::*;
use IoFuture;
extern "system" {
fn SetConsoleCtrlHandler(HandlerRoutine: usize, Add: BOOL) -> BOOL;
}
static INIT: Once = ONCE_INIT;
static mut GLOBAL_STATE: *mut GlobalState = 0 as *mut _;
/// Stream of events discovered via `SetConsoleCtrlHandler`.
///
/// This structure can be used to listen for events of the type `CTRL_C_EVENT`
/// and `CTRL_BREAK_EVENT`. The `Stream` trait is implemented for this struct
/// and will resolve for each notification received by the process. Note that
/// there are few limitations with this as well:
///
/// * A notification to this process notifies *all* `Event` streams for that
/// event type.
/// * Notifications to an `Event` stream **are coalesced** if they aren't
/// processed quickly enough. This means that if two notifications are
/// received back-to-back, then the stream may only receive one item about the
/// two notifications.
pub struct Event {
reg: PollEvented<MyRegistration>,
_finished: oneshot::Sender<()>,
}
struct GlobalState {
ready: mio::SetReadiness,
tx: mpsc::UnboundedSender<Message>,
ctrl_c: GlobalEventState,
ctrl_break: GlobalEventState,
}
struct GlobalEventState {
ready: AtomicBool,
}
enum Message {
NewEvent(DWORD, oneshot::Sender<io::Result<Event>>),
}
struct DriverTask {
handle: Handle,
reg: PollEvented<MyRegistration>,
rx: Fuse<mpsc::UnboundedReceiver<Message>>,
ctrl_c: EventState,
ctrl_break: EventState,
}
struct EventState {
tasks: Vec<(RefCell<oneshot::Receiver<()>>, mio::SetReadiness)>,
}
impl Event {
/// Creates a new stream listening for the `CTRL_C_EVENT` events.
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_c() -> IoFuture<Event> {
Event::ctrl_c_handle(&Handle::current())
}
/// Creates a new stream listening for the `CTRL_C_EVENT` events.
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_c_handle(handle: &Handle) -> IoFuture<Event> {
Event::new(CTRL_C_EVENT, handle)
}
/// Creates a new stream listening for the `CTRL_BREAK_EVENT` events.
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_break() -> IoFuture<Event> {
Event::ctrl_break_handle(&Handle::current())
}
/// Creates a new stream listening for the `CTRL_BREAK_EVENT` events.
///
/// This function will register a handler via `SetConsoleCtrlHandler` and
/// deliver notifications to the returned stream.
pub fn ctrl_break_handle(handle: &Handle) -> IoFuture<Event> {
Event::new(CTRL_BREAK_EVENT, handle)
}
fn new(signum: DWORD, handle: &Handle) -> IoFuture<Event> {
let mut init = None;
INIT.call_once(|| {
init = Some(global_init(handle));
});
let new_signal = future::lazy(move || {
let (tx, rx) = oneshot::channel();
let msg = Message::NewEvent(signum, tx);
let res = unsafe { (*GLOBAL_STATE).tx.clone().unbounded_send(msg) };
res.expect(
"failed to request a new signal stream, did the \
first event loop go away?",
);
rx.then(|r| r.unwrap())
});
match init {
Some(init) => Box::new(init.into_future().and_then(|()| new_signal)),
None => Box::new(new_signal),
}
}
}
impl Stream for Event {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<()>, io::Error> {
if !self.reg.poll_read_ready(Ready::readable())?.is_ready() {
return Ok(Async::NotReady);
}
self.reg.clear_read_ready(Ready::readable())?;
self.reg
.get_ref()
.inner
.borrow()
.as_ref()
.unwrap()
.1
.set_readiness(mio::Ready::empty())
.expect("failed to set readiness");
Ok(Async::Ready(Some(())))
}
}
fn global_init(handle: &Handle) -> io::Result<()> {
let (tx, rx) = mpsc::unbounded();
let reg = MyRegistration {
inner: RefCell::new(None),
};
let reg = try!(PollEvented::new_with_handle(reg, handle));
let ready = reg.get_ref().inner.borrow().as_ref().unwrap().1.clone();
unsafe {
let state = Box::new(GlobalState {
ready: ready,
ctrl_c: GlobalEventState {
ready: AtomicBool::new(false),
},
ctrl_break: GlobalEventState {
ready: AtomicBool::new(false),
},
tx: tx,
});
GLOBAL_STATE = Box::into_raw(state);
let rc = SetConsoleCtrlHandler(handler as usize, TRUE);
if rc == 0 {
Box::from_raw(GLOBAL_STATE);
GLOBAL_STATE = 0 as *mut _;
return Err(io::Error::last_os_error());
}
::tokio_executor::spawn(Box::new(DriverTask {
handle: handle.clone(),
rx: rx.fuse(),
reg: reg,
ctrl_c: EventState { tasks: Vec::new() },
ctrl_break: EventState { tasks: Vec::new() },
}));
Ok(())
}
}
impl Future for DriverTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.check_event_drops();
self.check_messages();
self.check_events().unwrap();
// TODO: when to finish this task?
Ok(Async::NotReady)
}
}
impl DriverTask {
fn check_event_drops(&mut self) {
self.ctrl_c
.tasks
.retain(|task| !task.0.borrow_mut().poll().is_err());
self.ctrl_break
.tasks
.retain(|task| !task.0.borrow_mut().poll().is_err());
}
fn check_messages(&mut self) {
loop {
// Acquire the next message
let message = match self.rx.poll().unwrap() {
Async::Ready(Some(e)) => e,
Async::Ready(None) | Async::NotReady => break,
};
let (sig, complete) = match message {
Message::NewEvent(sig, complete) => (sig, complete),
};
let event = if sig == CTRL_C_EVENT {
&mut self.ctrl_c
} else {
&mut self.ctrl_break
};
// Acquire the (registration, set_readiness) pair by... assuming
// we're on the event loop (true because of the spawn above).
let reg = MyRegistration {
inner: RefCell::new(None),
};
let reg = match PollEvented::new_with_handle(reg, &self.handle) {
Ok(reg) => reg,
Err(e) => {
drop(complete.send(Err(e)));
continue;
}
};
// Create the `Event` to pass back and then also keep a handle to
// the `SetReadiness` for ourselves internally.
let (tx, rx) = oneshot::channel();
let ready = reg.get_ref().inner.borrow_mut().as_mut().unwrap().1.clone();
drop(complete.send(Ok(Event {
reg: reg,
_finished: tx,
})));
event.tasks.push((RefCell::new(rx), ready));
}
}
fn check_events(&mut self) -> io::Result<()> {
if self.reg.poll_read_ready(Ready::readable())?.is_not_ready() {
return Ok(());
}
self.reg.clear_read_ready(Ready::readable())?;
self.reg
.get_ref()
.inner
.borrow()
.as_ref()
.unwrap()
.1
.set_readiness(mio::Ready::empty())
.unwrap();
if unsafe { (*GLOBAL_STATE).ctrl_c.ready.swap(false, Ordering::SeqCst) } {
for task in self.ctrl_c.tasks.iter() {
task.1.set_readiness(mio::Ready::readable()).unwrap();
}
}
if unsafe {
(*GLOBAL_STATE)
.ctrl_break
.ready
.swap(false, Ordering::SeqCst)
} {
for task in self.ctrl_break.tasks.iter() {
task.1.set_readiness(mio::Ready::readable()).unwrap();
}
}
Ok(())
}
}
unsafe extern "system" fn handler(ty: DWORD) -> BOOL {
let event = match ty {
CTRL_C_EVENT => &(*GLOBAL_STATE).ctrl_c,
CTRL_BREAK_EVENT => &(*GLOBAL_STATE).ctrl_break,
_ => return FALSE,
};
if event.ready.swap(true, Ordering::SeqCst) {
FALSE
} else {
drop((*GLOBAL_STATE).ready.set_readiness(mio::Ready::readable()));
// TODO: this will report that we handled a CTRL_BREAK_EVENT when in
// fact we may not have any streams actually created for that
// event.
TRUE
}
}
struct MyRegistration {
inner: RefCell<Option<(mio::Registration, mio::SetReadiness)>>,
}
impl mio::Evented for MyRegistration {
fn register(
&self,
poll: &mio::Poll,
token: mio::Token,
events: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
let reg = mio::Registration::new2();
reg.0.register(poll, token, events, opts)?;
*self.inner.borrow_mut() = Some(reg);
Ok(())
}
fn reregister(
&self,
_poll: &mio::Poll,
_token: mio::Token,
_events: mio::Ready,
_opts: mio::PollOpt,
) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _poll: &mio::Poll) -> io::Result<()> {
Ok(())
}
}

View File

@ -0,0 +1,39 @@
#![cfg(unix)]
extern crate libc;
pub mod support;
use support::*;
const TEST_SIGNAL: libc::c_int = libc::SIGUSR1;
#[test]
fn dropping_loops_does_not_cause_starvation() {
let (mut rt, signal) = {
let mut first_rt = CurrentThreadRuntime::new()
.expect("failed to init first runtime");
let first_signal = run_with_timeout(&mut first_rt, Signal::new(TEST_SIGNAL))
.expect("failed to register first signal");
let mut second_rt = CurrentThreadRuntime::new()
.expect("failed to init second runtime");
let second_signal = run_with_timeout(&mut second_rt, Signal::new(TEST_SIGNAL))
.expect("failed to register second signal");
drop(first_rt);
drop(first_signal);
(second_rt, second_signal)
};
send_signal(TEST_SIGNAL);
let signal_future = signal.into_future()
.map_err(|(e, _)| e);
run_with_timeout(&mut rt, signal_future)
.expect("failed to get signal");
}

View File

@ -0,0 +1,28 @@
#![cfg(unix)]
extern crate libc;
pub mod support;
use support::*;
#[test]
fn drop_then_get_a_signal() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let signal = run_core_with_timeout(&mut lp, Signal::with_handle(
libc::SIGUSR1,
&handle.new_tokio_handle(),
)).expect("failed to create first signal");
drop(signal);
send_signal(libc::SIGUSR1);
let signal = lp.run(Signal::with_handle(libc::SIGUSR1, &handle.new_tokio_handle()))
.expect("failed to create signal")
.into_future()
.map(|_| ())
.map_err(|(e, _)| panic!("{}", e));
run_core_with_timeout(&mut lp, signal)
.expect("failed to get signal");
}

View File

@ -0,0 +1,32 @@
#![cfg(unix)]
extern crate libc;
pub mod support;
use support::*;
const TEST_SIGNAL: libc::c_int = libc::SIGUSR1;
#[test]
fn dropping_signal_does_not_deregister_any_other_instances() {
// NB: Deadline requires a timer registration which is provided by
// tokio's `current_thread::Runtime`, but isn't available by just using
// tokio's default CurrentThread executor which powers `current_thread::block_on_all`.
let mut rt = CurrentThreadRuntime::new()
.expect("failed to init runtime");
// NB: Testing for issue #38: signals should not starve based on ordering
let first_duplicate_signal = run_with_timeout(&mut rt, Signal::new(TEST_SIGNAL))
.expect("failed to register first duplicate signal");
let signal = run_with_timeout(&mut rt, Signal::new(TEST_SIGNAL))
.expect("failed to register signal");
let second_duplicate_signal = run_with_timeout(&mut rt, Signal::new(TEST_SIGNAL))
.expect("failed to register second duplicate signal");
drop(first_duplicate_signal);
drop(second_duplicate_signal);
send_signal(TEST_SIGNAL);
run_with_timeout(&mut rt, signal.into_future().map_err(|(e, _)| e))
.expect("failed to get signal");
}

View File

@ -0,0 +1,40 @@
#![cfg(unix)]
extern crate libc;
use std::sync::mpsc::channel;
use std::thread;
pub mod support;
use support::*;
#[test]
fn multi_loop() {
// An "ordinary" (non-future) channel
let (sender, receiver) = channel();
// Run multiple times, to make sure there are no race conditions
for _ in 0..10 {
// Run multiple event loops, each one in its own thread
let threads: Vec<_> = (0..4)
.map(|_| {
let sender = sender.clone();
thread::spawn(move || {
let mut lp = Core::new().unwrap();
let signal = lp.run(Signal::new(libc::SIGHUP)).unwrap();
sender.send(()).unwrap();
run_core_with_timeout(&mut lp, signal.into_future()).ok().unwrap();
})
})
.collect();
// Wait for them to declare they're ready
for &_ in threads.iter() {
receiver.recv().unwrap();
}
// Send a signal
send_signal(libc::SIGHUP);
// Make sure the threads terminated correctly
for t in threads {
t.join().unwrap();
}
}
}

View File

@ -0,0 +1,27 @@
#![cfg(unix)]
extern crate libc;
pub mod support;
use support::*;
#[test]
fn notify_both() {
let mut lp = Core::new().unwrap();
let handle = lp.handle();
let signal1 = run_core_with_timeout(&mut lp, Signal::with_handle(
libc::SIGUSR2,
&handle.new_tokio_handle(),
)).expect("failed to create signal1");
let signal2 = run_core_with_timeout(&mut lp, Signal::with_handle(
libc::SIGUSR2,
&handle.new_tokio_handle(),
)).expect("failed to create signal2");
send_signal(libc::SIGUSR2);
run_core_with_timeout(&mut lp, signal1.into_future().join(signal2.into_future()))
.ok()
.expect("failed to create signal2");
}

View File

@ -0,0 +1,20 @@
#![cfg(unix)]
extern crate libc;
pub mod support;
use support::*;
#[test]
fn tokio_simple() {
let signal_future = Signal::new(libc::SIGUSR1)
.and_then(|signal| {
send_signal(libc::SIGUSR1);
signal.into_future().map(|_| ()).map_err(|(err, _)| err)
});
let mut rt = CurrentThreadRuntime::new()
.expect("failed to init runtime");
run_with_timeout(&mut rt, signal_future)
.expect("failed");
}

View File

@ -0,0 +1,20 @@
#![cfg(unix)]
extern crate libc;
pub mod support;
use support::*;
#[test]
fn simple() {
let mut lp = Core::new().unwrap();
let signal = run_core_with_timeout(&mut lp, Signal::new(libc::SIGUSR1))
.expect("failed to create signal");
send_signal(libc::SIGUSR1);
run_core_with_timeout(&mut lp, signal.into_future())
.ok()
.expect("failed to get signal");
}

View File

@ -0,0 +1,50 @@
extern crate libc;
extern crate futures;
extern crate tokio;
extern crate tokio_core;
extern crate tokio_signal;
use self::libc::{c_int, getpid, kill};
use std::time::{Duration, Instant};
use self::tokio::timer::Deadline;
use self::tokio_core::reactor::Timeout;
pub use self::futures::{Future, Stream};
pub use self::tokio_core::reactor::Core;
pub use self::tokio::runtime::current_thread::Runtime as CurrentThreadRuntime;
pub use self::tokio_signal::unix::Signal;
pub fn run_core_with_timeout<F>(lp: &mut Core, future: F) -> Result<F::Item, F::Error>
where F: Future
{
let timeout = Timeout::new(Duration::from_secs(1), &lp.handle())
.expect("failed to register timeout")
.map(|()| panic!("timeout exceeded"))
.map_err(|e| panic!("timeout error: {}", e));
lp.run(future.select(timeout))
.map(|(r, _)| r)
.map_err(|(e, _)| e)
}
pub fn run_with_timeout<F>(rt: &mut CurrentThreadRuntime, future: F) -> Result<F::Item, F::Error>
where F: Future
{
let deadline = Deadline::new(future, Instant::now() + Duration::from_secs(1))
.map_err(|e| if e.is_timer() {
panic!("failed to register timer");
} else if e.is_elapsed() {
panic!("timed out")
} else {
e.into_inner().expect("missing inner error")
});
rt.block_on(deadline)
}
#[cfg(unix)]
pub fn send_signal(signal: c_int) {
unsafe {
assert_eq!(kill(getpid(), signal), 0);
}
}

View File

@ -0,0 +1,19 @@
#![cfg(unix)]
extern crate libc;
pub mod support;
use support::*;
#[test]
fn twice() {
let mut lp = Core::new().unwrap();
let signal = run_core_with_timeout(&mut lp, Signal::new(libc::SIGUSR1)).unwrap();
send_signal(libc::SIGUSR1);
let (num, signal) = run_core_with_timeout(&mut lp, signal.into_future()).ok().unwrap();
assert_eq!(num, Some(libc::SIGUSR1));
send_signal(libc::SIGUSR1);
run_core_with_timeout(&mut lp, signal.into_future()).ok().unwrap();
}