fix(chat): Implement alexcrichton's suggestions

* Remove unnecessary clone
* Improve rightward drift
* Remove unnecessary lazy future
* Improve utf-8 handling
* Refactor to make code more understandable
This commit is contained in:
oberien 2016-10-06 15:38:20 +02:00
parent 0205b855d0
commit 6961efa8dd

View File

@ -8,7 +8,7 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::iter;
use std::env;
use std::io::BufReader;
use std::io::{Error, ErrorKind, BufReader};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
@ -28,12 +28,11 @@ fn main() {
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let connections = connections.clone();
let future = socket.incoming().for_each(move |(stream, addr)| {
let connections = connections.clone();
let handle_inner = handle.clone();
// We create a new future in which we create all other futures.
// This makes `stream` be bound on the `lazy` future's task, allowing
// This makes `stream` be bound on the outer future's task, allowing
// `ReadHalf` and `WriteHalf` to be shared between inner futures.
handle.spawn_fn(move || {
println!("New Connection: {}", addr);
@ -43,50 +42,65 @@ fn main() {
// add sender to hashmap of all current connections
connections.borrow_mut().insert(addr, tx);
let reader = BufReader::new(reader);
let connections_inner = connections.clone();
// https://users.rust-lang.org/t/loop-futures-for-client-handling/6950/2
// We have an endless loop reading from a client.
// In order to fuse the reading and writing futures in the end, we need to have the same
// output type. Therefore we use `(Option<BufReader<ReadHalf<TcpStream>>>,
// Option<WriteHalf<TcpStream>>)`.
let reader = BufReader::new(reader);
let socket_reader = stream::iter::<_, _, std::io::Error>(iter::repeat(()).map(Ok)).fold((Some(reader),None), move |(reader, _), _| {
let reader = reader.unwrap();
// First we need to get an infinite iterator
let iter = stream::iter::<_, _, std::io::Error>(iter::repeat(()).map(Ok));
// Then we fold it as infinite loop
let socket_reader = iter.fold(reader, move |reader, _| {
let connections = connections_inner.clone();
// read and parse length prefix
io::read_until(reader, '\n' as u8, vec![])
.and_then(|(reader, vec)| futures::lazy(|| {
// EOF was hit without reading a delimiter
if vec.len() == 0 {
futures::failed((std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Broken Pipe"))).boxed()
} else {
futures::finished((reader, vec)).boxed()
// read line
let amt = io::read_until(reader, '\n' as u8, vec![]);
// check if we hit EOF and need to close the connection
let amt = amt.and_then(|(reader, vec)| {
// EOF was hit without reading a delimiter
if vec.len() == 0 {
let err = Error::new(ErrorKind::BrokenPipe, "Broken Pipe");
futures::failed(err).boxed()
} else {
futures::finished((reader, vec)).boxed()
}
});
// convert bytes into string
let amt = amt.map(|(reader, vec)| (reader, String::from_utf8(vec)));
amt.and_then(move |(reader, message)| {
println!("{}: {:?}", addr, message);
let conns = connections.borrow_mut();
if let Ok(msg) = message {
// For each open connection except the sender, send the string
// via the channel
let iter = conns.iter().filter(|&(&k,_)| k != addr).map(|(_,v)| v);
for tx in iter {
tx.send(msg.clone()).unwrap();
}
}))
// convert bytes into string
.map(|(reader, vec)| (reader, String::from_utf8(vec).unwrap()))
.and_then(move |(reader, message)| {
println!("{}: {:?}", addr, message);
// For each open connection except the sender, send the string via the channel
for tx in connections.borrow_mut().iter().filter(|&(&k,_)| k != addr).map(|(_,v)| v) {
tx.send(message.clone()).unwrap();
}
futures::finished((Some(reader),None))
})
} else {
let tx = conns.get(&addr).unwrap();
tx.send("You didn't send valid UTF-8.".to_string()).unwrap();
}
futures::finished(reader)
})
});
// Whenever we receive a string on the Receiver, we write it to `WriteHalf<TcpStream>`.
let socket_writer = rx.fold((None, Some(writer)), move |(_, writer), msg| {
let writer = writer.unwrap();
io::write_all(writer, msg.into_bytes()).map(|(writer, _)| (None, Some(writer))).boxed()
let socket_writer = rx.fold(writer, |writer, msg| {
let amt = io::write_all(writer, msg.into_bytes());
let amt = amt.map(|(writer, _)| writer);
amt
});
socket_reader.select(socket_writer)
.then(move |_| {
connections.borrow_mut().remove(&addr);
println!("Connection {:?} closed.", addr);
Ok(())
})
// In order to fuse the reading and writing futures in the end, we need to have the
// same output type. Therefore we use `(Option<BufReader<ReadHalf<TcpStream>>>,
// Option<WriteHalf<TcpStream>>)`.
let socket_reader = socket_reader.map(|reader| (Some(reader), None));
let socket_writer = socket_writer.map(|writer| (None, Some(writer)));
let amt = socket_reader.select(socket_writer);
amt.then(move |_| {
connections.borrow_mut().remove(&addr);
println!("Connection {:?} closed.", addr);
Ok(())
})
});
Ok(())
});
@ -94,3 +108,4 @@ fn main() {
// exectue server
core.run(future).unwrap();
}