This commit is contained in:
Alex Crichton 2017-01-06 11:30:18 -08:00
commit 2499dd35a2

View File

@ -8,7 +8,7 @@ use futures::sync::oneshot;
use futures::{self, Future, failed, Poll, Async}; use futures::{self, Future, failed, Poll, Async};
use mio; use mio;
use io::{Io, IoFuture}; use io::Io;
use reactor::{Handle, PollEvented}; use reactor::{Handle, PollEvented};
/// An I/O object representing a TCP socket listening for incoming connections. /// An I/O object representing a TCP socket listening for incoming connections.
@ -224,7 +224,38 @@ pub struct TcpStream {
/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
/// when the stream is connected. /// when the stream is connected.
pub struct TcpStreamNew { pub struct TcpStreamNew {
inner: IoFuture<TcpStream>, inner: TcpStreamNewFuture,
}
type TcpStreamNewConnected = ::futures::AndThen<
::futures::future::FutureResult<
::reactor::PollEvented<::mio::tcp::TcpStream>,
::std::io::Error,
>,
TcpStreamConnect,
fn (::reactor::PollEvented<::mio::tcp::TcpStream>) -> ::net::tcp::TcpStreamConnect
>;
enum TcpStreamNewFuture {
Connected(TcpStreamNewConnected),
Error(::futures::future::Err<TcpStream, ::std::io::Error>),
}
impl Future for TcpStreamNewFuture {
type Item = TcpStream;
type Error = io::Error;
fn poll(&mut self) -> Result<futures::Async<TcpStream>, ::std::io::Error> {
match *self {
TcpStreamNewFuture::Connected(ref mut stream) => stream.poll(),
TcpStreamNewFuture::Error(ref mut error) => error.poll(),
}
}
}
impl TcpStreamConnect {
fn from_stream(io: ::reactor::PollEvented<::mio::tcp::TcpStream>) -> Self {
TcpStreamConnect::Waiting(TcpStream { io: io })
}
} }
enum TcpStreamConnect { enum TcpStreamConnect {
@ -243,17 +274,18 @@ impl TcpStream {
pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew { pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
let future = match mio::tcp::TcpStream::connect(addr) { let future = match mio::tcp::TcpStream::connect(addr) {
Ok(tcp) => TcpStream::new(tcp, handle), Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => failed(e).boxed(), Err(e) => TcpStreamNewFuture::Error(failed(e)),
}; };
TcpStreamNew { inner: future } TcpStreamNew { inner: future }
} }
fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle) fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle)
-> IoFuture<TcpStream> { -> TcpStreamNewFuture
{
let tcp = PollEvented::new(connected_stream, handle); let tcp = PollEvented::new(connected_stream, handle);
futures::done(tcp).and_then(|io| { TcpStreamNewFuture::Connected(
TcpStreamConnect::Waiting(TcpStream { io: io }) futures::done(tcp).and_then(TcpStreamConnect::from_stream)
}).boxed() )
} }
/// Creates a new `TcpStream` from the pending socket inside the given /// Creates a new `TcpStream` from the pending socket inside the given
@ -276,11 +308,12 @@ impl TcpStream {
/// (perhaps to `INADDR_ANY`) before this method is called. /// (perhaps to `INADDR_ANY`) before this method is called.
pub fn connect_stream(stream: net::TcpStream, pub fn connect_stream(stream: net::TcpStream,
addr: &SocketAddr, addr: &SocketAddr,
handle: &Handle) -> IoFuture<TcpStream> { handle: &Handle) -> TcpStreamNew {
match mio::tcp::TcpStream::connect_stream(stream, addr) { let future = match mio::tcp::TcpStream::connect_stream(stream, addr) {
Ok(tcp) => TcpStream::new(tcp, handle), Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => failed(e).boxed(), Err(e) => TcpStreamNewFuture::Error(failed(e)),
} };
TcpStreamNew { inner: future }
} }
/// Test whether this socket is ready to be read or not. /// Test whether this socket is ready to be read or not.