Update docs of futures-mio

This commit is contained in:
Alex Crichton 2016-08-18 10:21:25 -07:00
parent 12a05b9568
commit 0a707ffccb
5 changed files with 103 additions and 16 deletions

View File

@ -201,7 +201,26 @@ impl Loop {
/// Runs a future until completion, driving the event loop while we're
/// otherwise waiting for the future to complete.
///
/// Returns the value that the future resolves to.
/// This function will begin executing the event loop and will finish once
/// the provided future is resolve. Note that the future argument here
/// crucially does not require the `'static` nor `Send` bounds. As a result
/// the future will be "pinned" to not only this thread but also this stack
/// frame.
///
/// This function will returns the value that the future resolves to once
/// the future has finished. If the future never resolves then this function
/// will never return.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
///
/// Similarly, becuase the provided future will be pinned not only to this
/// thread but also to this task, any attempt to poll the future on a
/// separate thread will result in a panic. That is, calls to
/// `task::poll_on` must be avoided.
pub fn run<F>(&mut self, mut f: F) -> Result<F::Item, F::Error>
where F: Future,
{
@ -214,9 +233,19 @@ impl Loop {
}
}
// First up, create the task that will drive this future. The task here
// isn't a "normal task" but rather one where we define what to do when
// a readiness notification comes in.
//
// We translate readiness notifications to a `set_readiness` of our
// `future_readiness` structure we have stored internally.
let mut task = Task::new_notify(MyNotify(self.future_readiness.clone()));
let ready = self.future_readiness.clone();
// Next, move all that data into a dynamically dispatched closure to cut
// down on monomorphization costs. Inside this closure we unset the
// readiness of the future (as we're about to poll it) and then we check
// to see if it's done. If it's not then the event loop will turn again.
let mut res = None;
self._run(&mut || {
ready.set_readiness(mio::EventSet::none())
@ -233,7 +262,13 @@ impl Loop {
}
fn _run(&mut self, done: &mut FnMut() -> bool) {
while CURRENT_LOOP.set(self, || !done()) {
// Check to see if we're done immediately, if so we shouldn't do any
// work.
if CURRENT_LOOP.set(self, || done()) {
return
}
loop {
let amt;
// On Linux, Poll::poll is epoll_wait, which may return EINTR if a
// ptracer attaches. This retry loop prevents crashing when
@ -282,7 +317,9 @@ impl Loop {
});
continue
} else if token == 1 {
debug!("ZOMG IT'S HERE");
if CURRENT_LOOP.set(self, || done()) {
return
}
continue
}
@ -320,8 +357,6 @@ impl Loop {
debug!("loop process - {} events, {:?}", amt, start.elapsed());
}
debug!("loop is done!");
}
fn consume_timeouts(&mut self, now: Instant) {
@ -1100,6 +1135,7 @@ impl<E: ?Sized> Source<E> {
/// The event loop will fill in this information and then inform futures
/// that they're ready to go with the `schedule` method, and then the `poll`
/// method can use this to figure out what happened.
// TODO: shouldn't return a usize here, but rather some kind of newtype
pub fn take_readiness(&self) -> usize {
self.readiness.swap(0, Ordering::SeqCst)
}

View File

@ -12,12 +12,15 @@ use event_loop::{IoSource, LoopHandle, AddSource};
/// associated with a specific event loop and source of events that will be
/// registered with an event loop.
///
/// Currently readiness streams have "edge" semantics. That is, if a stream
/// receives a readable notification it will not receive another readable
/// notification until all bytes have been read from the stream.
/// Each readiness stream has a number of methods to test whether the underlying
/// object is readable or writable. Once the methods return that an object is
/// readable/writable, then it will continue to do so until the `need_read` or
/// `need_write` methods are called.
///
/// Note that the precise semantics of when notifications are received will
/// likely be configurable in the future.
/// That is, this object is typically wrapped in another form of I/O object.
/// It's the responsibility of the wrapper to inform the readiness stream when a
/// "would block" I/O event is seen. The readiness stream will then take care of
/// any scheduling necessary to get notified when the event is ready again.
pub struct ReadinessStream {
io_token: usize,
loop_handle: LoopHandle,
@ -47,6 +50,12 @@ impl ReadinessStream {
}
/// Tests to see if this source is ready to be read from or not.
///
/// If this stream is not ready for a read then `NotReady` will be returned
/// and the current task will be scheduled to receive a notification when
/// the stream is readable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a
/// `Future::poll` method.
pub fn poll_read(&self) -> Poll<(), io::Error> {
if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
return Poll::Ok(())
@ -61,6 +70,12 @@ impl ReadinessStream {
}
/// Tests to see if this source is ready to be written to or not.
///
/// If this stream is not ready for a write then `NotReady` will be returned
/// and the current task will be scheduled to receive a notification when
/// the stream is writable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a
/// `Future::poll` method.
pub fn poll_write(&self) -> Poll<(), io::Error> {
if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
return Poll::Ok(())
@ -74,13 +89,33 @@ impl ReadinessStream {
}
}
/// Tests to see if this source is ready to be read from or not.
/// Indicates to this source of events that the corresponding I/O object is
/// no longer readable, but it needs to be.
///
/// This function, like `poll_read`, is only safe to call from the context
/// of a future's task (typically in a `Future::poll` implementation). It
/// informs this readiness stream that the underlying object is no longer
/// readable, typically because a "would block" error was seen.
///
/// The flag indicating that this stream is readable is unset and the
/// current task is scheduled to receive a notification when the stream is
/// then again readable.
pub fn need_read(&self) {
self.readiness.fetch_and(!1, Ordering::SeqCst);
self.loop_handle.schedule_read(self.io_token);
}
/// Tests to see if this source is ready to be written to or not.
/// Indicates to this source of events that the corresponding I/O object is
/// no longer writable, but it needs to be.
///
/// This function, like `poll_write`, is only safe to call from the context
/// of a future's task (typically in a `Future::poll` implementation). It
/// informs this readiness stream that the underlying object is no longer
/// writable, typically because a "would block" error was seen.
///
/// The flag indicating that this stream is writable is unset and the
/// current task is scheduled to receive a notification when the stream is
/// then again writable.
pub fn need_write(&self) {
self.readiness.fetch_and(!2, Ordering::SeqCst);
self.loop_handle.schedule_write(self.io_token);

View File

@ -227,11 +227,21 @@ impl TcpStream {
}
/// Test whether this socket is ready to be read or not.
///
/// If the socket is *not* readable then the current task is scheduled to
/// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
pub fn poll_read(&self) -> Poll<(), io::Error> {
self.ready.poll_read()
}
/// Test whether this socket is writey to be written to or not.
///
/// If the socket is *not* writable then the current task is scheduled to
/// get a notification when the socket does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again.
pub fn poll_write(&self) -> Poll<(), io::Error> {
self.ready.poll_write()
}
@ -303,7 +313,6 @@ impl Read for TcpStream {
if is_wouldblock(&r) {
self.ready.need_read();
}
trace!("read[{:p}] {:?} on {:?}", self, r, self.source.io());
return r
}
}
@ -314,7 +323,6 @@ impl Write for TcpStream {
if is_wouldblock(&r) {
self.ready.need_write();
}
trace!("write[{:p}] {:?} on {:?}", self, r, self.source.io());
return r
}
fn flush(&mut self) -> io::Result<()> {

View File

@ -54,8 +54,6 @@ impl Future for Timeout {
if *self.token.when() <= now {
Poll::Ok(())
} else {
trace!("waiting for a timeout at {:?}", self.token.when());
trace!("current time is {:?}", now);
self.handle.update_timeout(&self.token);
Poll::NotReady
}

View File

@ -66,11 +66,21 @@ impl UdpSocket {
}
/// Test whether this socket is ready to be read or not.
///
/// If the socket is *not* readable then the current task is scheduled to
/// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
pub fn poll_read(&self) -> Poll<(), io::Error> {
self.ready.poll_read()
}
/// Test whether this socket is writey to be written to or not.
///
/// If the socket is *not* writable then the current task is scheduled to
/// get a notification when the socket does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again.
pub fn poll_write(&self) -> Poll<(), io::Error> {
self.ready.poll_write()
}