mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
Touch up TcpStream::connect
and revert breaking change
This commit is contained in:
parent
2499dd35a2
commit
64c360c30e
@ -5,10 +5,10 @@ use std::net::{self, SocketAddr, Shutdown};
|
|||||||
|
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
use futures::{self, Future, failed, Poll, Async};
|
use futures::{Future, Poll, Async};
|
||||||
use mio;
|
use mio;
|
||||||
|
|
||||||
use io::Io;
|
use io::{Io, IoFuture};
|
||||||
use reactor::{Handle, PollEvented};
|
use reactor::{Handle, PollEvented};
|
||||||
|
|
||||||
/// An I/O object representing a TCP socket listening for incoming connections.
|
/// An I/O object representing a TCP socket listening for incoming connections.
|
||||||
@ -224,42 +224,12 @@ pub struct TcpStream {
|
|||||||
/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
|
/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
|
||||||
/// when the stream is connected.
|
/// when the stream is connected.
|
||||||
pub struct TcpStreamNew {
|
pub struct TcpStreamNew {
|
||||||
inner: TcpStreamNewFuture,
|
inner: TcpStreamNewState,
|
||||||
}
|
}
|
||||||
|
|
||||||
type TcpStreamNewConnected = ::futures::AndThen<
|
enum TcpStreamNewState {
|
||||||
::futures::future::FutureResult<
|
|
||||||
::reactor::PollEvented<::mio::tcp::TcpStream>,
|
|
||||||
::std::io::Error,
|
|
||||||
>,
|
|
||||||
TcpStreamConnect,
|
|
||||||
fn (::reactor::PollEvented<::mio::tcp::TcpStream>) -> ::net::tcp::TcpStreamConnect
|
|
||||||
>;
|
|
||||||
|
|
||||||
enum TcpStreamNewFuture {
|
|
||||||
Connected(TcpStreamNewConnected),
|
|
||||||
Error(::futures::future::Err<TcpStream, ::std::io::Error>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for TcpStreamNewFuture {
|
|
||||||
type Item = TcpStream;
|
|
||||||
type Error = io::Error;
|
|
||||||
fn poll(&mut self) -> Result<futures::Async<TcpStream>, ::std::io::Error> {
|
|
||||||
match *self {
|
|
||||||
TcpStreamNewFuture::Connected(ref mut stream) => stream.poll(),
|
|
||||||
TcpStreamNewFuture::Error(ref mut error) => error.poll(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TcpStreamConnect {
|
|
||||||
fn from_stream(io: ::reactor::PollEvented<::mio::tcp::TcpStream>) -> Self {
|
|
||||||
TcpStreamConnect::Waiting(TcpStream { io: io })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum TcpStreamConnect {
|
|
||||||
Waiting(TcpStream),
|
Waiting(TcpStream),
|
||||||
|
Error(io::Error),
|
||||||
Empty,
|
Empty,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,20 +242,19 @@ impl TcpStream {
|
|||||||
/// connection or during the socket creation, that error will be returned to
|
/// connection or during the socket creation, that error will be returned to
|
||||||
/// the future instead.
|
/// the future instead.
|
||||||
pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
|
pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
|
||||||
let future = match mio::tcp::TcpStream::connect(addr) {
|
let inner = match mio::tcp::TcpStream::connect(addr) {
|
||||||
Ok(tcp) => TcpStream::new(tcp, handle),
|
Ok(tcp) => TcpStream::new(tcp, handle),
|
||||||
Err(e) => TcpStreamNewFuture::Error(failed(e)),
|
Err(e) => TcpStreamNewState::Error(e),
|
||||||
};
|
};
|
||||||
TcpStreamNew { inner: future }
|
TcpStreamNew { inner: inner }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle)
|
fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle)
|
||||||
-> TcpStreamNewFuture
|
-> TcpStreamNewState {
|
||||||
{
|
match PollEvented::new(connected_stream, handle) {
|
||||||
let tcp = PollEvented::new(connected_stream, handle);
|
Ok(io) => TcpStreamNewState::Waiting(TcpStream { io: io }),
|
||||||
TcpStreamNewFuture::Connected(
|
Err(e) => TcpStreamNewState::Error(e),
|
||||||
futures::done(tcp).and_then(TcpStreamConnect::from_stream)
|
}
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new `TcpStream` from the pending socket inside the given
|
/// Creates a new `TcpStream` from the pending socket inside the given
|
||||||
@ -308,12 +277,12 @@ impl TcpStream {
|
|||||||
/// (perhaps to `INADDR_ANY`) before this method is called.
|
/// (perhaps to `INADDR_ANY`) before this method is called.
|
||||||
pub fn connect_stream(stream: net::TcpStream,
|
pub fn connect_stream(stream: net::TcpStream,
|
||||||
addr: &SocketAddr,
|
addr: &SocketAddr,
|
||||||
handle: &Handle) -> TcpStreamNew {
|
handle: &Handle) -> IoFuture<TcpStream> {
|
||||||
let future = match mio::tcp::TcpStream::connect_stream(stream, addr) {
|
let state = match mio::tcp::TcpStream::connect_stream(stream, addr) {
|
||||||
Ok(tcp) => TcpStream::new(tcp, handle),
|
Ok(tcp) => TcpStream::new(tcp, handle),
|
||||||
Err(e) => TcpStreamNewFuture::Error(failed(e)),
|
Err(e) => TcpStreamNewState::Error(e),
|
||||||
};
|
};
|
||||||
TcpStreamNew { inner: future }
|
state.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test whether this socket is ready to be read or not.
|
/// Test whether this socket is ready to be read or not.
|
||||||
@ -485,15 +454,22 @@ impl Future for TcpStreamNew {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for TcpStreamConnect {
|
impl Future for TcpStreamNewState {
|
||||||
type Item = TcpStream;
|
type Item = TcpStream;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<TcpStream, io::Error> {
|
fn poll(&mut self) -> Poll<TcpStream, io::Error> {
|
||||||
{
|
{
|
||||||
let stream = match *self {
|
let stream = match *self {
|
||||||
TcpStreamConnect::Waiting(ref s) => s,
|
TcpStreamNewState::Waiting(ref s) => s,
|
||||||
TcpStreamConnect::Empty => panic!("can't poll TCP stream twice"),
|
TcpStreamNewState::Error(_) => {
|
||||||
|
let e = match mem::replace(self, TcpStreamNewState::Empty) {
|
||||||
|
TcpStreamNewState::Error(e) => e,
|
||||||
|
_ => panic!(),
|
||||||
|
};
|
||||||
|
return Err(e)
|
||||||
|
}
|
||||||
|
TcpStreamNewState::Empty => panic!("can't poll TCP stream twice"),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Once we've connected, wait for the stream to be writable as
|
// Once we've connected, wait for the stream to be writable as
|
||||||
@ -509,9 +485,9 @@ impl Future for TcpStreamConnect {
|
|||||||
return Err(e)
|
return Err(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
match mem::replace(self, TcpStreamConnect::Empty) {
|
match mem::replace(self, TcpStreamNewState::Empty) {
|
||||||
TcpStreamConnect::Waiting(stream) => Ok(Async::Ready(stream)),
|
TcpStreamNewState::Waiting(stream) => Ok(Async::Ready(stream)),
|
||||||
TcpStreamConnect::Empty => panic!(),
|
_ => panic!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user