diff --git a/src/event_loop.rs b/src/event_loop.rs index 70f2cfd92..d04a3e4d2 100644 --- a/src/event_loop.rs +++ b/src/event_loop.rs @@ -201,7 +201,26 @@ impl Loop { /// Runs a future until completion, driving the event loop while we're /// otherwise waiting for the future to complete. /// - /// Returns the value that the future resolves to. + /// This function will begin executing the event loop and will finish once + /// the provided future is resolve. Note that the future argument here + /// crucially does not require the `'static` nor `Send` bounds. As a result + /// the future will be "pinned" to not only this thread but also this stack + /// frame. + /// + /// This function will returns the value that the future resolves to once + /// the future has finished. If the future never resolves then this function + /// will never return. + /// + /// # Panics + /// + /// This method will **not** catch panics from polling the future `f`. If + /// the future panics then it's the responsibility of the caller to catch + /// that panic and handle it as appropriate. + /// + /// Similarly, becuase the provided future will be pinned not only to this + /// thread but also to this task, any attempt to poll the future on a + /// separate thread will result in a panic. That is, calls to + /// `task::poll_on` must be avoided. pub fn run(&mut self, mut f: F) -> Result where F: Future, { @@ -214,9 +233,19 @@ impl Loop { } } + // First up, create the task that will drive this future. The task here + // isn't a "normal task" but rather one where we define what to do when + // a readiness notification comes in. + // + // We translate readiness notifications to a `set_readiness` of our + // `future_readiness` structure we have stored internally. let mut task = Task::new_notify(MyNotify(self.future_readiness.clone())); let ready = self.future_readiness.clone(); + // Next, move all that data into a dynamically dispatched closure to cut + // down on monomorphization costs. Inside this closure we unset the + // readiness of the future (as we're about to poll it) and then we check + // to see if it's done. If it's not then the event loop will turn again. let mut res = None; self._run(&mut || { ready.set_readiness(mio::EventSet::none()) @@ -233,7 +262,13 @@ impl Loop { } fn _run(&mut self, done: &mut FnMut() -> bool) { - while CURRENT_LOOP.set(self, || !done()) { + // Check to see if we're done immediately, if so we shouldn't do any + // work. + if CURRENT_LOOP.set(self, || done()) { + return + } + + loop { let amt; // On Linux, Poll::poll is epoll_wait, which may return EINTR if a // ptracer attaches. This retry loop prevents crashing when @@ -282,7 +317,9 @@ impl Loop { }); continue } else if token == 1 { - debug!("ZOMG IT'S HERE"); + if CURRENT_LOOP.set(self, || done()) { + return + } continue } @@ -320,8 +357,6 @@ impl Loop { debug!("loop process - {} events, {:?}", amt, start.elapsed()); } - - debug!("loop is done!"); } fn consume_timeouts(&mut self, now: Instant) { @@ -1100,6 +1135,7 @@ impl Source { /// The event loop will fill in this information and then inform futures /// that they're ready to go with the `schedule` method, and then the `poll` /// method can use this to figure out what happened. + // TODO: shouldn't return a usize here, but rather some kind of newtype pub fn take_readiness(&self) -> usize { self.readiness.swap(0, Ordering::SeqCst) } diff --git a/src/readiness_stream.rs b/src/readiness_stream.rs index 93e1eb09c..3b48f3131 100644 --- a/src/readiness_stream.rs +++ b/src/readiness_stream.rs @@ -12,12 +12,15 @@ use event_loop::{IoSource, LoopHandle, AddSource}; /// associated with a specific event loop and source of events that will be /// registered with an event loop. /// -/// Currently readiness streams have "edge" semantics. That is, if a stream -/// receives a readable notification it will not receive another readable -/// notification until all bytes have been read from the stream. +/// Each readiness stream has a number of methods to test whether the underlying +/// object is readable or writable. Once the methods return that an object is +/// readable/writable, then it will continue to do so until the `need_read` or +/// `need_write` methods are called. /// -/// Note that the precise semantics of when notifications are received will -/// likely be configurable in the future. +/// That is, this object is typically wrapped in another form of I/O object. +/// It's the responsibility of the wrapper to inform the readiness stream when a +/// "would block" I/O event is seen. The readiness stream will then take care of +/// any scheduling necessary to get notified when the event is ready again. pub struct ReadinessStream { io_token: usize, loop_handle: LoopHandle, @@ -47,6 +50,12 @@ impl ReadinessStream { } /// Tests to see if this source is ready to be read from or not. + /// + /// If this stream is not ready for a read then `NotReady` will be returned + /// and the current task will be scheduled to receive a notification when + /// the stream is readable again. In other words, this method is only safe + /// to call from within the context of a future's task, typically done in a + /// `Future::poll` method. pub fn poll_read(&self) -> Poll<(), io::Error> { if self.readiness.load(Ordering::SeqCst) & 1 != 0 { return Poll::Ok(()) @@ -61,6 +70,12 @@ impl ReadinessStream { } /// Tests to see if this source is ready to be written to or not. + /// + /// If this stream is not ready for a write then `NotReady` will be returned + /// and the current task will be scheduled to receive a notification when + /// the stream is writable again. In other words, this method is only safe + /// to call from within the context of a future's task, typically done in a + /// `Future::poll` method. pub fn poll_write(&self) -> Poll<(), io::Error> { if self.readiness.load(Ordering::SeqCst) & 2 != 0 { return Poll::Ok(()) @@ -74,13 +89,33 @@ impl ReadinessStream { } } - /// Tests to see if this source is ready to be read from or not. + /// Indicates to this source of events that the corresponding I/O object is + /// no longer readable, but it needs to be. + /// + /// This function, like `poll_read`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// readable, typically because a "would block" error was seen. + /// + /// The flag indicating that this stream is readable is unset and the + /// current task is scheduled to receive a notification when the stream is + /// then again readable. pub fn need_read(&self) { self.readiness.fetch_and(!1, Ordering::SeqCst); self.loop_handle.schedule_read(self.io_token); } - /// Tests to see if this source is ready to be written to or not. + /// Indicates to this source of events that the corresponding I/O object is + /// no longer writable, but it needs to be. + /// + /// This function, like `poll_write`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// writable, typically because a "would block" error was seen. + /// + /// The flag indicating that this stream is writable is unset and the + /// current task is scheduled to receive a notification when the stream is + /// then again writable. pub fn need_write(&self) { self.readiness.fetch_and(!2, Ordering::SeqCst); self.loop_handle.schedule_write(self.io_token); diff --git a/src/tcp.rs b/src/tcp.rs index e07063351..5993ef7f0 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -227,11 +227,21 @@ impl TcpStream { } /// Test whether this socket is ready to be read or not. + /// + /// If the socket is *not* readable then the current task is scheduled to + /// get a notification when the socket does become readable. That is, this + /// is only suitable for calling in a `Future::poll` method and will + /// automatically handle ensuring a retry once the socket is readable again. pub fn poll_read(&self) -> Poll<(), io::Error> { self.ready.poll_read() } /// Test whether this socket is writey to be written to or not. + /// + /// If the socket is *not* writable then the current task is scheduled to + /// get a notification when the socket does become writable. That is, this + /// is only suitable for calling in a `Future::poll` method and will + /// automatically handle ensuring a retry once the socket is writable again. pub fn poll_write(&self) -> Poll<(), io::Error> { self.ready.poll_write() } @@ -303,7 +313,6 @@ impl Read for TcpStream { if is_wouldblock(&r) { self.ready.need_read(); } - trace!("read[{:p}] {:?} on {:?}", self, r, self.source.io()); return r } } @@ -314,7 +323,6 @@ impl Write for TcpStream { if is_wouldblock(&r) { self.ready.need_write(); } - trace!("write[{:p}] {:?} on {:?}", self, r, self.source.io()); return r } fn flush(&mut self) -> io::Result<()> { diff --git a/src/timeout.rs b/src/timeout.rs index 582a71fc6..93199be15 100644 --- a/src/timeout.rs +++ b/src/timeout.rs @@ -54,8 +54,6 @@ impl Future for Timeout { if *self.token.when() <= now { Poll::Ok(()) } else { - trace!("waiting for a timeout at {:?}", self.token.when()); - trace!("current time is {:?}", now); self.handle.update_timeout(&self.token); Poll::NotReady } diff --git a/src/udp.rs b/src/udp.rs index bc71ec41c..30c15317d 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -66,11 +66,21 @@ impl UdpSocket { } /// Test whether this socket is ready to be read or not. + /// + /// If the socket is *not* readable then the current task is scheduled to + /// get a notification when the socket does become readable. That is, this + /// is only suitable for calling in a `Future::poll` method and will + /// automatically handle ensuring a retry once the socket is readable again. pub fn poll_read(&self) -> Poll<(), io::Error> { self.ready.poll_read() } /// Test whether this socket is writey to be written to or not. + /// + /// If the socket is *not* writable then the current task is scheduled to + /// get a notification when the socket does become writable. That is, this + /// is only suitable for calling in a `Future::poll` method and will + /// automatically handle ensuring a retry once the socket is writable again. pub fn poll_write(&self) -> Poll<(), io::Error> { self.ready.poll_write() }