From e97e8cb7fe42e7b07f3b220e082a7fdbad38bc1c Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Thu, 3 May 2018 21:19:58 +0200 Subject: [PATCH] signal: Update the windows implementation to work with tokio BREAKING CHANGE `ctrl_c` now takes a `tokio_reactor::Handle` --- Cargo.toml | 6 +++++- examples/ctrl-c.rs | 2 +- src/lib.rs | 18 +++++++++++------- src/windows.rs | 30 ++++++++++++++++-------------- 4 files changed, 33 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index da8c81272..b7829fe4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,13 +18,17 @@ appveyor = { repository = "alexcrichton/tokio-signal" } [dependencies] futures = "0.1.11" mio = "0.6.5" -tokio-core = "0.1.6" +tokio-reactor = "0.1.0" +tokio-executor = "0.1.0" tokio-io = "0.1" [target.'cfg(unix)'.dependencies] libc = "0.2" mio-uds = "0.6" +[dev-dependencies] +tokio-core = "0.1.17" + [target.'cfg(windows)'.dependencies.winapi] version = "0.3" features = ["minwindef", "wincon"] diff --git a/examples/ctrl-c.rs b/examples/ctrl-c.rs index ff2786f0e..e650506dd 100644 --- a/examples/ctrl-c.rs +++ b/examples/ctrl-c.rs @@ -19,7 +19,7 @@ fn main() { // the `flatten_stream()` convenience method lazily defers that // initialisation, allowing us to use it 'as if' it is already the // stream we want, reducing boilerplate Future-handling. - let endless_stream = tokio_signal::ctrl_c(&core.handle()).flatten_stream(); + let endless_stream = tokio_signal::ctrl_c(&core.handle().new_tokio_handle()).flatten_stream(); // don't keep going forever: convert the endless stream to a bounded one. let limited_stream = endless_stream.take(STOP_AFTER); diff --git a/src/lib.rs b/src/lib.rs index 93a284f95..a3423b0d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,14 +79,16 @@ #![deny(missing_docs)] extern crate futures; -extern crate tokio_core; +extern crate mio; +extern crate tokio_executor; extern crate tokio_io; +extern crate tokio_reactor; use std::io; use futures::stream::Stream; -use futures::Future; -use tokio_core::reactor::Handle; +use futures::{future, Future}; +use tokio_reactor::Handle; pub mod unix; pub mod windows; @@ -121,9 +123,11 @@ pub fn ctrl_c(handle: &Handle) -> IoFuture> { #[cfg(windows)] fn ctrl_c_imp(handle: &Handle) -> IoFuture> { - Box::new( - windows::Event::ctrl_c(handle) - .map(|x| Box::new(x) as Box + Send>), - ) + let handle = handle.clone(); + // Use lazy to ensure that `ctrl_c` gets called while on an event loop + Box::new(future::lazy(move || { + windows::Event::ctrl_c(&handle) + .map(|x| Box::new(x) as Box + Send>) + })) } } diff --git a/src/windows.rs b/src/windows.rs index 436dda519..c3cf505ab 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -15,14 +15,15 @@ use std::io; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Once, ONCE_INIT}; -use self::winapi::shared::minwindef::*; -use self::winapi::um::wincon::*; use futures::future; use futures::stream::Fuse; use futures::sync::mpsc; use futures::sync::oneshot; use futures::{Async, Future, IntoFuture, Poll, Stream}; -use tokio_core::reactor::{Handle, PollEvented}; +use tokio_reactor::{Handle, PollEvented}; +use mio::Ready; +use self::winapi::shared::minwindef::*; +use self::winapi::um::wincon::*; use IoFuture; @@ -122,10 +123,10 @@ impl Stream for Event { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { - if !self.reg.poll_read().is_ready() { + if !self.reg.poll_read_ready(Ready::readable())?.is_ready() { return Ok(Async::NotReady); } - self.reg.need_read(); + self.reg.clear_read_ready(Ready::readable())?; self.reg .get_ref() .inner @@ -144,7 +145,7 @@ fn global_init(handle: &Handle) -> io::Result<()> { let reg = MyRegistration { inner: RefCell::new(None), }; - let reg = try!(PollEvented::new(reg, handle)); + let reg = try!(PollEvented::new_with_handle(reg, handle)); let ready = reg.get_ref().inner.borrow().as_ref().unwrap().1.clone(); unsafe { let state = Box::new(GlobalState { @@ -166,13 +167,13 @@ fn global_init(handle: &Handle) -> io::Result<()> { return Err(io::Error::last_os_error()); } - handle.spawn(DriverTask { + ::tokio_executor::spawn(Box::new(DriverTask { handle: handle.clone(), rx: rx.fuse(), reg: reg, ctrl_c: EventState { tasks: Vec::new() }, ctrl_break: EventState { tasks: Vec::new() }, - }); + })); Ok(()) } @@ -185,7 +186,7 @@ impl Future for DriverTask { fn poll(&mut self) -> Poll<(), ()> { self.check_event_drops(); self.check_messages(); - self.check_events(); + self.check_events().unwrap(); // TODO: when to finish this task? Ok(Async::NotReady) @@ -224,7 +225,7 @@ impl DriverTask { let reg = MyRegistration { inner: RefCell::new(None), }; - let reg = match PollEvented::new(reg, &self.handle) { + let reg = match PollEvented::new_with_handle(reg, &self.handle) { Ok(reg) => reg, Err(e) => { drop(complete.send(Err(e))); @@ -244,11 +245,11 @@ impl DriverTask { } } - fn check_events(&mut self) { - if self.reg.poll_read().is_not_ready() { - return; + fn check_events(&mut self) -> io::Result<()> { + if self.reg.poll_read_ready(Ready::readable())?.is_not_ready() { + return Ok(()); } - self.reg.need_read(); + self.reg.clear_read_ready(Ready::readable())?; self.reg .get_ref() .inner @@ -274,6 +275,7 @@ impl DriverTask { task.1.set_readiness(mio::Ready::readable()).unwrap(); } } + Ok(()) } }