extern crate tokio_core; extern crate env_logger; extern crate futures; #[macro_use] extern crate log; use std::io; use std::net::{SocketAddr}; use futures::{future, Future, Stream, Sink}; use tokio_core::net::{UdpSocket, UdpCodec}; use tokio_core::reactor::{Core, Timeout}; use std::time::Duration; use std::str; /// This is a basic example of leveraging `FramedUdp` to create /// a simple UDP client and server which speak a custom Protocol. /// `FramedUdp` applies a `Codec` to the input and output of an /// `Evented` /// Simple Newline based parser, /// This is for a connectionless server, it must keep track /// of the Socket address of the last peer to contact it /// so that it can respond back. /// In the real world, one would probably /// want an associative of remote peers to their state /// /// Note that this takes a pretty draconian stance by returning /// an error if it can't find a newline in the datagram it received pub struct LineCodec { addr : Option } impl UdpCodec for LineCodec { type In = Vec>; type Out = Vec; fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result { trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr); self.addr = Some(*addr); let res : Vec> = buf.split(|c| *c == b'\n').map(|s| s.into()).collect(); if res.len() > 0 { Ok(res) } else { Err(io::Error::new(io::ErrorKind::Other, "failed to find newline in datagram")) } } fn encode(&mut self, item: Vec, into: &mut Vec) -> SocketAddr { trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap()); into.extend_from_slice(item.as_slice()); into.push('\n' as u8); self.addr.unwrap() } } fn main() { drop(env_logger::init()); let mut core = Core::new().unwrap(); let handle = core.handle(); //create the line codec parser for each let srvcodec = LineCodec { addr : None }; let clicodec = LineCodec { addr : None }; let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap(); let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap(); //We bind each socket to a specific port let server = UdpSocket::bind(&srvaddr, &handle).unwrap(); let client = UdpSocket::bind(&clientaddr, &handle).unwrap(); //start things off by sending a ping from the client to the server //This doesn't utilize the codec to encode the message, but rather //it sends raw data directly to the remote peer with the send_dgram future let job = client.send_dgram(b"PING\n", srvaddr); let (client, _buf) = core.run(job).unwrap(); //We create a FramedUdp instance, which associates a socket //with a codec. We then immediate split that into the //receiving side `Stream` and the writing side `Sink` let (srvsink, srvstream) = server.framed(srvcodec).split(); //`Stream::fold` runs once per every received datagram. //Note that we pass srvsink into fold, so that it can be //supplied to every iteration. The reason for this is //sink.send moves itself into `send` and then returns itself let srvloop = srvstream.fold(srvsink, move |sink, lines| { println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); sink.send(b"PONG".to_vec()) }).map(|_| ()); //We create another FramedUdp instance, this time for the client socket let (clisink, clistream) = client.framed(clicodec).split(); //And another infinite iteration let cliloop = clistream.fold(clisink, move |sink, lines| { println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); sink.send(b"PING".to_vec()) }).map(|_| ()); let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap(); //`select_all` takes an `Iterable` of `Future` and returns a future itself //This future waits until the first `Future` completes, it then returns //that result. let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]); //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll` //has completed if let Err(e) = core.run(wait) { error!("{}", e.0); } }