//! A chat server that broadcasts a message to all connections. //! //! This example is explicitly more verbose than it has to be. This is to //! illustrate more concepts. //! //! A chat server for telnet clients. After a telnet client connects, the first //! line should contain the client's name. After that, all lines sent by a //! client are broadcasted to all other connected clients. //! //! Because the client is telnet, lines are delimited by "\r\n". //! //! You can test this out by running: //! //! cargo run --example chat //! //! And then in another terminal run: //! //! telnet localhost 6142 //! //! You can run the `telnet` command in any number of additional windows. //! //! You can run the second command in multiple windows and then chat between the //! two, seeing the messages from the other client as they're received. For all //! connected clients they'll all join the same room and see everyone else's //! messages. #![warn(rust_2018_idioms)] use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; use tokio_stream::StreamExt; use tokio_util::codec::{Framed, LinesCodec}; use futures::SinkExt; use std::collections::HashMap; use std::env; use std::error::Error; use std::io; use std::net::SocketAddr; use std::sync::Arc; #[tokio::main] async fn main() -> Result<(), Box> { use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; // Configure a `tracing` subscriber that logs traces emitted by the chat // server. tracing_subscriber::fmt() // Filter what traces are displayed based on the RUST_LOG environment // variable. // // Traces emitted by the example code will always be displayed. You // can set `RUST_LOG=tokio=trace` to enable additional traces emitted by // Tokio itself. .with_env_filter(EnvFilter::from_default_env().add_directive("chat=info".parse()?)) // Log events when `tracing` spans are created, entered, exited, or // closed. When Tokio's internal tracing support is enabled (as // described above), this can be used to track the lifecycle of spawned // tasks on the Tokio runtime. .with_span_events(FmtSpan::FULL) // Set this subscriber as the default, to collect all traces emitted by // the program. .init(); // Create the shared state. This is how all the peers communicate. // // The server task will hold a handle to this. For every new client, the // `state` handle is cloned and passed into the task that processes the // client connection. let state = Arc::new(Mutex::new(Shared::new())); let addr = env::args() .nth(1) .unwrap_or_else(|| "127.0.0.1:6142".to_string()); // Bind a TCP listener to the socket address. // // Note that this is the Tokio TcpListener, which is fully async. let listener = TcpListener::bind(&addr).await?; tracing::info!("server running on {}", addr); loop { // Asynchronously wait for an inbound TcpStream. let (stream, addr) = listener.accept().await?; // Clone a handle to the `Shared` state for the new connection. let state = Arc::clone(&state); // Spawn our handler to be run asynchronously. tokio::spawn(async move { tracing::debug!("accepted connection"); if let Err(e) = process(state, stream, addr).await { tracing::info!("an error occurred; error = {:?}", e); } }); } } /// Shorthand for the transmit half of the message channel. type Tx = mpsc::UnboundedSender; /// Shorthand for the receive half of the message channel. type Rx = mpsc::UnboundedReceiver; /// Data that is shared between all peers in the chat server. /// /// This is the set of `Tx` handles for all connected clients. Whenever a /// message is received from a client, it is broadcasted to all peers by /// iterating over the `peers` entries and sending a copy of the message on each /// `Tx`. struct Shared { peers: HashMap, } /// The state for each connected client. struct Peer { /// The TCP socket wrapped with the `Lines` codec, defined below. /// /// This handles sending and receiving data on the socket. When using /// `Lines`, we can work at the line level instead of having to manage the /// raw byte operations. lines: Framed, /// Receive half of the message channel. /// /// This is used to receive messages from peers. When a message is received /// off of this `Rx`, it will be written to the socket. rx: Rx, } impl Shared { /// Create a new, empty, instance of `Shared`. fn new() -> Self { Shared { peers: HashMap::new(), } } /// Send a `LineCodec` encoded message to every peer, except /// for the sender. async fn broadcast(&mut self, sender: SocketAddr, message: &str) { for peer in self.peers.iter_mut() { if *peer.0 != sender { let _ = peer.1.send(message.into()); } } } } impl Peer { /// Create a new instance of `Peer`. async fn new( state: Arc>, lines: Framed, ) -> io::Result { // Get the client socket address let addr = lines.get_ref().peer_addr()?; // Create a channel for this peer let (tx, rx) = mpsc::unbounded_channel(); // Add an entry for this `Peer` in the shared state map. state.lock().await.peers.insert(addr, tx); Ok(Peer { lines, rx }) } } /// Process an individual chat client async fn process( state: Arc>, stream: TcpStream, addr: SocketAddr, ) -> Result<(), Box> { let mut lines = Framed::new(stream, LinesCodec::new()); // Send a prompt to the client to enter their username. lines.send("Please enter your username:").await?; // Read the first line from the `LineCodec` stream to get the username. let username = match lines.next().await { Some(Ok(line)) => line, // We didn't get a line so we return early here. _ => { tracing::error!("Failed to get username from {}. Client disconnected.", addr); return Ok(()); } }; // Register our peer with state which internally sets up some channels. let mut peer = Peer::new(state.clone(), lines).await?; // A client has connected, let's let everyone know. { let mut state = state.lock().await; let msg = format!("{username} has joined the chat"); tracing::info!("{}", msg); state.broadcast(addr, &msg).await; } // Process incoming messages until our stream is exhausted by a disconnect. loop { tokio::select! { // A message was received from a peer. Send it to the current user. Some(msg) = peer.rx.recv() => { peer.lines.send(&msg).await?; } result = peer.lines.next() => match result { // A message was received from the current user, we should // broadcast this message to the other users. Some(Ok(msg)) => { let mut state = state.lock().await; let msg = format!("{username}: {msg}"); state.broadcast(addr, &msg).await; } // An error occurred. Some(Err(e)) => { tracing::error!( "an error occurred while processing messages for {}; error = {:?}", username, e ); } // The stream has been exhausted. None => break, }, } } // If this section is reached it means that the client was disconnected! // Let's let everyone still connected know about it. { let mut state = state.lock().await; state.peers.remove(&addr); let msg = format!("{username} has left the chat"); tracing::info!("{}", msg); state.broadcast(addr, &msg).await; } Ok(()) }