tests: handle errors properly in examples (#748)

This commit is contained in:
Liran Ringel 2018-11-20 18:10:36 +02:00 committed by Toby Lawrence
parent 477fa5580a
commit 9b1a45cc6a
19 changed files with 144 additions and 119 deletions

View File

@ -34,12 +34,12 @@ use std::env;
use std::io::{BufReader}; use std::io::{BufReader};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// Create the TCP listener we'll accept connections on. // Create the TCP listener we'll accept connections on.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap(); let addr = addr.parse()?;
let socket = TcpListener::bind(&addr).unwrap(); let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr); println!("Listening on: {}", addr);
// This is running on the Tokio runtime, so it will be multi-threaded. The // This is running on the Tokio runtime, so it will be multi-threaded. The
@ -49,10 +49,10 @@ fn main() {
// The server task asynchronously iterates over and processes each incoming // The server task asynchronously iterates over and processes each incoming
// connection. // connection.
let srv = socket.incoming() let srv = socket.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e)) .map_err(|e| {println!("failed to accept socket; error = {:?}", e); e})
.for_each(move |stream| { .for_each(move |stream| {
// The client's socket address // The client's socket address
let addr = stream.peer_addr().unwrap(); let addr = stream.peer_addr()?;
println!("New Connection: {}", addr); println!("New Connection: {}", addr);
@ -143,8 +143,10 @@ fn main() {
})); }));
Ok(()) Ok(())
}); })
.map_err(|err| println!("error occurred: {:?}", err));
// execute server // execute server
tokio::run(srv); tokio::run(srv);
Ok(())
} }

View File

@ -426,7 +426,7 @@ fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
tokio::spawn(connection); tokio::spawn(connection);
} }
pub fn main() { pub fn main() -> Result<(), Box<std::error::Error>> {
// Create the shared state. This is how all the peers communicate. // Create the shared state. This is how all the peers communicate.
// //
// The server task will hold a handle to this. For every new client, the // The server task will hold a handle to this. For every new client, the
@ -434,12 +434,12 @@ pub fn main() {
// client connection. // client connection.
let state = Arc::new(Mutex::new(Shared::new())); let state = Arc::new(Mutex::new(Shared::new()));
let addr = "127.0.0.1:6142".parse().unwrap(); let addr = "127.0.0.1:6142".parse()?;
// Bind a TCP listener to the socket address. // Bind a TCP listener to the socket address.
// //
// Note that this is the Tokio TcpListener, which is fully async. // Note that this is the Tokio TcpListener, which is fully async.
let listener = TcpListener::bind(&addr).unwrap(); let listener = TcpListener::bind(&addr)?;
// The server task asynchronously iterates over and processes each // The server task asynchronously iterates over and processes each
// incoming connection. // incoming connection.
@ -471,4 +471,5 @@ pub fn main() {
// In our example, we have not defined a shutdown strategy, so this will // In our example, we have not defined a shutdown strategy, so this will
// block until `ctrl-c` is pressed at the terminal. // block until `ctrl-c` is pressed at the terminal.
tokio::run(server); tokio::run(server);
Ok(())
} }

View File

@ -29,7 +29,7 @@ use std::thread;
use tokio::prelude::*; use tokio::prelude::*;
use futures::sync::mpsc; use futures::sync::mpsc;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// Determine if we're going to run in TCP or UDP mode // Determine if we're going to run in TCP or UDP mode
let mut args = env::args().skip(1).collect::<Vec<_>>(); let mut args = env::args().skip(1).collect::<Vec<_>>();
let tcp = match args.iter().position(|a| a == "--udp") { let tcp = match args.iter().position(|a| a == "--udp") {
@ -41,10 +41,11 @@ fn main() {
}; };
// Parse what address we're going to connect to // Parse what address we're going to connect to
let addr = args.first().unwrap_or_else(|| { let addr = match args.first() {
panic!("this program requires at least one argument") Some(addr) => addr,
}); None => Err("this program requires at least one argument")?,
let addr = addr.parse::<SocketAddr>().unwrap(); };
let addr = addr.parse::<SocketAddr>()?;
// Right now Tokio doesn't support a handle to stdin running on the event // Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will // loop, so we farm out that work to a separate thread. This thread will
@ -52,15 +53,15 @@ fn main() {
// loop over a standard futures channel. // loop over a standard futures channel.
let (stdin_tx, stdin_rx) = mpsc::channel(0); let (stdin_tx, stdin_rx) = mpsc::channel(0);
thread::spawn(|| read_stdin(stdin_tx)); thread::spawn(|| read_stdin(stdin_tx));
let stdin_rx = stdin_rx.map_err(|_| panic!()); // errors not possible on rx let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx"));
// Now that we've got our stdin read we either set up our TCP connection or // Now that we've got our stdin read we either set up our TCP connection or
// our UDP connection to get a stream of bytes we're going to emit to // our UDP connection to get a stream of bytes we're going to emit to
// stdout. // stdout.
let stdout = if tcp { let stdout = if tcp {
tcp::connect(&addr, Box::new(stdin_rx)) tcp::connect(&addr, Box::new(stdin_rx))?
} else { } else {
udp::connect(&addr, Box::new(stdin_rx)) udp::connect(&addr, Box::new(stdin_rx))?
}; };
// And now with our stream of bytes to write to stdout, we execute that in // And now with our stream of bytes to write to stdout, we execute that in
@ -77,6 +78,7 @@ fn main() {
}) })
.map_err(|e| println!("error reading stdout; error = {:?}", e)) .map_err(|e| println!("error reading stdout; error = {:?}", e))
}); });
Ok(())
} }
mod codec { mod codec {
@ -127,12 +129,13 @@ mod tcp {
use bytes::BytesMut; use bytes::BytesMut;
use codec::Bytes; use codec::Bytes;
use std::error::Error;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
pub fn connect(addr: &SocketAddr, pub fn connect(addr: &SocketAddr,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error> + Send> -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
{ {
let tcp = TcpStream::connect(addr); let tcp = TcpStream::connect(addr);
@ -151,22 +154,24 @@ mod tcp {
// You'll also note that we *spawn* the work to read stdin and write it // You'll also note that we *spawn* the work to read stdin and write it
// to the TCP stream. This is done to ensure that happens concurrently // to the TCP stream. This is done to ensure that happens concurrently
// with us reading data from the stream. // with us reading data from the stream.
Box::new(tcp.map(move |stream| { let stream = Box::new(tcp.map(move |stream| {
let (sink, stream) = Bytes.framed(stream).split(); let (sink, stream) = Bytes.framed(stream).split();
tokio::spawn(stdin.forward(sink).then(|result| { tokio::spawn(stdin.forward(sink).then(|result| {
if let Err(e) = result { if let Err(e) = result {
panic!("failed to write to socket: {}", e) println!("failed to write to socket: {}", e)
} }
Ok(()) Ok(())
})); }));
stream stream
}).flatten_stream()) }).flatten_stream());
Ok(stream)
} }
} }
mod udp { mod udp {
use std::error::Error;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -179,17 +184,19 @@ mod udp {
pub fn connect(&addr: &SocketAddr, pub fn connect(&addr: &SocketAddr,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error> + Send> -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>
{ {
// We'll bind our UDP socket to a local IP/port, but for now we // We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those. // basically let the OS pick both of those.
let addr_to_bind = if addr.ip().is_ipv4() { let addr_to_bind = if addr.ip().is_ipv4() {
"0.0.0.0:0".parse().unwrap() "0.0.0.0:0".parse()?
} else { } else {
"[::]:0".parse().unwrap() "[::]:0".parse()?
};
let udp = match UdpSocket::bind(&addr_to_bind) {
Ok(udp) => udp,
Err(_) => Err("failed to bind socket")?,
}; };
let udp = UdpSocket::bind(&addr_to_bind)
.expect("failed to bind socket");
// Like above with TCP we use an instance of `Bytes` codec to transform // Like above with TCP we use an instance of `Bytes` codec to transform
// this UDP socket into a framed sink/stream which operates over // this UDP socket into a framed sink/stream which operates over
@ -203,7 +210,7 @@ mod udp {
(chunk, addr) (chunk, addr)
}).forward(sink).then(|result| { }).forward(sink).then(|result| {
if let Err(e) = result { if let Err(e) = result {
panic!("failed to write to socket: {}", e) println!("failed to write to socket: {}", e)
} }
Ok(()) Ok(())
}); });
@ -218,10 +225,11 @@ mod udp {
} }
}); });
Box::new(future::lazy(|| { let stream = Box::new(future::lazy(|| {
tokio::spawn(forward_stdin); tokio::spawn(forward_stdin);
future::ok(receive) future::ok(receive)
}).flatten_stream()) }).flatten_stream());
Ok(stream)
} }
} }

View File

@ -50,12 +50,12 @@ impl Future for Server {
} }
} }
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap(); let addr = addr.parse::<SocketAddr>()?;
let socket = UdpSocket::bind(&addr).unwrap(); let socket = UdpSocket::bind(&addr)?;
println!("Listening on: {}", socket.local_addr().unwrap()); println!("Listening on: {}", socket.local_addr()?);
let server = Server { let server = Server {
socket: socket, socket: socket,
@ -70,4 +70,5 @@ fn main() {
// //
// `tokio::run` spawns the task on the Tokio runtime and starts running. // `tokio::run` spawns the task on the Tokio runtime and starts running.
tokio::run(server.map_err(|e| println!("server error = {:?}", e))); tokio::run(server.map_err(|e| println!("server error = {:?}", e)));
Ok(())
} }

View File

@ -30,19 +30,19 @@ use tokio::prelude::*;
use std::env; use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// Allow passing an address to listen on as the first argument of this // Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on // program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections. // 127.0.0.1:8080 for connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap(); let addr = addr.parse::<SocketAddr>()?;
// Next up we create a TCP listener which will listen for incoming // Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined // connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop, so we pass in a handle // above and must be associated with an event loop, so we pass in a handle
// to our event loop. After the socket's created we inform that we're ready // to our event loop. After the socket's created we inform that we're ready
// to go and start accepting connections. // to go and start accepting connections.
let socket = TcpListener::bind(&addr).unwrap(); let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr); println!("Listening on: {}", addr);
// Here we convert the `TcpListener` to a stream of incoming connections // Here we convert the `TcpListener` to a stream of incoming connections
@ -111,4 +111,5 @@ fn main() {
// never completes (it just keeps accepting sockets), `tokio::run` blocks // never completes (it just keeps accepting sockets), `tokio::run` blocks
// forever (until ctrl-c is pressed). // forever (until ctrl-c is pressed).
tokio::run(done); tokio::run(done);
Ok(())
} }

View File

@ -19,8 +19,8 @@ use tokio::io;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::prelude::*; use tokio::prelude::*;
pub fn main() { pub fn main() -> Result<(), Box<std::error::Error>> {
let addr = "127.0.0.1:6142".parse().unwrap(); let addr = "127.0.0.1:6142".parse()?;
// Open a TCP stream to the socket address. // Open a TCP stream to the socket address.
// //
@ -52,4 +52,6 @@ pub fn main() {
println!("About to create the stream and write to it..."); println!("About to create the stream and write to it...");
tokio::run(client); tokio::run(client);
println!("Stream has been created and written to."); println!("Stream has been created and written to.");
Ok(())
} }

View File

@ -60,7 +60,7 @@ fn run<F: Future<Item = (), Error = ()>>(f: F) -> Result<(), IoError> {
Ok(()) Ok(())
} }
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
run(future::lazy(|| { run(future::lazy(|| {
// Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn(). // Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn().
// It also can use the default reactor and create timeouts. // It also can use the default reactor and create timeouts.
@ -82,5 +82,6 @@ fn main() {
// We can spawn on the default executor, which is also the local one. // We can spawn on the default executor, which is also the local one.
tokio::executor::spawn(deadline); tokio::executor::spawn(deadline);
Ok(()) Ok(())
})).unwrap(); }))?;
Ok(())
} }

View File

@ -65,19 +65,19 @@ use tokio::codec::Decoder;
use std::env; use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// Allow passing an address to listen on as the first argument of this // Allow passing an address to listen on as the first argument of this
// program, but otherwise we'll just set up our TCP listener on // program, but otherwise we'll just set up our TCP listener on
// 127.0.0.1:8080 for connections. // 127.0.0.1:8080 for connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap(); let addr = addr.parse::<SocketAddr>()?;
// Next up we create a TCP listener which will listen for incoming // Next up we create a TCP listener which will listen for incoming
// connections. This TCP listener is bound to the address we determined // connections. This TCP listener is bound to the address we determined
// above and must be associated with an event loop, so we pass in a handle // above and must be associated with an event loop, so we pass in a handle
// to our event loop. After the socket's created we inform that we're ready // to our event loop. After the socket's created we inform that we're ready
// to go and start accepting connections. // to go and start accepting connections.
let socket = TcpListener::bind(&addr).unwrap(); let socket = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr); println!("Listening on: {}", addr);
// Here we convert the `TcpListener` to a stream of incoming connections // Here we convert the `TcpListener` to a stream of incoming connections
@ -146,4 +146,5 @@ fn main() {
// never completes (it just keeps accepting sockets), `tokio::run` blocks // never completes (it just keeps accepting sockets), `tokio::run` blocks
// forever (until ctrl-c is pressed). // forever (until ctrl-c is pressed).
tokio::run(done); tokio::run(done);
Ok(())
} }

View File

@ -33,15 +33,15 @@ use tokio::io::{copy, shutdown};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*; use tokio::prelude::*;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
let listen_addr = listen_addr.parse::<SocketAddr>().unwrap(); let listen_addr = listen_addr.parse::<SocketAddr>()?;
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>().unwrap(); let server_addr = server_addr.parse::<SocketAddr>()?;
// Create a TCP listener which will listen for incoming connections. // Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr).unwrap(); let socket = TcpListener::bind(&listen_addr)?;
println!("Listening on: {}", listen_addr); println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr); println!("Proxying to: {}", server_addr);
@ -94,6 +94,7 @@ fn main() {
}); });
tokio::run(done); tokio::run(done);
Ok(())
} }
// This is a custom type used to have a custom implementation of the // This is a custom type used to have a custom implementation of the

View File

@ -74,12 +74,12 @@ enum Response {
Error { msg: String }, Error { msg: String },
} }
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// Parse the address we're going to run this server on // Parse the address we're going to run this server on
// and set up our TCP listener to accept connections. // and set up our TCP listener to accept connections.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap(); let addr = addr.parse::<SocketAddr>()?;
let listener = TcpListener::bind(&addr).expect("failed to bind"); let listener = TcpListener::bind(&addr).map_err(|_| "failed to bind")?;
println!("Listening on: {}", addr); println!("Listening on: {}", addr);
// Create the shared state of this server that will be shared amongst all // Create the shared state of this server that will be shared amongst all
@ -156,6 +156,7 @@ fn main() {
}); });
tokio::run(done); tokio::run(done);
Ok(())
} }
impl Request { impl Request {

View File

@ -34,13 +34,13 @@ use bytes::BytesMut;
use http::header::HeaderValue; use http::header::HeaderValue;
use http::{Request, Response, StatusCode}; use http::{Request, Response, StatusCode};
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// Parse the arguments, bind the TCP socket we'll be listening to, spin up // Parse the arguments, bind the TCP socket we'll be listening to, spin up
// our worker threads, and start shipping sockets to those worker threads. // our worker threads, and start shipping sockets to those worker threads.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap(); let addr = addr.parse::<SocketAddr>()?;
let listener = TcpListener::bind(&addr).expect("failed to bind"); let listener = TcpListener::bind(&addr)?;
println!("Listening on: {}", addr); println!("Listening on: {}", addr);
tokio::run({ tokio::run({
@ -51,6 +51,7 @@ fn main() {
Ok(()) Ok(())
}) })
}); });
Ok(())
} }
fn process(socket: TcpStream) { fn process(socket: TcpStream) {
@ -84,28 +85,32 @@ fn process(socket: TcpStream) {
fn respond(req: Request<()>) fn respond(req: Request<()>)
-> Box<Future<Item = Response<String>, Error = io::Error> + Send> -> Box<Future<Item = Response<String>, Error = io::Error> + Send>
{ {
let mut ret = Response::builder(); let f = future::lazy(move || {
let mut response = Response::builder();
let body = match req.uri().path() { let body = match req.uri().path() {
"/plaintext" => { "/plaintext" => {
ret.header("Content-Type", "text/plain"); response.header("Content-Type", "text/plain");
"Hello, World!".to_string() "Hello, World!".to_string()
} }
"/json" => { "/json" => {
ret.header("Content-Type", "application/json"); response.header("Content-Type", "application/json");
#[derive(Serialize)] #[derive(Serialize)]
struct Message { struct Message {
message: &'static str, message: &'static str,
} }
serde_json::to_string(&Message { message: "Hello, World!" }) serde_json::to_string(&Message { message: "Hello, World!" })?
.unwrap()
} }
_ => { _ => {
ret.status(StatusCode::NOT_FOUND); response.status(StatusCode::NOT_FOUND);
String::new() String::new()
} }
}; };
Box::new(future::ok(ret.body(body).unwrap())) let response = response.body(body).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok(response)
});
Box::new(f)
} }
struct Http; struct Http;

View File

@ -35,29 +35,27 @@ use std::net::SocketAddr;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::prelude::*; use tokio::prelude::*;
fn get_stdin_data() -> Vec<u8> { fn get_stdin_data() -> Result<Vec<u8>, Box<std::error::Error>> {
let mut buf = Vec::new(); let mut buf = Vec::new();
stdin().read_to_end(&mut buf).unwrap(); stdin().read_to_end(&mut buf)?;
buf Ok(buf)
} }
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
let remote_addr: SocketAddr = env::args() let remote_addr: SocketAddr = env::args()
.nth(1) .nth(1)
.unwrap_or("127.0.0.1:8080".into()) .unwrap_or("127.0.0.1:8080".into())
.parse() .parse()?;
.unwrap();
// We use port 0 to let the operating system allocate an available port for us. // We use port 0 to let the operating system allocate an available port for us.
let local_addr: SocketAddr = if remote_addr.is_ipv4() { let local_addr: SocketAddr = if remote_addr.is_ipv4() {
"0.0.0.0:0" "0.0.0.0:0"
} else { } else {
"[::]:0" "[::]:0"
}.parse() }.parse()?;
.unwrap(); let socket = UdpSocket::bind(&local_addr)?;
let socket = UdpSocket::bind(&local_addr).unwrap();
const MAX_DATAGRAM_SIZE: usize = 65_507; const MAX_DATAGRAM_SIZE: usize = 65_507;
let processing = socket socket
.send_dgram(get_stdin_data(), &remote_addr) .send_dgram(get_stdin_data()?, &remote_addr)
.and_then(|(socket, _)| socket.recv_dgram(vec![0u8; MAX_DATAGRAM_SIZE])) .and_then(|(socket, _)| socket.recv_dgram(vec![0u8; MAX_DATAGRAM_SIZE]))
.map(|(_, data, len, _)| { .map(|(_, data, len, _)| {
println!( println!(
@ -66,9 +64,6 @@ fn main() {
String::from_utf8_lossy(&data[..len]) String::from_utf8_lossy(&data[..len])
) )
}) })
.wait(); .wait()?;
match processing { Ok(())
Ok(_) => {}
Err(e) => eprintln!("Encountered an error: {}", e),
}
} }

View File

@ -19,15 +19,15 @@ use tokio::prelude::*;
use tokio::net::{UdpSocket, UdpFramed}; use tokio::net::{UdpSocket, UdpFramed};
use tokio_codec::BytesCodec; use tokio_codec::BytesCodec;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
let _ = env_logger::init(); let _ = env_logger::init();
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: SocketAddr = "127.0.0.1:0".parse()?;
// Bind both our sockets and then figure out what ports we got. // Bind both our sockets and then figure out what ports we got.
let a = UdpSocket::bind(&addr).unwrap(); let a = UdpSocket::bind(&addr)?;
let b = UdpSocket::bind(&addr).unwrap(); let b = UdpSocket::bind(&addr)?;
let b_addr = b.local_addr().unwrap(); let b_addr = b.local_addr()?;
// We're parsing each socket with the `BytesCodec` included in `tokio_io`, and then we // We're parsing each socket with the `BytesCodec` included in `tokio_io`, and then we
// `split` each codec into the sink/stream halves. // `split` each codec into the sink/stream halves.
@ -61,4 +61,5 @@ fn main() {
.map(|_| ()) .map(|_| ())
.map_err(|e| println!("error = {:?}", e)) .map_err(|e| println!("error = {:?}", e))
}); });
Ok(())
} }

View File

@ -14,7 +14,7 @@ use futures::{Future, Stream, Sink};
use std::io; use std::io;
pub fn main() { pub fn main() -> Result<(), Box<std::error::Error>> {
let pool = Builder::new() let pool = Builder::new()
.pool_size(1) .pool_size(1)
.build(); .build();
@ -44,5 +44,6 @@ pub fn main() {
.map_err(|e| panic!("io error = {:?}", e)) .map_err(|e| panic!("io error = {:?}", e))
}); });
pool.shutdown_on_idle().wait().unwrap(); pool.shutdown_on_idle().wait().map_err(|_| "failed to shutdown the thread pool")?;
Ok(())
} }

View File

@ -7,7 +7,7 @@ use futures::{Future, Stream};
/// how many signals to handle before exiting /// how many signals to handle before exiting
const STOP_AFTER: u64 = 10; const STOP_AFTER: u64 = 10;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// tokio_signal provides a convenience builder for Ctrl+C // tokio_signal provides a convenience builder for Ctrl+C
// this even works cross-platform: linux and windows! // this even works cross-platform: linux and windows!
// //
@ -53,7 +53,8 @@ fn main() {
// Up until now, we haven't really DONE anything, just prepared // Up until now, we haven't really DONE anything, just prepared
// now it's time to actually schedule, and thus execute, the stream // now it's time to actually schedule, and thus execute, the stream
// on our event loop // on our event loop
tokio::runtime::current_thread::block_on_all(future).unwrap(); tokio::runtime::current_thread::block_on_all(future)?;
println!("Stream ended, quiting the program."); println!("Stream ended, quiting the program.");
Ok(())
} }

View File

@ -11,7 +11,7 @@ mod platform {
use futures::{Future, Stream}; use futures::{Future, Stream};
use tokio_signal::unix::{Signal, SIGINT, SIGTERM}; use tokio_signal::unix::{Signal, SIGINT, SIGTERM};
pub fn main() { pub fn main() -> Result<(), Box<::std::error::Error>> {
// Create a stream for each of the signals we'd like to handle. // Create a stream for each of the signals we'd like to handle.
let sigint = Signal::new(SIGINT).flatten_stream(); let sigint = Signal::new(SIGINT).flatten_stream();
let sigterm = Signal::new(SIGTERM).flatten_stream(); let sigterm = Signal::new(SIGTERM).flatten_stream();
@ -27,26 +27,26 @@ mod platform {
(i.e. this binary)" (i.e. this binary)"
); );
let (item, _rest) = ::tokio::runtime::current_thread::block_on_all(stream.into_future()) let (item, _rest) = ::tokio::runtime::current_thread::block_on_all(stream.into_future())
.ok() .map_err(|_| "failed to wait for signals")?;
.unwrap();
// Figure out which signal we received // Figure out which signal we received
let item = item.unwrap(); let item = item.ok_or("received no signal")?;
if item == SIGINT { if item == SIGINT {
println!("received SIGINT"); println!("received SIGINT");
} else { } else {
assert_eq!(item, SIGTERM); assert_eq!(item, SIGTERM);
println!("received SIGTERM"); println!("received SIGTERM");
} }
Ok(())
} }
} }
#[cfg(not(unix))] #[cfg(not(unix))]
mod platform { mod platform {
pub fn main() {} pub fn main() -> Result<(), Box<::std::error::Error>> {Ok(())}
} }
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
platform::main() platform::main()
} }

View File

@ -9,7 +9,7 @@ mod platform {
use futures::{Future, Stream}; use futures::{Future, Stream};
use tokio_signal::unix::{Signal, SIGHUP}; use tokio_signal::unix::{Signal, SIGHUP};
pub fn main() { pub fn main() -> Result<(), Box<::std::error::Error>> {
// on Unix, we can listen to whatever signal we want, in this case: SIGHUP // on Unix, we can listen to whatever signal we want, in this case: SIGHUP
let stream = Signal::new(SIGHUP).flatten_stream(); let stream = Signal::new(SIGHUP).flatten_stream();
@ -35,16 +35,17 @@ mod platform {
// Up until now, we haven't really DONE anything, just prepared // Up until now, we haven't really DONE anything, just prepared
// now it's time to actually schedule, and thus execute, the stream // now it's time to actually schedule, and thus execute, the stream
// on our event loop, and loop forever // on our event loop, and loop forever
::tokio::runtime::current_thread::block_on_all(future).unwrap(); ::tokio::runtime::current_thread::block_on_all(future)?;
Ok(())
} }
} }
#[cfg(not(unix))] #[cfg(not(unix))]
mod platform { mod platform {
pub fn main() {} pub fn main() -> Result<(), Box<::std::error::Error>> {Ok(())}
} }
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
platform::main() platform::main()
} }

View File

@ -12,12 +12,12 @@ use native_tls::TlsConnector;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
let mut runtime = Runtime::new().unwrap(); let mut runtime = Runtime::new()?;
let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap(); let addr = "www.rust-lang.org:443".to_socket_addrs()?.next().ok_or("failed to resolve www.rust-lang.org")?;
let socket = TcpStream::connect(&addr); let socket = TcpStream::connect(&addr);
let cx = TlsConnector::builder().build().unwrap(); let cx = TlsConnector::builder().build()?;
let cx = tokio_tls::TlsConnector::from(cx); let cx = tokio_tls::TlsConnector::from(cx);
let tls_handshake = socket.and_then(move |socket| { let tls_handshake = socket.and_then(move |socket| {
@ -36,6 +36,7 @@ fn main() {
tokio_io::io::read_to_end(socket, Vec::new()) tokio_io::io::read_to_end(socket, Vec::new())
}); });
let (_, data) = runtime.block_on(response).unwrap(); let (_, data) = runtime.block_on(response)?;
println!("{}", String::from_utf8_lossy(&data)); println!("{}", String::from_utf8_lossy(&data));
Ok(())
} }

View File

@ -8,16 +8,16 @@ use tokio::io;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::prelude::*; use tokio::prelude::*;
fn main() { fn main() -> Result<(), Box<std::error::Error>> {
// Bind the server's socket // Bind the server's socket
let addr = "127.0.0.1:12345".parse().unwrap(); let addr = "127.0.0.1:12345".parse()?;
let tcp = TcpListener::bind(&addr).unwrap(); let tcp = TcpListener::bind(&addr)?;
// Create the TLS acceptor. // Create the TLS acceptor.
let der = include_bytes!("identity.p12"); let der = include_bytes!("identity.p12");
let cert = Identity::from_pkcs12(der, "mypass").unwrap(); let cert = Identity::from_pkcs12(der, "mypass")?;
let tls_acceptor = tokio_tls::TlsAcceptor::from( let tls_acceptor = tokio_tls::TlsAcceptor::from(
native_tls::TlsAcceptor::builder(cert).build().unwrap()); native_tls::TlsAcceptor::builder(cert).build()?);
// Iterate incoming connections // Iterate incoming connections
let server = tcp.incoming().for_each(move |tcp| { let server = tcp.incoming().for_each(move |tcp| {
@ -56,4 +56,5 @@ fn main() {
// Start the runtime and spin up the server // Start the runtime and spin up the server
tokio::run(server); tokio::run(server);
Ok(())
} }