From fc1640891e1cb4a2bf1ab032452e0e3ef6daf22c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 28 Aug 2019 13:25:50 -0700 Subject: [PATCH] net: perform DNS lookup on connect / bind. (#1499) A sealed `net::ToSocketAddrs` trait is added. This trait is not intended to be used by users. Instead, it is an argument to `connect` and `bind` functions. The operating system's DNS lookup functionality is used. Blocking operations are performed on a thread pool in order to avoid blocking the runtime. --- tokio-executor/Cargo.toml | 2 +- tokio-net/Cargo.toml | 11 +- tokio-net/src/addr.rs | 214 ++++++++++++++++++++++++++ tokio-net/src/driver/mod.rs | 4 +- tokio-net/src/lib.rs | 3 + tokio-net/src/tcp/listener.rs | 138 ++++++++++++----- tokio-net/src/tcp/stream.rs | 111 ++++++------- tokio-net/src/udp/socket.rs | 55 ++++++- tokio-net/tests/tcp_accept.rs | 41 +++-- tokio-net/tests/tcp_connect.rs | 137 ++++++++++++++++- tokio-net/tests/tcp_echo.rs | 3 +- tokio-net/tests/tcp_shutdown.rs | 3 +- tokio-net/tests/tcp_split.rs | 4 +- tokio-net/tests/udp.rs | 24 +-- tokio-tls/tests/smoke.rs | 6 +- tokio/examples/chat.rs | 3 +- tokio/examples/connect.rs | 8 +- tokio/examples/echo-udp.rs | 3 +- tokio/examples/echo.rs | 4 +- tokio/examples/hello_world.rs | 5 +- tokio/examples/print_each_packet.rs | 8 +- tokio/examples/proxy.rs | 13 +- tokio/examples/tinydb.rs | 5 +- tokio/examples/tinyhttp.rs | 5 +- tokio/examples/udp-client.rs | 4 +- tokio/examples/udp-codec.rs | 5 +- tokio/src/executor.rs | 4 +- tokio/src/lib.rs | 3 +- tokio/src/runtime/mod.rs | 8 +- tokio/tests/buffered.rs | 3 +- tokio/tests/reactor.rs | 3 +- tokio/tests/runtime_current_thread.rs | 3 +- tokio/tests/runtime_threaded.rs | 7 +- 33 files changed, 632 insertions(+), 218 deletions(-) create mode 100644 tokio-net/src/addr.rs diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml index de25bbce4..b902e7bc4 100644 --- a/tokio-executor/Cargo.toml +++ b/tokio-executor/Cargo.toml @@ -21,7 +21,7 @@ keywords = ["futures", "tokio"] categories = ["concurrency", "asynchronous"] [features] -blocking = ["tokio-sync"] +blocking = ["tokio-sync", "lazy_static"] current-thread = ["crossbeam-channel"] threadpool = [ "tokio-sync", diff --git a/tokio-net/Cargo.toml b/tokio-net/Cargo.toml index a2e056382..fef187752 100644 --- a/tokio-net/Cargo.toml +++ b/tokio-net/Cargo.toml @@ -24,7 +24,6 @@ categories = ["asynchronous", "network-programming"] async-traits = [] process = [ "crossbeam-queue", - "futures-util-preview", "libc", "mio-named-pipes", "signal", @@ -38,7 +37,6 @@ process = [ "winapi/winnt", ] signal = [ - "futures-util-preview", "mio-uds", "libc", "signal-hook-registry", @@ -48,18 +46,15 @@ signal = [ ] tcp = [ "bytes", - "futures-util-preview", "iovec", ] udp = [ "bytes", "futures-sink-preview", - "futures-util-preview", ] uds = [ "bytes", "mio-uds", - "futures-util-preview", "iovec", "libc", ] @@ -67,7 +62,7 @@ log = ["tracing/log"] [dependencies] tokio-codec = { version = "=0.2.0-alpha.2", path = "../tokio-codec" } -tokio-executor = { version = "=0.2.0-alpha.2", path = "../tokio-executor" } +tokio-executor = { version = "=0.2.0-alpha.2", features = ["blocking"], path = "../tokio-executor" } tokio-io = { version = "=0.2.0-alpha.2", path = "../tokio-io" } tokio-sync = { version = "=0.2.0-alpha.2", path = "../tokio-sync" } @@ -76,6 +71,7 @@ tracing = { version = "0.1.5", optional = true } # driver implementation crossbeam-utils = "0.6.0" futures-core-preview = "=0.3.0-alpha.18" +futures-util-preview = "=0.3.0-alpha.18" lazy_static = "1.0.2" mio = "0.6.14" num_cpus = "1.8.0" @@ -85,7 +81,6 @@ slab = "0.4.0" # TCP / UDP bytes = { version = "0.4", optional = true } futures-sink-preview = { version = "=0.3.0-alpha.18", optional = true } -futures-util-preview = { version = "=0.3.0-alpha.18", optional = true } iovec = { version = "0.1", optional = true } [target.'cfg(unix)'.dependencies] @@ -114,7 +109,7 @@ tokio-io-pool = "0.1.4" # UDS tests tempfile = "3" -futures-preview = "=0.3.0-alpha.18" +futures-preview = { version = "=0.3.0-alpha.18", features = ["nightly", "async-await"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio-net/src/addr.rs b/tokio-net/src/addr.rs new file mode 100644 index 000000000..8b782cfa3 --- /dev/null +++ b/tokio-net/src/addr.rs @@ -0,0 +1,214 @@ +use tokio_executor::blocking; + +use futures_util::future; +use std::io; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +/// Convert or resolve without blocking to one or more `SocketAddr` values. +/// +/// Currently, this trait is only used as an argument to Tokio functions that +/// need to reference a target socket address. +/// +/// This trait is sealed and is intended to be opaque. Users of Tokio should +/// only use `ToSocketAddrs` in trait bounds and __must not__ attempt to call +/// the functions directly or reference associated types. Changing these is not +/// considered a breaking change. +pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {} + +type ReadyFuture = future::Ready>; + +// ===== impl SocketAddr ===== + +impl ToSocketAddrs for SocketAddr {} + +impl sealed::ToSocketAddrsPriv for SocketAddr { + type Iter = std::option::IntoIter; + type Future = ReadyFuture; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(*self).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl str ===== + +impl ToSocketAddrs for str {} + +impl sealed::ToSocketAddrsPriv for str { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + + // First check if the input parses as a socket address + let res: Result = self.parse(); + + if let Ok(addr) = res { + return MaybeReady::Ready(Some(addr)); + } + + // Run DNS lookup on the blocking pool + let s = self.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&s) + })) + } +} + +// ===== impl (&str, u16) ===== + +impl ToSocketAddrs for (&'_ str, u16) {} + +impl sealed::ToSocketAddrsPriv for (&'_ str, u16) { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + use std::net::{SocketAddrV4, SocketAddrV6}; + + let (host, port) = *self; + + // try to parse the host as a regular IP address first + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV4::new(addr, port); + let addr = SocketAddr::V4(addr); + + return MaybeReady::Ready(Some(addr)); + } + + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + let addr = SocketAddr::V6(addr); + + return MaybeReady::Ready(Some(addr)); + } + + let host = host.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port)) + })) + } +} + +// ===== impl (IpAddr, u16) ===== + +impl ToSocketAddrs for (IpAddr, u16) {} + +impl sealed::ToSocketAddrsPriv for (IpAddr, u16) { + type Iter = std::option::IntoIter; + type Future = ReadyFuture; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(SocketAddr::from(*self)).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl String ===== + +impl ToSocketAddrs for String {} + +impl sealed::ToSocketAddrsPriv for String { + type Iter = ::Iter; + type Future = ::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (&self[..]).to_socket_addrs() + } +} + +// ===== impl &'_ impl ToSocketAddrs ===== + +impl ToSocketAddrs for &'_ T {} + +impl sealed::ToSocketAddrsPriv for &'_ T +where + T: sealed::ToSocketAddrsPriv + ?Sized, +{ + type Iter = T::Iter; + type Future = T::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (**self).to_socket_addrs() + } +} + +pub(crate) mod sealed { + //! The contents of this trait are intended to remain private and __not__ + //! part of the `ToSocketAddrs` public API. The details will change over + //! time. + + use tokio_executor::blocking::Blocking; + + use futures_core::ready; + use std::future::Future; + use std::io; + use std::net::SocketAddr; + use std::option; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::vec; + + #[doc(hidden)] + pub trait ToSocketAddrsPriv { + type Iter: Iterator + Send + 'static; + type Future: Future> + Send + 'static; + + fn to_socket_addrs(&self) -> Self::Future; + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum MaybeReady { + Ready(Option), + Blocking(Blocking>>), + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum OneOrMore { + One(option::IntoIter), + More(vec::IntoIter), + } + + impl Future for MaybeReady { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match *self { + MaybeReady::Ready(ref mut i) => { + let iter = OneOrMore::One(i.take().into_iter()); + Poll::Ready(Ok(iter)) + } + MaybeReady::Blocking(ref mut rx) => { + let res = ready!(Pin::new(rx).poll(cx)).map(OneOrMore::More); + + Poll::Ready(res) + } + } + } + } + + impl Iterator for OneOrMore { + type Item = SocketAddr; + + fn next(&mut self) -> Option { + match self { + OneOrMore::One(i) => i.next(), + OneOrMore::More(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option) { + match self { + OneOrMore::One(i) => i.size_hint(), + OneOrMore::More(i) => i.size_hint(), + } + } + } +} diff --git a/tokio-net/src/driver/mod.rs b/tokio-net/src/driver/mod.rs index aab2cbc21..381c9e57e 100644 --- a/tokio-net/src/driver/mod.rs +++ b/tokio-net/src/driver/mod.rs @@ -25,9 +25,7 @@ //! //! # async fn process(t: T) {} //! # async fn dox() -> Result<(), Box> { -//! let addr = "93.184.216.34:9243".parse()?; -//! -//! let stream = TcpStream::connect(&addr).await?; +//! let stream = TcpStream::connect("93.184.216.34:9243").await?; //! //! println!("successfully connected"); //! diff --git a/tokio-net/src/lib.rs b/tokio-net/src/lib.rs index 576aa32b3..5618c512f 100644 --- a/tokio-net/src/lib.rs +++ b/tokio-net/src/lib.rs @@ -38,6 +38,9 @@ #[macro_use] mod tracing; +mod addr; +pub use addr::ToSocketAddrs; + pub mod driver; pub mod util; diff --git a/tokio-net/src/tcp/listener.rs b/tokio-net/src/tcp/listener.rs index 8a064b7af..81cd71b6d 100644 --- a/tokio-net/src/tcp/listener.rs +++ b/tokio-net/src/tcp/listener.rs @@ -3,6 +3,7 @@ use super::incoming::Incoming; use super::TcpStream; use crate::driver::Handle; use crate::util::PollEvented; +use crate::ToSocketAddrs; use futures_core::ready; use futures_util::future::poll_fn; @@ -22,13 +23,13 @@ use std::task::{Context, Poll}; /// /// ```no_run /// use tokio::net::TcpListener; -/// use std::error::Error; +/// +/// use std::io; /// # async fn process_socket(socket: T) {} /// /// #[tokio::main] -/// async fn main() -> Result<(), Box> { -/// let addr = "127.0.0.1:8080".parse()?; -/// let mut listener = TcpListener::bind(&addr)?; +/// async fn main() -> io::Result<()> { +/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { /// let (socket, _) = listener.accept().await?; @@ -41,24 +42,60 @@ pub struct TcpListener { } impl TcpListener { - /// Create a new TCP listener associated with this event loop. + /// Creates a new TcpListener which will be bound to the specified address. /// - /// The TCP listener will bind to the provided `addr` address, if available. - /// If the result is `Ok`, the socket has successfully bound. + /// The returned listener is ready for accepting connections. + /// + /// Binding with a port number of 0 will request that the OS assigns a port + /// to this listener. The port allocated can be queried via the `local_addr` + /// method. + /// + /// The address type can be any implementor of `ToSocketAddrs` trait. + /// + /// If `addr` yields multiple addresses, bind will be attempted with each of + /// the addresses until one succeeds and returns the listener. If none of + /// the addresses succeed in creating a listener, the error returned from + /// the last attempt (the last address) is returned. /// /// # Examples /// - /// ``` - /// use std::net::SocketAddr; + /// ```no_run /// use tokio::net::TcpListener; /// - /// let addr = "127.0.0.1:0".parse::()?; - /// let listener = TcpListener::bind(&addr)?; - /// # Ok::<_, Box>(()) + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// + /// // use the listener + /// + /// Ok(()) + /// } /// ``` - pub fn bind(addr: &SocketAddr) -> io::Result { - let l = mio::net::TcpListener::bind(addr)?; - Ok(TcpListener::new(l)) + pub async fn bind(addr: A) -> io::Result { + let addrs = addr.to_socket_addrs().await?; + + let mut last_err = None; + + for addr in addrs { + match TcpListener::bind_addr(addr) { + Ok(listener) => return Ok(listener), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + fn bind_addr(addr: SocketAddr) -> io::Result { + let listener = mio::net::TcpListener::bind(&addr)?; + Ok(TcpListener::new(listener)) } /// Accept a new incoming connection from this listener. @@ -71,18 +108,22 @@ impl TcpListener { /// /// # Examples /// - /// ``` - /// # async fn dox() -> Result<(), Box> { + /// ```no_run /// use tokio::net::TcpListener; /// - /// let addr = "127.0.0.1:8080".parse()?; - /// let mut listener = TcpListener::bind(&addr)?; - /// match listener.accept().await { - /// Ok((_socket, addr)) => println!("new client: {:?}", addr), - /// Err(e) => println!("couldn't get client: {:?}", e), + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// match listener.accept().await { + /// Ok((_socket, addr)) => println!("new client: {:?}", addr), + /// Err(e) => println!("couldn't get client: {:?}", e), + /// } + /// + /// Ok(()) /// } - /// # Ok(()) - /// # } /// ``` pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { poll_fn(|cx| self.poll_accept(cx)).await @@ -178,13 +219,19 @@ impl TcpListener { /// /// ``` /// use tokio::net::TcpListener; + /// + /// use std::io; /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; /// - /// let addr = "127.0.0.1:8080".parse::()?; - /// let listener = TcpListener::bind(&addr)?; - /// assert_eq!(listener.local_addr()?, - /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))); - /// # Ok::<_, Box>(()) + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// assert_eq!(listener.local_addr()?, + /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))); + /// + /// Ok(()) + /// } /// ``` pub fn local_addr(&self) -> io::Result { self.io.get_ref().local_addr() @@ -215,17 +262,20 @@ impl TcpListener { /// /// # Examples /// - /// ``` + /// ```no_run /// use tokio::net::TcpListener; - /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:0".parse::()?; - /// let listener = TcpListener::bind(&addr)?; + /// use std::io; /// - /// listener.set_ttl(100).expect("could not set TTL"); + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; /// - /// assert_eq!(listener.ttl()?, 100); - /// # Ok::<_, Box>(()) + /// listener.set_ttl(100).expect("could not set TTL"); + /// assert_eq!(listener.ttl()?, 100); + /// + /// Ok(()) + /// } /// ``` pub fn ttl(&self) -> io::Result { self.io.get_ref().ttl() @@ -238,15 +288,19 @@ impl TcpListener { /// /// # Examples /// - /// ``` + /// ```no_run /// use tokio::net::TcpListener; - /// use std::net::SocketAddr; /// - /// let addr = "127.0.0.1:0".parse::()?; - /// let listener = TcpListener::bind(&addr)?; + /// use std::io; /// - /// listener.set_ttl(100).expect("could not set TTL"); - /// # Ok::<_, Box>(()) + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// + /// listener.set_ttl(100).expect("could not set TTL"); + /// + /// Ok(()) + /// } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.io.get_ref().set_ttl(ttl) diff --git a/tokio-net/src/tcp/stream.rs b/tokio-net/src/tcp/stream.rs index 8741355f6..9edde0273 100644 --- a/tokio-net/src/tcp/stream.rs +++ b/tokio-net/src/tcp/stream.rs @@ -4,6 +4,7 @@ use super::split::{ }; use crate::driver::Handle; use crate::util::PollEvented; +use crate::ToSocketAddrs; use tokio_io::{AsyncRead, AsyncWrite}; @@ -38,10 +39,8 @@ use std::time::Duration; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { -/// let addr = "127.0.0.1:8080".parse()?; -/// /// // Connect to a peer -/// let mut stream = TcpStream::connect(&addr).await?; +/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// // Write some data. /// stream.write_all(b"hello world!").await?; @@ -54,12 +53,15 @@ pub struct TcpStream { } impl TcpStream { - /// Create a new TCP stream connected to the specified address. + /// Opens a TCP connection to a remote host. /// - /// This function will create a new TCP socket and attempt to connect it to - /// the `addr` provided. The returned future will be resolved once the - /// stream has successfully connected, or it will return an error if one - /// occurs. + /// `addr` is an address of the remote host. Anything which implements + /// `ToSocketAddrs` trait can be supplied for the address. + /// + /// If `addr` yields multiple addresses, connect will be attempted with each + /// of the addresses until a connection is successful. If none of the + /// addresses result in a successful connection, the error returned from the + /// last connection attempt (the last address) is returned. /// /// # Examples /// @@ -70,10 +72,8 @@ impl TcpStream { /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// /// // Connect to a peer - /// let mut stream = TcpStream::connect(&addr).await?; + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// // Write some data. /// stream.write_all(b"hello world!").await?; @@ -81,8 +81,29 @@ impl TcpStream { /// Ok(()) /// } /// ``` - pub async fn connect(addr: &SocketAddr) -> io::Result { - let sys = mio::net::TcpStream::connect(addr)?; + pub async fn connect(addr: A) -> io::Result { + let addrs = addr.to_socket_addrs().await?; + + let mut last_err = None; + + for addr in addrs { + match TcpStream::connect_addr(addr).await { + Ok(stream) => return Ok(stream), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + /// Establish a connection to the specified `addr`. + async fn connect_addr(addr: SocketAddr) -> io::Result { + let sys = mio::net::TcpStream::connect(&addr)?; let stream = TcpStream::new(sys); // Once we've connected, wait for the stream to be writable as @@ -136,11 +157,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.local_addr()?); /// # Ok(()) @@ -155,11 +174,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.peer_addr()?); /// # Ok(()) @@ -198,10 +215,8 @@ impl TcpStream { /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// /// // Connect to a peer - /// let mut stream = TcpStream::connect(&addr).await?; + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// let mut b1 = [0; 10]; /// let mut b2 = [0; 10]; @@ -236,10 +251,8 @@ impl TcpStream { /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// /// // Connect to a peer - /// let mut stream = TcpStream::connect(&addr).await?; + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// // Shutdown the stream /// stream.shutdown(Shutdown::Write)?; @@ -261,11 +274,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.nodelay()?); /// # Ok(()) @@ -287,11 +298,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_nodelay(true)?; /// # Ok(()) @@ -311,11 +320,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.recv_buffer_size()?); /// # Ok(()) @@ -334,11 +341,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_recv_buffer_size(100)?; /// # Ok(()) @@ -367,11 +372,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.send_buffer_size()?); /// # Ok(()) @@ -390,11 +393,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_send_buffer_size(100)?; /// # Ok(()) @@ -415,11 +416,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.keepalive()?); /// # Ok(()) @@ -446,11 +445,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_keepalive(None)?; /// # Ok(()) @@ -470,11 +467,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.ttl()?); /// # Ok(()) @@ -493,11 +488,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_ttl(123)?; /// # Ok(()) @@ -518,11 +511,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.linger()?); /// # Ok(()) @@ -548,11 +539,9 @@ impl TcpStream { /// /// ```no_run /// use tokio::net::TcpStream; - /// use std::net::SocketAddr; /// /// # async fn dox() -> Result<(), Box> { - /// let addr = "127.0.0.1:8080".parse()?; - /// let stream = TcpStream::connect(&addr).await?; + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_linger(None)?; /// # Ok(()) diff --git a/tokio-net/src/udp/socket.rs b/tokio-net/src/udp/socket.rs index 90d1c1e90..00a42469a 100644 --- a/tokio-net/src/udp/socket.rs +++ b/tokio-net/src/udp/socket.rs @@ -1,6 +1,7 @@ use super::split::{split, UdpSocketRecvHalf, UdpSocketSendHalf}; use crate::driver::Handle; use crate::util::PollEvented; +use crate::ToSocketAddrs; use futures_core::ready; use futures_util::future::poll_fn; @@ -19,8 +20,27 @@ pub struct UdpSocket { impl UdpSocket { /// This function will create a new UDP socket and attempt to bind it to /// the `addr` provided. - pub fn bind(addr: &SocketAddr) -> io::Result { - mio::net::UdpSocket::bind(addr).map(UdpSocket::new) + pub async fn bind(addr: A) -> io::Result { + let addrs = addr.to_socket_addrs().await?; + let mut last_err = None; + + for addr in addrs { + match UdpSocket::bind_addr(addr) { + Ok(socket) => return Ok(socket), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + fn bind_addr(addr: SocketAddr) -> io::Result { + mio::net::UdpSocket::bind(&addr).map(UdpSocket::new) } fn new(socket: mio::net::UdpSocket) -> UdpSocket { @@ -63,8 +83,23 @@ impl UdpSocket { /// Connects the UDP socket setting the default destination for send() and /// limiting packets that are read via recv from the address specified in /// `addr`. - pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> { - self.io.get_ref().connect(*addr) + pub async fn connect(&self, addr: A) -> io::Result<()> { + let addrs = addr.to_socket_addrs().await?; + let mut last_err = None; + + for addr in addrs { + match self.io.get_ref().connect(addr) { + Ok(_) => return Ok(()), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) } /// Returns a future that sends data on the socket to the remote address to which it is connected. @@ -141,8 +176,16 @@ impl UdpSocket { /// /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. - pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result { - poll_fn(|cx| self.poll_send_to_priv(cx, buf, target)).await + pub async fn send_to(&mut self, buf: &[u8], target: A) -> io::Result { + let mut addrs = target.to_socket_addrs().await?; + + match addrs.next() { + Some(target) => poll_fn(|cx| self.poll_send_to_priv(cx, buf, &target)).await, + None => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "no addresses to send data to", + )), + } } pub(crate) fn poll_send_to_priv( diff --git a/tokio-net/tests/tcp_accept.rs b/tokio-net/tests/tcp_accept.rs index ecb10e471..09a2a6035 100644 --- a/tokio-net/tests/tcp_accept.rs +++ b/tokio-net/tests/tcp_accept.rs @@ -4,21 +4,36 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot; use tokio_test::assert_ok; -#[tokio::test] -async fn accept() { - let addr = "127.0.0.1:0".parse().unwrap(); - let mut listener = assert_ok!(TcpListener::bind(&addr)); - let addr = listener.local_addr().unwrap(); +use std::net::{IpAddr, SocketAddr}; - let (tx, rx) = oneshot::channel(); +macro_rules! test_accept { + ($(($ident:ident, $target:expr),)*) => { + $( + #[tokio::test] + async fn $ident() { + let mut listener = assert_ok!(TcpListener::bind($target).await); + let addr = listener.local_addr().unwrap(); - tokio::spawn(async move { - let (socket, _) = assert_ok!(listener.accept().await); - assert_ok!(tx.send(socket)); - }); + let (tx, rx) = oneshot::channel(); - let cli = assert_ok!(TcpStream::connect(&addr).await); - let srv = assert_ok!(rx.await); + tokio::spawn(async move { + let (socket, _) = assert_ok!(listener.accept().await); + assert_ok!(tx.send(socket)); + }); - assert_eq!(cli.local_addr().unwrap(), srv.peer_addr().unwrap()); + let cli = assert_ok!(TcpStream::connect(&addr).await); + let srv = assert_ok!(rx.await); + + assert_eq!(cli.local_addr().unwrap(), srv.peer_addr().unwrap()); + } + )* + } +} + +test_accept! { + (ip_str, "127.0.0.1:0"), + (host_str, "localhost:0"), + (socket_addr, "127.0.0.1:0".parse::().unwrap()), + (str_port_tuple, ("127.0.0.1", 0)), + (ip_port_tuple, ("127.0.0.1".parse::().unwrap(), 0)), } diff --git a/tokio-net/tests/tcp_connect.rs b/tokio-net/tests/tcp_connect.rs index f452ca898..daf0b3c94 100644 --- a/tokio-net/tests/tcp_connect.rs +++ b/tokio-net/tests/tcp_connect.rs @@ -4,11 +4,13 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot; use tokio_test::assert_ok; +use futures::join; + #[tokio::test] -async fn connect() { - let addr = assert_ok!("127.0.0.1:0".parse()); - let mut srv = assert_ok!(TcpListener::bind(&addr)); +async fn connect_v4() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(srv.local_addr()); + assert!(addr.is_ipv4()); let (tx, rx) = oneshot::channel(); @@ -31,6 +33,135 @@ async fn connect() { ); } +#[tokio::test] +async fn connect_v6() { + let mut srv = assert_ok!(TcpListener::bind("[::1]:0").await); + let addr = assert_ok!(srv.local_addr()); + assert!(addr.is_ipv6()); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let (socket, addr) = assert_ok!(srv.accept().await); + assert_eq!(addr, assert_ok!(socket.peer_addr())); + assert_ok!(tx.send(socket)); + }); + + let mine = assert_ok!(TcpStream::connect(&addr).await); + let theirs = assert_ok!(rx.await); + + assert_eq!( + assert_ok!(mine.local_addr()), + assert_ok!(theirs.peer_addr()) + ); + assert_eq!( + assert_ok!(theirs.local_addr()), + assert_ok!(mine.peer_addr()) + ); +} + +#[tokio::test] +async fn connect_addr_ip_string() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("127.0.0.1:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_str_slice() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("127.0.0.1:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr[..]).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_host_string() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("localhost:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = (addr.ip(), addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_str_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = ("127.0.0.1", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_host_str_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = ("localhost", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + /* * TODO: bring this back once TCP exposes HUP again * diff --git a/tokio-net/tests/tcp_echo.rs b/tokio-net/tests/tcp_echo.rs index 1fb6cfdcc..bbcd02cde 100644 --- a/tokio-net/tests/tcp_echo.rs +++ b/tokio-net/tests/tcp_echo.rs @@ -11,8 +11,7 @@ async fn echo_server() { let (tx, rx) = oneshot::channel(); - let addr = assert_ok!("127.0.0.1:0".parse()); - let mut srv = assert_ok!(TcpListener::bind(&addr)); + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(srv.local_addr()); let msg = "foo bar baz"; diff --git a/tokio-net/tests/tcp_shutdown.rs b/tokio-net/tests/tcp_shutdown.rs index bf1ebf990..f9570ff9a 100644 --- a/tokio-net/tests/tcp_shutdown.rs +++ b/tokio-net/tests/tcp_shutdown.rs @@ -7,8 +7,7 @@ use tokio_test::assert_ok; #[tokio::test] async fn shutdown() { - let addr = assert_ok!("127.0.0.1:0".parse()); - let mut srv = assert_ok!(TcpListener::bind(&addr)); + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(srv.local_addr()); tokio::spawn(async move { diff --git a/tokio-net/tests/tcp_split.rs b/tokio-net/tests/tcp_split.rs index 07c758a62..269dd895f 100644 --- a/tokio-net/tests/tcp_split.rs +++ b/tokio-net/tests/tcp_split.rs @@ -2,7 +2,7 @@ use tokio_net::tcp::{TcpListener, TcpStream}; #[tokio::test] async fn split_reunite() -> std::io::Result<()> { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap())?; + let listener = TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; let stream = TcpStream::connect(&addr).await?; @@ -13,7 +13,7 @@ async fn split_reunite() -> std::io::Result<()> { #[tokio::test] async fn split_reunite_error() -> std::io::Result<()> { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap())?; + let listener = TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; let stream = TcpStream::connect(&addr).await?; let stream1 = TcpStream::connect(&addr).await?; diff --git a/tokio-net/tests/udp.rs b/tokio-net/tests/udp.rs index a0f0f4e40..fb8f068a2 100644 --- a/tokio-net/tests/udp.rs +++ b/tokio-net/tests/udp.rs @@ -9,11 +9,11 @@ use std::io; #[tokio::test] async fn send_recv() -> std::io::Result<()> { - let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; - let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let mut sender = UdpSocket::bind("127.0.0.1:0").await?; + let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; - sender.connect(&receiver.local_addr()?)?; - receiver.connect(&sender.local_addr()?)?; + sender.connect(receiver.local_addr()?).await?; + receiver.connect(sender.local_addr()?).await?; let message = b"hello!"; sender.send(message).await?; @@ -27,8 +27,8 @@ async fn send_recv() -> std::io::Result<()> { #[tokio::test] async fn send_to_recv_from() -> std::io::Result<()> { - let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; - let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let mut sender = UdpSocket::bind("127.0.0.1:0").await?; + let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; let message = b"hello!"; let receiver_addr = receiver.local_addr()?; @@ -44,7 +44,7 @@ async fn send_to_recv_from() -> std::io::Result<()> { #[tokio::test] async fn split() -> std::io::Result<()> { - let socket = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let socket = UdpSocket::bind("127.0.0.1:0").await?; let (mut r, mut s) = socket.split(); let msg = b"hello"; @@ -60,7 +60,7 @@ async fn split() -> std::io::Result<()> { #[tokio::test] async fn reunite() -> std::io::Result<()> { - let socket = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let socket = UdpSocket::bind("127.0.0.1:0").await?; let (s, r) = socket.split(); assert!(s.reunite(r).is_ok()); Ok(()) @@ -68,8 +68,8 @@ async fn reunite() -> std::io::Result<()> { #[tokio::test] async fn reunite_error() -> std::io::Result<()> { - let socket = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; - let socket1 = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let socket1 = UdpSocket::bind("127.0.0.1:0").await?; let (s, _) = socket.split(); let (_, r1) = socket1.split(); assert!(s.reunite(r1).is_err()); @@ -101,8 +101,8 @@ impl Encoder for ByteCodec { #[tokio::test] async fn send_framed() -> std::io::Result<()> { - let mut a_soc = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; - let mut b_soc = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?; + let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?; + let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?; let a_addr = a_soc.local_addr()?; let b_addr = b_soc.local_addr()?; diff --git a/tokio-tls/tests/smoke.rs b/tokio-tls/tests/smoke.rs index 6b27370b1..d1a3863b2 100644 --- a/tokio-tls/tests/smoke.rs +++ b/tokio-tls/tests/smoke.rs @@ -523,7 +523,7 @@ async fn client_to_server() { drop(env_logger::try_init()); // Create a server listening on a port, then figure out what that port is - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let srv = t!(TcpListener::bind("127.0.0.1:0").await); let addr = t!(srv.local_addr()); let (server_cx, client_cx) = contexts(); @@ -558,7 +558,7 @@ async fn server_to_client() { drop(env_logger::try_init()); // Create a server listening on a port, then figure out what that port is - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let srv = t!(TcpListener::bind("127.0.0.1:0").await); let addr = t!(srv.local_addr()); let (server_cx, client_cx) = contexts(); @@ -589,7 +589,7 @@ async fn one_byte_at_a_time() { const AMT: usize = 1024; drop(env_logger::try_init()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let srv = t!(TcpListener::bind("127.0.0.1:0").await); let addr = t!(srv.local_addr()); let (server_cx, client_cx) = contexts(); diff --git a/tokio/examples/chat.rs b/tokio/examples/chat.rs index 8ace878c9..719a25309 100644 --- a/tokio/examples/chat.rs +++ b/tokio/examples/chat.rs @@ -45,12 +45,11 @@ async fn main() -> Result<(), Box> { let state = Lock::new(Shared::new()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:6142".to_string()); - let addr = addr.parse::()?; // Bind a TCP listener to the socket address. // // Note that this is the Tokio TcpListener, which is fully async. - let mut listener = TcpListener::bind(&addr)?; + let mut listener = TcpListener::bind(&addr).await?; println!("server running on {}", addr); diff --git a/tokio/examples/connect.rs b/tokio/examples/connect.rs index 6036eec30..0ba8a6f2d 100644 --- a/tokio/examples/connect.rs +++ b/tokio/examples/connect.rs @@ -126,13 +126,13 @@ mod udp { // We'll bind our UDP socket to a local IP/port, but for now we // basically let the OS pick both of those. let bind_addr = if addr.ip().is_ipv4() { - "0.0.0.0:0".parse()? + "0.0.0.0:0" } else { - "[::]:0".parse()? + "[::]:0" }; - let socket = UdpSocket::bind(&bind_addr)?; - socket.connect(addr)?; + let socket = UdpSocket::bind(&bind_addr).await?; + socket.connect(addr).await?; let (mut r, mut w) = socket.split(); future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?; diff --git a/tokio/examples/echo-udp.rs b/tokio/examples/echo-udp.rs index 330a3cd83..f1e8134df 100644 --- a/tokio/examples/echo-udp.rs +++ b/tokio/examples/echo-udp.rs @@ -52,9 +52,8 @@ impl Server { #[tokio::main] async fn main() -> Result<(), Box> { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; - let socket = UdpSocket::bind(&addr)?; + let socket = UdpSocket::bind(&addr).await?; println!("Listening on: {}", socket.local_addr()?); let server = Server { diff --git a/tokio/examples/echo.rs b/tokio/examples/echo.rs index c8842b737..455aebde0 100644 --- a/tokio/examples/echo.rs +++ b/tokio/examples/echo.rs @@ -27,7 +27,6 @@ use tokio::net::TcpListener; use std::env; use std::error::Error; -use std::net::SocketAddr; #[tokio::main] async fn main() -> Result<(), Box> { @@ -35,12 +34,11 @@ async fn main() -> Result<(), Box> { // program, but otherwise we'll just set up our TCP listener on // 127.0.0.1:8080 for connections. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; // Next up we create a TCP listener which will listen for incoming // connections. This TCP listener is bound to the address we determined // above and must be associated with an event loop. - let mut listener = TcpListener::bind(&addr)?; + let mut listener = TcpListener::bind(&addr).await?; println!("Listening on: {}", addr); loop { diff --git a/tokio/examples/hello_world.rs b/tokio/examples/hello_world.rs index 69c3b3f60..8ff40902d 100644 --- a/tokio/examples/hello_world.rs +++ b/tokio/examples/hello_world.rs @@ -13,7 +13,6 @@ #![warn(rust_2018_idioms)] -use tokio; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; @@ -21,12 +20,10 @@ use std::error::Error; #[tokio::main] pub async fn main() -> Result<(), Box> { - let addr = "127.0.0.1:6142".parse()?; - // Open a TCP stream to the socket address. // // Note that this is the Tokio TcpStream, which is fully async. - let mut stream = TcpStream::connect(&addr).await?; + let mut stream = TcpStream::connect("127.0.0.1:6142").await?; println!("created stream"); let result = stream.write(b"hello world\n").await; diff --git a/tokio/examples/print_each_packet.rs b/tokio/examples/print_each_packet.rs index b801ac8bd..3729c3e96 100644 --- a/tokio/examples/print_each_packet.rs +++ b/tokio/examples/print_each_packet.rs @@ -54,28 +54,26 @@ #![warn(rust_2018_idioms)] -use std::env; -use std::net::SocketAddr; - use tokio; use tokio::codec::{BytesCodec, Decoder}; use tokio::net::TcpListener; use tokio::prelude::*; +use std::env; + #[tokio::main] async fn main() -> Result<(), Box> { // Allow passing an address to listen on as the first argument of this // program, but otherwise we'll just set up our TCP listener on // 127.0.0.1:8080 for connections. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; // Next up we create a TCP listener which will listen for incoming // connections. This TCP listener is bound to the address we determined // above and must be associated with an event loop, so we pass in a handle // to our event loop. After the socket's created we inform that we're ready // to go and start accepting connections. - let mut listener = TcpListener::bind(&addr)?; + let mut listener = TcpListener::bind(&addr).await?; println!("Listening on: {}", addr); loop { diff --git a/tokio/examples/proxy.rs b/tokio/examples/proxy.rs index 848c0a40d..7946be8a9 100644 --- a/tokio/examples/proxy.rs +++ b/tokio/examples/proxy.rs @@ -23,7 +23,7 @@ #![warn(rust_2018_idioms)] use futures::{future::try_join, FutureExt, StreamExt}; -use std::{env, error::Error, net::SocketAddr}; +use std::{env, error::Error}; use tokio::{ io::AsyncReadExt, net::{TcpListener, TcpStream}, @@ -32,18 +32,15 @@ use tokio::{ #[tokio::main] async fn main() -> Result<(), Box> { let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); - let listen_addr = listen_addr.parse::()?; - let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); - let server_addr = server_addr.parse::()?; println!("Listening on: {}", listen_addr); println!("Proxying to: {}", server_addr); - let mut incoming = TcpListener::bind(&listen_addr)?.incoming(); + let mut incoming = TcpListener::bind(listen_addr).await?.incoming(); while let Some(Ok(inbound)) = incoming.next().await { - let transfer = transfer(inbound, server_addr).map(|r| { + let transfer = transfer(inbound, server_addr.clone()).map(|r| { if let Err(e) = r { println!("Failed to transfer; error={}", e); } @@ -55,8 +52,8 @@ async fn main() -> Result<(), Box> { Ok(()) } -async fn transfer(inbound: TcpStream, proxy_addr: SocketAddr) -> Result<(), Box> { - let outbound = TcpStream::connect(&proxy_addr).await?; +async fn transfer(inbound: TcpStream, proxy_addr: String) -> Result<(), Box> { + let outbound = TcpStream::connect(proxy_addr).await?; let (mut ri, mut wi) = inbound.split(); let (mut ro, mut wo) = outbound.split(); diff --git a/tokio/examples/tinydb.rs b/tokio/examples/tinydb.rs index fa3cefee5..be4951dc5 100644 --- a/tokio/examples/tinydb.rs +++ b/tokio/examples/tinydb.rs @@ -44,7 +44,6 @@ use std::collections::HashMap; use std::env; use std::error::Error; -use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use tokio; @@ -88,8 +87,8 @@ async fn main() -> Result<(), Box> { // Parse the address we're going to run this server on // and set up our TCP listener to accept connections. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; - let mut listener = TcpListener::bind(&addr)?; + + let mut listener = TcpListener::bind(&addr).await?; println!("Listening on: {}", addr); // Create the shared state of this server that will be shared amongst all diff --git a/tokio/examples/tinyhttp.rs b/tokio/examples/tinyhttp.rs index 6aa5a183b..65074018f 100644 --- a/tokio/examples/tinyhttp.rs +++ b/tokio/examples/tinyhttp.rs @@ -17,7 +17,7 @@ use bytes::BytesMut; use futures::{SinkExt, StreamExt}; use http::{header::HeaderValue, Request, Response, StatusCode}; use serde::Serialize; -use std::{env, error::Error, fmt, io, net::SocketAddr}; +use std::{env, error::Error, fmt, io}; use tokio::{ codec::{Decoder, Encoder, Framed}, net::{TcpListener, TcpStream}, @@ -28,9 +28,8 @@ async fn main() -> Result<(), Box> { // Parse the arguments, bind the TCP socket we'll be listening to, spin up // our worker threads, and start shipping sockets to those worker threads. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr.parse::()?; - let mut incoming = TcpListener::bind(&addr)?.incoming(); + let mut incoming = TcpListener::bind(&addr).await?.incoming(); println!("Listening on: {}", addr); while let Some(Ok(stream)) = incoming.next().await { diff --git a/tokio/examples/udp-client.rs b/tokio/examples/udp-client.rs index 915c06d94..5437daf60 100644 --- a/tokio/examples/udp-client.rs +++ b/tokio/examples/udp-client.rs @@ -55,9 +55,9 @@ async fn main() -> Result<(), Box> { } .parse()?; - let mut socket = UdpSocket::bind(&local_addr)?; + let mut socket = UdpSocket::bind(local_addr).await?; const MAX_DATAGRAM_SIZE: usize = 65_507; - socket.connect(&remote_addr)?; + socket.connect(&remote_addr).await?; let data = get_stdin_data()?; socket.send(&data).await?; let mut data = vec![0u8; MAX_DATAGRAM_SIZE]; diff --git a/tokio/examples/udp-codec.rs b/tokio/examples/udp-codec.rs index c055eaa40..7d0aaf69d 100644 --- a/tokio/examples/udp-codec.rs +++ b/tokio/examples/udp-codec.rs @@ -27,11 +27,10 @@ async fn main() -> Result<(), Box> { let _ = env_logger::init(); let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string()); - let addr = addr.parse::()?; // Bind both our sockets and then figure out what ports we got. - let a = UdpSocket::bind(&addr)?; - let b = UdpSocket::bind(&addr)?; + let a = UdpSocket::bind(&addr).await?; + let b = UdpSocket::bind(&addr).await?; let b_addr = b.local_addr()?; diff --git a/tokio/src/executor.rs b/tokio/src/executor.rs index 7026cd63e..75f0f9d4e 100644 --- a/tokio/src/executor.rs +++ b/tokio/src/executor.rs @@ -73,8 +73,7 @@ pub struct Spawn(()); /// /// # async fn process(t: T) {} /// # async fn dox() -> Result<(), Box> { -/// let addr = "127.0.0.1:8080".parse()?; -/// let mut listener = TcpListener::bind(&addr).unwrap(); +/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { /// let (socket, _) = listener.accept().await?; @@ -84,7 +83,6 @@ pub struct Spawn(()); /// process(socket).await /// }); /// } -/// # Ok(()) /// # } /// ``` /// diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 2c804c770..7ddcefdf3 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -35,8 +35,7 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { -//! let addr = "127.0.0.1:8080".parse()?; -//! let mut listener = TcpListener::bind(&addr).unwrap(); +//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 9b630d87e..4ba6663a2 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -39,8 +39,7 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { -//! let addr = "127.0.0.1:8080".parse()?; -//! let mut listener = TcpListener::bind(&addr).unwrap(); +//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; @@ -87,12 +86,11 @@ //! //! fn main() -> Result<(), Box> { //! // Create the runtime -//! let mut rt = Runtime::new().unwrap(); +//! let mut rt = Runtime::new()?; //! //! // Spawn the root task //! rt.block_on(async { -//! let addr = "127.0.0.1:8080".parse()?; -//! let mut listener = TcpListener::bind(&addr).unwrap(); +//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; diff --git a/tokio/tests/buffered.rs b/tokio/tests/buffered.rs index 351cb30a0..fe4190997 100644 --- a/tokio/tests/buffered.rs +++ b/tokio/tests/buffered.rs @@ -13,8 +13,7 @@ use std::thread; async fn echo_server() { const N: usize = 1024; - let addr = assert_ok!("127.0.0.1:0".parse()); - let mut srv = assert_ok!(TcpListener::bind(&addr)); + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(srv.local_addr()); let msg = "foo bar baz"; diff --git a/tokio/tests/reactor.rs b/tokio/tests/reactor.rs index 59ff81281..f1ed8703f 100644 --- a/tokio/tests/reactor.rs +++ b/tokio/tests/reactor.rs @@ -50,9 +50,8 @@ fn test_drop_on_notify() { // Define a task that just drains the listener let task = Arc::new(Task::new(async move { - let addr = assert_ok!("127.0.0.1:0".parse()); // Create a listener - let mut listener = assert_ok!(TcpListener::bind(&addr)); + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); // Send the address let addr = listener.local_addr().unwrap(); diff --git a/tokio/tests/runtime_current_thread.rs b/tokio/tests/runtime_current_thread.rs index 9f77ac7c3..ff742707e 100644 --- a/tokio/tests/runtime_current_thread.rs +++ b/tokio/tests/runtime_current_thread.rs @@ -13,8 +13,7 @@ use std::time::{Duration, Instant}; use tokio::timer::delay; async fn client_server(tx: mpsc::Sender<()>) { - let addr = assert_ok!("127.0.0.1:0".parse()); - let mut server = assert_ok!(TcpListener::bind(&addr)); + let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); // Get the assigned address let addr = assert_ok!(server.local_addr()); diff --git a/tokio/tests/runtime_threaded.rs b/tokio/tests/runtime_threaded.rs index bd8ce6a02..36f652dda 100644 --- a/tokio/tests/runtime_threaded.rs +++ b/tokio/tests/runtime_threaded.rs @@ -15,8 +15,7 @@ use std::thread; use std::time::{Duration, Instant}; async fn client_server(tx: mpsc::Sender<()>) { - let addr = assert_ok!("127.0.0.1:0".parse()); - let mut server = assert_ok!(TcpListener::bind(&addr)); + let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); // Get the assigned address let addr = assert_ok!(server.local_addr()); @@ -78,11 +77,9 @@ fn block_on_socket() { let rt = Runtime::new().unwrap(); rt.block_on(async move { - let addr = "127.0.0.1:0".parse().unwrap(); - let (tx, rx) = oneshot::channel(); - let mut listener = TcpListener::bind(&addr).unwrap(); + let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move {