diff --git a/src/channel.rs b/src/channel.rs index 813484b70..c0c03b5eb 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,7 +1,7 @@ use std::io; use std::sync::mpsc::TryRecvError; -use futures::{Future, Poll}; +use futures::{Future, Poll, Async}; use futures::stream::Stream; use mio::channel; @@ -87,17 +87,14 @@ impl Stream for Receiver { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { - match self.rx.poll_read() { - Poll::Ok(()) => {} - _ => return Poll::NotReady, - } + try_ready!(self.rx.poll_read()); match self.rx.get_ref().try_recv() { - Ok(t) => Poll::Ok(Some(t)), + Ok(t) => Ok(Async::Ready(Some(t))), Err(TryRecvError::Empty) => { self.rx.need_read(); - Poll::NotReady + Ok(Async::NotReady) } - Err(TryRecvError::Disconnected) => Poll::Ok(None), + Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)), } } } diff --git a/src/event_loop/mod.rs b/src/event_loop/mod.rs index fe64a87b5..90ac92d12 100644 --- a/src/event_loop/mod.rs +++ b/src/event_loop/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use std::time::{Instant, Duration}; -use futures::{Future, Poll, IntoFuture}; +use futures::{Future, Poll, IntoFuture, Async}; use futures::task::{self, Unpark, Task, Spawn}; use mio; use slab::Slab; @@ -223,9 +223,9 @@ impl Loop { self._run(&mut || { assert!(res.is_none()); match task.poll_future(ready.clone()) { - Poll::NotReady => {} - Poll::Ok(e) => res = Some(Ok(e)), - Poll::Err(e) => res = Some(Err(e)), + Ok(Async::NotReady) => {} + Ok(Async::Ready(e)) => res = Some(Ok(e)), + Err(e) => res = Some(Err(e)), } res.is_some() }); @@ -344,12 +344,12 @@ impl Loop { let res = CURRENT_LOOP.set(self, || task.poll_future(wake)); let mut dispatch = self.task_dispatch.borrow_mut(); match res { - Poll::NotReady => { + Ok(Async::NotReady) => { assert!(dispatch[token].spawn.is_none()); dispatch[token].spawn = Some(task); } - Poll::Ok(()) | - Poll::Err(()) => { + Ok(Async::Ready(())) | + Err(()) => { dispatch.remove(token).unwrap(); } } @@ -599,14 +599,15 @@ impl LoopFuture Some((ref result, ref mut token)) => { result.cancel(*token); match result.try_consume() { - Ok(t) => return t.into(), + Ok(Ok(t)) => return Ok(t.into()), + Ok(Err(e)) => return Err(e), Err(_) => {} } let task = task::park(); *token = result.on_full(move |_| { task.unpark(); }); - return Poll::NotReady + Ok(Async::NotReady) } None => { let data = &mut self.data; @@ -615,7 +616,7 @@ impl LoopFuture }); if let Some(ret) = ret { debug!("loop future done immediately on event loop"); - return ret.into() + return ret.map(|e| e.into()) } debug!("loop future needs to send info to event loop"); @@ -626,7 +627,7 @@ impl LoopFuture }); self.result = Some((result.clone(), token)); self.loop_handle.send(g(data.take().unwrap(), result)); - Poll::NotReady + Ok(Async::NotReady) } } } diff --git a/src/event_loop/source.rs b/src/event_loop/source.rs index 4ab84bd87..12c841a93 100644 --- a/src/event_loop/source.rs +++ b/src/event_loop/source.rs @@ -160,7 +160,7 @@ impl Future for AddSource type Error = io::Error; fn poll(&mut self) -> Poll<(E, IoToken), io::Error> { - let res = self.inner.poll(|lp, io| { + let res = try_ready!(self.inner.poll(|lp, io| { let pair = try!(lp.add_source(&io)); Ok((io, pair)) }, |io, slot| { @@ -169,10 +169,9 @@ impl Future for AddSource slot.try_produce(res).ok() .expect("add source try_produce intereference"); })) - }); + })); - res.map(|(io, (ready, token))| { - (io, IoToken { token: token, readiness: ready }) - }) + let (io, (ready, token)) = res; + Ok((io, IoToken { token: token, readiness: ready }).into()) } } diff --git a/src/event_loop/timeout.rs b/src/event_loop/timeout.rs index a51c26036..98320019d 100644 --- a/src/event_loop/timeout.rs +++ b/src/event_loop/timeout.rs @@ -58,12 +58,12 @@ impl Future for AddTimeout { type Error = io::Error; fn poll(&mut self) -> Poll { - self.inner.poll(Loop::add_timeout, Message::AddTimeout).map(|(t, i)| { - TimeoutToken { - token: t, - when: i, - } - }) + let (t, i) = try_ready!(self.inner.poll(Loop::add_timeout, + Message::AddTimeout)); + Ok(TimeoutToken { + token: t, + when: i, + }.into()) } } diff --git a/src/io/copy.rs b/src/io/copy.rs index 042e819bd..04a5470b4 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -75,7 +75,7 @@ impl Future for Copy // done with the entire transfer. if self.pos == self.cap && self.read_done { try_nb!(self.writer.flush()); - return Poll::Ok(self.amt) + return Ok(self.amt.into()) } } } diff --git a/src/io/flush.rs b/src/io/flush.rs index 159a178b6..3e42073d9 100644 --- a/src/io/flush.rs +++ b/src/io/flush.rs @@ -1,6 +1,6 @@ use std::io::{self, Write}; -use futures::{Poll, Future}; +use futures::{Poll, Future, Async}; /// A future used to fully flush an I/O object. /// @@ -33,7 +33,7 @@ impl Future for Flush fn poll(&mut self) -> Poll { try_nb!(self.a.as_mut().unwrap().flush()); - Poll::Ok(self.a.take().unwrap()) + Ok(Async::Ready(self.a.take().unwrap())) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index 41a739782..6eb97686a 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -25,9 +25,9 @@ macro_rules! try_nb { ($e:expr) => (match $e { Ok(t) => t, Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { - return ::futures::Poll::NotReady + return Ok(::futures::Async::NotReady) } - Err(e) => return ::futures::Poll::Err(e.into()), + Err(e) => return Err(e.into()), }) } diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs index 417e13ce9..629b8aafb 100644 --- a/src/io/read_exact.rs +++ b/src/io/read_exact.rs @@ -62,7 +62,7 @@ impl Future for ReadExact let n = try_nb!(a.read(&mut buf[*pos..])); *pos += n; if n == 0 { - return Poll::Err(eof()) + return Err(eof()) } } } @@ -70,7 +70,7 @@ impl Future for ReadExact } match mem::replace(&mut self.state, State::Empty) { - State::Reading { a, buf, .. } => Poll::Ok((a, buf)), + State::Reading { a, buf, .. } => Ok((a, buf).into()), State::Empty => panic!(), } } diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs index 80e7cd6c5..4fbf7b042 100644 --- a/src/io/read_to_end.rs +++ b/src/io/read_to_end.rs @@ -55,7 +55,7 @@ impl Future for ReadToEnd } match mem::replace(&mut self.state, State::Empty) { - State::Reading { a, buf } => Poll::Ok((a, buf)), + State::Reading { a, buf } => Ok((a, buf).into()), State::Empty => unreachable!(), } } diff --git a/src/io/write_all.rs b/src/io/write_all.rs index df4751df9..3324c201e 100644 --- a/src/io/write_all.rs +++ b/src/io/write_all.rs @@ -65,7 +65,7 @@ impl Future for WriteAll let n = try_nb!(a.write(&buf[*pos..])); *pos += n; if n == 0 { - return Poll::Err(zero_write()) + return Err(zero_write()) } } } @@ -73,7 +73,7 @@ impl Future for WriteAll } match mem::replace(&mut self.state, State::Empty) { - State::Writing { a, buf, .. } => Poll::Ok((a, buf)), + State::Writing { a, buf, .. } => Ok((a, buf).into()), State::Empty => panic!(), } } diff --git a/src/lib.rs b/src/lib.rs index d58e10581..0b0864f65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ #![deny(missing_docs)] +#[macro_use] extern crate futures; extern crate mio; extern crate slab; diff --git a/src/readiness_stream.rs b/src/readiness_stream.rs index a2e44e7a7..15c7720fe 100644 --- a/src/readiness_stream.rs +++ b/src/readiness_stream.rs @@ -1,7 +1,7 @@ use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures::{Future, Poll}; +use futures::{Future, Poll, Async}; use mio; use event_loop::{IoToken, LoopHandle, AddSource}; @@ -60,14 +60,14 @@ impl ReadinessStream { /// `Future::poll` method. pub fn poll_read(&self) -> Poll<(), io::Error> { if self.readiness.load(Ordering::SeqCst) & 1 != 0 { - return Poll::Ok(()) + return Ok(Async::Ready(())) } self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); if self.readiness.load(Ordering::SeqCst) & 1 != 0 { - Poll::Ok(()) + Ok(Async::Ready(())) } else { self.handle.schedule_read(&self.token); - Poll::NotReady + Ok(Async::NotReady) } } @@ -80,14 +80,14 @@ impl ReadinessStream { /// `Future::poll` method. pub fn poll_write(&self) -> Poll<(), io::Error> { if self.readiness.load(Ordering::SeqCst) & 2 != 0 { - return Poll::Ok(()) + return Ok(Async::Ready(())) } self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); if self.readiness.load(Ordering::SeqCst) & 2 != 0 { - Poll::Ok(()) + Ok(Async::Ready(())) } else { self.handle.schedule_write(&self.token); - Poll::NotReady + Ok(Async::NotReady) } } @@ -149,14 +149,13 @@ impl Future for ReadinessStreamNew type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { - self.inner.poll().map(|(io, token)| { - ReadinessStream { - token: token, - handle: self.handle.clone(), - io: io, - readiness: AtomicUsize::new(0), - } - }) + let (io, token) = try_ready!(self.inner.poll()); + Ok(ReadinessStream { + token: token, + handle: self.handle.clone(), + io: io, + readiness: AtomicUsize::new(0), + }.into()) } } diff --git a/src/tcp.rs b/src/tcp.rs index 4aa6683b8..f38488958 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -4,7 +4,7 @@ use std::mem; use std::net::{self, SocketAddr, Shutdown}; use futures::stream::Stream; -use futures::{Future, IntoFuture, failed, Poll}; +use futures::{Future, IntoFuture, failed, Poll, Async}; use mio; use {ReadinessStream, LoopHandle}; @@ -92,17 +92,14 @@ impl TcpListener { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { - match self.inner.io.poll_read() { - Poll::Ok(()) => {} - _ => return Poll::NotReady, - } + try_ready!(self.inner.io.poll_read()); match self.inner.io.get_ref().accept() { - Ok(pair) => Poll::Ok(Some(pair)), + Ok(pair) => Ok(Async::Ready(Some(pair))), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.inner.io.need_read(); - Poll::NotReady + Ok(Async::NotReady) } - Err(e) => Poll::Err(e) + Err(e) => Err(e) } } } @@ -348,30 +345,27 @@ impl Future for TcpStreamNew { type Error = io::Error; fn poll(&mut self) -> Poll { - let stream = match mem::replace(self, TcpStreamNew::Empty) { - TcpStreamNew::Waiting(s) => s, - TcpStreamNew::Empty => panic!("can't poll TCP stream twice"), - }; + { + let stream = match *self { + TcpStreamNew::Waiting(ref s) => s, + TcpStreamNew::Empty => panic!("can't poll TCP stream twice"), + }; - // Once we've connected, wait for the stream to be writable as that's - // when the actual connection has been initiated. Once we're writable we - // check for `take_socket_error` to see if the connect actually hit an - // error or not. - // - // If all that succeeded then we ship everything on up. - match stream.io.poll_write() { - Poll::Ok(()) => { - match stream.io.get_ref().take_error() { - Ok(Some(e)) => return Poll::Err(e), - Ok(None) => return Poll::Ok(stream), - Err(e) => return Poll::Err(e), - } + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + try_ready!(stream.io.poll_write()); + if let Some(e) = try!(stream.io.get_ref().take_error()) { + return Err(e) } - Poll::Err(e) => return Poll::Err(e), - Poll::NotReady => {} } - *self = TcpStreamNew::Waiting(stream); - Poll::NotReady + match mem::replace(self, TcpStreamNew::Empty) { + TcpStreamNew::Waiting(stream) => Ok(Async::Ready(stream)), + TcpStreamNew::Empty => panic!(), + } } } @@ -392,9 +386,8 @@ impl Write for TcpStream { impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self.io.poll_read() { - Poll::Ok(()) => {} - _ => return Err(mio::would_block()), + if let Async::NotReady = try!(self.io.poll_read()) { + return Err(mio::would_block()) } let r = self.io.get_ref().read(buf); if is_wouldblock(&r) { @@ -406,9 +399,8 @@ impl<'a> Read for &'a TcpStream { impl<'a> Write for &'a TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { - match self.io.poll_write() { - Poll::Ok(()) => {} - _ => return Err(mio::would_block()), + if let Async::NotReady = try!(self.io.poll_write()) { + return Err(mio::would_block()) } let r = self.io.get_ref().write(buf); if is_wouldblock(&r) { @@ -418,9 +410,8 @@ impl<'a> Write for &'a TcpStream { } fn flush(&mut self) -> io::Result<()> { - match self.io.poll_write() { - Poll::Ok(()) => {} - _ => return Err(mio::would_block()), + if let Async::NotReady = try!(self.io.poll_write()) { + return Err(mio::would_block()) } let r = self.io.get_ref().flush(); if is_wouldblock(&r) { diff --git a/src/timeout.rs b/src/timeout.rs index e4ada461b..da0bf21b1 100644 --- a/src/timeout.rs +++ b/src/timeout.rs @@ -1,7 +1,7 @@ use std::io; use std::time::{Duration, Instant}; -use futures::{Future, Poll}; +use futures::{Future, Poll, Async}; use LoopHandle; use io::IoFuture; @@ -52,10 +52,10 @@ impl Future for Timeout { // TODO: is this fast enough? let now = Instant::now(); if *self.token.when() <= now { - Poll::Ok(()) + Ok(Async::Ready(())) } else { self.handle.update_timeout(&self.token); - Poll::NotReady + Ok(Async::NotReady) } } } diff --git a/src/udp.rs b/src/udp.rs index 23b3dd5d0..73afcd669 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -2,7 +2,7 @@ use std::io; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::fmt; -use futures::{Future, failed, Poll}; +use futures::{Future, failed, Poll, Async}; use mio; use {ReadinessStream, LoopHandle}; @@ -84,9 +84,8 @@ impl UdpSocket { /// Address type can be any implementor of `ToSocketAddrs` trait. See its /// documentation for concrete examples. pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result { - match self.io.poll_write() { - Poll::Ok(()) => {} - _ => return Err(mio::would_block()), + if let Async::NotReady = try!(self.io.poll_write()) { + return Err(mio::would_block()) } match self.io.get_ref().send_to(buf, target) { Ok(Some(n)) => Ok(n), @@ -101,9 +100,8 @@ impl UdpSocket { /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - match self.io.poll_read() { - Poll::Ok(()) => {} - _ => return Err(mio::would_block()), + if let Async::NotReady = try!(self.io.poll_read()) { + return Err(mio::would_block()) } match self.io.get_ref().recv_from(buf) { Ok(Some(n)) => Ok(n), diff --git a/tests/poll.rs b/tests/poll.rs index bedcde397..3049c431d 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -3,7 +3,7 @@ extern crate futures; extern crate mio; extern crate tokio_core; -use futures::{Future, Poll}; +use futures::{Future, Poll, Async}; use futures::task; use tokio_core::{Loop, IoToken, LoopHandle}; @@ -17,9 +17,9 @@ impl Future for Next { if self.0 == 0 { task::park().unpark(); self.0 += 1; - Poll::NotReady + Ok(Async::NotReady) } else { - Poll::Ok(()) + Ok(().into()) } } } @@ -54,10 +54,10 @@ fn poll_after_ready() { if self.n == 0 { self.handle.schedule_read(&self.token); self.n += 1; - Poll::NotReady + Ok(Async::NotReady) } else { assert!(self.token.take_readiness() & 1 != 0); - Poll::Ok(()) + Ok(().into()) } } } diff --git a/tests/udp.rs b/tests/udp.rs index 1bfe512aa..86b9ceee7 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -1,4 +1,5 @@ extern crate futures; +#[macro_use] extern crate tokio_core; use std::io; @@ -38,12 +39,9 @@ impl Future for SendMessage { type Error = io::Error; fn poll(&mut self) -> Poll<(), io::Error> { - match self.socket.send_to(b"1234", &self.addr) { - Ok(4) => Poll::Ok(()), - Ok(n) => panic!("didn't send 4 bytes: {}", n), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::NotReady, - Err(e) => Poll::Err(e), - } + let n = try_nb!(self.socket.send_to(b"1234", &self.addr)); + assert_eq!(n, 4); + Ok(().into()) } } @@ -58,15 +56,10 @@ impl Future for RecvMessage { fn poll(&mut self) -> Poll<(), io::Error> { let mut buf = [0; 32]; - match self.socket.recv_from(&mut buf) { - Ok((4, addr)) => { - assert_eq!(&buf[..4], b"1234"); - assert_eq!(addr, self.expected_addr); - Poll::Ok(()) - } - Ok((n, _)) => panic!("didn't read 4 bytes: {}", n), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::NotReady, - Err(e) => Poll::Err(e), - } + let (n, addr) = try_nb!(self.socket.recv_from(&mut buf)); + assert_eq!(n, 4); + assert_eq!(&buf[..4], b"1234"); + assert_eq!(addr, self.expected_addr); + Ok(().into()) } }