mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
Add a multithreaded echo server example
This commit is contained in:
parent
645ae7051d
commit
7b94cf307d
@ -29,3 +29,4 @@ futures = "0.1.15"
|
||||
[dev-dependencies]
|
||||
env_logger = { version = "0.4", default-features = false }
|
||||
libc = "0.2"
|
||||
num_cpus = "1.0"
|
||||
|
100
examples/echo-threads.rs
Normal file
100
examples/echo-threads.rs
Normal file
@ -0,0 +1,100 @@
|
||||
//! A multithreaded version of an echo server
|
||||
//!
|
||||
//! This server implements the same functionality as the `echo` example, except
|
||||
//! that this example will use all cores of the machine to do I/O instead of
|
||||
//! just one. This examples works by having the main thread using blocking I/O
|
||||
//! and shipping accepted sockets to worker threads in a round-robin fashion.
|
||||
//!
|
||||
//! To see this server in action, you can run this in one terminal:
|
||||
//!
|
||||
//! cargo run --example echoe-threads
|
||||
//!
|
||||
//! and in another terminal you can run:
|
||||
//!
|
||||
//! cargo run --example connect 127.0.0.1:8080
|
||||
|
||||
extern crate futures;
|
||||
extern crate num_cpus;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::env;
|
||||
use std::net::{self, SocketAddr};
|
||||
use std::thread;
|
||||
|
||||
use futures::Future;
|
||||
use futures::stream::Stream;
|
||||
use futures::sync::mpsc;
|
||||
use tokio_io::AsyncRead;
|
||||
use tokio_io::io::copy;
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
fn main() {
|
||||
// First argument, the address to bind
|
||||
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let addr = addr.parse::<SocketAddr>().unwrap();
|
||||
|
||||
// Second argument, the number of threads we'll be using
|
||||
let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
|
||||
.unwrap_or(num_cpus::get());
|
||||
|
||||
// Use `std::net` to bind the requested port, we'll use this on the main
|
||||
// thread below
|
||||
let listener = net::TcpListener::bind(&addr).expect("failed to bind");
|
||||
println!("Listening on: {}", addr);
|
||||
|
||||
// Spin up our worker threads, creating a channel routing to each worker
|
||||
// thread that we'll use below.
|
||||
let mut channels = Vec::new();
|
||||
for _ in 0..num_threads {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
channels.push(tx);
|
||||
thread::spawn(|| worker(rx));
|
||||
}
|
||||
|
||||
// Infinitely accept sockets from our `std::net::TcpListener`, as this'll do
|
||||
// blocking I/O. Each socket is then shipped round-robin to a particular
|
||||
// thread which will associate the socket with the corresponding event loop
|
||||
// and process the connection.
|
||||
let mut next = 0;
|
||||
for socket in listener.incoming() {
|
||||
let socket = socket.expect("failed to accept");
|
||||
channels[next].unbounded_send(socket).expect("worker thread died");
|
||||
next = (next + 1) % channels.len();
|
||||
}
|
||||
}
|
||||
|
||||
fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
|
||||
let mut core = Core::new().unwrap();
|
||||
let handle = core.handle();
|
||||
|
||||
let done = rx.for_each(move |socket| {
|
||||
// First up when we receive a socket we associate it with our event loop
|
||||
// using the `TcpStream::from_stream` API. After that the socket is not
|
||||
// a `tokio_core::net::TcpStream` meaning it's in nonblocking mode and
|
||||
// ready to be used with Tokio
|
||||
let socket = TcpStream::from_stream(socket, &handle)
|
||||
.expect("failed to associate TCP stream");
|
||||
let addr = socket.peer_addr().expect("failed to get remote address");
|
||||
|
||||
// Like the single-threaded `echo` example we split the socket halves
|
||||
// and use the `copy` helper to ship bytes back and forth. Afterwards we
|
||||
// spawn the task to run concurrently on this thread, and then print out
|
||||
// what happened afterwards
|
||||
let (reader, writer) = socket.split();
|
||||
let amt = copy(reader, writer);
|
||||
let msg = amt.then(move |result| {
|
||||
match result {
|
||||
Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
|
||||
Err(e) => println!("error on {}: {}", addr, e),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
handle.spawn(msg);
|
||||
|
||||
Ok(())
|
||||
});
|
||||
core.run(done).unwrap();
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
//! An "hello world" echo server with tokio-core
|
||||
//! A "hello world" echo server with tokio-core
|
||||
//!
|
||||
//! This server will create a TCP listener, accept connections in a loop, and
|
||||
//! simply write back everything that's read off of each TCP connection. Each
|
||||
|
Loading…
x
Reference in New Issue
Block a user