mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
Update examples to track latest Tokio changes (#180)
The exampes included in the repository have lagged behind the changes made. Specifically, they do not use the new runtime construct. This patch updates examples to use the latest features of Tokio.
This commit is contained in:
parent
56c5797872
commit
f1cb12e14f
@ -1,9 +1,7 @@
|
||||
## Examples of `tokio-core`
|
||||
## Examples of how to use Tokio
|
||||
|
||||
This directory contains a number of examples showcasing various capabilities of
|
||||
the `tokio` crate. Most of these examples also leverage the `futures` and
|
||||
`tokio_io` crates, along with a number of other miscellaneous dependencies for
|
||||
various tasks.
|
||||
the `tokio` crate.
|
||||
|
||||
All examples can be executed with:
|
||||
|
||||
@ -13,37 +11,37 @@ cargo run --example $name
|
||||
|
||||
A high level description of each example is:
|
||||
|
||||
* `hello` - a tiny server that simply writes "Hello!" to all connected clients
|
||||
and then terminates the connection, should help see how to create and
|
||||
* `hello_world` - a tiny server that writes "hello world" to all connected
|
||||
clients and then terminates the connection, should help see how to create and
|
||||
initialize `tokio`.
|
||||
* `echo` - this is your standard TCP "echo server" which simply accepts
|
||||
connections and then echos back any contents that are read from each connected
|
||||
client.
|
||||
|
||||
* `echo` - this is your standard TCP "echo server" which accepts connections and
|
||||
then echos back any contents that are read from each connected client.
|
||||
|
||||
* `echo-udp` - again your standard "echo server", except for UDP instead of TCP.
|
||||
This will echo back any packets received to the original sender.
|
||||
* `echo-threads` - servers the same purpose as the `echo` example, except this
|
||||
shows off using multiple cores on a machine for doing I/O processing.
|
||||
|
||||
* `connect` - this is a `nc`-like clone which can be used to interact with most
|
||||
other examples. The program creates a TCP connection or UDP socket to sends
|
||||
all information read on stdin to the remote peer, displaying any data received
|
||||
on stdout. Often quite useful when interacting with the various other servers
|
||||
here!
|
||||
|
||||
* `chat` - this spins up a local TCP server which will broadcast from any
|
||||
connected client to all other connected clients. You can connect to this in
|
||||
multiple terminals and use it to chat between the terminals.
|
||||
|
||||
* `chat-combinator` - Similar to `chat`, but this uses a much more functional
|
||||
programming approch using combinators.
|
||||
|
||||
* `proxy` - an example proxy server that will forward all connected TCP clients
|
||||
to the remote address specified when starting the program.
|
||||
* `sink` - a benchmark-like example which shows writing 0s infinitely to any
|
||||
connected client.
|
||||
|
||||
* `tinyhttp` - a tiny HTTP/1.1 server which doesn't support HTTP request bodies
|
||||
showcasing running on multiple cores, working with futures and spawning
|
||||
tasks, and finally framing a TCP connection to discrete request/response
|
||||
objects.
|
||||
* `udp-codec` - an example of using the `Encoder`/`Decoder` traits for UDP
|
||||
along with a small ping-pong protocol happening locally.
|
||||
* `compress` - an echo-like server where instead of echoing back everything read
|
||||
it echos back a gzip-compressed version of everything read! All compression
|
||||
occurs on a CPU pool to offload work from the event loop.
|
||||
|
||||
* `tinydb` - an in-memory database which shows sharing state between all
|
||||
connected clients, notably the key/value store of this database.
|
||||
|
||||
|
@ -1,8 +1,10 @@
|
||||
//! A chat server that broadcasts a message to all connections.
|
||||
//!
|
||||
//! This is a simple line-based server which accepts connections, reads lines
|
||||
//! from those connections, and broadcasts the lines to all other connected
|
||||
//! clients. In a sense this is a bit of a "poor man's chat server".
|
||||
//! This is a line-based server which accepts connections, reads lines from
|
||||
//! those connections, and broadcasts the lines to all other connected clients.
|
||||
//!
|
||||
//! This example is similar to chat.rs, but uses combinators and a much more
|
||||
//! functional style.
|
||||
//!
|
||||
//! You can test this out by running:
|
||||
//!
|
||||
@ -17,122 +19,132 @@
|
||||
//! connected clients they'll all join the same room and see everyone else's
|
||||
//! messages.
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
extern crate futures;
|
||||
|
||||
use tokio::io;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::prelude::*;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::iter;
|
||||
use std::env;
|
||||
use std::io::{Error, ErrorKind, BufReader};
|
||||
use std::io::{BufReader};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::Future;
|
||||
use futures::future::Executor;
|
||||
use futures::stream::{self, Stream};
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_io::io;
|
||||
use tokio_io::AsyncRead;
|
||||
|
||||
fn main() {
|
||||
// Create the TCP listener we'll accept connections on.
|
||||
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let addr = addr.parse().unwrap();
|
||||
|
||||
// Create the TCP listener we'll accept connections on.
|
||||
let socket = TcpListener::bind(&addr).unwrap();
|
||||
println!("Listening on: {}", addr);
|
||||
|
||||
// This is currently a multi threaded server.
|
||||
//
|
||||
// Once the same thread executor lands, transition to single threaded.
|
||||
// This is running on the Tokio runtime, so it will be multi-threaded. The
|
||||
// `Arc<Mutex<...>>` allows state to be shared across the threads.
|
||||
let connections = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let srv = socket.incoming().for_each(move |stream| {
|
||||
let addr = stream.peer_addr().unwrap();
|
||||
// The server task asynchronously iterates over and processes each incoming
|
||||
// connection.
|
||||
let srv = socket.incoming()
|
||||
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
|
||||
.for_each(move |stream| {
|
||||
// The client's socket address
|
||||
let addr = stream.peer_addr().unwrap();
|
||||
|
||||
println!("New Connection: {}", addr);
|
||||
let (reader, writer) = stream.split();
|
||||
println!("New Connection: {}", addr);
|
||||
|
||||
// Create a channel for our stream, which other sockets will use to
|
||||
// send us messages. Then register our address with the stream to send
|
||||
// data to us.
|
||||
let (tx, rx) = futures::sync::mpsc::unbounded();
|
||||
connections.lock().unwrap().insert(addr, tx);
|
||||
// Split the TcpStream into two separate handles. One handle for reading
|
||||
// and one handle for writing. This lets us use separate tasks for
|
||||
// reading and writing.
|
||||
let (reader, writer) = stream.split();
|
||||
|
||||
// Define here what we do for the actual I/O. That is, read a bunch of
|
||||
// lines from the socket and dispatch them while we also write any lines
|
||||
// from other sockets.
|
||||
let connections_inner = connections.clone();
|
||||
let reader = BufReader::new(reader);
|
||||
// Create a channel for our stream, which other sockets will use to
|
||||
// send us messages. Then register our address with the stream to send
|
||||
// data to us.
|
||||
let (tx, rx) = futures::sync::mpsc::unbounded();
|
||||
connections.lock().unwrap().insert(addr, tx);
|
||||
|
||||
// Model the read portion of this socket by mapping an infinite
|
||||
// iterator to each line off the socket. This "loop" is then
|
||||
// terminated with an error once we hit EOF on the socket.
|
||||
let iter = stream::iter_ok::<_, Error>(iter::repeat(()));
|
||||
let socket_reader = iter.fold(reader, move |reader, _| {
|
||||
// Read a line off the socket, failing if we're at EOF
|
||||
let line = io::read_until(reader, b'\n', Vec::new());
|
||||
let line = line.and_then(|(reader, vec)| {
|
||||
if vec.len() == 0 {
|
||||
Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"))
|
||||
} else {
|
||||
Ok((reader, vec))
|
||||
}
|
||||
});
|
||||
// Define here what we do for the actual I/O. That is, read a bunch of
|
||||
// lines from the socket and dispatch them while we also write any lines
|
||||
// from other sockets.
|
||||
let connections_inner = connections.clone();
|
||||
let reader = BufReader::new(reader);
|
||||
|
||||
// Convert the bytes we read into a string, and then send that
|
||||
// string to all other connected clients.
|
||||
let line = line.map(|(reader, vec)| {
|
||||
(reader, String::from_utf8(vec))
|
||||
});
|
||||
let connections = connections_inner.clone();
|
||||
line.map(move |(reader, message)| {
|
||||
println!("{}: {:?}", addr, message);
|
||||
let mut conns = connections.lock().unwrap();
|
||||
if let Ok(msg) = message {
|
||||
// For each open connection except the sender, send the
|
||||
// string via the channel.
|
||||
let iter = conns.iter_mut()
|
||||
.filter(|&(&k, _)| k != addr)
|
||||
.map(|(_, v)| v);
|
||||
for tx in iter {
|
||||
tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
|
||||
// Model the read portion of this socket by mapping an infinite
|
||||
// iterator to each line off the socket. This "loop" is then
|
||||
// terminated with an error once we hit EOF on the socket.
|
||||
let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
|
||||
|
||||
let socket_reader = iter.fold(reader, move |reader, _| {
|
||||
// Read a line off the socket, failing if we're at EOF
|
||||
let line = io::read_until(reader, b'\n', Vec::new());
|
||||
let line = line.and_then(|(reader, vec)| {
|
||||
if vec.len() == 0 {
|
||||
Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
|
||||
} else {
|
||||
Ok((reader, vec))
|
||||
}
|
||||
} else {
|
||||
let tx = conns.get_mut(&addr).unwrap();
|
||||
tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
|
||||
}
|
||||
reader
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
// Whenever we receive a string on the Receiver, we write it to
|
||||
// `WriteHalf<TcpStream>`.
|
||||
let socket_writer = rx.fold(writer, |writer, msg| {
|
||||
let amt = io::write_all(writer, msg.into_bytes());
|
||||
let amt = amt.map(|(writer, _)| writer);
|
||||
amt.map_err(|_| ())
|
||||
});
|
||||
// Convert the bytes we read into a string, and then send that
|
||||
// string to all other connected clients.
|
||||
let line = line.map(|(reader, vec)| {
|
||||
(reader, String::from_utf8(vec))
|
||||
});
|
||||
|
||||
let pool = CpuPool::new(1);
|
||||
// Move the connection state into the closure below.
|
||||
let connections = connections_inner.clone();
|
||||
|
||||
line.map(move |(reader, message)| {
|
||||
println!("{}: {:?}", addr, message);
|
||||
let mut conns = connections.lock().unwrap();
|
||||
|
||||
if let Ok(msg) = message {
|
||||
// For each open connection except the sender, send the
|
||||
// string via the channel.
|
||||
let iter = conns.iter_mut()
|
||||
.filter(|&(&k, _)| k != addr)
|
||||
.map(|(_, v)| v);
|
||||
for tx in iter {
|
||||
tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
|
||||
}
|
||||
} else {
|
||||
let tx = conns.get_mut(&addr).unwrap();
|
||||
tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
|
||||
}
|
||||
|
||||
reader
|
||||
})
|
||||
});
|
||||
|
||||
// Whenever we receive a string on the Receiver, we write it to
|
||||
// `WriteHalf<TcpStream>`.
|
||||
let socket_writer = rx.fold(writer, |writer, msg| {
|
||||
let amt = io::write_all(writer, msg.into_bytes());
|
||||
let amt = amt.map(|(writer, _)| writer);
|
||||
amt.map_err(|_| ())
|
||||
});
|
||||
|
||||
// Now that we've got futures representing each half of the socket, we
|
||||
// use the `select` combinator to wait for either half to be done to
|
||||
// tear down the other. Then we spawn off the result.
|
||||
let connections = connections.clone();
|
||||
let socket_reader = socket_reader.map_err(|_| ());
|
||||
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
|
||||
|
||||
// Spawn a task to process the connection
|
||||
tokio::spawn(connection.then(move |_| {
|
||||
connections.lock().unwrap().remove(&addr);
|
||||
println!("Connection {} closed.", addr);
|
||||
Ok(())
|
||||
}));
|
||||
|
||||
// Now that we've got futures representing each half of the socket, we
|
||||
// use the `select` combinator to wait for either half to be done to
|
||||
// tear down the other. Then we spawn off the result.
|
||||
let connections = connections.clone();
|
||||
let socket_reader = socket_reader.map_err(|_| ());
|
||||
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
|
||||
pool.execute(connection.then(move |_| {
|
||||
connections.lock().unwrap().remove(&addr);
|
||||
println!("Connection {} closed.", addr);
|
||||
Ok(())
|
||||
})).unwrap();
|
||||
|
||||
Ok(())
|
||||
});
|
||||
});
|
||||
|
||||
// execute server
|
||||
srv.wait().unwrap();
|
||||
tokio::run(srv);
|
||||
}
|
||||
|
@ -26,26 +26,21 @@
|
||||
|
||||
#![deny(warnings)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate tokio;
|
||||
#[macro_use]
|
||||
extern crate tokio_io;
|
||||
extern crate futures;
|
||||
extern crate bytes;
|
||||
|
||||
use tokio::executor::current_thread;
|
||||
use tokio::io;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_io::{AsyncRead};
|
||||
use futures::prelude::*;
|
||||
use tokio::prelude::*;
|
||||
use futures::sync::mpsc;
|
||||
use futures::future::{self, Either};
|
||||
use bytes::{BytesMut, Bytes, BufMut};
|
||||
|
||||
use std::io::{self, Write};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// Shorthand for the transmit half of the message channel.
|
||||
type Tx = mpsc::UnboundedSender<Bytes>;
|
||||
@ -88,7 +83,7 @@ struct Peer {
|
||||
///
|
||||
/// This is used to broadcast messages read off the socket to all connected
|
||||
/// peers.
|
||||
state: Rc<RefCell<Shared>>,
|
||||
state: Arc<Mutex<Shared>>,
|
||||
|
||||
/// Receive half of the message channel.
|
||||
///
|
||||
@ -137,7 +132,7 @@ impl Shared {
|
||||
impl Peer {
|
||||
/// Create a new instance of `Peer`.
|
||||
fn new(name: BytesMut,
|
||||
state: Rc<RefCell<Shared>>,
|
||||
state: Arc<Mutex<Shared>>,
|
||||
lines: Lines) -> Peer
|
||||
{
|
||||
// Get the client socket address
|
||||
@ -147,7 +142,8 @@ impl Peer {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
// Add an entry for this `Peer` in the shared state map.
|
||||
state.borrow_mut().peers.insert(addr, tx);
|
||||
state.lock().unwrap()
|
||||
.peers.insert(addr, tx);
|
||||
|
||||
Peer {
|
||||
name,
|
||||
@ -213,7 +209,7 @@ impl Future for Peer {
|
||||
let line = line.freeze();
|
||||
|
||||
// Now, send the line to all other peers
|
||||
for (addr, tx) in &self.state.borrow().peers {
|
||||
for (addr, tx) in &self.state.lock().unwrap().peers {
|
||||
// Don't send the message to ourselves
|
||||
if *addr != self.addr {
|
||||
// The send only fails if the rx half has been dropped,
|
||||
@ -240,7 +236,7 @@ impl Future for Peer {
|
||||
|
||||
impl Drop for Peer {
|
||||
fn drop(&mut self) {
|
||||
self.state.borrow_mut().peers
|
||||
self.state.lock().unwrap().peers
|
||||
.remove(&self.addr);
|
||||
}
|
||||
}
|
||||
@ -275,7 +271,7 @@ impl Lines {
|
||||
//
|
||||
// In the case of `io::Result`, an error of `WouldBlock` is
|
||||
// equivalent to `Async::NotReady.
|
||||
let n = try_nb!(self.socket.write(&self.wr));
|
||||
let n = try_ready!(self.socket.poll_write(&self.wr));
|
||||
|
||||
// As long as the wr is not empty, a successful write should
|
||||
// never write 0 bytes.
|
||||
@ -344,7 +340,7 @@ impl Stream for Lines {
|
||||
///
|
||||
/// This will read the first line from the socket to identify the client, then
|
||||
/// add the client to the set of connected peers in the chat service.
|
||||
fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
|
||||
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
|
||||
// Wrap the socket with the `Lines` codec that we wrote above.
|
||||
//
|
||||
// By doing this, we can operate at the line level instead of doing raw byte
|
||||
@ -406,8 +402,8 @@ fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
|
||||
println!("connection error = {:?}", e);
|
||||
});
|
||||
|
||||
// Spawn a new task that processes the socket:
|
||||
current_thread::spawn(connection);
|
||||
// Return the connection processing task
|
||||
tokio::spawn(connection);
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
@ -416,7 +412,7 @@ pub fn main() {
|
||||
// The server task will hold a handle to this. For every new client, the
|
||||
// `state` handle is cloned and passed into the task that processes the
|
||||
// client connection.
|
||||
let state = Rc::new(RefCell::new(Shared::new()));
|
||||
let state = Arc::new(Mutex::new(Shared::new()));
|
||||
|
||||
let addr = "127.0.0.1:6142".parse().unwrap();
|
||||
|
||||
@ -428,6 +424,7 @@ pub fn main() {
|
||||
// The server task asynchronously iterates over and processes each
|
||||
// incoming connection.
|
||||
let server = listener.incoming().for_each(move |socket| {
|
||||
// Spawn a task to process the connection
|
||||
process(socket, state.clone());
|
||||
Ok(())
|
||||
})
|
||||
@ -460,5 +457,5 @@ pub fn main() {
|
||||
//
|
||||
// In our example, we have not defined a shutdown strategy, so this will
|
||||
// block until `ctrl-c` is pressed at the terminal.
|
||||
current_thread::block_on_all(server).unwrap();
|
||||
tokio::run(server);
|
||||
}
|
||||
|
@ -1,122 +0,0 @@
|
||||
//! An example of offloading work to a thread pool instead of doing work on the
|
||||
//! main event loop.
|
||||
//!
|
||||
//! In this example the server will act as a form of echo server except that
|
||||
//! it'll echo back gzip-compressed data. Each connected client will have the
|
||||
//! data written streamed back as the compressed version is available, and all
|
||||
//! compressing will occur on a thread pool rather than the main event loop.
|
||||
//!
|
||||
//! You can preview this example with in one terminal:
|
||||
//!
|
||||
//! cargo run --example compress
|
||||
//!
|
||||
//! and in another terminal;
|
||||
//!
|
||||
//! echo test | cargo run --example connect 127.0.0.1:8080 | gunzip
|
||||
//!
|
||||
//! The latter command will need to be tweaked for non-unix-like shells, but
|
||||
//! you can also redirect the stdout of the `connect` program to a file
|
||||
//! and then decompress that.
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate flate2;
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::io;
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::{Future, Stream, Poll};
|
||||
use futures::future::Executor;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use flate2::write::GzEncoder;
|
||||
|
||||
fn main() {
|
||||
// As with many other examples, parse our CLI arguments and prepare the
|
||||
// reactor.
|
||||
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let addr = addr.parse::<SocketAddr>().unwrap();
|
||||
let socket = TcpListener::bind(&addr).unwrap();
|
||||
println!("Listening on: {}", addr);
|
||||
|
||||
// This is where we're going to offload our computationally heavy work
|
||||
// (compressing) to. Here we just use a convenience constructor to create a
|
||||
// pool of threads equal to the number of CPUs we have.
|
||||
let pool = CpuPool::new_num_cpus();
|
||||
|
||||
// The compress logic will happen in the function below, but everything's
|
||||
// still a future! Each client is spawned to concurrently get processed.
|
||||
let server = socket.incoming().for_each(move |socket| {
|
||||
let addr = socket.peer_addr().unwrap();
|
||||
pool.execute(compress(socket, &pool).then(move |result| {
|
||||
match result {
|
||||
Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w),
|
||||
Err(e) => println!("{}: failed when compressing: {}", addr, e),
|
||||
}
|
||||
Ok(())
|
||||
})).unwrap();
|
||||
Ok(())
|
||||
});
|
||||
|
||||
server.wait().unwrap();
|
||||
}
|
||||
|
||||
/// The main workhorse of this example. This'll compress all data read from
|
||||
/// `socket` on the `pool` provided, writing it back out to `socket` as it's
|
||||
/// available.
|
||||
fn compress(socket: TcpStream, pool: &CpuPool)
|
||||
-> Box<Future<Item = (u64, u64), Error = io::Error> + Send>
|
||||
{
|
||||
use tokio_io::io;
|
||||
|
||||
// The general interface that `CpuPool` provides is that we'll *spawn a
|
||||
// future* onto it. All execution of the future will occur on the `CpuPool`
|
||||
// and we'll get back a handle representing the completed value of the
|
||||
// future. In essence it's our job here to create a future that represents
|
||||
// compressing `socket`, and then we'll simply spawn it at the very end.
|
||||
//
|
||||
// Here we exploit the fact that `TcpStream` itself is `Send` in this
|
||||
// function as well. That is, we can read/write the TCP stream on any
|
||||
// thread, and we'll get notifications about it being ready from the reactor
|
||||
// thread.
|
||||
//
|
||||
// Otherwise this is the same as the echo server except that after splitting
|
||||
// we apply some encoding to one side, followed by a `shutdown` when we're
|
||||
// done to ensure that all gz footers are written.
|
||||
let (read, write) = socket.split();
|
||||
let write = Count { io: write, amt: 0 };
|
||||
let write = GzEncoder::new(write, flate2::Compression::best());
|
||||
let process = io::copy(read, write).and_then(|(amt, _read, write)| {
|
||||
io::shutdown(write).map(move |io| (amt, io.get_ref().amt))
|
||||
});
|
||||
|
||||
// Spawn the future so is executes entirely on the thread pool here
|
||||
Box::new(pool.spawn(process))
|
||||
}
|
||||
|
||||
struct Count<T> {
|
||||
io: T,
|
||||
amt: u64,
|
||||
}
|
||||
|
||||
impl<T: io::Write> io::Write for Count<T> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let n = self.io.write(buf)?;
|
||||
self.amt += n as u64;
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.io.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> AsyncWrite for Count<T> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.io.shutdown()
|
||||
}
|
||||
}
|
@ -14,10 +14,11 @@
|
||||
//! this repository! Many of them recommend running this as a simple "hook up
|
||||
//! stdin/stdout to a server" to get up and running.
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
extern crate futures;
|
||||
extern crate bytes;
|
||||
|
||||
use std::env;
|
||||
@ -25,9 +26,8 @@ use std::io::{self, Read, Write};
|
||||
use std::net::SocketAddr;
|
||||
use std::thread;
|
||||
|
||||
use tokio::prelude::*;
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Future, Sink, Stream};
|
||||
use futures_cpupool::CpuPool;
|
||||
|
||||
fn main() {
|
||||
// Determine if we're going to run in TCP or UDP mode
|
||||
@ -46,8 +46,6 @@ fn main() {
|
||||
});
|
||||
let addr = addr.parse::<SocketAddr>().unwrap();
|
||||
|
||||
let pool = CpuPool::new(1);
|
||||
|
||||
// Right now Tokio doesn't support a handle to stdin running on the event
|
||||
// loop, so we farm out that work to a separate thread. This thread will
|
||||
// read data (with blocking I/O) from stdin and then send it to the event
|
||||
@ -60,9 +58,9 @@ fn main() {
|
||||
// our UDP connection to get a stream of bytes we're going to emit to
|
||||
// stdout.
|
||||
let stdout = if tcp {
|
||||
tcp::connect(&addr, &pool, Box::new(stdin_rx))
|
||||
tcp::connect(&addr, Box::new(stdin_rx))
|
||||
} else {
|
||||
udp::connect(&addr, &pool, Box::new(stdin_rx))
|
||||
udp::connect(&addr, Box::new(stdin_rx))
|
||||
};
|
||||
|
||||
// And now with our stream of bytes to write to stdout, we execute that in
|
||||
@ -71,15 +69,21 @@ fn main() {
|
||||
// loop. In this case, though, we know it's ok as the event loop isn't
|
||||
// otherwise running anything useful.
|
||||
let mut out = io::stdout();
|
||||
stdout.for_each(|chunk| {
|
||||
out.write_all(&chunk)
|
||||
}).wait().unwrap();
|
||||
|
||||
tokio::run({
|
||||
stdout
|
||||
.for_each(move |chunk| {
|
||||
out.write_all(&chunk)
|
||||
})
|
||||
.map_err(|e| println!("error reading stdout; error = {:?}", e))
|
||||
});
|
||||
}
|
||||
|
||||
mod codec {
|
||||
use std::io;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use tokio_io::codec::{Encoder, Decoder};
|
||||
|
||||
/// A simple `Codec` implementation that just ships bytes around.
|
||||
///
|
||||
/// This type is used for "framing" a TCP/UDP stream of bytes but it's really
|
||||
@ -115,24 +119,21 @@ mod codec {
|
||||
}
|
||||
|
||||
mod tcp {
|
||||
use tokio;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::prelude::*;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use codec::Bytes;
|
||||
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::{Future, Stream};
|
||||
use futures::future::Executor;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_io::AsyncRead;
|
||||
use codec::Bytes;
|
||||
|
||||
pub fn connect(addr: &SocketAddr,
|
||||
pool: &CpuPool,
|
||||
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
|
||||
-> Box<Stream<Item = BytesMut, Error = io::Error>>
|
||||
-> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
|
||||
{
|
||||
let tcp = TcpStream::connect(addr);
|
||||
let pool = pool.clone();
|
||||
|
||||
// After the TCP connection has been established, we set up our client
|
||||
// to start forwarding data.
|
||||
@ -151,12 +152,14 @@ mod tcp {
|
||||
// with us reading data from the stream.
|
||||
Box::new(tcp.map(move |stream| {
|
||||
let (sink, stream) = stream.framed(Bytes).split();
|
||||
pool.execute(stdin.forward(sink).then(|result| {
|
||||
|
||||
tokio::spawn(stdin.forward(sink).then(|result| {
|
||||
if let Err(e) = result {
|
||||
panic!("failed to write to socket: {}", e)
|
||||
}
|
||||
Ok(())
|
||||
})).unwrap();
|
||||
}));
|
||||
|
||||
stream
|
||||
}).flatten_stream())
|
||||
}
|
||||
@ -166,17 +169,16 @@ mod udp {
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::{Future, Stream};
|
||||
use futures::future::Executor;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio;
|
||||
use tokio::net::{UdpSocket, UdpFramed};
|
||||
use tokio::prelude::*;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use codec::Bytes;
|
||||
|
||||
pub fn connect(&addr: &SocketAddr,
|
||||
pool: &CpuPool,
|
||||
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
|
||||
-> Box<Stream<Item = BytesMut, Error = io::Error>>
|
||||
-> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
|
||||
{
|
||||
// We'll bind our UDP socket to a local IP/port, but for now we
|
||||
// basically let the OS pick both of those.
|
||||
@ -196,14 +198,14 @@ mod udp {
|
||||
|
||||
// All bytes from `stdin` will go to the `addr` specified in our
|
||||
// argument list. Like with TCP this is spawned concurrently
|
||||
pool.execute(stdin.map(move |chunk| {
|
||||
tokio::spawn(stdin.map(move |chunk| {
|
||||
(chunk, addr)
|
||||
}).forward(sink).then(|result| {
|
||||
if let Err(e) = result {
|
||||
panic!("failed to write to socket: {}", e)
|
||||
}
|
||||
Ok(())
|
||||
})).unwrap();
|
||||
}));
|
||||
|
||||
// With UDP we could receive data from any source, so filter out
|
||||
// anything coming from a different address
|
||||
|
@ -1,92 +0,0 @@
|
||||
//! A multithreaded version of an echo server
|
||||
//!
|
||||
//! This server implements the same functionality as the `echo` example, except
|
||||
//! that this example will use all cores of the machine to do I/O instead of
|
||||
//! just one. This examples works by having the main thread using blocking I/O
|
||||
//! and shipping accepted sockets to worker threads in a round-robin fashion.
|
||||
//!
|
||||
//! To see this server in action, you can run this in one terminal:
|
||||
//!
|
||||
//! cargo run --example echo-threads
|
||||
//!
|
||||
//! and in another terminal you can run:
|
||||
//!
|
||||
//! cargo run --example connect 127.0.0.1:8080
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate num_cpus;
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use std::thread;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::future::Executor;
|
||||
use futures::sync::mpsc;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_io::AsyncRead;
|
||||
use tokio_io::io::copy;
|
||||
use tokio::net::{TcpStream, TcpListener};
|
||||
|
||||
fn main() {
|
||||
// First argument, the address to bind
|
||||
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let addr = addr.parse::<SocketAddr>().unwrap();
|
||||
|
||||
// Second argument, the number of threads we'll be using
|
||||
let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
|
||||
.unwrap_or(num_cpus::get());
|
||||
|
||||
let listener = TcpListener::bind(&addr).expect("failed to bind");
|
||||
println!("Listening on: {}", addr);
|
||||
|
||||
// Spin up our worker threads, creating a channel routing to each worker
|
||||
// thread that we'll use below.
|
||||
let mut channels = Vec::new();
|
||||
for _ in 0..num_threads {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
channels.push(tx);
|
||||
thread::spawn(|| worker(rx));
|
||||
}
|
||||
|
||||
// Infinitely accept sockets from our `TcpListener`. Each socket is then
|
||||
// shipped round-robin to a particular thread which will associate the
|
||||
// socket with the corresponding event loop and process the connection.
|
||||
let mut next = 0;
|
||||
let srv = listener.incoming().for_each(|socket| {
|
||||
channels[next].unbounded_send(socket).expect("worker thread died");
|
||||
next = (next + 1) % channels.len();
|
||||
Ok(())
|
||||
});
|
||||
srv.wait().unwrap();
|
||||
}
|
||||
|
||||
fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
|
||||
let pool = CpuPool::new(1);
|
||||
|
||||
let done = rx.for_each(move |socket| {
|
||||
let addr = socket.peer_addr().expect("failed to get remote address");
|
||||
|
||||
// Like the single-threaded `echo` example we split the socket halves
|
||||
// and use the `copy` helper to ship bytes back and forth. Afterwards we
|
||||
// spawn the task to run concurrently on this thread, and then print out
|
||||
// what happened afterwards
|
||||
let (reader, writer) = socket.split();
|
||||
let amt = copy(reader, writer);
|
||||
let msg = amt.then(move |result| {
|
||||
match result {
|
||||
Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
|
||||
Err(e) => println!("error on {}: {}", addr, e),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
pool.execute(msg).unwrap();
|
||||
|
||||
Ok(())
|
||||
});
|
||||
done.wait().unwrap();
|
||||
}
|
@ -10,15 +10,16 @@
|
||||
//!
|
||||
//! Each line you type in to the `nc` terminal should be echo'd back to you!
|
||||
|
||||
#![deny(warnings)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::{env, io};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::{Future, Poll};
|
||||
use tokio::prelude::*;
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
struct Server {
|
||||
@ -56,11 +57,17 @@ fn main() {
|
||||
let socket = UdpSocket::bind(&addr).unwrap();
|
||||
println!("Listening on: {}", socket.local_addr().unwrap());
|
||||
|
||||
// Next we'll create a future to spawn (the one we defined above) and then
|
||||
// we'll block our current thread waiting on the result of the future
|
||||
Server {
|
||||
let server = Server {
|
||||
socket: socket,
|
||||
buf: vec![0; 1024],
|
||||
to_send: None,
|
||||
}.wait().unwrap();
|
||||
};
|
||||
|
||||
// This starts the server task.
|
||||
//
|
||||
// `map_err` handles the error by logging it and maps the future to a type
|
||||
// that can be spawned.
|
||||
//
|
||||
// `tokio::run` spanws the task on the Tokio runtime and starts running.
|
||||
tokio::run(server.map_err(|e| println!("server error = {:?}", e)));
|
||||
}
|
||||
|
132
examples/echo.rs
132
examples/echo.rs
@ -1,9 +1,11 @@
|
||||
//! A "hello world" echo server with tokio-core
|
||||
//! A "hello world" echo server with Tokio
|
||||
//!
|
||||
//! This server will create a TCP listener, accept connections in a loop, and
|
||||
//! simply write back everything that's read off of each TCP connection. Each
|
||||
//! TCP connection is processed concurrently with all other TCP connections, and
|
||||
//! each connection will have its own buffer that it's reading in/out of.
|
||||
//! write back everything that's read off of each TCP connection.
|
||||
//!
|
||||
//! Because the Tokio runtime uses a thread poool, each TCP connection is
|
||||
//! processed concurrently with all other TCP connections across multiple
|
||||
//! threads.
|
||||
//!
|
||||
//! To see this server in action, you can run this in one terminal:
|
||||
//!
|
||||
@ -17,22 +19,17 @@
|
||||
//! you! If you open up multiple terminals running the `connect` example you
|
||||
//! should be able to see them all make progress simultaneously.
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use tokio::io;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::prelude::*;
|
||||
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::Future;
|
||||
use futures::future::Executor;
|
||||
use futures::stream::Stream;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_io::AsyncRead;
|
||||
use tokio_io::io::copy;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
fn main() {
|
||||
// Allow passing an address to listen on as the first argument of this
|
||||
// program, but otherwise we'll just set up our TCP listener on
|
||||
@ -48,9 +45,6 @@ fn main() {
|
||||
let socket = TcpListener::bind(&addr).unwrap();
|
||||
println!("Listening on: {}", addr);
|
||||
|
||||
// A CpuPool allows futures to be executed concurrently.
|
||||
let pool = CpuPool::new(1);
|
||||
|
||||
// Here we convert the `TcpListener` to a stream of incoming connections
|
||||
// with the `incoming` method. We then define how to process each element in
|
||||
// the stream with the `for_each` method.
|
||||
@ -60,59 +54,61 @@ fn main() {
|
||||
// connections made to the server). The return value of the `for_each`
|
||||
// method is itself a future representing processing the entire stream of
|
||||
// connections, and ends up being our server.
|
||||
let done = socket.incoming().for_each(move |socket| {
|
||||
let done = socket.incoming()
|
||||
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
|
||||
.for_each(move |socket| {
|
||||
// Once we're inside this closure this represents an accepted client
|
||||
// from our server. The `socket` is the client connection (similar to
|
||||
// how the standard library operates).
|
||||
//
|
||||
// We just want to copy all data read from the socket back onto the
|
||||
// socket itself (e.g. "echo"). We can use the standard `io::copy`
|
||||
// combinator in the `tokio-core` crate to do precisely this!
|
||||
//
|
||||
// The `copy` function takes two arguments, where to read from and where
|
||||
// to write to. We only have one argument, though, with `socket`.
|
||||
// Luckily there's a method, `Io::split`, which will split an Read/Write
|
||||
// stream into its two halves. This operation allows us to work with
|
||||
// each stream independently, such as pass them as two arguments to the
|
||||
// `copy` function.
|
||||
//
|
||||
// The `copy` function then returns a future, and this future will be
|
||||
// resolved when the copying operation is complete, resolving to the
|
||||
// amount of data that was copied.
|
||||
let (reader, writer) = socket.split();
|
||||
let amt = io::copy(reader, writer);
|
||||
|
||||
// Once we're inside this closure this represents an accepted client
|
||||
// from our server. The `socket` is the client connection (similar to
|
||||
// how the standard library operates).
|
||||
//
|
||||
// We just want to copy all data read from the socket back onto the
|
||||
// socket itself (e.g. "echo"). We can use the standard `io::copy`
|
||||
// combinator in the `tokio-core` crate to do precisely this!
|
||||
//
|
||||
// The `copy` function takes two arguments, where to read from and where
|
||||
// to write to. We only have one argument, though, with `socket`.
|
||||
// Luckily there's a method, `Io::split`, which will split an Read/Write
|
||||
// stream into its two halves. This operation allows us to work with
|
||||
// each stream independently, such as pass them as two arguments to the
|
||||
// `copy` function.
|
||||
//
|
||||
// The `copy` function then returns a future, and this future will be
|
||||
// resolved when the copying operation is complete, resolving to the
|
||||
// amount of data that was copied.
|
||||
let (reader, writer) = socket.split();
|
||||
let amt = copy(reader, writer);
|
||||
// After our copy operation is complete we just print out some helpful
|
||||
// information.
|
||||
let msg = amt.then(move |result| {
|
||||
match result {
|
||||
Ok((amt, _, _)) => println!("wrote {} bytes", amt),
|
||||
Err(e) => println!("error: {}", e),
|
||||
}
|
||||
|
||||
// After our copy operation is complete we just print out some helpful
|
||||
// information.
|
||||
let msg = amt.then(move |result| {
|
||||
match result {
|
||||
Ok((amt, _, _)) => println!("wrote {} bytes", amt),
|
||||
Err(e) => println!("error: {}", e),
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
Ok(())
|
||||
|
||||
// And this is where much of the magic of this server happens. We
|
||||
// crucially want all clients to make progress concurrently, rather than
|
||||
// blocking one on completion of another. To achieve this we use the
|
||||
// `tokio::spawn` function to execute the work in the background.
|
||||
//
|
||||
// This function will transfer ownership of the future (`msg` in this
|
||||
// case) to the Tokio runtime thread pool that. The thread pool will
|
||||
// drive the future to completion.
|
||||
//
|
||||
// Essentially here we're executing a new task to run concurrently,
|
||||
// which will allow all of our clients to be processed concurrently.
|
||||
tokio::spawn(msg)
|
||||
});
|
||||
|
||||
// And this is where much of the magic of this server happens. We
|
||||
// crucially want all clients to make progress concurrently, rather than
|
||||
// blocking one on completion of another. To achieve this we use the
|
||||
// `execute` function on the `Executor` trait to essentially execute
|
||||
// some work in the background.
|
||||
//
|
||||
// This function will transfer ownership of the future (`msg` in this
|
||||
// case) to the event loop that `handle` points to. The event loop will
|
||||
// then drive the future to completion.
|
||||
//
|
||||
// Essentially here we're executing a new task to run concurrently,
|
||||
// which will allow all of our clients to be processed concurrently.
|
||||
pool.execute(msg).unwrap();
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
// And finally now that we've define what our server is, we run it! Here we
|
||||
// just need to execute the future we've created and wait for it to complete
|
||||
// using the standard methods in the `futures` crate.
|
||||
done.wait().unwrap();
|
||||
// And finally now that we've define what our server is, we run it!
|
||||
//
|
||||
// This starts the Tokio runtime, spawns the server task, and blocks the
|
||||
// current thread until all tasks complete execution. Since the `done` task
|
||||
// never completes (it just keeps accepting sockets), `tokio::run` blocks
|
||||
// forever (until ctrl-c is pressed).
|
||||
tokio::run(done);
|
||||
}
|
||||
|
@ -1,44 +0,0 @@
|
||||
//! A small example of a server that accepts TCP connections and writes out
|
||||
//! `Hello!` to them, afterwards closing the connection.
|
||||
//!
|
||||
//! You can test this out by running:
|
||||
//!
|
||||
//! cargo run --example hello
|
||||
//!
|
||||
//! and then in another terminal executing
|
||||
//!
|
||||
//! cargo run --example connect 127.0.0.1:8080
|
||||
//!
|
||||
//! You should see `Hello!` printed out and then the `nc` program will exit.
|
||||
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::prelude::*;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
fn main() {
|
||||
env_logger::init().unwrap();
|
||||
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let addr = addr.parse::<SocketAddr>().unwrap();
|
||||
|
||||
let listener = TcpListener::bind(&addr).unwrap();
|
||||
|
||||
let addr = listener.local_addr().unwrap();
|
||||
println!("Listening for connections on {}", addr);
|
||||
|
||||
let clients = listener.incoming();
|
||||
let welcomes = clients.and_then(|socket| {
|
||||
tokio_io::io::write_all(socket, b"Hello!\n")
|
||||
});
|
||||
let server = welcomes.for_each(|(_socket, _welcome)| {
|
||||
Ok(())
|
||||
});
|
||||
|
||||
server.wait().unwrap();
|
||||
}
|
@ -15,13 +15,10 @@
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
extern crate futures;
|
||||
|
||||
use tokio::executor::current_thread;
|
||||
use tokio::io;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_io::io;
|
||||
use futures::{Future, Stream};
|
||||
use tokio::prelude::*;
|
||||
|
||||
pub fn main() {
|
||||
let addr = "127.0.0.1:6142".parse().unwrap();
|
||||
@ -43,7 +40,7 @@ pub fn main() {
|
||||
});
|
||||
|
||||
// Spawn a new task that processes the socket:
|
||||
current_thread::spawn(connection);
|
||||
tokio::spawn(connection);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
@ -76,5 +73,5 @@ pub fn main() {
|
||||
//
|
||||
// In our example, we have not defined a shutdown strategy, so this will
|
||||
// block until `ctrl-c` is pressed at the terminal.
|
||||
current_thread::block_on_all(server).unwrap();
|
||||
tokio::run(server);
|
||||
}
|
||||
|
@ -1,6 +1,10 @@
|
||||
//! A proxy that forwards data to another server and forwards that server's
|
||||
//! responses back to clients.
|
||||
//!
|
||||
//! Because the Tokio runtime uses a thread poool, each TCP connection is
|
||||
//! processed concurrently with all other TCP connections across multiple
|
||||
//! threads.
|
||||
//!
|
||||
//! You can showcase this by running this in one terminal:
|
||||
//!
|
||||
//! cargo run --example proxy
|
||||
@ -16,23 +20,18 @@
|
||||
//! This final terminal will connect to our proxy, which will in turn connect to
|
||||
//! the echo server, and you'll be able to see data flowing between them.
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::env;
|
||||
use std::net::{Shutdown, SocketAddr};
|
||||
use std::io::{self, Read, Write};
|
||||
|
||||
use futures::stream::Stream;
|
||||
use futures::{Future, Poll};
|
||||
use futures::future::{Executor};
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio::io::{copy, shutdown};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io::io::{copy, shutdown};
|
||||
use tokio::prelude::*;
|
||||
|
||||
fn main() {
|
||||
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
|
||||
@ -41,59 +40,60 @@ fn main() {
|
||||
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let server_addr = server_addr.parse::<SocketAddr>().unwrap();
|
||||
|
||||
let pool = CpuPool::new(1);
|
||||
|
||||
// Create a TCP listener which will listen for incoming connections.
|
||||
let socket = TcpListener::bind(&listen_addr).unwrap();
|
||||
println!("Listening on: {}", listen_addr);
|
||||
println!("Proxying to: {}", server_addr);
|
||||
|
||||
let done = socket.incoming().for_each(move |client| {
|
||||
let server = TcpStream::connect(&server_addr);
|
||||
let amounts = server.and_then(move |server| {
|
||||
// Create separate read/write handles for the TCP clients that we're
|
||||
// proxying data between. Note that typically you'd use
|
||||
// `AsyncRead::split` for this operation, but we want our writer
|
||||
// handles to have a custom implementation of `shutdown` which
|
||||
// actually calls `TcpStream::shutdown` to ensure that EOF is
|
||||
// transmitted properly across the proxied connection.
|
||||
//
|
||||
// As a result, we wrap up our client/server manually in arcs and
|
||||
// use the impls below on our custom `MyTcpStream` type.
|
||||
let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
|
||||
let client_writer = client_reader.clone();
|
||||
let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
|
||||
let server_writer = server_reader.clone();
|
||||
let done = socket.incoming()
|
||||
.map_err(|e| println!("error accepting socket; error = {:?}", e))
|
||||
.for_each(move |client| {
|
||||
let server = TcpStream::connect(&server_addr);
|
||||
let amounts = server.and_then(move |server| {
|
||||
// Create separate read/write handles for the TCP clients that we're
|
||||
// proxying data between. Note that typically you'd use
|
||||
// `AsyncRead::split` for this operation, but we want our writer
|
||||
// handles to have a custom implementation of `shutdown` which
|
||||
// actually calls `TcpStream::shutdown` to ensure that EOF is
|
||||
// transmitted properly across the proxied connection.
|
||||
//
|
||||
// As a result, we wrap up our client/server manually in arcs and
|
||||
// use the impls below on our custom `MyTcpStream` type.
|
||||
let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
|
||||
let client_writer = client_reader.clone();
|
||||
let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
|
||||
let server_writer = server_reader.clone();
|
||||
|
||||
// Copy the data (in parallel) between the client and the server.
|
||||
// After the copy is done we indicate to the remote side that we've
|
||||
// finished by shutting down the connection.
|
||||
let client_to_server = copy(client_reader, server_writer)
|
||||
.and_then(|(n, _, server_writer)| {
|
||||
shutdown(server_writer).map(move |_| n)
|
||||
});
|
||||
// Copy the data (in parallel) between the client and the server.
|
||||
// After the copy is done we indicate to the remote side that we've
|
||||
// finished by shutting down the connection.
|
||||
let client_to_server = copy(client_reader, server_writer)
|
||||
.and_then(|(n, _, server_writer)| {
|
||||
shutdown(server_writer).map(move |_| n)
|
||||
});
|
||||
|
||||
let server_to_client = copy(server_reader, client_writer)
|
||||
.and_then(|(n, _, client_writer)| {
|
||||
shutdown(client_writer).map(move |_| n)
|
||||
});
|
||||
let server_to_client = copy(server_reader, client_writer)
|
||||
.and_then(|(n, _, client_writer)| {
|
||||
shutdown(client_writer).map(move |_| n)
|
||||
});
|
||||
|
||||
client_to_server.join(server_to_client)
|
||||
client_to_server.join(server_to_client)
|
||||
});
|
||||
|
||||
let msg = amounts.map(move |(from_client, from_server)| {
|
||||
println!("client wrote {} bytes and received {} bytes",
|
||||
from_client, from_server);
|
||||
}).map_err(|e| {
|
||||
// Don't panic. Maybe the client just disconnected too soon.
|
||||
println!("error: {}", e);
|
||||
});
|
||||
|
||||
tokio::spawn(msg);
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let msg = amounts.map(move |(from_client, from_server)| {
|
||||
println!("client wrote {} bytes and received {} bytes",
|
||||
from_client, from_server);
|
||||
}).map_err(|e| {
|
||||
// Don't panic. Maybe the client just disconnected too soon.
|
||||
println!("error: {}", e);
|
||||
});
|
||||
pool.execute(msg).unwrap();
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
done.wait().unwrap();
|
||||
tokio::run(done);
|
||||
}
|
||||
|
||||
// This is a custom type used to have a custom implementation of the
|
||||
|
@ -1,58 +0,0 @@
|
||||
//! A small server that writes as many nul bytes on all connections it receives.
|
||||
//!
|
||||
//! There is no concurrency in this server, only one connection is written to at
|
||||
//! a time. You can use this as a benchmark for the raw performance of writing
|
||||
//! data to a socket by measuring how much data is being written on each
|
||||
//! connection.
|
||||
//!
|
||||
//! Typically you'll want to run this example with:
|
||||
//!
|
||||
//! cargo run --example sink --release
|
||||
//!
|
||||
//! And then you can connect to it via:
|
||||
//!
|
||||
//! cargo run --example connect 127.0.0.1:8080 > /dev/null
|
||||
//!
|
||||
//! You should see your CPUs light up as data's being shove into the ether.
|
||||
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::env;
|
||||
use std::iter;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::Future;
|
||||
use futures::future::Executor;
|
||||
use futures::stream::{self, Stream};
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_io::IoFuture;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
|
||||
fn main() {
|
||||
env_logger::init().unwrap();
|
||||
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let addr = addr.parse::<SocketAddr>().unwrap();
|
||||
|
||||
let pool = CpuPool::new(1);
|
||||
|
||||
let socket = TcpListener::bind(&addr).unwrap();
|
||||
println!("Listening on: {}", addr);
|
||||
let server = socket.incoming().for_each(|socket| {
|
||||
println!("got a socket: {}", socket.peer_addr().unwrap());
|
||||
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
|
||||
Ok(())
|
||||
});
|
||||
server.wait().unwrap();
|
||||
}
|
||||
|
||||
fn write(socket: TcpStream) -> IoFuture<()> {
|
||||
static BUF: &'static [u8] = &[0; 64 * 1024];
|
||||
let iter = iter::repeat(());
|
||||
Box::new(stream::iter_ok(iter).fold(socket, |socket, ()| {
|
||||
tokio_io::io::write_all(socket, BUF).map(|(socket, _)| socket)
|
||||
}).map(|_| ()))
|
||||
}
|
@ -39,10 +39,9 @@
|
||||
//! * `SET $key $value` - this will set the value of `$key` to `$value`,
|
||||
//! returning the previous value, if any.
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::io::BufReader;
|
||||
@ -50,12 +49,9 @@ use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::future::Executor;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio::io::{lines, write_all};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_io::AsyncRead;
|
||||
use tokio_io::io::{lines, write_all};
|
||||
use tokio::prelude::*;
|
||||
|
||||
/// The in-memory database shared amongst all clients.
|
||||
///
|
||||
@ -86,9 +82,6 @@ fn main() {
|
||||
let listener = TcpListener::bind(&addr).expect("failed to bind");
|
||||
println!("Listening on: {}", addr);
|
||||
|
||||
// Create a CpuPool to execute tasks
|
||||
let pool = CpuPool::new(1);
|
||||
|
||||
// Create the shared state of this server that will be shared amongst all
|
||||
// clients. We populate the initial database and then create the `Database`
|
||||
// structure. Note the usage of `Arc` here which will be used to ensure that
|
||||
@ -100,67 +93,69 @@ fn main() {
|
||||
map: Mutex::new(initial_db),
|
||||
});
|
||||
|
||||
let done = listener.incoming().for_each(move |socket| {
|
||||
// As with many other small examples, the first thing we'll do is
|
||||
// *split* this TCP stream into two separately owned halves. This'll
|
||||
// allow us to work with the read and write halves independently.
|
||||
let (reader, writer) = socket.split();
|
||||
let done = listener.incoming()
|
||||
.map_err(|e| println!("error accepting socket; error = {:?}", e))
|
||||
.for_each(move |socket| {
|
||||
// As with many other small examples, the first thing we'll do is
|
||||
// *split* this TCP stream into two separately owned halves. This'll
|
||||
// allow us to work with the read and write halves independently.
|
||||
let (reader, writer) = socket.split();
|
||||
|
||||
// Since our protocol is line-based we use `tokio_io`'s `lines` utility
|
||||
// to convert our stream of bytes, `reader`, into a `Stream` of lines.
|
||||
let lines = lines(BufReader::new(reader));
|
||||
// Since our protocol is line-based we use `tokio_io`'s `lines` utility
|
||||
// to convert our stream of bytes, `reader`, into a `Stream` of lines.
|
||||
let lines = lines(BufReader::new(reader));
|
||||
|
||||
// Here's where the meat of the processing in this server happens. First
|
||||
// we see a clone of the database being created, which is creating a
|
||||
// new reference for this connected client to use. Also note the `move`
|
||||
// keyword on the closure here which moves ownership of the reference
|
||||
// into the closure, which we'll need for spawning the client below.
|
||||
//
|
||||
// The `map` function here means that we'll run some code for all
|
||||
// requests (lines) we receive from the client. The actual handling here
|
||||
// is pretty simple, first we parse the request and if it's valid we
|
||||
// generate a response based on the values in the database.
|
||||
let db = db.clone();
|
||||
let responses = lines.map(move |line| {
|
||||
let request = match Request::parse(&line) {
|
||||
Ok(req) => req,
|
||||
Err(e) => return Response::Error { msg: e },
|
||||
};
|
||||
// Here's where the meat of the processing in this server happens. First
|
||||
// we see a clone of the database being created, which is creating a
|
||||
// new reference for this connected client to use. Also note the `move`
|
||||
// keyword on the closure here which moves ownership of the reference
|
||||
// into the closure, which we'll need for spawning the client below.
|
||||
//
|
||||
// The `map` function here means that we'll run some code for all
|
||||
// requests (lines) we receive from the client. The actual handling here
|
||||
// is pretty simple, first we parse the request and if it's valid we
|
||||
// generate a response based on the values in the database.
|
||||
let db = db.clone();
|
||||
let responses = lines.map(move |line| {
|
||||
let request = match Request::parse(&line) {
|
||||
Ok(req) => req,
|
||||
Err(e) => return Response::Error { msg: e },
|
||||
};
|
||||
|
||||
let mut db = db.map.lock().unwrap();
|
||||
match request {
|
||||
Request::Get { key } => {
|
||||
match db.get(&key) {
|
||||
Some(value) => Response::Value { key, value: value.clone() },
|
||||
None => Response::Error { msg: format!("no key {}", key) },
|
||||
let mut db = db.map.lock().unwrap();
|
||||
match request {
|
||||
Request::Get { key } => {
|
||||
match db.get(&key) {
|
||||
Some(value) => Response::Value { key, value: value.clone() },
|
||||
None => Response::Error { msg: format!("no key {}", key) },
|
||||
}
|
||||
}
|
||||
Request::Set { key, value } => {
|
||||
let previous = db.insert(key.clone(), value.clone());
|
||||
Response::Set { key, value, previous }
|
||||
}
|
||||
}
|
||||
Request::Set { key, value } => {
|
||||
let previous = db.insert(key.clone(), value.clone());
|
||||
Response::Set { key, value, previous }
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// At this point `responses` is a stream of `Response` types which we
|
||||
// now want to write back out to the client. To do that we use
|
||||
// `Stream::fold` to perform a loop here, serializing each response and
|
||||
// then writing it out to the client.
|
||||
let writes = responses.fold(writer, |writer, response| {
|
||||
let mut response = response.serialize();
|
||||
response.push('\n');
|
||||
write_all(writer, response.into_bytes()).map(|(w, _)| w)
|
||||
});
|
||||
|
||||
// Like with other small servers, we'll `spawn` this client to ensure it
|
||||
// runs concurrently with all other clients, for now ignoring any errors
|
||||
// that we see.
|
||||
let msg = writes.then(move |_| Ok(()));
|
||||
|
||||
tokio::spawn(msg)
|
||||
});
|
||||
|
||||
// At this point `responses` is a stream of `Response` types which we
|
||||
// now want to write back out to the client. To do that we use
|
||||
// `Stream::fold` to perform a loop here, serializing each response and
|
||||
// then writing it out to the client.
|
||||
let writes = responses.fold(writer, |writer, response| {
|
||||
let mut response = response.serialize();
|
||||
response.push('\n');
|
||||
write_all(writer, response.into_bytes()).map(|(w, _)| w)
|
||||
});
|
||||
|
||||
// Like with other small servers, we'll `spawn` this client to ensure it
|
||||
// runs concurrently with all other clients, for now ignoring any errors
|
||||
// that we see.
|
||||
let msg = writes.then(move |_| Ok(()));
|
||||
pool.execute(msg).unwrap();
|
||||
Ok(())
|
||||
});
|
||||
|
||||
done.wait().unwrap();
|
||||
tokio::run(done);
|
||||
}
|
||||
|
||||
impl Request {
|
||||
|
@ -11,12 +11,11 @@
|
||||
//! respectively. By default this will run I/O on all the cores your system has
|
||||
//! available, and it doesn't support HTTP request bodies.
|
||||
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate bytes;
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate http;
|
||||
extern crate httparse;
|
||||
extern crate num_cpus;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate serde_json;
|
||||
@ -24,73 +23,58 @@ extern crate time;
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
|
||||
use std::env;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{self, SocketAddr};
|
||||
use std::thread;
|
||||
use std::{env, fmt, io};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use tokio::net::{TcpStream, TcpListener};
|
||||
use tokio::prelude::*;
|
||||
|
||||
use tokio_io::codec::{Encoder, Decoder};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::future::{self, Executor};
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Stream, Future, Sink};
|
||||
use futures_cpupool::CpuPool;
|
||||
use http::header::HeaderValue;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::reactor::Handle;
|
||||
use tokio_io::codec::{Encoder, Decoder};
|
||||
use tokio_io::{AsyncRead};
|
||||
|
||||
fn main() {
|
||||
// Parse the arguments, bind the TCP socket we'll be listening to, spin up
|
||||
// our worker threads, and start shipping sockets to those worker threads.
|
||||
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
|
||||
let addr = addr.parse::<SocketAddr>().unwrap();
|
||||
let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
|
||||
.unwrap_or(num_cpus::get());
|
||||
|
||||
let listener = net::TcpListener::bind(&addr).expect("failed to bind");
|
||||
let listener = TcpListener::bind(&addr).expect("failed to bind");
|
||||
println!("Listening on: {}", addr);
|
||||
|
||||
let mut channels = Vec::new();
|
||||
for _ in 0..num_threads {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
channels.push(tx);
|
||||
thread::spawn(|| worker(rx));
|
||||
}
|
||||
let mut next = 0;
|
||||
for socket in listener.incoming() {
|
||||
if let Ok(socket) = socket {
|
||||
channels[next].unbounded_send(socket).expect("worker thread died");
|
||||
next = (next + 1) % channels.len();
|
||||
}
|
||||
}
|
||||
tokio::run({
|
||||
listener.incoming()
|
||||
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
|
||||
.for_each(|socket| {
|
||||
process(socket);
|
||||
Ok(())
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
|
||||
let handle = Handle::default();
|
||||
fn process(socket: TcpStream) {
|
||||
let (tx, rx) = socket
|
||||
// Frame the socket using the `Http` protocol. This maps the TCP socket
|
||||
// to a Stream + Sink of HTTP frames.
|
||||
.framed(Http)
|
||||
// This splits a single `Stream + Sink` value into two separate handles
|
||||
// that can be used independently (even on different tasks or threads).
|
||||
.split();
|
||||
|
||||
let pool = CpuPool::new(1);
|
||||
// Map all requests into responses and send them back to the client.
|
||||
let task = tx.send_all(rx.and_then(respond))
|
||||
.then(|res| {
|
||||
if let Err(e) = res {
|
||||
println!("failed to process connection; error = {:?}", e);
|
||||
}
|
||||
|
||||
let done = rx.for_each(move |socket| {
|
||||
// Associate each socket we get with our local event loop, and then use
|
||||
// the codec support in the tokio-io crate to deal with discrete
|
||||
// request/response types instead of bytes. Here we'll just use our
|
||||
// framing defined below and then use the `send_all` helper to send the
|
||||
// responses back on the socket after we've processed them
|
||||
let socket = future::result(TcpStream::from_std(socket, &handle));
|
||||
let req = socket.and_then(|socket| {
|
||||
let (tx, rx) = socket.framed(Http).split();
|
||||
tx.send_all(rx.and_then(respond))
|
||||
});
|
||||
pool.execute(req.then(move |result| {
|
||||
drop(result);
|
||||
Ok(())
|
||||
})).unwrap();
|
||||
Ok(())
|
||||
});
|
||||
done.wait().unwrap();
|
||||
});
|
||||
|
||||
// Spawn the task that handles the connection.
|
||||
tokio::spawn(task);
|
||||
}
|
||||
|
||||
/// "Server logic" is implemented in this function.
|
||||
|
@ -1,29 +1,25 @@
|
||||
//! This is a basic example of leveraging `BytesCodec` to create a simple UDP
|
||||
//! client and server which speak a custom protocol.
|
||||
//! This example leverages `BytesCodec` to create a UDP client and server which
|
||||
//! speak a custom protocol.
|
||||
//!
|
||||
//! Here we're using the codec from tokio-io to convert a UDP socket to a stream of
|
||||
//! client messages. These messages are then processed and returned back as a
|
||||
//! new message with a new destination. Overall, we then use this to construct a
|
||||
//! "ping pong" pair where two sockets are sending messages back and forth.
|
||||
|
||||
#![deny(warnings)]
|
||||
|
||||
extern crate tokio;
|
||||
extern crate tokio_io;
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::{Future, Stream, Sink};
|
||||
use futures::future::Executor;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio::prelude::*;
|
||||
use tokio::net::{UdpSocket, UdpFramed};
|
||||
use tokio_io::codec::BytesCodec;
|
||||
|
||||
fn main() {
|
||||
drop(env_logger::init());
|
||||
|
||||
let pool = CpuPool::new(1);
|
||||
let _ = env_logger::init();
|
||||
|
||||
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
|
||||
@ -59,6 +55,9 @@ fn main() {
|
||||
let b = b_sink.send_all(b_stream);
|
||||
|
||||
// Spawn the sender of pongs and then wait for our pinger to finish.
|
||||
pool.execute(b.then(|_| Ok(()))).unwrap();
|
||||
drop(a.wait());
|
||||
tokio::run({
|
||||
b.join(a)
|
||||
.map(|_| ())
|
||||
.map_err(|e| println!("error = {:?}", e))
|
||||
});
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user