update connect example (#1787)

This commit is contained in:
João Oliveira 2019-12-19 00:54:06 +00:00 committed by Lucio Franco
parent 2d78cfe56a
commit 58b5abdb99

View File

@ -16,8 +16,9 @@
#![warn(rust_2018_idioms)]
use futures::StreamExt;
use tokio::io;
use tokio_util::codec::{FramedRead, FramedWrite};
use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
use std::env;
use std::error::Error;
@ -41,8 +42,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.ok_or("this program requires at least one argument")?;
let addr = addr.parse::<SocketAddr>()?;
let stdin = FramedRead::new(io::stdin(), codec::Bytes);
let stdout = FramedWrite::new(io::stdout(), codec::Bytes);
let stdin = FramedRead::new(io::stdin(), BytesCodec::new());
let stdin = stdin.map(|i| i.map(|bytes| bytes.freeze()));
let stdout = FramedWrite::new(io::stdout(), BytesCodec::new());
if tcp {
tcp::connect(&addr, stdin, stdout).await?;
@ -54,25 +56,26 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
mod tcp {
use super::codec;
use futures::StreamExt;
use futures::{future, Sink, SinkExt};
use bytes::Bytes;
use futures::{future, Sink, SinkExt, Stream, StreamExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::net::TcpStream;
use tokio::stream::Stream;
use tokio_util::codec::{FramedRead, FramedWrite};
use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
pub async fn connect(
addr: &SocketAddr,
mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(addr).await?;
let (r, w) = stream.split();
let mut sink = FramedWrite::new(w, codec::Bytes);
let mut stream = FramedRead::new(r, codec::Bytes)
let mut sink = FramedWrite::new(w, BytesCodec::new());
// filter map Result<BytesMut, Error> stream into just a Bytes stream to match stdout Sink
// on the event of an Error, log the error and end the stream
let mut stream = FramedRead::new(r, BytesCodec::new())
.filter_map(|i| match i {
Ok(i) => future::ready(Some(i)),
//BytesMut into Bytes
Ok(i) => future::ready(Some(i.freeze())),
Err(e) => {
println!("failed to read from socket; error={}", e);
future::ready(None)
@ -88,19 +91,18 @@ mod tcp {
}
mod udp {
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
use tokio::stream::{Stream, StreamExt};
use futures::{future, Sink, SinkExt};
use bytes::Bytes;
use futures::{future, Sink, SinkExt, Stream, StreamExt};
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
pub async fn connect(
addr: &SocketAddr,
stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
@ -120,7 +122,7 @@ mod udp {
}
async fn send(
mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
writer: &mut SendHalf,
) -> Result<(), io::Error> {
while let Some(item) = stdin.next().await {
@ -132,7 +134,7 @@ mod udp {
}
async fn recv(
mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
reader: &mut RecvHalf,
) -> Result<(), io::Error> {
loop {
@ -140,47 +142,8 @@ mod udp {
let n = reader.recv(&mut buf[..]).await?;
if n > 0 {
stdout.send(buf).await?;
stdout.send(Bytes::from(buf)).await?;
}
}
}
}
mod codec {
use bytes::{BufMut, BytesMut};
use std::io;
use tokio_util::codec::{Decoder, Encoder};
/// A simple `Codec` implementation that just ships bytes around.
///
/// This type is used for "framing" a TCP/UDP stream of bytes but it's really
/// just a convenient method for us to work with streams/sinks for now.
/// This'll just take any data read and interpret it as a "frame" and
/// conversely just shove data into the output location without looking at
/// it.
pub struct Bytes;
impl Decoder for Bytes {
type Item = Vec<u8>;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Vec<u8>>> {
if !buf.is_empty() {
let len = buf.len();
Ok(Some(buf.split_to(len).into_iter().collect()))
} else {
Ok(None)
}
}
}
impl Encoder for Bytes {
type Item = Vec<u8>;
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
buf.put(&data[..]);
Ok(())
}
}
}