diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index b913748f1..4c50d3b13 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -5,6 +5,7 @@ //! futures, schedule tasks, issue I/O requests, etc. use std::cell::RefCell; +use std::cmp; use std::io::{self, ErrorKind}; use std::mem; use std::rc::{Rc, Weak}; @@ -226,6 +227,18 @@ impl Core { res.expect("run should not return until future is done") } + /// Performs one iteration of the event loop, blocking on waiting for events + /// for at most `max_wait` (forever if `None`). + /// + /// It only makes sense to call this method if you've previously spawned + /// a future onto this event loop. + /// + /// `loop { lp.turn(None) }` is equivalent to calling `run` with an + /// empty future (one that never finishes). + pub fn turn(&mut self, max_wait: Option) { + self.poll_internal(max_wait, &mut || true).expect("Error in event loop turn"); + } + fn _run(&mut self, done: &mut FnMut() -> bool) { // Check to see if we're done immediately, if so we shouldn't do any // work. @@ -235,60 +248,70 @@ impl Core { let mut finished = false; while !finished { - let amt; - // On Linux, Poll::poll is epoll_wait, which may return EINTR if a - // ptracer attaches. This retry loop prevents crashing when - // attaching strace, or similar. - let start = Instant::now(); - loop { - let inner = self.inner.borrow_mut(); - let timeout = inner.timer_heap.peek().map(|t| { - if t.0 < start { - Duration::new(0, 0) - } else { - t.0 - start - } - }); - match inner.io.poll(&mut self.events, timeout) { - Ok(a) => { - amt = a; - break; - } - Err(ref e) if e.kind() == ErrorKind::Interrupted => {} - err @ Err(_) => { - err.unwrap(); - } - } - } - debug!("loop poll - {:?}", start.elapsed()); - debug!("loop time - {:?}", Instant::now()); - - // First up, process all timeouts that may have just occurred. - let start = Instant::now(); - self.consume_timeouts(start); - - // Next, process all the events that came in. - for i in 0..self.events.len() { - let event = self.events.get(i).unwrap(); - let token = event.token(); - trace!("event {:?} {:?}", event.kind(), event.token()); - - if token == TOKEN_MESSAGES { - CURRENT_LOOP.set(&self, || self.consume_queue()); - } else if token == TOKEN_FUTURE { - self.future_readiness.0.set_readiness(mio::Ready::none()).unwrap(); - if !finished && CURRENT_LOOP.set(self, || done()) { - finished = true; - } - } else { - self.dispatch(token, event.kind()); - } - } - - debug!("loop process - {} events, {:?}", amt, start.elapsed()); + finished = self.poll_internal(None, done).expect("Error in event loop") } } + fn poll_internal(&mut self, max_wait: Option, done: &mut FnMut() -> bool) -> io::Result { + let amt; + let start = Instant::now(); + { + let inner = self.inner.borrow_mut(); + let timeout = inner.timer_heap.peek().map(|t| { + if t.0 < start { + Duration::new(0, 0) + } else { + t.0 - start + } + }); + let timeout = if let (Some(d1), Some(d2)) = (max_wait, timeout) { + Some(cmp::min(d1, d2)) + } else { + max_wait.or(timeout) + }; + match inner.io.poll(&mut self.events, timeout) { + Ok(a) => { + amt = a; + } + // On Linux, Poll::poll is epoll_wait, which may return EINTR if a + // ptracer attaches. + Err(ref e) if e.kind() == ErrorKind::Interrupted => { + return Ok(false) + }, + Err(e) => { + return Err(e) + }, + } + } + debug!("loop poll - {:?}", start.elapsed()); + debug!("loop time - {:?}", Instant::now()); + + // First up, process all timeouts that may have just occurred. + let start = Instant::now(); + self.consume_timeouts(start); + + // Next, process all the events that came in. + let mut finished = false; + for i in 0..self.events.len() { + let event = self.events.get(i).unwrap(); + let token = event.token(); + trace!("event {:?} {:?}", event.kind(), event.token()); + + if token == TOKEN_MESSAGES { + CURRENT_LOOP.set(&self, || self.consume_queue()); + } else if token == TOKEN_FUTURE { + self.future_readiness.0.set_readiness(mio::Ready::none()).unwrap(); + if !finished && CURRENT_LOOP.set(self, || done()) { + finished = true; + } + } else { + self.dispatch(token, event.kind()); + } + } + debug!("loop process - {} events, {:?}", amt, start.elapsed()); + Ok(finished) + } + fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { let token = usize::from(token) - TOKEN_START; if token % 2 == 0 { diff --git a/tests/spawn.rs b/tests/spawn.rs index 1d39543d5..8fd4599d5 100644 --- a/tests/spawn.rs +++ b/tests/spawn.rs @@ -2,6 +2,9 @@ extern crate tokio_core; extern crate env_logger; extern crate futures; +use std::time::Duration; +use std::sync::mpsc; + use futures::Future; use tokio_core::reactor::Core; @@ -26,6 +29,30 @@ fn simple() { assert_eq!(lp.run(rx1.join(rx2)).unwrap(), (1, 2)); } +#[test] +fn simple_core_poll() { + drop(env_logger::init()); + let mut lp = Core::new().unwrap(); + + let (tx, rx) = mpsc::channel(); + let (tx1, tx2) = (tx.clone(), tx.clone()); + + lp.turn(Some(Duration::new(0, 0))); + lp.handle().spawn(futures::lazy(move || { + tx1.send(1).unwrap(); + Ok(()) + })); + lp.turn(Some(Duration::new(0, 0))); + lp.handle().spawn(futures::lazy(move || { + tx2.send(2).unwrap(); + Ok(()) + })); + assert_eq!(rx.try_recv().unwrap(), 1); + assert!(rx.try_recv().is_err()); + lp.turn(Some(Duration::new(0, 0))); + assert_eq!(rx.try_recv().unwrap(), 2); +} + #[test] fn spawn_in_poll() { drop(env_logger::init());