diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs new file mode 100644 index 000000000..afc70d75f --- /dev/null +++ b/examples/udp-codec.rs @@ -0,0 +1,119 @@ +extern crate tokio_core; +extern crate env_logger; +extern crate futures; + +#[macro_use] +extern crate log; + +use std::io; +use std::net::{SocketAddr}; +use futures::{future, Future, Stream, Sink}; +use tokio_core::io::{CodecUdp}; +use tokio_core::net::{UdpSocket}; +use tokio_core::reactor::{Core, Timeout}; +use std::time::Duration; +use std::str; + +/// This is a basic example of leveraging `FramedUdp` to create +/// a simple UDP client and server which speak a custom Protocol. +/// `FramedUdp` applies a `Codec` to the input and output of an +/// `Evented` + +/// Simple Newline based parser, +/// This is for a connectionless server, it must keep track +/// of the Socket address of the last peer to contact it +/// so that it can respond back. +/// In the real world, one would probably +/// want an associative of remote peers to their state +/// +/// Note that this takes a pretty draconian stance by returning +/// an error if it can't find a newline in the datagram it received +pub struct LineCodec { + addr : Option +} + +impl CodecUdp for LineCodec { + type In = Vec>; + type Out = Vec; + + fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result { + trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr); + self.addr = Some(*addr); + let res : Vec> = buf.split(|c| *c == b'\n').map(|s| s.into()).collect(); + if res.len() > 0 { + Ok(res) + } + else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to find newline in datagram")) + } + } + + fn encode(&mut self, item: &Vec, into: &mut Vec) -> SocketAddr { + trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap()); + into.extend_from_slice(item.as_slice()); + into.push('\n' as u8); + + self.addr.unwrap() + } +} + +fn main() { + drop(env_logger::init()); + + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + //create the line codec parser for each + let srvcodec = LineCodec { addr : None }; + let clicodec = LineCodec { addr : None }; + + let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap(); + let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap(); + + //We bind each socket to a specific port + let server = UdpSocket::bind(&srvaddr, &handle).unwrap(); + let client = UdpSocket::bind(&clientaddr, &handle).unwrap(); + + //start things off by sending a ping from the client to the server + //This doesn't utilize the codec to encode the message, but rather + //it sends raw data directly to the remote peer with the send_dgram future + let job = client.send_dgram(b"PING\n", srvaddr); + let (client, _buf) = core.run(job).unwrap(); + + //We create a FramedUdp instance, which associates a socket + //with a codec. We then immediate split that into the + //receiving side `Stream` and the writing side `Sink` + let (srvstream, srvsink) = server.framed(srvcodec).split(); + + //`Stream::fold` runs once per every received datagram. + //Note that we pass srvsink into fold, so that it can be + //supplied to every iteration. The reason for this is + //sink.send moves itself into `send` and then returns itself + let srvloop = srvstream.fold(srvsink, move |sink, lines| { + println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); + sink.send(b"PONG".to_vec()) + }).map(|_| ()); + + //We create another FramedUdp instance, this time for the client socket + let (clistream, clisink) = client.framed(clicodec).split(); + + //And another infinite iteration + let cliloop = clistream.fold(clisink, move |sink, lines| { + println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); + sink.send(b"PING".to_vec()) + }).map(|_| ()); + + let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap(); + + //`select_all` takes an `Iterable` of `Future` and returns a future itself + //This future waits until the first `Future` completes, it then returns + //that result. + let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]); + + //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll` + //has completed + if let Err(e) = core.run(wait) { + error!("{}", e.0); + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 3a17d394d..fa907fa52 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,6 +33,7 @@ macro_rules! try_nb { mod copy; mod frame; +mod udp_frame; mod flush; mod read_exact; mod read_to_end; @@ -43,6 +44,7 @@ mod window; mod write_all; pub use self::copy::{copy, Copy}; pub use self::frame::{EasyBuf, EasyBufMut, Framed, Codec}; +pub use self::udp_frame::{FramedUdp, framed_udp, FramedUdpRead, FramedUdpWrite, CodecUdp, VecDGramCodec}; pub use self::flush::{flush, Flush}; pub use self::read_exact::{read_exact, ReadExact}; pub use self::read_to_end::{read_to_end, ReadToEnd}; diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs new file mode 100644 index 000000000..99cc90a5a --- /dev/null +++ b/src/io/udp_frame.rs @@ -0,0 +1,262 @@ +use std::io; +use std::net::SocketAddr; +use net::UdpSocket; +use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; +use futures::sync::BiLock; + +/// Encoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `FramedUdp`. It provides +/// one type: `Out` for encoding outgoing frames according to a protocol. +/// +/// Because UDP is a connectionless protocol, the encode method will also be +/// responsible for determining the remote host to which the datagram should be +/// sent +/// +/// The trait itself is implemented on a type that can track state for decoding +/// or encoding, which is particularly useful for streaming parsers. In many +/// cases, though, this type will simply be a unit struct (e.g. `struct +/// HttpCodec`). +pub trait CodecUdp { + + /// The type of frames to be encoded. + type Out; + + /// The type of decoded frames. + type In; + + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `msg` into the byte buffer provided by `buf`. + /// The `buf` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + /// + /// The encode method also determines the destination to which the buffer should + /// be directed, which will be returned as a SocketAddr; + fn encode(&mut self, msg: &Self::Out, buf: &mut Vec) -> SocketAddr; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `FramedUdp` on a single datagram which has been + /// read from a socket. + /// + /// It is required that the decode method empty the read buffer in every call to + /// decode, as the next poll_read that occurs will write the next datagram + /// into the buffer, without regard for what is already there. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + /// + fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result; +} + +/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using +/// the `CodecUdp` trait to encode and decode frames. +/// +/// You can acquire a `Framed` instance by using the `Io::framed` adapter. +pub struct FramedUdp { + socket: UdpSocket, + codec: C, + out_addr : Option, + rd: Vec, + wr: Vec, +} + +impl Stream for FramedUdp { + type Item = C::In; + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { + loop { + + let ret = self.socket.recv_from(self.rd.as_mut_slice()); + match ret { + Ok((n, addr)) => { + trace!("read {} bytes", n); + trace!("attempting to decode a frame"); + let frame = try!(self.codec.decode(&addr, & self.rd[.. n])); + trace!("frame decoded from buffer"); + return Ok(Async::Ready(Some(frame))); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + return Ok(Async::NotReady) + } + Err(e) => return Err(e), + } + } + } +} + +impl Sink for FramedUdp { + type SinkItem = C::Out; + type SinkError = io::Error; + + fn start_send(&mut self, item: C::Out) -> StartSend { + if self.wr.len() > 0 { + try!(self.poll_complete()); + if self.wr.len() > 0 { + return Ok(AsyncSink::NotReady(item)); + } + } + + self.out_addr = Some(self.codec.encode(&item, &mut self.wr)); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + trace!("flushing framed transport"); + + if !self.wr.is_empty() { + if let Some(outaddr) = self.out_addr { + let remaining = self.wr.len(); + trace!("writing; remaining={}", remaining); + let n = try_nb!(self.socket.send_to(self.wr.as_slice(), &outaddr)); + trace!("written {}", n); + self.wr.clear(); + self.out_addr = None; + if n != remaining { + return Err(io::Error::new(io::ErrorKind::Other, + "failed to write entire datagram to socket")); + } + } + else { + panic!("outbound stream in invalid state: out_addr is not known"); + } + } + + return Ok(Async::Ready(())); + } +} + +/// Helper function that Creates a new FramedUdp object. It moves the supplied socket, codec +/// into the resulting FramedUdp +pub fn framed_udp(socket : UdpSocket, codec : C) -> FramedUdp { + FramedUdp::new( + socket, + codec, + vec![0; 64 * 1024], + Vec::with_capacity(64 * 1024) + ) +} + +impl FramedUdp { + + /// Creates a new FramedUdp object. It moves the supplied socket, codec + /// supplied vecs. + fn new(sock : UdpSocket, codec : C, rd_buf : Vec, wr_buf : Vec) -> FramedUdp { + FramedUdp { + socket: sock, + codec : codec, + out_addr: None, + rd: rd_buf, + wr: wr_buf + } + } + + + /// Splits this `Stream + Sink` object into separate `Stream` and `Sink` + /// objects, which can be useful when you want to split ownership between + /// tasks, or allow direct interaction between the two objects (e.g. via + /// `Sink::send_all`). + pub fn split(self) -> (FramedUdpRead, FramedUdpWrite) { + let (a, b) = BiLock::new(self); + let read = FramedUdpRead { framed: a }; + let write = FramedUdpWrite { framed: b }; + (read, write) + } + + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_ref(&self) -> &UdpSocket { + &self.socket + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut UdpSocket { + &mut self.socket + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> UdpSocket { + self.socket + } +} +/// A `Stream` interface to an underlying `Io` object, using the `CodecUdp` trait +/// to decode frames. +pub struct FramedUdpRead { + framed: BiLock>, +} + +impl Stream for FramedUdpRead { + type Item = C::In; + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { + if let Async::Ready(mut guard) = self.framed.poll_lock() { + guard.poll() + } else { + Ok(Async::NotReady) + } + } +} + +/// A `Sink` interface to an underlying `Io` object, using the `CodecUdp` trait +/// to encode frames. +pub struct FramedUdpWrite { + framed: BiLock>, +} + +impl Sink for FramedUdpWrite { + type SinkItem = C::Out; + type SinkError = io::Error; + + fn start_send(&mut self, item: C::Out) -> StartSend { + if let Async::Ready(mut guard) = self.framed.poll_lock() { + guard.start_send(item) + } else { + Ok(AsyncSink::NotReady(item)) + } + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + if let Async::Ready(mut guard) = self.framed.poll_lock() { + guard.poll_complete() + } else { + Ok(Async::NotReady) + } + } +} + +/// Default implementation of a DGram "parser" +/// This receives and produces a tuple of ('SocketAddr', `Vec`) +pub struct VecDGramCodec; + +impl CodecUdp for VecDGramCodec { + type In = (SocketAddr, Vec); + type Out = (SocketAddr, Vec); + + fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result { + Ok((*addr, buf.into())) + } + + fn encode(&mut self, item: &Self::Out, into: &mut Vec) -> SocketAddr { + into.extend_from_slice(item.1.as_slice()); + into.push('\n' as u8); + item.0 + } +} + diff --git a/src/net/udp.rs b/src/net/udp.rs index a88a8ca16..7fb6b9a78 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,9 +1,10 @@ use std::io; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::fmt; - -use futures::Async; +use io::{FramedUdp, framed_udp}; +use futures::{Async, Future, Poll}; use mio; +use std::mem; use reactor::{Handle, PollEvented}; @@ -42,6 +43,13 @@ impl UdpSocket { UdpSocket::new(udp, handle) } + /// Creates a FramedUdp object, which leverages a supplied `EncodeUdp` + /// and `DecodeUdp` to implement `Stream` and `Sink` + /// This moves the socket into the newly created FramedUdp object + pub fn framed(self, codec : C) -> FramedUdp { + framed_udp(self, codec) + } + /// Returns the local address that this stream is bound to. pub fn local_addr(&self) -> io::Result { self.io.get_ref().local_addr() @@ -85,6 +93,33 @@ impl UdpSocket { Err(e) => Err(e), } } + + /// Creates a future that will write the entire contents of the buffer `buf` to + /// the stream `a` provided. + /// + /// The returned future will return after data has been written to the outbound + /// socket. + /// The future will resolve to the stream as well as the buffer (for reuse if + /// needed). + /// + /// Any error which happens during writing will cause both the stream and the + /// buffer to get destroyed. + /// + /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should + /// be broadly applicable to accepting data which can be converted to a slice. + /// The `Window` struct is also available in this crate to provide a different + /// window into a slice if necessary. + pub fn send_dgram(self, buf: T, addr : SocketAddr) -> SendDGram + where T: AsRef<[u8]>, + { + SendDGram { + state: UdpState::Writing { + sock: self, + addr: addr, + buf: buf, + }, + } + } /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. @@ -249,6 +284,53 @@ impl fmt::Debug for UdpSocket { } } +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the [`write_all`] top-level method. +/// +/// [`write_all`]: fn.write_all.html +pub struct SendDGram { + state: UdpState, +} + +enum UdpState { + Writing { + sock: UdpSocket, + buf: T, + addr: SocketAddr, + }, + Empty, +} + +fn incomplete_write(reason : &str) -> io::Error { + io::Error::new(io::ErrorKind::Other, reason) +} + +impl Future for SendDGram + where T: AsRef<[u8]>, +{ + type Item = (UdpSocket, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { + match self.state { + UdpState::Writing { ref sock, ref buf, ref addr} => { + let buf = buf.as_ref(); + let n = try_nb!(sock.send_to(&buf, addr)); + if n != buf.len() { + return Err(incomplete_write("Failed to send entire message in datagram")) + } + } + UdpState::Empty => panic!("poll a SendDGram after it's done"), + } + + match mem::replace(&mut self.state, UdpState::Empty) { + UdpState::Writing { sock, buf, .. } => Ok(Async::Ready((sock, (buf).into()))), + UdpState::Empty => panic!(), + } + } +} + #[cfg(unix)] mod sys { use std::os::unix::prelude::*;