mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
No more need for lazy in chat example
This commit is contained in:
parent
6dc8333fb2
commit
4744a2e48b
110
examples/chat.rs
110
examples/chat.rs
@ -37,6 +37,7 @@ fn main() {
|
||||
|
||||
let srv = socket.incoming().for_each(move |(stream, addr)| {
|
||||
println!("New Connection: {}", addr);
|
||||
let (reader, writer) = stream.split();
|
||||
|
||||
// Create a channel for our stream, which other sockets will use to
|
||||
// send us messages. Then register our address with the stream to send
|
||||
@ -44,84 +45,67 @@ fn main() {
|
||||
let (tx, rx) = tokio_core::channel::channel(&handle).unwrap();
|
||||
connections.borrow_mut().insert(addr, tx);
|
||||
|
||||
// Note that below we're calling `spawn` to spawn a new future for this
|
||||
// connection. As a result we use `futures::lazy` here to ensure that
|
||||
// the call to `.split()` happens on the right task.
|
||||
//
|
||||
// This `split` will give us a read/write half to work with each portion
|
||||
// of the socket separately.
|
||||
let pair = futures::lazy(|| Ok(stream.split()));
|
||||
|
||||
// Define here what we do for the actual I/O. That is, read a bunch of
|
||||
// lines from the socket and dispatch them while we also write any lines
|
||||
// from other sockets.
|
||||
let connections_inner = connections.clone();
|
||||
let pair = pair.map(move |(reader, writer)| {
|
||||
let reader = BufReader::new(reader);
|
||||
let reader = BufReader::new(reader);
|
||||
|
||||
// Model the read portion of this socket by mapping an infinite
|
||||
// iterator to each line off the socket. This "loop" is then
|
||||
// terminated with an error once we hit EOF on the socket.
|
||||
let iter = stream::iter(iter::repeat(()).map(Ok));
|
||||
let socket_reader = iter.fold(reader, move |reader, _| {
|
||||
// Read a line off the socket, failing if we're at EOF
|
||||
let line = io::read_until(reader, b'\n', Vec::new());
|
||||
let line = line.and_then(|(reader, vec)| {
|
||||
if vec.len() == 0 {
|
||||
Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"))
|
||||
} else {
|
||||
Ok((reader, vec))
|
||||
}
|
||||
});
|
||||
|
||||
// Convert the bytes we read into a string, and then send that
|
||||
// string to all other connected clients.
|
||||
let line = line.map(|(reader, vec)| {
|
||||
(reader, String::from_utf8(vec))
|
||||
});
|
||||
let connections = connections_inner.clone();
|
||||
line.map(move |(reader, message)| {
|
||||
println!("{}: {:?}", addr, message);
|
||||
let conns = connections.borrow_mut();
|
||||
if let Ok(msg) = message {
|
||||
// For each open connection except the sender, send the
|
||||
// string via the channel.
|
||||
let iter = conns.iter()
|
||||
.filter(|&(&k, _)| k != addr)
|
||||
.map(|(_, v)| v);
|
||||
for tx in iter {
|
||||
tx.send(format!("{}: {}", addr, msg)).unwrap();
|
||||
}
|
||||
} else {
|
||||
let tx = conns.get(&addr).unwrap();
|
||||
tx.send("You didn't send valid UTF-8.".to_string()).unwrap();
|
||||
}
|
||||
reader
|
||||
})
|
||||
// Model the read portion of this socket by mapping an infinite
|
||||
// iterator to each line off the socket. This "loop" is then
|
||||
// terminated with an error once we hit EOF on the socket.
|
||||
let iter = stream::iter(iter::repeat(()).map(Ok));
|
||||
let socket_reader = iter.fold(reader, move |reader, _| {
|
||||
// Read a line off the socket, failing if we're at EOF
|
||||
let line = io::read_until(reader, b'\n', Vec::new());
|
||||
let line = line.and_then(|(reader, vec)| {
|
||||
if vec.len() == 0 {
|
||||
Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"))
|
||||
} else {
|
||||
Ok((reader, vec))
|
||||
}
|
||||
});
|
||||
|
||||
// Whenever we receive a string on the Receiver, we write it to
|
||||
// `WriteHalf<TcpStream>`.
|
||||
let socket_writer = rx.fold(writer, |writer, msg| {
|
||||
let amt = io::write_all(writer, msg.into_bytes());
|
||||
let amt = amt.map(|(writer, _)| writer);
|
||||
amt
|
||||
// Convert the bytes we read into a string, and then send that
|
||||
// string to all other connected clients.
|
||||
let line = line.map(|(reader, vec)| {
|
||||
(reader, String::from_utf8(vec))
|
||||
});
|
||||
let connections = connections_inner.clone();
|
||||
line.map(move |(reader, message)| {
|
||||
println!("{}: {:?}", addr, message);
|
||||
let conns = connections.borrow_mut();
|
||||
if let Ok(msg) = message {
|
||||
// For each open connection except the sender, send the
|
||||
// string via the channel.
|
||||
let iter = conns.iter()
|
||||
.filter(|&(&k, _)| k != addr)
|
||||
.map(|(_, v)| v);
|
||||
for tx in iter {
|
||||
tx.send(format!("{}: {}", addr, msg)).unwrap();
|
||||
}
|
||||
} else {
|
||||
let tx = conns.get(&addr).unwrap();
|
||||
tx.send("You didn't send valid UTF-8.".to_string()).unwrap();
|
||||
}
|
||||
reader
|
||||
})
|
||||
});
|
||||
|
||||
(socket_reader, socket_writer)
|
||||
// Whenever we receive a string on the Receiver, we write it to
|
||||
// `WriteHalf<TcpStream>`.
|
||||
let socket_writer = rx.fold(writer, |writer, msg| {
|
||||
let amt = io::write_all(writer, msg.into_bytes());
|
||||
let amt = amt.map(|(writer, _)| writer);
|
||||
amt
|
||||
});
|
||||
|
||||
// Now that we've got futures representing each half of the socket, we
|
||||
// use the `select` combinator to wait for either half to be done to
|
||||
// tear down the other. Then we spawn off the result.
|
||||
let connections = connections.clone();
|
||||
let addr = addr;
|
||||
handle.spawn(pair.and_then(|(reader, writer)| {
|
||||
let reader = reader.map(|_| ());
|
||||
let writer = writer.map(|_| ());
|
||||
|
||||
reader.select(writer)
|
||||
}).then(move |_| {
|
||||
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
|
||||
handle.spawn(connection.then(move |_| {
|
||||
connections.borrow_mut().remove(&addr);
|
||||
println!("Connection {} closed.", addr);
|
||||
Ok(())
|
||||
|
Loading…
x
Reference in New Issue
Block a user