net: perform DNS lookup on connect / bind. (#1499)

A sealed `net::ToSocketAddrs` trait is added. This trait is not intended
to be used by users. Instead, it is an argument to `connect` and `bind`
functions.

The operating system's DNS lookup functionality is used. Blocking
operations are performed on a thread pool in order to avoid blocking the
runtime.
This commit is contained in:
Carl Lerche 2019-08-28 13:25:50 -07:00 committed by GitHub
parent de9f05d4d3
commit fc1640891e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 632 additions and 218 deletions

View File

@ -21,7 +21,7 @@ keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]
[features]
blocking = ["tokio-sync"]
blocking = ["tokio-sync", "lazy_static"]
current-thread = ["crossbeam-channel"]
threadpool = [
"tokio-sync",

View File

@ -24,7 +24,6 @@ categories = ["asynchronous", "network-programming"]
async-traits = []
process = [
"crossbeam-queue",
"futures-util-preview",
"libc",
"mio-named-pipes",
"signal",
@ -38,7 +37,6 @@ process = [
"winapi/winnt",
]
signal = [
"futures-util-preview",
"mio-uds",
"libc",
"signal-hook-registry",
@ -48,18 +46,15 @@ signal = [
]
tcp = [
"bytes",
"futures-util-preview",
"iovec",
]
udp = [
"bytes",
"futures-sink-preview",
"futures-util-preview",
]
uds = [
"bytes",
"mio-uds",
"futures-util-preview",
"iovec",
"libc",
]
@ -67,7 +62,7 @@ log = ["tracing/log"]
[dependencies]
tokio-codec = { version = "=0.2.0-alpha.2", path = "../tokio-codec" }
tokio-executor = { version = "=0.2.0-alpha.2", path = "../tokio-executor" }
tokio-executor = { version = "=0.2.0-alpha.2", features = ["blocking"], path = "../tokio-executor" }
tokio-io = { version = "=0.2.0-alpha.2", path = "../tokio-io" }
tokio-sync = { version = "=0.2.0-alpha.2", path = "../tokio-sync" }
@ -76,6 +71,7 @@ tracing = { version = "0.1.5", optional = true }
# driver implementation
crossbeam-utils = "0.6.0"
futures-core-preview = "=0.3.0-alpha.18"
futures-util-preview = "=0.3.0-alpha.18"
lazy_static = "1.0.2"
mio = "0.6.14"
num_cpus = "1.8.0"
@ -85,7 +81,6 @@ slab = "0.4.0"
# TCP / UDP
bytes = { version = "0.4", optional = true }
futures-sink-preview = { version = "=0.3.0-alpha.18", optional = true }
futures-util-preview = { version = "=0.3.0-alpha.18", optional = true }
iovec = { version = "0.1", optional = true }
[target.'cfg(unix)'.dependencies]
@ -114,7 +109,7 @@ tokio-io-pool = "0.1.4"
# UDS tests
tempfile = "3"
futures-preview = "=0.3.0-alpha.18"
futures-preview = { version = "=0.3.0-alpha.18", features = ["nightly", "async-await"] }
[package.metadata.docs.rs]
all-features = true

214
tokio-net/src/addr.rs Normal file
View File

@ -0,0 +1,214 @@
use tokio_executor::blocking;
use futures_util::future;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
/// Convert or resolve without blocking to one or more `SocketAddr` values.
///
/// Currently, this trait is only used as an argument to Tokio functions that
/// need to reference a target socket address.
///
/// This trait is sealed and is intended to be opaque. Users of Tokio should
/// only use `ToSocketAddrs` in trait bounds and __must not__ attempt to call
/// the functions directly or reference associated types. Changing these is not
/// considered a breaking change.
pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {}
type ReadyFuture<T> = future::Ready<io::Result<T>>;
// ===== impl SocketAddr =====
impl ToSocketAddrs for SocketAddr {}
impl sealed::ToSocketAddrsPriv for SocketAddr {
type Iter = std::option::IntoIter<SocketAddr>;
type Future = ReadyFuture<Self::Iter>;
fn to_socket_addrs(&self) -> Self::Future {
let iter = Some(*self).into_iter();
future::ready(Ok(iter))
}
}
// ===== impl str =====
impl ToSocketAddrs for str {}
impl sealed::ToSocketAddrsPriv for str {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;
fn to_socket_addrs(&self) -> Self::Future {
use sealed::MaybeReady;
// First check if the input parses as a socket address
let res: Result<SocketAddr, _> = self.parse();
if let Ok(addr) = res {
return MaybeReady::Ready(Some(addr));
}
// Run DNS lookup on the blocking pool
let s = self.to_owned();
MaybeReady::Blocking(blocking::run(move || {
std::net::ToSocketAddrs::to_socket_addrs(&s)
}))
}
}
// ===== impl (&str, u16) =====
impl ToSocketAddrs for (&'_ str, u16) {}
impl sealed::ToSocketAddrsPriv for (&'_ str, u16) {
type Iter = sealed::OneOrMore;
type Future = sealed::MaybeReady;
fn to_socket_addrs(&self) -> Self::Future {
use sealed::MaybeReady;
use std::net::{SocketAddrV4, SocketAddrV6};
let (host, port) = *self;
// try to parse the host as a regular IP address first
if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port);
let addr = SocketAddr::V4(addr);
return MaybeReady::Ready(Some(addr));
}
if let Ok(addr) = host.parse::<Ipv6Addr>() {
let addr = SocketAddrV6::new(addr, port, 0, 0);
let addr = SocketAddr::V6(addr);
return MaybeReady::Ready(Some(addr));
}
let host = host.to_owned();
MaybeReady::Blocking(blocking::run(move || {
std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port))
}))
}
}
// ===== impl (IpAddr, u16) =====
impl ToSocketAddrs for (IpAddr, u16) {}
impl sealed::ToSocketAddrsPriv for (IpAddr, u16) {
type Iter = std::option::IntoIter<SocketAddr>;
type Future = ReadyFuture<Self::Iter>;
fn to_socket_addrs(&self) -> Self::Future {
let iter = Some(SocketAddr::from(*self)).into_iter();
future::ready(Ok(iter))
}
}
// ===== impl String =====
impl ToSocketAddrs for String {}
impl sealed::ToSocketAddrsPriv for String {
type Iter = <str as sealed::ToSocketAddrsPriv>::Iter;
type Future = <str as sealed::ToSocketAddrsPriv>::Future;
fn to_socket_addrs(&self) -> Self::Future {
(&self[..]).to_socket_addrs()
}
}
// ===== impl &'_ impl ToSocketAddrs =====
impl<T: ToSocketAddrs + ?Sized> ToSocketAddrs for &'_ T {}
impl<T> sealed::ToSocketAddrsPriv for &'_ T
where
T: sealed::ToSocketAddrsPriv + ?Sized,
{
type Iter = T::Iter;
type Future = T::Future;
fn to_socket_addrs(&self) -> Self::Future {
(**self).to_socket_addrs()
}
}
pub(crate) mod sealed {
//! The contents of this trait are intended to remain private and __not__
//! part of the `ToSocketAddrs` public API. The details will change over
//! time.
use tokio_executor::blocking::Blocking;
use futures_core::ready;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::option;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::vec;
#[doc(hidden)]
pub trait ToSocketAddrsPriv {
type Iter: Iterator<Item = SocketAddr> + Send + 'static;
type Future: Future<Output = io::Result<Self::Iter>> + Send + 'static;
fn to_socket_addrs(&self) -> Self::Future;
}
#[doc(hidden)]
#[derive(Debug)]
pub enum MaybeReady {
Ready(Option<SocketAddr>),
Blocking(Blocking<io::Result<vec::IntoIter<SocketAddr>>>),
}
#[doc(hidden)]
#[derive(Debug)]
pub enum OneOrMore {
One(option::IntoIter<SocketAddr>),
More(vec::IntoIter<SocketAddr>),
}
impl Future for MaybeReady {
type Output = io::Result<OneOrMore>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match *self {
MaybeReady::Ready(ref mut i) => {
let iter = OneOrMore::One(i.take().into_iter());
Poll::Ready(Ok(iter))
}
MaybeReady::Blocking(ref mut rx) => {
let res = ready!(Pin::new(rx).poll(cx)).map(OneOrMore::More);
Poll::Ready(res)
}
}
}
}
impl Iterator for OneOrMore {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
match self {
OneOrMore::One(i) => i.next(),
OneOrMore::More(i) => i.next(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self {
OneOrMore::One(i) => i.size_hint(),
OneOrMore::More(i) => i.size_hint(),
}
}
}
}

View File

@ -25,9 +25,7 @@
//!
//! # async fn process<T>(t: T) {}
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
//! let addr = "93.184.216.34:9243".parse()?;
//!
//! let stream = TcpStream::connect(&addr).await?;
//! let stream = TcpStream::connect("93.184.216.34:9243").await?;
//!
//! println!("successfully connected");
//!

View File

@ -38,6 +38,9 @@
#[macro_use]
mod tracing;
mod addr;
pub use addr::ToSocketAddrs;
pub mod driver;
pub mod util;

View File

@ -3,6 +3,7 @@ use super::incoming::Incoming;
use super::TcpStream;
use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
use futures_core::ready;
use futures_util::future::poll_fn;
@ -22,13 +23,13 @@ use std::task::{Context, Poll};
///
/// ```no_run
/// use tokio::net::TcpListener;
/// use std::error::Error;
///
/// use std::io;
/// # async fn process_socket<T>(socket: T) {}
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let mut listener = TcpListener::bind(&addr)?;
/// async fn main() -> io::Result<()> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
@ -41,24 +42,60 @@ pub struct TcpListener {
}
impl TcpListener {
/// Create a new TCP listener associated with this event loop.
/// Creates a new TcpListener which will be bound to the specified address.
///
/// The TCP listener will bind to the provided `addr` address, if available.
/// If the result is `Ok`, the socket has successfully bound.
/// The returned listener is ready for accepting connections.
///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the `local_addr`
/// method.
///
/// The address type can be any implementor of `ToSocketAddrs` trait.
///
/// If `addr` yields multiple addresses, bind will be attempted with each of
/// the addresses until one succeeds and returns the listener. If none of
/// the addresses succeed in creating a listener, the error returned from
/// the last attempt (the last address) is returned.
///
/// # Examples
///
/// ```
/// use std::net::SocketAddr;
/// ```no_run
/// use tokio::net::TcpListener;
///
/// let addr = "127.0.0.1:0".parse::<SocketAddr>()?;
/// let listener = TcpListener::bind(&addr)?;
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:0").await?;
///
/// // use the listener
///
/// Ok(())
/// }
/// ```
pub fn bind(addr: &SocketAddr) -> io::Result<TcpListener> {
let l = mio::net::TcpListener::bind(addr)?;
Ok(TcpListener::new(l))
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
let addrs = addr.to_socket_addrs().await?;
let mut last_err = None;
for addr in addrs {
match TcpListener::bind_addr(addr) {
Ok(listener) => return Ok(listener),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(&addr)?;
Ok(TcpListener::new(listener))
}
/// Accept a new incoming connection from this listener.
@ -71,18 +108,22 @@ impl TcpListener {
///
/// # Examples
///
/// ```
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// ```no_run
/// use tokio::net::TcpListener;
///
/// let addr = "127.0.0.1:8080".parse()?;
/// let mut listener = TcpListener::bind(&addr)?;
/// match listener.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
/// Err(e) => println!("couldn't get client: {:?}", e),
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// match listener.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
/// Err(e) => println!("couldn't get client: {:?}", e),
/// }
///
/// Ok(())
/// }
/// # Ok(())
/// # }
/// ```
pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
poll_fn(|cx| self.poll_accept(cx)).await
@ -178,13 +219,19 @@ impl TcpListener {
///
/// ```
/// use tokio::net::TcpListener;
///
/// use std::io;
/// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
///
/// let addr = "127.0.0.1:8080".parse::<SocketAddr>()?;
/// let listener = TcpListener::bind(&addr)?;
/// assert_eq!(listener.local_addr()?,
/// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// assert_eq!(listener.local_addr()?,
/// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)));
///
/// Ok(())
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
@ -215,17 +262,20 @@ impl TcpListener {
///
/// # Examples
///
/// ```
/// ```no_run
/// use tokio::net::TcpListener;
/// use std::net::SocketAddr;
///
/// let addr = "127.0.0.1:0".parse::<SocketAddr>()?;
/// let listener = TcpListener::bind(&addr)?;
/// use std::io;
///
/// listener.set_ttl(100).expect("could not set TTL");
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:0").await?;
///
/// assert_eq!(listener.ttl()?, 100);
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// listener.set_ttl(100).expect("could not set TTL");
/// assert_eq!(listener.ttl()?, 100);
///
/// Ok(())
/// }
/// ```
pub fn ttl(&self) -> io::Result<u32> {
self.io.get_ref().ttl()
@ -238,15 +288,19 @@ impl TcpListener {
///
/// # Examples
///
/// ```
/// ```no_run
/// use tokio::net::TcpListener;
/// use std::net::SocketAddr;
///
/// let addr = "127.0.0.1:0".parse::<SocketAddr>()?;
/// let listener = TcpListener::bind(&addr)?;
/// use std::io;
///
/// listener.set_ttl(100).expect("could not set TTL");
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:0").await?;
///
/// listener.set_ttl(100).expect("could not set TTL");
///
/// Ok(())
/// }
/// ```
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.io.get_ref().set_ttl(ttl)

View File

@ -4,6 +4,7 @@ use super::split::{
};
use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
use tokio_io::{AsyncRead, AsyncWrite};
@ -38,10 +39,8 @@ use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
///
/// // Connect to a peer
/// let mut stream = TcpStream::connect(&addr).await?;
/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// // Write some data.
/// stream.write_all(b"hello world!").await?;
@ -54,12 +53,15 @@ pub struct TcpStream {
}
impl TcpStream {
/// Create a new TCP stream connected to the specified address.
/// Opens a TCP connection to a remote host.
///
/// This function will create a new TCP socket and attempt to connect it to
/// the `addr` provided. The returned future will be resolved once the
/// stream has successfully connected, or it will return an error if one
/// occurs.
/// `addr` is an address of the remote host. Anything which implements
/// `ToSocketAddrs` trait can be supplied for the address.
///
/// If `addr` yields multiple addresses, connect will be attempted with each
/// of the addresses until a connection is successful. If none of the
/// addresses result in a successful connection, the error returned from the
/// last connection attempt (the last address) is returned.
///
/// # Examples
///
@ -70,10 +72,8 @@ impl TcpStream {
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
///
/// // Connect to a peer
/// let mut stream = TcpStream::connect(&addr).await?;
/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// // Write some data.
/// stream.write_all(b"hello world!").await?;
@ -81,8 +81,29 @@ impl TcpStream {
/// Ok(())
/// }
/// ```
pub async fn connect(addr: &SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(addr)?;
pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
let addrs = addr.to_socket_addrs().await?;
let mut last_err = None;
for addr in addrs {
match TcpStream::connect_addr(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}
/// Establish a connection to the specified `addr`.
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(&addr)?;
let stream = TcpStream::new(sys);
// Once we've connected, wait for the stream to be writable as
@ -136,11 +157,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.local_addr()?);
/// # Ok(())
@ -155,11 +174,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.peer_addr()?);
/// # Ok(())
@ -198,10 +215,8 @@ impl TcpStream {
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
///
/// // Connect to a peer
/// let mut stream = TcpStream::connect(&addr).await?;
/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// let mut b1 = [0; 10];
/// let mut b2 = [0; 10];
@ -236,10 +251,8 @@ impl TcpStream {
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
///
/// // Connect to a peer
/// let mut stream = TcpStream::connect(&addr).await?;
/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// // Shutdown the stream
/// stream.shutdown(Shutdown::Write)?;
@ -261,11 +274,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.nodelay()?);
/// # Ok(())
@ -287,11 +298,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_nodelay(true)?;
/// # Ok(())
@ -311,11 +320,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.recv_buffer_size()?);
/// # Ok(())
@ -334,11 +341,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_recv_buffer_size(100)?;
/// # Ok(())
@ -367,11 +372,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.send_buffer_size()?);
/// # Ok(())
@ -390,11 +393,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_send_buffer_size(100)?;
/// # Ok(())
@ -415,11 +416,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.keepalive()?);
/// # Ok(())
@ -446,11 +445,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_keepalive(None)?;
/// # Ok(())
@ -470,11 +467,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.ttl()?);
/// # Ok(())
@ -493,11 +488,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_ttl(123)?;
/// # Ok(())
@ -518,11 +511,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// println!("{:?}", stream.linger()?);
/// # Ok(())
@ -548,11 +539,9 @@ impl TcpStream {
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::net::SocketAddr;
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let stream = TcpStream::connect(&addr).await?;
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// stream.set_linger(None)?;
/// # Ok(())

View File

@ -1,6 +1,7 @@
use super::split::{split, UdpSocketRecvHalf, UdpSocketSendHalf};
use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
use futures_core::ready;
use futures_util::future::poll_fn;
@ -19,8 +20,27 @@ pub struct UdpSocket {
impl UdpSocket {
/// This function will create a new UDP socket and attempt to bind it to
/// the `addr` provided.
pub fn bind(addr: &SocketAddr) -> io::Result<UdpSocket> {
mio::net::UdpSocket::bind(addr).map(UdpSocket::new)
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> {
let addrs = addr.to_socket_addrs().await?;
let mut last_err = None;
for addr in addrs {
match UdpSocket::bind_addr(addr) {
Ok(socket) => return Ok(socket),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}
fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> {
mio::net::UdpSocket::bind(&addr).map(UdpSocket::new)
}
fn new(socket: mio::net::UdpSocket) -> UdpSocket {
@ -63,8 +83,23 @@ impl UdpSocket {
/// Connects the UDP socket setting the default destination for send() and
/// limiting packets that are read via recv from the address specified in
/// `addr`.
pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> {
self.io.get_ref().connect(*addr)
pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> {
let addrs = addr.to_socket_addrs().await?;
let mut last_err = None;
for addr in addrs {
match self.io.get_ref().connect(addr) {
Ok(_) => return Ok(()),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}
/// Returns a future that sends data on the socket to the remote address to which it is connected.
@ -141,8 +176,16 @@ impl UdpSocket {
///
/// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`.
pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
poll_fn(|cx| self.poll_send_to_priv(cx, buf, target)).await
pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> {
let mut addrs = target.to_socket_addrs().await?;
match addrs.next() {
Some(target) => poll_fn(|cx| self.poll_send_to_priv(cx, buf, &target)).await,
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no addresses to send data to",
)),
}
}
pub(crate) fn poll_send_to_priv(

View File

@ -4,21 +4,36 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot;
use tokio_test::assert_ok;
#[tokio::test]
async fn accept() {
let addr = "127.0.0.1:0".parse().unwrap();
let mut listener = assert_ok!(TcpListener::bind(&addr));
let addr = listener.local_addr().unwrap();
use std::net::{IpAddr, SocketAddr};
let (tx, rx) = oneshot::channel();
macro_rules! test_accept {
($(($ident:ident, $target:expr),)*) => {
$(
#[tokio::test]
async fn $ident() {
let mut listener = assert_ok!(TcpListener::bind($target).await);
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (socket, _) = assert_ok!(listener.accept().await);
assert_ok!(tx.send(socket));
});
let (tx, rx) = oneshot::channel();
let cli = assert_ok!(TcpStream::connect(&addr).await);
let srv = assert_ok!(rx.await);
tokio::spawn(async move {
let (socket, _) = assert_ok!(listener.accept().await);
assert_ok!(tx.send(socket));
});
assert_eq!(cli.local_addr().unwrap(), srv.peer_addr().unwrap());
let cli = assert_ok!(TcpStream::connect(&addr).await);
let srv = assert_ok!(rx.await);
assert_eq!(cli.local_addr().unwrap(), srv.peer_addr().unwrap());
}
)*
}
}
test_accept! {
(ip_str, "127.0.0.1:0"),
(host_str, "localhost:0"),
(socket_addr, "127.0.0.1:0".parse::<SocketAddr>().unwrap()),
(str_port_tuple, ("127.0.0.1", 0)),
(ip_port_tuple, ("127.0.0.1".parse::<IpAddr>().unwrap(), 0)),
}

View File

@ -4,11 +4,13 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot;
use tokio_test::assert_ok;
use futures::join;
#[tokio::test]
async fn connect() {
let addr = assert_ok!("127.0.0.1:0".parse());
let mut srv = assert_ok!(TcpListener::bind(&addr));
async fn connect_v4() {
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
assert!(addr.is_ipv4());
let (tx, rx) = oneshot::channel();
@ -31,6 +33,135 @@ async fn connect() {
);
}
#[tokio::test]
async fn connect_v6() {
let mut srv = assert_ok!(TcpListener::bind("[::1]:0").await);
let addr = assert_ok!(srv.local_addr());
assert!(addr.is_ipv6());
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let (socket, addr) = assert_ok!(srv.accept().await);
assert_eq!(addr, assert_ok!(socket.peer_addr()));
assert_ok!(tx.send(socket));
});
let mine = assert_ok!(TcpStream::connect(&addr).await);
let theirs = assert_ok!(rx.await);
assert_eq!(
assert_ok!(mine.local_addr()),
assert_ok!(theirs.peer_addr())
);
assert_eq!(
assert_ok!(theirs.local_addr()),
assert_ok!(mine.peer_addr())
);
}
#[tokio::test]
async fn connect_addr_ip_string() {
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let addr = format!("127.0.0.1:{}", addr.port());
let server = async {
assert_ok!(srv.accept().await);
};
let client = async {
assert_ok!(TcpStream::connect(addr).await);
};
join!(server, client);
}
#[tokio::test]
async fn connect_addr_ip_str_slice() {
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let addr = format!("127.0.0.1:{}", addr.port());
let server = async {
assert_ok!(srv.accept().await);
};
let client = async {
assert_ok!(TcpStream::connect(&addr[..]).await);
};
join!(server, client);
}
#[tokio::test]
async fn connect_addr_host_string() {
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let addr = format!("localhost:{}", addr.port());
let server = async {
assert_ok!(srv.accept().await);
};
let client = async {
assert_ok!(TcpStream::connect(addr).await);
};
join!(server, client);
}
#[tokio::test]
async fn connect_addr_ip_port_tuple() {
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let addr = (addr.ip(), addr.port());
let server = async {
assert_ok!(srv.accept().await);
};
let client = async {
assert_ok!(TcpStream::connect(&addr).await);
};
join!(server, client);
}
#[tokio::test]
async fn connect_addr_ip_str_port_tuple() {
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let addr = ("127.0.0.1", addr.port());
let server = async {
assert_ok!(srv.accept().await);
};
let client = async {
assert_ok!(TcpStream::connect(&addr).await);
};
join!(server, client);
}
#[tokio::test]
async fn connect_addr_host_str_port_tuple() {
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let addr = ("localhost", addr.port());
let server = async {
assert_ok!(srv.accept().await);
};
let client = async {
assert_ok!(TcpStream::connect(&addr).await);
};
join!(server, client);
}
/*
* TODO: bring this back once TCP exposes HUP again
*

View File

@ -11,8 +11,7 @@ async fn echo_server() {
let (tx, rx) = oneshot::channel();
let addr = assert_ok!("127.0.0.1:0".parse());
let mut srv = assert_ok!(TcpListener::bind(&addr));
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let msg = "foo bar baz";

View File

@ -7,8 +7,7 @@ use tokio_test::assert_ok;
#[tokio::test]
async fn shutdown() {
let addr = assert_ok!("127.0.0.1:0".parse());
let mut srv = assert_ok!(TcpListener::bind(&addr));
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
tokio::spawn(async move {

View File

@ -2,7 +2,7 @@ use tokio_net::tcp::{TcpListener, TcpStream};
#[tokio::test]
async fn split_reunite() -> std::io::Result<()> {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap())?;
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let stream = TcpStream::connect(&addr).await?;
@ -13,7 +13,7 @@ async fn split_reunite() -> std::io::Result<()> {
#[tokio::test]
async fn split_reunite_error() -> std::io::Result<()> {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap())?;
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let stream = TcpStream::connect(&addr).await?;
let stream1 = TcpStream::connect(&addr).await?;

View File

@ -9,11 +9,11 @@ use std::io;
#[tokio::test]
async fn send_recv() -> std::io::Result<()> {
let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut sender = UdpSocket::bind("127.0.0.1:0").await?;
let mut receiver = UdpSocket::bind("127.0.0.1:0").await?;
sender.connect(&receiver.local_addr()?)?;
receiver.connect(&sender.local_addr()?)?;
sender.connect(receiver.local_addr()?).await?;
receiver.connect(sender.local_addr()?).await?;
let message = b"hello!";
sender.send(message).await?;
@ -27,8 +27,8 @@ async fn send_recv() -> std::io::Result<()> {
#[tokio::test]
async fn send_to_recv_from() -> std::io::Result<()> {
let mut sender = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut receiver = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut sender = UdpSocket::bind("127.0.0.1:0").await?;
let mut receiver = UdpSocket::bind("127.0.0.1:0").await?;
let message = b"hello!";
let receiver_addr = receiver.local_addr()?;
@ -44,7 +44,7 @@ async fn send_to_recv_from() -> std::io::Result<()> {
#[tokio::test]
async fn split() -> std::io::Result<()> {
let socket = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let (mut r, mut s) = socket.split();
let msg = b"hello";
@ -60,7 +60,7 @@ async fn split() -> std::io::Result<()> {
#[tokio::test]
async fn reunite() -> std::io::Result<()> {
let socket = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let (s, r) = socket.split();
assert!(s.reunite(r).is_ok());
Ok(())
@ -68,8 +68,8 @@ async fn reunite() -> std::io::Result<()> {
#[tokio::test]
async fn reunite_error() -> std::io::Result<()> {
let socket = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let socket1 = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let socket1 = UdpSocket::bind("127.0.0.1:0").await?;
let (s, _) = socket.split();
let (_, r1) = socket1.split();
assert!(s.reunite(r1).is_err());
@ -101,8 +101,8 @@ impl Encoder for ByteCodec {
#[tokio::test]
async fn send_framed() -> std::io::Result<()> {
let mut a_soc = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut b_soc = UdpSocket::bind(&"127.0.0.1:0".parse().unwrap())?;
let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
let a_addr = a_soc.local_addr()?;
let b_addr = b_soc.local_addr()?;

View File

@ -523,7 +523,7 @@ async fn client_to_server() {
drop(env_logger::try_init());
// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse())));
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());
let (server_cx, client_cx) = contexts();
@ -558,7 +558,7 @@ async fn server_to_client() {
drop(env_logger::try_init());
// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse())));
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());
let (server_cx, client_cx) = contexts();
@ -589,7 +589,7 @@ async fn one_byte_at_a_time() {
const AMT: usize = 1024;
drop(env_logger::try_init());
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse())));
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());
let (server_cx, client_cx) = contexts();

View File

@ -45,12 +45,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = Lock::new(Shared::new());
let addr = env::args().nth(1).unwrap_or("127.0.0.1:6142".to_string());
let addr = addr.parse::<SocketAddr>()?;
// Bind a TCP listener to the socket address.
//
// Note that this is the Tokio TcpListener, which is fully async.
let mut listener = TcpListener::bind(&addr)?;
let mut listener = TcpListener::bind(&addr).await?;
println!("server running on {}", addr);

View File

@ -126,13 +126,13 @@ mod udp {
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
let bind_addr = if addr.ip().is_ipv4() {
"0.0.0.0:0".parse()?
"0.0.0.0:0"
} else {
"[::]:0".parse()?
"[::]:0"
};
let socket = UdpSocket::bind(&bind_addr)?;
socket.connect(addr)?;
let socket = UdpSocket::bind(&bind_addr).await?;
socket.connect(addr).await?;
let (mut r, mut w) = socket.split();
future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?;

View File

@ -52,9 +52,8 @@ impl Server {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;
let socket = UdpSocket::bind(&addr)?;
let socket = UdpSocket::bind(&addr).await?;
println!("Listening on: {}", socket.local_addr()?);
let server = Server {

View File

@ -27,7 +27,6 @@ use tokio::net::TcpListener;
use std::env;
use std::error::Error;
use std::net::SocketAddr;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
@ -35,12 +34,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
// program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop.
let mut listener = TcpListener::bind(&addr)?;
let mut listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);
loop {

View File

@ -13,7 +13,6 @@
#![warn(rust_2018_idioms)]
use tokio;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
@ -21,12 +20,10 @@ use std::error::Error;
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:6142".parse()?;
// Open a TCP stream to the socket address.
//
// Note that this is the Tokio TcpStream, which is fully async.
let mut stream = TcpStream::connect(&addr).await?;
let mut stream = TcpStream::connect("127.0.0.1:6142").await?;
println!("created stream");
let result = stream.write(b"hello world\n").await;

View File

@ -54,28 +54,26 @@
#![warn(rust_2018_idioms)]
use std::env;
use std::net::SocketAddr;
use tokio;
use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::env;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;
// Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop, so we pass in a handle
// to our event loop. After the socket's created we inform that we're ready
// to go and start accepting connections.
let mut listener = TcpListener::bind(&addr)?;
let mut listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);
loop {

View File

@ -23,7 +23,7 @@
#![warn(rust_2018_idioms)]
use futures::{future::try_join, FutureExt, StreamExt};
use std::{env, error::Error, net::SocketAddr};
use std::{env, error::Error};
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
@ -32,18 +32,15 @@ use tokio::{
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
let listen_addr = listen_addr.parse::<SocketAddr>()?;
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>()?;
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
let mut incoming = TcpListener::bind(&listen_addr)?.incoming();
let mut incoming = TcpListener::bind(listen_addr).await?.incoming();
while let Some(Ok(inbound)) = incoming.next().await {
let transfer = transfer(inbound, server_addr).map(|r| {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
if let Err(e) = r {
println!("Failed to transfer; error={}", e);
}
@ -55,8 +52,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}
async fn transfer(inbound: TcpStream, proxy_addr: SocketAddr) -> Result<(), Box<dyn Error>> {
let outbound = TcpStream::connect(&proxy_addr).await?;
async fn transfer(inbound: TcpStream, proxy_addr: String) -> Result<(), Box<dyn Error>> {
let outbound = TcpStream::connect(proxy_addr).await?;
let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();

View File

@ -44,7 +44,6 @@
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio;
@ -88,8 +87,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Parse the address we're going to run this server on
// and set up our TCP listener to accept connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;
let mut listener = TcpListener::bind(&addr)?;
let mut listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);
// Create the shared state of this server that will be shared amongst all

View File

@ -17,7 +17,7 @@ use bytes::BytesMut;
use futures::{SinkExt, StreamExt};
use http::{header::HeaderValue, Request, Response, StatusCode};
use serde::Serialize;
use std::{env, error::Error, fmt, io, net::SocketAddr};
use std::{env, error::Error, fmt, io};
use tokio::{
codec::{Decoder, Encoder, Framed},
net::{TcpListener, TcpStream},
@ -28,9 +28,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Parse the arguments, bind the TCP socket we'll be listening to, spin up
// our worker threads, and start shipping sockets to those worker threads.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;
let mut incoming = TcpListener::bind(&addr)?.incoming();
let mut incoming = TcpListener::bind(&addr).await?.incoming();
println!("Listening on: {}", addr);
while let Some(Ok(stream)) = incoming.next().await {

View File

@ -55,9 +55,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
.parse()?;
let mut socket = UdpSocket::bind(&local_addr)?;
let mut socket = UdpSocket::bind(local_addr).await?;
const MAX_DATAGRAM_SIZE: usize = 65_507;
socket.connect(&remote_addr)?;
socket.connect(&remote_addr).await?;
let data = get_stdin_data()?;
socket.send(&data).await?;
let mut data = vec![0u8; MAX_DATAGRAM_SIZE];

View File

@ -27,11 +27,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::init();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string());
let addr = addr.parse::<SocketAddr>()?;
// Bind both our sockets and then figure out what ports we got.
let a = UdpSocket::bind(&addr)?;
let b = UdpSocket::bind(&addr)?;
let a = UdpSocket::bind(&addr).await?;
let b = UdpSocket::bind(&addr).await?;
let b_addr = b.local_addr()?;

View File

@ -73,8 +73,7 @@ pub struct Spawn(());
///
/// # async fn process<T>(t: T) {}
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let addr = "127.0.0.1:8080".parse()?;
/// let mut listener = TcpListener::bind(&addr).unwrap();
/// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
@ -84,7 +83,6 @@ pub struct Spawn(());
/// process(socket).await
/// });
/// }
/// # Ok(())
/// # }
/// ```
///

View File

@ -35,8 +35,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let addr = "127.0.0.1:8080".parse()?;
//! let mut listener = TcpListener::bind(&addr).unwrap();
//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;

View File

@ -39,8 +39,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let addr = "127.0.0.1:8080".parse()?;
//! let mut listener = TcpListener::bind(&addr).unwrap();
//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
@ -87,12 +86,11 @@
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Create the runtime
//! let mut rt = Runtime::new().unwrap();
//! let mut rt = Runtime::new()?;
//!
//! // Spawn the root task
//! rt.block_on(async {
//! let addr = "127.0.0.1:8080".parse()?;
//! let mut listener = TcpListener::bind(&addr).unwrap();
//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;

View File

@ -13,8 +13,7 @@ use std::thread;
async fn echo_server() {
const N: usize = 1024;
let addr = assert_ok!("127.0.0.1:0".parse());
let mut srv = assert_ok!(TcpListener::bind(&addr));
let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(srv.local_addr());
let msg = "foo bar baz";

View File

@ -50,9 +50,8 @@ fn test_drop_on_notify() {
// Define a task that just drains the listener
let task = Arc::new(Task::new(async move {
let addr = assert_ok!("127.0.0.1:0".parse());
// Create a listener
let mut listener = assert_ok!(TcpListener::bind(&addr));
let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Send the address
let addr = listener.local_addr().unwrap();

View File

@ -13,8 +13,7 @@ use std::time::{Duration, Instant};
use tokio::timer::delay;
async fn client_server(tx: mpsc::Sender<()>) {
let addr = assert_ok!("127.0.0.1:0".parse());
let mut server = assert_ok!(TcpListener::bind(&addr));
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());

View File

@ -15,8 +15,7 @@ use std::thread;
use std::time::{Duration, Instant};
async fn client_server(tx: mpsc::Sender<()>) {
let addr = assert_ok!("127.0.0.1:0".parse());
let mut server = assert_ok!(TcpListener::bind(&addr));
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
@ -78,11 +77,9 @@ fn block_on_socket() {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let addr = "127.0.0.1:0".parse().unwrap();
let (tx, rx) = oneshot::channel();
let mut listener = TcpListener::bind(&addr).unwrap();
let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {