Add an example of compressing on a CPU pool

This commit is contained in:
Alex Crichton 2017-09-11 14:40:18 -07:00
parent ecedea3404
commit 8a43472b35
3 changed files with 128 additions and 0 deletions

View File

@ -28,6 +28,8 @@ futures = "0.1.15"
[dev-dependencies]
env_logger = { version = "0.4", default-features = false }
flate2 = { version = "0.2", features = ["tokio"] }
futures-cpupool = "0.1"
http = "0.1"
httparse = "1.0"
libc = "0.2"

View File

@ -41,6 +41,9 @@ A high level description of each example is:
objects.
* `udp-codec` - an example of using the `UdpCodec` trait along with a small
ping-pong protocol happening locally.
* `compress` - an echo-like server where instead of echoing back everything read
it echos back a gzip-compressed version of everything read! All compression
occurs on a CPU pool to offload work from the event loop.
If you've got an example you'd like to see here, please feel free to open an
issue. Otherwise if you've got an example you'd like to add, please feel free

123
examples/compress.rs Normal file
View File

@ -0,0 +1,123 @@
//! An example of offloading work to a thread pool instead of doing work on the
//! main event loop.
//!
//! In this example the server will act as a form of echo server except that
//! it'll echo back gzip-compressed data. Each connected client will have the
//! data written streamed back as the compressed version is available, and all
//! compressing will occur on a thread pool rather than the main event loop.
//!
//! You can preview this example with in one terminal:
//!
//! cargo run --example compress
//!
//! and in another terminal;
//!
//! echo test | cargo run --example connect 127.0.0.1:8080 | gunzip
//!
//! The latter command will need to be tweaked for non-unix-like shells, but
//! you can also redirect the stdout of the `connect` program to a file
//! and then decompress that.
extern crate futures;
extern crate futures_cpupool;
extern crate flate2;
extern crate tokio_core;
extern crate tokio_io;
use std::io;
use std::env;
use std::net::SocketAddr;
use futures::{Future, Stream, Poll};
use futures_cpupool::CpuPool;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::Core;
use tokio_io::{AsyncRead, AsyncWrite};
use flate2::write::GzEncoder;
fn main() {
// As with many other examples, parse our CLI arguments and prepare the
// reactor.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
// This is where we're going to offload our computationally heavy work
// (compressing) to. Here we just use a convenience constructor to create a
// pool of threads equal to the number of CPUs we have.
let pool = CpuPool::new_num_cpus();
// The compress logic will happen in the function below, but everything's
// still a future! Each client is spawned to concurrently get processed.
let server = socket.incoming().for_each(move |(socket, addr)| {
handle.spawn(compress(socket, &pool).then(move |result| {
match result {
Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w),
Err(e) => println!("{}: failed when compressing: {}", addr, e),
}
Ok(())
}));
Ok(())
});
core.run(server).unwrap();
}
/// The main workhorse of this example. This'll compress all data read from
/// `socket` on the `pool` provided, writing it back out to `socket` as it's
/// available.
fn compress(socket: TcpStream, pool: &CpuPool)
-> Box<Future<Item = (u64, u64), Error = io::Error>>
{
use tokio_io::io;
// The general interface that `CpuPool` provides is that we'll *spawn a
// future* onto it. All execution of the future will occur on the `CpuPool`
// and we'll get back a handle representing the completed value of the
// future. In essence it's our job here to create a future that represents
// compressing `socket`, and then we'll simply spawn it at the very end.
//
// Here we exploit the fact that `TcpStream` itself is `Send` in this
// function as well. That is, we can read/write the TCP stream on any
// thread, and we'll get notifications about it being ready from the reactor
// thread.
//
// Otherwise this is the same as the echo server except that after splitting
// we apply some encoding to one side, followed by a `shutdown` when we're
// done to ensure that all gz footers are written.
let (read, write) = socket.split();
let write = Count { io: write, amt: 0 };
let write = GzEncoder::new(write, flate2::Compression::Best);
let process = io::copy(read, write).and_then(|(amt, _read, write)| {
io::shutdown(write).map(move |io| (amt, io.get_ref().amt))
});
// Spawn the future so is executes entirely on the thread pool here
Box::new(pool.spawn(process))
}
struct Count<T> {
io: T,
amt: u64,
}
impl<T: io::Write> io::Write for Count<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let n = self.io.write(buf)?;
self.amt += n as u64;
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl<T: AsyncWrite> AsyncWrite for Count<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.io.shutdown()
}
}