diff --git a/nightly-examples/Cargo.toml b/nightly-examples/Cargo.toml index c88b7605..0a323e89 100644 --- a/nightly-examples/Cargo.toml +++ b/nightly-examples/Cargo.toml @@ -17,3 +17,4 @@ tracing-fmt = "0.0.1-alpha.3" tracing-futures = { path = "../tracing-futures", default-features = false, features = ["std-future"] } tokio = { git = "https://github.com/tokio-rs/tokio.git" } tracing-attributes = { path = "../tracing-attributes" } +futures-preview = "0.3.0-alpha.18" diff --git a/nightly-examples/examples/async_fn.rs b/nightly-examples/examples/async_fn.rs index d67c51e9..e2a5e097 100644 --- a/nightly-examples/examples/async_fn.rs +++ b/nightly-examples/examples/async_fn.rs @@ -24,7 +24,7 @@ use tokio::net::TcpStream; use tracing::info; use tracing_attributes::instrument; -use std::{io, error::Error, net::SocketAddr}; +use std::{error::Error, io, net::SocketAddr}; #[instrument] async fn connect(addr: &SocketAddr) -> io::Result { diff --git a/nightly-examples/examples/echo.rs b/nightly-examples/examples/echo.rs new file mode 100644 index 00000000..39aefaa5 --- /dev/null +++ b/nightly-examples/examples/echo.rs @@ -0,0 +1,89 @@ +//! A "hello world" echo server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! write back everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example echo +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be echo'd back to +//! you! If you open up multiple terminals running the `connect` example you +//! should be able to see them all make progress simultaneously. + +#![feature(async_await)] +#![warn(rust_2018_idioms)] + +use tokio; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +use std::env; +use std::error::Error; +use std::net::SocketAddr; + +use tracing::{debug, error, info, warn}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let subscriber = tracing_fmt::FmtSubscriber::builder().finish(); + tracing::subscriber::set_global_default(subscriber)?; + + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:3000".to_string()); + let addr = addr.parse::()?; + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop. + let mut listener = TcpListener::bind(&addr)?; + info!("Listening on: {}", addr); + + loop { + // Asynchronously wait for an inbound socket. + let (mut socket, peer_addr) = listener.accept().await?; + + info!("Got connection from: {}", peer_addr); + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + + tokio::spawn(async move { + let mut buf = [0; 1024]; + + // In a loop, read data from the socket and write the data back. + loop { + let n = socket + .read(&mut buf) + .await + .expect("failed to read data from socket"); + + if n == 0 { + return; + } + + socket + .write_all(&buf[0..n]) + .await + .expect("failed to write data to socket"); + + info!(message = "echo'd data", peer_addr = ?peer_addr, size = n); + } + }); + } +} diff --git a/nightly-examples/examples/proxy_server.rs b/nightly-examples/examples/proxy_server.rs new file mode 100644 index 00000000..7d18953f --- /dev/null +++ b/nightly-examples/examples/proxy_server.rs @@ -0,0 +1,103 @@ +#![feature(async_await)] +#![deny(rust_2018_idioms)] + + +//! A proxy that forwards data to another server and forwards that server's +//! responses back to clients. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! You can showcase this by running this in one terminal: +//! +//! cargo +nightly run --example proxy_server +//! +//! This in another terminal +//! +//! cargo run --example echo +//! +//! And finally this in another terminal +//! +//! nc localhost 8081 +//! +//! This final terminal will connect to our proxy, which will in turn connect to +//! the echo server, and you'll be able to see data flowing between them. + +use futures::{future::try_join, FutureExt}; +use tokio::{ + self, + io::AsyncReadExt, + net::{TcpListener, TcpStream}, + prelude::*, +}; + +use tracing::{debug, info, warn}; +use tracing_attributes::instrument; + +use std::{env, net::SocketAddr}; + +#[instrument] +async fn transfer( + inbound: TcpStream, + proxy_addr: SocketAddr, +) -> Result<(), Box> { + let outbound = TcpStream::connect(&proxy_addr).await?; + + let (mut ri, mut wi) = inbound.split(); + let (mut ro, mut wo) = outbound.split(); + + let client_to_server = ri.copy(&mut wo); + let server_to_client = ro.copy(&mut wi); + + let (client_to_server, server_to_client) = try_join(client_to_server, server_to_client).await?; + info!( + message = "transfer completed", + client_to_server, server_to_client + ); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let subscriber = tracing_fmt::FmtSubscriber::builder().finish(); + tracing::subscriber::set_global_default(subscriber)?; + + let listen_addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8081".to_string()); + let listen_addr = listen_addr.parse::()?; + + let server_addr = env::args() + .nth(2) + .unwrap_or_else(|| "127.0.0.1:3000".to_string()); + let server_addr = server_addr.parse::()?; + + let mut listener = TcpListener::bind(&listen_addr)?.incoming(); + info!("Listening on: {}", listen_addr); + info!("Proxying to: {}", server_addr); + + while let Some(Ok(inbound)) = listener.next().await { + match inbound.peer_addr() { + Ok(addr) => { + info!(message = "client connected", client_addr = %addr); + } + Err(error) => warn!( + message = "Could not get client information", + %error + ), + } + + let transfer = transfer(inbound, server_addr).map(|r| { + if let Err(err) = r { + // Don't panic, maybe the client just disconnected too soon + debug!(%err); + } + }); + + tokio::spawn(transfer); + } + + Ok(()) +}