This commit is contained in:
Alex Crichton 2016-11-22 09:44:29 -08:00
commit 0d10b0e05a
4 changed files with 467 additions and 2 deletions

119
examples/udp-codec.rs Normal file
View File

@ -0,0 +1,119 @@
extern crate tokio_core;
extern crate env_logger;
extern crate futures;
#[macro_use]
extern crate log;
use std::io;
use std::net::{SocketAddr};
use futures::{future, Future, Stream, Sink};
use tokio_core::io::{CodecUdp};
use tokio_core::net::{UdpSocket};
use tokio_core::reactor::{Core, Timeout};
use std::time::Duration;
use std::str;
/// This is a basic example of leveraging `FramedUdp` to create
/// a simple UDP client and server which speak a custom Protocol.
/// `FramedUdp` applies a `Codec` to the input and output of an
/// `Evented`
/// Simple Newline based parser,
/// This is for a connectionless server, it must keep track
/// of the Socket address of the last peer to contact it
/// so that it can respond back.
/// In the real world, one would probably
/// want an associative of remote peers to their state
///
/// Note that this takes a pretty draconian stance by returning
/// an error if it can't find a newline in the datagram it received
pub struct LineCodec {
addr : Option<SocketAddr>
}
impl CodecUdp for LineCodec {
type In = Vec<Vec<u8>>;
type Out = Vec<u8>;
fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> {
trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr);
self.addr = Some(*addr);
let res : Vec<Vec<u8>> = buf.split(|c| *c == b'\n').map(|s| s.into()).collect();
if res.len() > 0 {
Ok(res)
}
else {
Err(io::Error::new(io::ErrorKind::Other,
"failed to find newline in datagram"))
}
}
fn encode(&mut self, item: &Vec<u8>, into: &mut Vec<u8>) -> SocketAddr {
trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap());
into.extend_from_slice(item.as_slice());
into.push('\n' as u8);
self.addr.unwrap()
}
}
fn main() {
drop(env_logger::init());
let mut core = Core::new().unwrap();
let handle = core.handle();
//create the line codec parser for each
let srvcodec = LineCodec { addr : None };
let clicodec = LineCodec { addr : None };
let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap();
let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap();
//We bind each socket to a specific port
let server = UdpSocket::bind(&srvaddr, &handle).unwrap();
let client = UdpSocket::bind(&clientaddr, &handle).unwrap();
//start things off by sending a ping from the client to the server
//This doesn't utilize the codec to encode the message, but rather
//it sends raw data directly to the remote peer with the send_dgram future
let job = client.send_dgram(b"PING\n", srvaddr);
let (client, _buf) = core.run(job).unwrap();
//We create a FramedUdp instance, which associates a socket
//with a codec. We then immediate split that into the
//receiving side `Stream` and the writing side `Sink`
let (srvstream, srvsink) = server.framed(srvcodec).split();
//`Stream::fold` runs once per every received datagram.
//Note that we pass srvsink into fold, so that it can be
//supplied to every iteration. The reason for this is
//sink.send moves itself into `send` and then returns itself
let srvloop = srvstream.fold(srvsink, move |sink, lines| {
println!("{}", str::from_utf8(lines[0].as_slice()).unwrap());
sink.send(b"PONG".to_vec())
}).map(|_| ());
//We create another FramedUdp instance, this time for the client socket
let (clistream, clisink) = client.framed(clicodec).split();
//And another infinite iteration
let cliloop = clistream.fold(clisink, move |sink, lines| {
println!("{}", str::from_utf8(lines[0].as_slice()).unwrap());
sink.send(b"PING".to_vec())
}).map(|_| ());
let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap();
//`select_all` takes an `Iterable` of `Future` and returns a future itself
//This future waits until the first `Future` completes, it then returns
//that result.
let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]);
//Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll`
//has completed
if let Err(e) = core.run(wait) {
error!("{}", e.0);
}
}

View File

@ -33,6 +33,7 @@ macro_rules! try_nb {
mod copy;
mod frame;
mod udp_frame;
mod flush;
mod read_exact;
mod read_to_end;
@ -43,6 +44,7 @@ mod window;
mod write_all;
pub use self::copy::{copy, Copy};
pub use self::frame::{EasyBuf, EasyBufMut, Framed, Codec};
pub use self::udp_frame::{FramedUdp, framed_udp, FramedUdpRead, FramedUdpWrite, CodecUdp, VecDGramCodec};
pub use self::flush::{flush, Flush};
pub use self::read_exact::{read_exact, ReadExact};
pub use self::read_to_end::{read_to_end, ReadToEnd};

262
src/io/udp_frame.rs Normal file
View File

@ -0,0 +1,262 @@
use std::io;
use std::net::SocketAddr;
use net::UdpSocket;
use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
use futures::sync::BiLock;
/// Encoding of frames via buffers.
///
/// This trait is used when constructing an instance of `FramedUdp`. It provides
/// one type: `Out` for encoding outgoing frames according to a protocol.
///
/// Because UDP is a connectionless protocol, the encode method will also be
/// responsible for determining the remote host to which the datagram should be
/// sent
///
/// The trait itself is implemented on a type that can track state for decoding
/// or encoding, which is particularly useful for streaming parsers. In many
/// cases, though, this type will simply be a unit struct (e.g. `struct
/// HttpCodec`).
pub trait CodecUdp {
/// The type of frames to be encoded.
type Out;
/// The type of decoded frames.
type In;
/// Encodes a frame into the buffer provided.
///
/// This method will encode `msg` into the byte buffer provided by `buf`.
/// The `buf` provided is an internal buffer of the `Framed` instance and
/// will be written out when possible.
///
/// The encode method also determines the destination to which the buffer should
/// be directed, which will be returned as a SocketAddr;
fn encode(&mut self, msg: &Self::Out, buf: &mut Vec<u8>) -> SocketAddr;
/// Attempts to decode a frame from the provided buffer of bytes.
///
/// This method is called by `FramedUdp` on a single datagram which has been
/// read from a socket.
///
/// It is required that the decode method empty the read buffer in every call to
/// decode, as the next poll_read that occurs will write the next datagram
/// into the buffer, without regard for what is already there.
///
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `Framed` that the stream is now
/// corrupt and should be terminated.
///
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error>;
}
/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
/// the `CodecUdp` trait to encode and decode frames.
///
/// You can acquire a `Framed` instance by using the `Io::framed` adapter.
pub struct FramedUdp<C> {
socket: UdpSocket,
codec: C,
out_addr : Option<SocketAddr>,
rd: Vec<u8>,
wr: Vec<u8>,
}
impl<C : CodecUdp> Stream for FramedUdp<C> {
type Item = C::In;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
loop {
let ret = self.socket.recv_from(self.rd.as_mut_slice());
match ret {
Ok((n, addr)) => {
trace!("read {} bytes", n);
trace!("attempting to decode a frame");
let frame = try!(self.codec.decode(&addr, & self.rd[.. n]));
trace!("frame decoded from buffer");
return Ok(Async::Ready(Some(frame)));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(Async::NotReady)
}
Err(e) => return Err(e),
}
}
}
}
impl<C : CodecUdp> Sink for FramedUdp<C> {
type SinkItem = C::Out;
type SinkError = io::Error;
fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
if self.wr.len() > 0 {
try!(self.poll_complete());
if self.wr.len() > 0 {
return Ok(AsyncSink::NotReady(item));
}
}
self.out_addr = Some(self.codec.encode(&item, &mut self.wr));
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
trace!("flushing framed transport");
if !self.wr.is_empty() {
if let Some(outaddr) = self.out_addr {
let remaining = self.wr.len();
trace!("writing; remaining={}", remaining);
let n = try_nb!(self.socket.send_to(self.wr.as_slice(), &outaddr));
trace!("written {}", n);
self.wr.clear();
self.out_addr = None;
if n != remaining {
return Err(io::Error::new(io::ErrorKind::Other,
"failed to write entire datagram to socket"));
}
}
else {
panic!("outbound stream in invalid state: out_addr is not known");
}
}
return Ok(Async::Ready(()));
}
}
/// Helper function that Creates a new FramedUdp object. It moves the supplied socket, codec
/// into the resulting FramedUdp
pub fn framed_udp<C>(socket : UdpSocket, codec : C) -> FramedUdp<C> {
FramedUdp::new(
socket,
codec,
vec![0; 64 * 1024],
Vec::with_capacity(64 * 1024)
)
}
impl<C> FramedUdp<C> {
/// Creates a new FramedUdp object. It moves the supplied socket, codec
/// supplied vecs.
fn new(sock : UdpSocket, codec : C, rd_buf : Vec<u8>, wr_buf : Vec<u8>) -> FramedUdp<C> {
FramedUdp {
socket: sock,
codec : codec,
out_addr: None,
rd: rd_buf,
wr: wr_buf
}
}
/// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
/// objects, which can be useful when you want to split ownership between
/// tasks, or allow direct interaction between the two objects (e.g. via
/// `Sink::send_all`).
pub fn split(self) -> (FramedUdpRead<C>, FramedUdpWrite<C>) {
let (a, b) = BiLock::new(self);
let read = FramedUdpRead { framed: a };
let write = FramedUdpWrite { framed: b };
(read, write)
}
/// Returns a reference to the underlying I/O stream wrapped by `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn get_ref(&self) -> &UdpSocket {
&self.socket
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn get_mut(&mut self) -> &mut UdpSocket {
&mut self.socket
}
/// Consumes the `Framed`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn into_inner(self) -> UdpSocket {
self.socket
}
}
/// A `Stream` interface to an underlying `Io` object, using the `CodecUdp` trait
/// to decode frames.
pub struct FramedUdpRead<C> {
framed: BiLock<FramedUdp<C>>,
}
impl<C : CodecUdp> Stream for FramedUdpRead<C> {
type Item = C::In;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
if let Async::Ready(mut guard) = self.framed.poll_lock() {
guard.poll()
} else {
Ok(Async::NotReady)
}
}
}
/// A `Sink` interface to an underlying `Io` object, using the `CodecUdp` trait
/// to encode frames.
pub struct FramedUdpWrite<C> {
framed: BiLock<FramedUdp<C>>,
}
impl<C : CodecUdp> Sink for FramedUdpWrite<C> {
type SinkItem = C::Out;
type SinkError = io::Error;
fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
if let Async::Ready(mut guard) = self.framed.poll_lock() {
guard.start_send(item)
} else {
Ok(AsyncSink::NotReady(item))
}
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
if let Async::Ready(mut guard) = self.framed.poll_lock() {
guard.poll_complete()
} else {
Ok(Async::NotReady)
}
}
}
/// Default implementation of a DGram "parser"
/// This receives and produces a tuple of ('SocketAddr', `Vec`<u8>)
pub struct VecDGramCodec;
impl CodecUdp for VecDGramCodec {
type In = (SocketAddr, Vec<u8>);
type Out = (SocketAddr, Vec<u8>);
fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> {
Ok((*addr, buf.into()))
}
fn encode(&mut self, item: &Self::Out, into: &mut Vec<u8>) -> SocketAddr {
into.extend_from_slice(item.1.as_slice());
into.push('\n' as u8);
item.0
}
}

View File

@ -1,9 +1,10 @@
use std::io;
use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr};
use std::fmt;
use futures::Async;
use io::{FramedUdp, framed_udp};
use futures::{Async, Future, Poll};
use mio;
use std::mem;
use reactor::{Handle, PollEvented};
@ -42,6 +43,13 @@ impl UdpSocket {
UdpSocket::new(udp, handle)
}
/// Creates a FramedUdp object, which leverages a supplied `EncodeUdp`
/// and `DecodeUdp` to implement `Stream` and `Sink`
/// This moves the socket into the newly created FramedUdp object
pub fn framed<C>(self, codec : C) -> FramedUdp<C> {
framed_udp(self, codec)
}
/// Returns the local address that this stream is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
@ -85,6 +93,33 @@ impl UdpSocket {
Err(e) => Err(e),
}
}
/// Creates a future that will write the entire contents of the buffer `buf` to
/// the stream `a` provided.
///
/// The returned future will return after data has been written to the outbound
/// socket.
/// The future will resolve to the stream as well as the buffer (for reuse if
/// needed).
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
///
/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
/// be broadly applicable to accepting data which can be converted to a slice.
/// The `Window` struct is also available in this crate to provide a different
/// window into a slice if necessary.
pub fn send_dgram<T>(self, buf: T, addr : SocketAddr) -> SendDGram<T>
where T: AsRef<[u8]>,
{
SendDGram {
state: UdpState::Writing {
sock: self,
addr: addr,
buf: buf,
},
}
}
/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
@ -249,6 +284,53 @@ impl fmt::Debug for UdpSocket {
}
}
/// A future used to write the entire contents of some data to a stream.
///
/// This is created by the [`write_all`] top-level method.
///
/// [`write_all`]: fn.write_all.html
pub struct SendDGram<T> {
state: UdpState<T>,
}
enum UdpState<T> {
Writing {
sock: UdpSocket,
buf: T,
addr: SocketAddr,
},
Empty,
}
fn incomplete_write(reason : &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, reason)
}
impl<T> Future for SendDGram<T>
where T: AsRef<[u8]>,
{
type Item = (UdpSocket, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> {
match self.state {
UdpState::Writing { ref sock, ref buf, ref addr} => {
let buf = buf.as_ref();
let n = try_nb!(sock.send_to(&buf, addr));
if n != buf.len() {
return Err(incomplete_write("Failed to send entire message in datagram"))
}
}
UdpState::Empty => panic!("poll a SendDGram after it's done"),
}
match mem::replace(&mut self.state, UdpState::Empty) {
UdpState::Writing { sock, buf, .. } => Ok(Async::Ready((sock, (buf).into()))),
UdpState::Empty => panic!(),
}
}
}
#[cfg(unix)]
mod sys {
use std::os::unix::prelude::*;