Update with Poll/Async changes

This commit is contained in:
Alex Crichton 2016-09-01 16:42:48 -07:00
parent 3282b3ec0d
commit 3794cf7f1d
17 changed files with 103 additions and 124 deletions

View File

@ -1,7 +1,7 @@
use std::io; use std::io;
use std::sync::mpsc::TryRecvError; use std::sync::mpsc::TryRecvError;
use futures::{Future, Poll}; use futures::{Future, Poll, Async};
use futures::stream::Stream; use futures::stream::Stream;
use mio::channel; use mio::channel;
@ -87,17 +87,14 @@ impl<T> Stream for Receiver<T> {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Option<T>, io::Error> { fn poll(&mut self) -> Poll<Option<T>, io::Error> {
match self.rx.poll_read() { try_ready!(self.rx.poll_read());
Poll::Ok(()) => {}
_ => return Poll::NotReady,
}
match self.rx.get_ref().try_recv() { match self.rx.get_ref().try_recv() {
Ok(t) => Poll::Ok(Some(t)), Ok(t) => Ok(Async::Ready(Some(t))),
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
self.rx.need_read(); self.rx.need_read();
Poll::NotReady Ok(Async::NotReady)
} }
Err(TryRecvError::Disconnected) => Poll::Ok(None), Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),
} }
} }
} }

View File

@ -6,7 +6,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use futures::{Future, Poll, IntoFuture}; use futures::{Future, Poll, IntoFuture, Async};
use futures::task::{self, Unpark, Task, Spawn}; use futures::task::{self, Unpark, Task, Spawn};
use mio; use mio;
use slab::Slab; use slab::Slab;
@ -223,9 +223,9 @@ impl Loop {
self._run(&mut || { self._run(&mut || {
assert!(res.is_none()); assert!(res.is_none());
match task.poll_future(ready.clone()) { match task.poll_future(ready.clone()) {
Poll::NotReady => {} Ok(Async::NotReady) => {}
Poll::Ok(e) => res = Some(Ok(e)), Ok(Async::Ready(e)) => res = Some(Ok(e)),
Poll::Err(e) => res = Some(Err(e)), Err(e) => res = Some(Err(e)),
} }
res.is_some() res.is_some()
}); });
@ -344,12 +344,12 @@ impl Loop {
let res = CURRENT_LOOP.set(self, || task.poll_future(wake)); let res = CURRENT_LOOP.set(self, || task.poll_future(wake));
let mut dispatch = self.task_dispatch.borrow_mut(); let mut dispatch = self.task_dispatch.borrow_mut();
match res { match res {
Poll::NotReady => { Ok(Async::NotReady) => {
assert!(dispatch[token].spawn.is_none()); assert!(dispatch[token].spawn.is_none());
dispatch[token].spawn = Some(task); dispatch[token].spawn = Some(task);
} }
Poll::Ok(()) | Ok(Async::Ready(())) |
Poll::Err(()) => { Err(()) => {
dispatch.remove(token).unwrap(); dispatch.remove(token).unwrap();
} }
} }
@ -599,14 +599,15 @@ impl<T, U> LoopFuture<T, U>
Some((ref result, ref mut token)) => { Some((ref result, ref mut token)) => {
result.cancel(*token); result.cancel(*token);
match result.try_consume() { match result.try_consume() {
Ok(t) => return t.into(), Ok(Ok(t)) => return Ok(t.into()),
Ok(Err(e)) => return Err(e),
Err(_) => {} Err(_) => {}
} }
let task = task::park(); let task = task::park();
*token = result.on_full(move |_| { *token = result.on_full(move |_| {
task.unpark(); task.unpark();
}); });
return Poll::NotReady Ok(Async::NotReady)
} }
None => { None => {
let data = &mut self.data; let data = &mut self.data;
@ -615,7 +616,7 @@ impl<T, U> LoopFuture<T, U>
}); });
if let Some(ret) = ret { if let Some(ret) = ret {
debug!("loop future done immediately on event loop"); debug!("loop future done immediately on event loop");
return ret.into() return ret.map(|e| e.into())
} }
debug!("loop future needs to send info to event loop"); debug!("loop future needs to send info to event loop");
@ -626,7 +627,7 @@ impl<T, U> LoopFuture<T, U>
}); });
self.result = Some((result.clone(), token)); self.result = Some((result.clone(), token));
self.loop_handle.send(g(data.take().unwrap(), result)); self.loop_handle.send(g(data.take().unwrap(), result));
Poll::NotReady Ok(Async::NotReady)
} }
} }
} }

View File

@ -160,7 +160,7 @@ impl<E> Future for AddSource<E>
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<(E, IoToken), io::Error> { fn poll(&mut self) -> Poll<(E, IoToken), io::Error> {
let res = self.inner.poll(|lp, io| { let res = try_ready!(self.inner.poll(|lp, io| {
let pair = try!(lp.add_source(&io)); let pair = try!(lp.add_source(&io));
Ok((io, pair)) Ok((io, pair))
}, |io, slot| { }, |io, slot| {
@ -169,10 +169,9 @@ impl<E> Future for AddSource<E>
slot.try_produce(res).ok() slot.try_produce(res).ok()
.expect("add source try_produce intereference"); .expect("add source try_produce intereference");
})) }))
}); }));
res.map(|(io, (ready, token))| { let (io, (ready, token)) = res;
(io, IoToken { token: token, readiness: ready }) Ok((io, IoToken { token: token, readiness: ready }).into())
})
} }
} }

View File

@ -58,12 +58,12 @@ impl Future for AddTimeout {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<TimeoutToken, io::Error> { fn poll(&mut self) -> Poll<TimeoutToken, io::Error> {
self.inner.poll(Loop::add_timeout, Message::AddTimeout).map(|(t, i)| { let (t, i) = try_ready!(self.inner.poll(Loop::add_timeout,
TimeoutToken { Message::AddTimeout));
Ok(TimeoutToken {
token: t, token: t,
when: i, when: i,
} }.into())
})
} }
} }

View File

@ -75,7 +75,7 @@ impl<R, W> Future for Copy<R, W>
// done with the entire transfer. // done with the entire transfer.
if self.pos == self.cap && self.read_done { if self.pos == self.cap && self.read_done {
try_nb!(self.writer.flush()); try_nb!(self.writer.flush());
return Poll::Ok(self.amt) return Ok(self.amt.into())
} }
} }
} }

View File

@ -1,6 +1,6 @@
use std::io::{self, Write}; use std::io::{self, Write};
use futures::{Poll, Future}; use futures::{Poll, Future, Async};
/// A future used to fully flush an I/O object. /// A future used to fully flush an I/O object.
/// ///
@ -33,7 +33,7 @@ impl<A> Future for Flush<A>
fn poll(&mut self) -> Poll<A, io::Error> { fn poll(&mut self) -> Poll<A, io::Error> {
try_nb!(self.a.as_mut().unwrap().flush()); try_nb!(self.a.as_mut().unwrap().flush());
Poll::Ok(self.a.take().unwrap()) Ok(Async::Ready(self.a.take().unwrap()))
} }
} }

View File

@ -25,9 +25,9 @@ macro_rules! try_nb {
($e:expr) => (match $e { ($e:expr) => (match $e {
Ok(t) => t, Ok(t) => t,
Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
return ::futures::Poll::NotReady return Ok(::futures::Async::NotReady)
} }
Err(e) => return ::futures::Poll::Err(e.into()), Err(e) => return Err(e.into()),
}) })
} }

View File

@ -62,7 +62,7 @@ impl<A, T> Future for ReadExact<A, T>
let n = try_nb!(a.read(&mut buf[*pos..])); let n = try_nb!(a.read(&mut buf[*pos..]));
*pos += n; *pos += n;
if n == 0 { if n == 0 {
return Poll::Err(eof()) return Err(eof())
} }
} }
} }
@ -70,7 +70,7 @@ impl<A, T> Future for ReadExact<A, T>
} }
match mem::replace(&mut self.state, State::Empty) { match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, buf, .. } => Poll::Ok((a, buf)), State::Reading { a, buf, .. } => Ok((a, buf).into()),
State::Empty => panic!(), State::Empty => panic!(),
} }
} }

View File

@ -55,7 +55,7 @@ impl<A> Future for ReadToEnd<A>
} }
match mem::replace(&mut self.state, State::Empty) { match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, buf } => Poll::Ok((a, buf)), State::Reading { a, buf } => Ok((a, buf).into()),
State::Empty => unreachable!(), State::Empty => unreachable!(),
} }
} }

View File

@ -65,7 +65,7 @@ impl<A, T> Future for WriteAll<A, T>
let n = try_nb!(a.write(&buf[*pos..])); let n = try_nb!(a.write(&buf[*pos..]));
*pos += n; *pos += n;
if n == 0 { if n == 0 {
return Poll::Err(zero_write()) return Err(zero_write())
} }
} }
} }
@ -73,7 +73,7 @@ impl<A, T> Future for WriteAll<A, T>
} }
match mem::replace(&mut self.state, State::Empty) { match mem::replace(&mut self.state, State::Empty) {
State::Writing { a, buf, .. } => Poll::Ok((a, buf)), State::Writing { a, buf, .. } => Ok((a, buf).into()),
State::Empty => panic!(), State::Empty => panic!(),
} }
} }

View File

@ -5,6 +5,7 @@
#![deny(missing_docs)] #![deny(missing_docs)]
#[macro_use]
extern crate futures; extern crate futures;
extern crate mio; extern crate mio;
extern crate slab; extern crate slab;

View File

@ -1,7 +1,7 @@
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{Future, Poll}; use futures::{Future, Poll, Async};
use mio; use mio;
use event_loop::{IoToken, LoopHandle, AddSource}; use event_loop::{IoToken, LoopHandle, AddSource};
@ -60,14 +60,14 @@ impl<E> ReadinessStream<E> {
/// `Future::poll` method. /// `Future::poll` method.
pub fn poll_read(&self) -> Poll<(), io::Error> { pub fn poll_read(&self) -> Poll<(), io::Error> {
if self.readiness.load(Ordering::SeqCst) & 1 != 0 { if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
return Poll::Ok(()) return Ok(Async::Ready(()))
} }
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
if self.readiness.load(Ordering::SeqCst) & 1 != 0 { if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
Poll::Ok(()) Ok(Async::Ready(()))
} else { } else {
self.handle.schedule_read(&self.token); self.handle.schedule_read(&self.token);
Poll::NotReady Ok(Async::NotReady)
} }
} }
@ -80,14 +80,14 @@ impl<E> ReadinessStream<E> {
/// `Future::poll` method. /// `Future::poll` method.
pub fn poll_write(&self) -> Poll<(), io::Error> { pub fn poll_write(&self) -> Poll<(), io::Error> {
if self.readiness.load(Ordering::SeqCst) & 2 != 0 { if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
return Poll::Ok(()) return Ok(Async::Ready(()))
} }
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
if self.readiness.load(Ordering::SeqCst) & 2 != 0 { if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
Poll::Ok(()) Ok(Async::Ready(()))
} else { } else {
self.handle.schedule_write(&self.token); self.handle.schedule_write(&self.token);
Poll::NotReady Ok(Async::NotReady)
} }
} }
@ -149,14 +149,13 @@ impl<E> Future for ReadinessStreamNew<E>
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<ReadinessStream<E>, io::Error> { fn poll(&mut self) -> Poll<ReadinessStream<E>, io::Error> {
self.inner.poll().map(|(io, token)| { let (io, token) = try_ready!(self.inner.poll());
ReadinessStream { Ok(ReadinessStream {
token: token, token: token,
handle: self.handle.clone(), handle: self.handle.clone(),
io: io, io: io,
readiness: AtomicUsize::new(0), readiness: AtomicUsize::new(0),
} }.into())
})
} }
} }

View File

@ -4,7 +4,7 @@ use std::mem;
use std::net::{self, SocketAddr, Shutdown}; use std::net::{self, SocketAddr, Shutdown};
use futures::stream::Stream; use futures::stream::Stream;
use futures::{Future, IntoFuture, failed, Poll}; use futures::{Future, IntoFuture, failed, Poll, Async};
use mio; use mio;
use {ReadinessStream, LoopHandle}; use {ReadinessStream, LoopHandle};
@ -92,17 +92,14 @@ impl TcpListener {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
match self.inner.io.poll_read() { try_ready!(self.inner.io.poll_read());
Poll::Ok(()) => {}
_ => return Poll::NotReady,
}
match self.inner.io.get_ref().accept() { match self.inner.io.get_ref().accept() {
Ok(pair) => Poll::Ok(Some(pair)), Ok(pair) => Ok(Async::Ready(Some(pair))),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.inner.io.need_read(); self.inner.io.need_read();
Poll::NotReady Ok(Async::NotReady)
} }
Err(e) => Poll::Err(e) Err(e) => Err(e)
} }
} }
} }
@ -348,30 +345,27 @@ impl Future for TcpStreamNew {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<TcpStream, io::Error> { fn poll(&mut self) -> Poll<TcpStream, io::Error> {
let stream = match mem::replace(self, TcpStreamNew::Empty) { {
TcpStreamNew::Waiting(s) => s, let stream = match *self {
TcpStreamNew::Waiting(ref s) => s,
TcpStreamNew::Empty => panic!("can't poll TCP stream twice"), TcpStreamNew::Empty => panic!("can't poll TCP stream twice"),
}; };
// Once we've connected, wait for the stream to be writable as that's // Once we've connected, wait for the stream to be writable as
// when the actual connection has been initiated. Once we're writable we // that's when the actual connection has been initiated. Once we're
// check for `take_socket_error` to see if the connect actually hit an // writable we check for `take_socket_error` to see if the connect
// error or not. // actually hit an error or not.
// //
// If all that succeeded then we ship everything on up. // If all that succeeded then we ship everything on up.
match stream.io.poll_write() { try_ready!(stream.io.poll_write());
Poll::Ok(()) => { if let Some(e) = try!(stream.io.get_ref().take_error()) {
match stream.io.get_ref().take_error() { return Err(e)
Ok(Some(e)) => return Poll::Err(e),
Ok(None) => return Poll::Ok(stream),
Err(e) => return Poll::Err(e),
} }
} }
Poll::Err(e) => return Poll::Err(e), match mem::replace(self, TcpStreamNew::Empty) {
Poll::NotReady => {} TcpStreamNew::Waiting(stream) => Ok(Async::Ready(stream)),
TcpStreamNew::Empty => panic!(),
} }
*self = TcpStreamNew::Waiting(stream);
Poll::NotReady
} }
} }
@ -392,9 +386,8 @@ impl Write for TcpStream {
impl<'a> Read for &'a TcpStream { impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.io.poll_read() { if let Async::NotReady = try!(self.io.poll_read()) {
Poll::Ok(()) => {} return Err(mio::would_block())
_ => return Err(mio::would_block()),
} }
let r = self.io.get_ref().read(buf); let r = self.io.get_ref().read(buf);
if is_wouldblock(&r) { if is_wouldblock(&r) {
@ -406,9 +399,8 @@ impl<'a> Read for &'a TcpStream {
impl<'a> Write for &'a TcpStream { impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.io.poll_write() { if let Async::NotReady = try!(self.io.poll_write()) {
Poll::Ok(()) => {} return Err(mio::would_block())
_ => return Err(mio::would_block()),
} }
let r = self.io.get_ref().write(buf); let r = self.io.get_ref().write(buf);
if is_wouldblock(&r) { if is_wouldblock(&r) {
@ -418,9 +410,8 @@ impl<'a> Write for &'a TcpStream {
} }
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
match self.io.poll_write() { if let Async::NotReady = try!(self.io.poll_write()) {
Poll::Ok(()) => {} return Err(mio::would_block())
_ => return Err(mio::would_block()),
} }
let r = self.io.get_ref().flush(); let r = self.io.get_ref().flush();
if is_wouldblock(&r) { if is_wouldblock(&r) {

View File

@ -1,7 +1,7 @@
use std::io; use std::io;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::{Future, Poll}; use futures::{Future, Poll, Async};
use LoopHandle; use LoopHandle;
use io::IoFuture; use io::IoFuture;
@ -52,10 +52,10 @@ impl Future for Timeout {
// TODO: is this fast enough? // TODO: is this fast enough?
let now = Instant::now(); let now = Instant::now();
if *self.token.when() <= now { if *self.token.when() <= now {
Poll::Ok(()) Ok(Async::Ready(()))
} else { } else {
self.handle.update_timeout(&self.token); self.handle.update_timeout(&self.token);
Poll::NotReady Ok(Async::NotReady)
} }
} }
} }

View File

@ -2,7 +2,7 @@ use std::io;
use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr};
use std::fmt; use std::fmt;
use futures::{Future, failed, Poll}; use futures::{Future, failed, Poll, Async};
use mio; use mio;
use {ReadinessStream, LoopHandle}; use {ReadinessStream, LoopHandle};
@ -84,9 +84,8 @@ impl UdpSocket {
/// Address type can be any implementor of `ToSocketAddrs` trait. See its /// Address type can be any implementor of `ToSocketAddrs` trait. See its
/// documentation for concrete examples. /// documentation for concrete examples.
pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
match self.io.poll_write() { if let Async::NotReady = try!(self.io.poll_write()) {
Poll::Ok(()) => {} return Err(mio::would_block())
_ => return Err(mio::would_block()),
} }
match self.io.get_ref().send_to(buf, target) { match self.io.get_ref().send_to(buf, target) {
Ok(Some(n)) => Ok(n), Ok(Some(n)) => Ok(n),
@ -101,9 +100,8 @@ impl UdpSocket {
/// Receives data from the socket. On success, returns the number of bytes /// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came. /// read and the address from whence the data came.
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
match self.io.poll_read() { if let Async::NotReady = try!(self.io.poll_read()) {
Poll::Ok(()) => {} return Err(mio::would_block())
_ => return Err(mio::would_block()),
} }
match self.io.get_ref().recv_from(buf) { match self.io.get_ref().recv_from(buf) {
Ok(Some(n)) => Ok(n), Ok(Some(n)) => Ok(n),

View File

@ -3,7 +3,7 @@ extern crate futures;
extern crate mio; extern crate mio;
extern crate tokio_core; extern crate tokio_core;
use futures::{Future, Poll}; use futures::{Future, Poll, Async};
use futures::task; use futures::task;
use tokio_core::{Loop, IoToken, LoopHandle}; use tokio_core::{Loop, IoToken, LoopHandle};
@ -17,9 +17,9 @@ impl Future for Next {
if self.0 == 0 { if self.0 == 0 {
task::park().unpark(); task::park().unpark();
self.0 += 1; self.0 += 1;
Poll::NotReady Ok(Async::NotReady)
} else { } else {
Poll::Ok(()) Ok(().into())
} }
} }
} }
@ -54,10 +54,10 @@ fn poll_after_ready() {
if self.n == 0 { if self.n == 0 {
self.handle.schedule_read(&self.token); self.handle.schedule_read(&self.token);
self.n += 1; self.n += 1;
Poll::NotReady Ok(Async::NotReady)
} else { } else {
assert!(self.token.take_readiness() & 1 != 0); assert!(self.token.take_readiness() & 1 != 0);
Poll::Ok(()) Ok(().into())
} }
} }
} }

View File

@ -1,4 +1,5 @@
extern crate futures; extern crate futures;
#[macro_use]
extern crate tokio_core; extern crate tokio_core;
use std::io; use std::io;
@ -38,12 +39,9 @@ impl Future for SendMessage {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> { fn poll(&mut self) -> Poll<(), io::Error> {
match self.socket.send_to(b"1234", &self.addr) { let n = try_nb!(self.socket.send_to(b"1234", &self.addr));
Ok(4) => Poll::Ok(()), assert_eq!(n, 4);
Ok(n) => panic!("didn't send 4 bytes: {}", n), Ok(().into())
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::NotReady,
Err(e) => Poll::Err(e),
}
} }
} }
@ -58,15 +56,10 @@ impl Future for RecvMessage {
fn poll(&mut self) -> Poll<(), io::Error> { fn poll(&mut self) -> Poll<(), io::Error> {
let mut buf = [0; 32]; let mut buf = [0; 32];
match self.socket.recv_from(&mut buf) { let (n, addr) = try_nb!(self.socket.recv_from(&mut buf));
Ok((4, addr)) => { assert_eq!(n, 4);
assert_eq!(&buf[..4], b"1234"); assert_eq!(&buf[..4], b"1234");
assert_eq!(addr, self.expected_addr); assert_eq!(addr, self.expected_addr);
Poll::Ok(()) Ok(().into())
}
Ok((n, _)) => panic!("didn't read 4 bytes: {}", n),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::NotReady,
Err(e) => Poll::Err(e),
}
} }
} }