//! A proxy that forwards data to another server and forwards that server's //! responses back to clients. extern crate futures; extern crate tokio_core; extern crate tokio_io; use std::sync::Arc; use std::env; use std::net::{Shutdown, SocketAddr}; use std::io::{self, Read, Write}; use futures::stream::Stream; use futures::{Future, Poll}; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor::Core; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::{copy, shutdown}; fn main() { let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); let listen_addr = listen_addr.parse::().unwrap(); let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); let server_addr = server_addr.parse::().unwrap(); // Create the event loop that will drive this server. let mut l = Core::new().unwrap(); let handle = l.handle(); // Create a TCP listener which will listen for incoming connections. let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap(); println!("Listening on: {}", listen_addr); println!("Proxying to: {}", server_addr); let done = socket.incoming().for_each(move |(client, client_addr)| { let server = TcpStream::connect(&server_addr, &handle); let amounts = server.and_then(move |server| { // Create separate read/write handles for the TCP clients that we're // proxying data between. Note that typically you'd use // `AsyncRead::split` for this operation, but we want our writer // handles to have a custom implementation of `shutdown` which // actually calls `TcpStream::shutdown` to ensure that EOF is // transmitted properly across the proxied connection. // // As a result, we wrap up our client/server manually in arcs and // use the impls below on our custom `MyTcpStream` type. let client_reader = MyTcpStream(Arc::new(client)); let client_writer = client_reader.clone(); let server_reader = MyTcpStream(Arc::new(server)); let server_writer = server_reader.clone(); // Copy the data (in parallel) between the client and the server. // After the copy is done we indicate to the remote side that we've // finished by shutting down the connection. let client_to_server = copy(client_reader, server_writer) .and_then(|(n, _, server_writer)| { shutdown(server_writer).map(move |_| n) }); let server_to_client = copy(server_reader, client_writer) .and_then(|(n, _, client_writer)| { shutdown(client_writer).map(move |_| n) }); client_to_server.join(server_to_client) }); let msg = amounts.map(move |(from_client, from_server)| { println!("client at {} wrote {} bytes and received {} bytes", client_addr, from_client, from_server); }).map_err(|e| { // Don't panic. Maybe the client just disconnected too soon. println!("error: {}", e); }); handle.spawn(msg); Ok(()) }); l.run(done).unwrap(); } // This is a custom type used to have a custom implementation of the // `AsyncWrite::shutdown` method which actually calls `TcpStream::shutdown` to // notify the remote end that we're done writing. #[derive(Clone)] struct MyTcpStream(Arc); impl Read for MyTcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { (&*self.0).read(buf) } } impl Write for MyTcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { (&*self.0).write(buf) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl AsyncRead for MyTcpStream {} impl AsyncWrite for MyTcpStream { fn shutdown(&mut self) -> Poll<(), io::Error> { try!(self.0.shutdown(Shutdown::Write)); Ok(().into()) } }