signal: Update the windows implementation to work with tokio

BREAKING CHANGE

`ctrl_c` now takes a `tokio_reactor::Handle`
This commit is contained in:
Markus Westerlind 2018-05-03 21:19:58 +02:00 committed by Carl Lerche
parent 2848df9b6c
commit e97e8cb7fe
4 changed files with 33 additions and 23 deletions

View File

@ -18,13 +18,17 @@ appveyor = { repository = "alexcrichton/tokio-signal" }
[dependencies]
futures = "0.1.11"
mio = "0.6.5"
tokio-core = "0.1.6"
tokio-reactor = "0.1.0"
tokio-executor = "0.1.0"
tokio-io = "0.1"
[target.'cfg(unix)'.dependencies]
libc = "0.2"
mio-uds = "0.6"
[dev-dependencies]
tokio-core = "0.1.17"
[target.'cfg(windows)'.dependencies.winapi]
version = "0.3"
features = ["minwindef", "wincon"]

View File

@ -19,7 +19,7 @@ fn main() {
// the `flatten_stream()` convenience method lazily defers that
// initialisation, allowing us to use it 'as if' it is already the
// stream we want, reducing boilerplate Future-handling.
let endless_stream = tokio_signal::ctrl_c(&core.handle()).flatten_stream();
let endless_stream = tokio_signal::ctrl_c(&core.handle().new_tokio_handle()).flatten_stream();
// don't keep going forever: convert the endless stream to a bounded one.
let limited_stream = endless_stream.take(STOP_AFTER);

View File

@ -79,14 +79,16 @@
#![deny(missing_docs)]
extern crate futures;
extern crate tokio_core;
extern crate mio;
extern crate tokio_executor;
extern crate tokio_io;
extern crate tokio_reactor;
use std::io;
use futures::stream::Stream;
use futures::Future;
use tokio_core::reactor::Handle;
use futures::{future, Future};
use tokio_reactor::Handle;
pub mod unix;
pub mod windows;
@ -121,9 +123,11 @@ pub fn ctrl_c(handle: &Handle) -> IoFuture<IoStream<()>> {
#[cfg(windows)]
fn ctrl_c_imp(handle: &Handle) -> IoFuture<IoStream<()>> {
Box::new(
windows::Event::ctrl_c(handle)
.map(|x| Box::new(x) as Box<Stream<Item = _, Error = _> + Send>),
)
let handle = handle.clone();
// Use lazy to ensure that `ctrl_c` gets called while on an event loop
Box::new(future::lazy(move || {
windows::Event::ctrl_c(&handle)
.map(|x| Box::new(x) as Box<Stream<Item = _, Error = _> + Send>)
}))
}
}

View File

@ -15,14 +15,15 @@ use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Once, ONCE_INIT};
use self::winapi::shared::minwindef::*;
use self::winapi::um::wincon::*;
use futures::future;
use futures::stream::Fuse;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio_core::reactor::{Handle, PollEvented};
use tokio_reactor::{Handle, PollEvented};
use mio::Ready;
use self::winapi::shared::minwindef::*;
use self::winapi::um::wincon::*;
use IoFuture;
@ -122,10 +123,10 @@ impl Stream for Event {
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<()>, io::Error> {
if !self.reg.poll_read().is_ready() {
if !self.reg.poll_read_ready(Ready::readable())?.is_ready() {
return Ok(Async::NotReady);
}
self.reg.need_read();
self.reg.clear_read_ready(Ready::readable())?;
self.reg
.get_ref()
.inner
@ -144,7 +145,7 @@ fn global_init(handle: &Handle) -> io::Result<()> {
let reg = MyRegistration {
inner: RefCell::new(None),
};
let reg = try!(PollEvented::new(reg, handle));
let reg = try!(PollEvented::new_with_handle(reg, handle));
let ready = reg.get_ref().inner.borrow().as_ref().unwrap().1.clone();
unsafe {
let state = Box::new(GlobalState {
@ -166,13 +167,13 @@ fn global_init(handle: &Handle) -> io::Result<()> {
return Err(io::Error::last_os_error());
}
handle.spawn(DriverTask {
::tokio_executor::spawn(Box::new(DriverTask {
handle: handle.clone(),
rx: rx.fuse(),
reg: reg,
ctrl_c: EventState { tasks: Vec::new() },
ctrl_break: EventState { tasks: Vec::new() },
});
}));
Ok(())
}
@ -185,7 +186,7 @@ impl Future for DriverTask {
fn poll(&mut self) -> Poll<(), ()> {
self.check_event_drops();
self.check_messages();
self.check_events();
self.check_events().unwrap();
// TODO: when to finish this task?
Ok(Async::NotReady)
@ -224,7 +225,7 @@ impl DriverTask {
let reg = MyRegistration {
inner: RefCell::new(None),
};
let reg = match PollEvented::new(reg, &self.handle) {
let reg = match PollEvented::new_with_handle(reg, &self.handle) {
Ok(reg) => reg,
Err(e) => {
drop(complete.send(Err(e)));
@ -244,11 +245,11 @@ impl DriverTask {
}
}
fn check_events(&mut self) {
if self.reg.poll_read().is_not_ready() {
return;
fn check_events(&mut self) -> io::Result<()> {
if self.reg.poll_read_ready(Ready::readable())?.is_not_ready() {
return Ok(());
}
self.reg.need_read();
self.reg.clear_read_ready(Ready::readable())?;
self.reg
.get_ref()
.inner
@ -274,6 +275,7 @@ impl DriverTask {
task.1.set_readiness(mio::Ready::readable()).unwrap();
}
}
Ok(())
}
}