diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index afc70d75f..894d21b42 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -8,21 +8,20 @@ extern crate log; use std::io; use std::net::{SocketAddr}; use futures::{future, Future, Stream, Sink}; -use tokio_core::io::{CodecUdp}; -use tokio_core::net::{UdpSocket}; +use tokio_core::net::{UdpSocket, UdpCodec}; use tokio_core::reactor::{Core, Timeout}; use std::time::Duration; use std::str; /// This is a basic example of leveraging `FramedUdp` to create -/// a simple UDP client and server which speak a custom Protocol. +/// a simple UDP client and server which speak a custom Protocol. /// `FramedUdp` applies a `Codec` to the input and output of an /// `Evented` -/// Simple Newline based parser, +/// Simple Newline based parser, /// This is for a connectionless server, it must keep track /// of the Socket address of the last peer to contact it -/// so that it can respond back. +/// so that it can respond back. /// In the real world, one would probably /// want an associative of remote peers to their state /// @@ -32,7 +31,7 @@ pub struct LineCodec { addr : Option } -impl CodecUdp for LineCodec { +impl UdpCodec for LineCodec { type In = Vec>; type Out = Vec; @@ -48,12 +47,12 @@ impl CodecUdp for LineCodec { "failed to find newline in datagram")) } } - - fn encode(&mut self, item: &Vec, into: &mut Vec) -> SocketAddr { + + fn encode(&mut self, item: Vec, into: &mut Vec) -> SocketAddr { trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap()); into.extend_from_slice(item.as_slice()); into.push('\n' as u8); - + self.addr.unwrap() } } @@ -64,7 +63,7 @@ fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); - //create the line codec parser for each + //create the line codec parser for each let srvcodec = LineCodec { addr : None }; let clicodec = LineCodec { addr : None }; @@ -82,24 +81,24 @@ fn main() { let (client, _buf) = core.run(job).unwrap(); //We create a FramedUdp instance, which associates a socket - //with a codec. We then immediate split that into the + //with a codec. We then immediate split that into the //receiving side `Stream` and the writing side `Sink` - let (srvstream, srvsink) = server.framed(srvcodec).split(); + let (srvsink, srvstream) = server.framed(srvcodec).split(); //`Stream::fold` runs once per every received datagram. //Note that we pass srvsink into fold, so that it can be //supplied to every iteration. The reason for this is //sink.send moves itself into `send` and then returns itself - let srvloop = srvstream.fold(srvsink, move |sink, lines| { + let srvloop = srvstream.fold(srvsink, move |sink, lines| { println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); sink.send(b"PONG".to_vec()) }).map(|_| ()); - + //We create another FramedUdp instance, this time for the client socket - let (clistream, clisink) = client.framed(clicodec).split(); + let (clisink, clistream) = client.framed(clicodec).split(); //And another infinite iteration - let cliloop = clistream.fold(clisink, move |sink, lines| { + let cliloop = clistream.fold(clisink, move |sink, lines| { println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); sink.send(b"PING".to_vec()) }).map(|_| ()); diff --git a/src/io/mod.rs b/src/io/mod.rs index fa907fa52..3a17d394d 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,7 +33,6 @@ macro_rules! try_nb { mod copy; mod frame; -mod udp_frame; mod flush; mod read_exact; mod read_to_end; @@ -44,7 +43,6 @@ mod window; mod write_all; pub use self::copy::{copy, Copy}; pub use self::frame::{EasyBuf, EasyBufMut, Framed, Codec}; -pub use self::udp_frame::{FramedUdp, framed_udp, FramedUdpRead, FramedUdpWrite, CodecUdp, VecDGramCodec}; pub use self::flush::{flush, Flush}; pub use self::read_exact::{read_exact, ReadExact}; pub use self::read_to_end::{read_to_end, ReadToEnd}; diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs deleted file mode 100644 index 99cc90a5a..000000000 --- a/src/io/udp_frame.rs +++ /dev/null @@ -1,262 +0,0 @@ -use std::io; -use std::net::SocketAddr; -use net::UdpSocket; -use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; -use futures::sync::BiLock; - -/// Encoding of frames via buffers. -/// -/// This trait is used when constructing an instance of `FramedUdp`. It provides -/// one type: `Out` for encoding outgoing frames according to a protocol. -/// -/// Because UDP is a connectionless protocol, the encode method will also be -/// responsible for determining the remote host to which the datagram should be -/// sent -/// -/// The trait itself is implemented on a type that can track state for decoding -/// or encoding, which is particularly useful for streaming parsers. In many -/// cases, though, this type will simply be a unit struct (e.g. `struct -/// HttpCodec`). -pub trait CodecUdp { - - /// The type of frames to be encoded. - type Out; - - /// The type of decoded frames. - type In; - - - /// Encodes a frame into the buffer provided. - /// - /// This method will encode `msg` into the byte buffer provided by `buf`. - /// The `buf` provided is an internal buffer of the `Framed` instance and - /// will be written out when possible. - /// - /// The encode method also determines the destination to which the buffer should - /// be directed, which will be returned as a SocketAddr; - fn encode(&mut self, msg: &Self::Out, buf: &mut Vec) -> SocketAddr; - - /// Attempts to decode a frame from the provided buffer of bytes. - /// - /// This method is called by `FramedUdp` on a single datagram which has been - /// read from a socket. - /// - /// It is required that the decode method empty the read buffer in every call to - /// decode, as the next poll_read that occurs will write the next datagram - /// into the buffer, without regard for what is already there. - /// - /// Finally, if the bytes in the buffer are malformed then an error is - /// returned indicating why. This informs `Framed` that the stream is now - /// corrupt and should be terminated. - /// - fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result; -} - -/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using -/// the `CodecUdp` trait to encode and decode frames. -/// -/// You can acquire a `Framed` instance by using the `Io::framed` adapter. -pub struct FramedUdp { - socket: UdpSocket, - codec: C, - out_addr : Option, - rd: Vec, - wr: Vec, -} - -impl Stream for FramedUdp { - type Item = C::In; - type Error = io::Error; - - fn poll(&mut self) -> Poll, io::Error> { - loop { - - let ret = self.socket.recv_from(self.rd.as_mut_slice()); - match ret { - Ok((n, addr)) => { - trace!("read {} bytes", n); - trace!("attempting to decode a frame"); - let frame = try!(self.codec.decode(&addr, & self.rd[.. n])); - trace!("frame decoded from buffer"); - return Ok(Async::Ready(Some(frame))); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - return Ok(Async::NotReady) - } - Err(e) => return Err(e), - } - } - } -} - -impl Sink for FramedUdp { - type SinkItem = C::Out; - type SinkError = io::Error; - - fn start_send(&mut self, item: C::Out) -> StartSend { - if self.wr.len() > 0 { - try!(self.poll_complete()); - if self.wr.len() > 0 { - return Ok(AsyncSink::NotReady(item)); - } - } - - self.out_addr = Some(self.codec.encode(&item, &mut self.wr)); - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), io::Error> { - trace!("flushing framed transport"); - - if !self.wr.is_empty() { - if let Some(outaddr) = self.out_addr { - let remaining = self.wr.len(); - trace!("writing; remaining={}", remaining); - let n = try_nb!(self.socket.send_to(self.wr.as_slice(), &outaddr)); - trace!("written {}", n); - self.wr.clear(); - self.out_addr = None; - if n != remaining { - return Err(io::Error::new(io::ErrorKind::Other, - "failed to write entire datagram to socket")); - } - } - else { - panic!("outbound stream in invalid state: out_addr is not known"); - } - } - - return Ok(Async::Ready(())); - } -} - -/// Helper function that Creates a new FramedUdp object. It moves the supplied socket, codec -/// into the resulting FramedUdp -pub fn framed_udp(socket : UdpSocket, codec : C) -> FramedUdp { - FramedUdp::new( - socket, - codec, - vec![0; 64 * 1024], - Vec::with_capacity(64 * 1024) - ) -} - -impl FramedUdp { - - /// Creates a new FramedUdp object. It moves the supplied socket, codec - /// supplied vecs. - fn new(sock : UdpSocket, codec : C, rd_buf : Vec, wr_buf : Vec) -> FramedUdp { - FramedUdp { - socket: sock, - codec : codec, - out_addr: None, - rd: rd_buf, - wr: wr_buf - } - } - - - /// Splits this `Stream + Sink` object into separate `Stream` and `Sink` - /// objects, which can be useful when you want to split ownership between - /// tasks, or allow direct interaction between the two objects (e.g. via - /// `Sink::send_all`). - pub fn split(self) -> (FramedUdpRead, FramedUdpWrite) { - let (a, b) = BiLock::new(self); - let read = FramedUdpRead { framed: a }; - let write = FramedUdpWrite { framed: b }; - (read, write) - } - - /// Returns a reference to the underlying I/O stream wrapped by `Framed`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise being - /// worked with. - pub fn get_ref(&self) -> &UdpSocket { - &self.socket - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `Framed`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise being - /// worked with. - pub fn get_mut(&mut self) -> &mut UdpSocket { - &mut self.socket - } - - /// Consumes the `Framed`, returning its underlying I/O stream. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise being - /// worked with. - pub fn into_inner(self) -> UdpSocket { - self.socket - } -} -/// A `Stream` interface to an underlying `Io` object, using the `CodecUdp` trait -/// to decode frames. -pub struct FramedUdpRead { - framed: BiLock>, -} - -impl Stream for FramedUdpRead { - type Item = C::In; - type Error = io::Error; - - fn poll(&mut self) -> Poll, io::Error> { - if let Async::Ready(mut guard) = self.framed.poll_lock() { - guard.poll() - } else { - Ok(Async::NotReady) - } - } -} - -/// A `Sink` interface to an underlying `Io` object, using the `CodecUdp` trait -/// to encode frames. -pub struct FramedUdpWrite { - framed: BiLock>, -} - -impl Sink for FramedUdpWrite { - type SinkItem = C::Out; - type SinkError = io::Error; - - fn start_send(&mut self, item: C::Out) -> StartSend { - if let Async::Ready(mut guard) = self.framed.poll_lock() { - guard.start_send(item) - } else { - Ok(AsyncSink::NotReady(item)) - } - } - - fn poll_complete(&mut self) -> Poll<(), io::Error> { - if let Async::Ready(mut guard) = self.framed.poll_lock() { - guard.poll_complete() - } else { - Ok(Async::NotReady) - } - } -} - -/// Default implementation of a DGram "parser" -/// This receives and produces a tuple of ('SocketAddr', `Vec`) -pub struct VecDGramCodec; - -impl CodecUdp for VecDGramCodec { - type In = (SocketAddr, Vec); - type Out = (SocketAddr, Vec); - - fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result { - Ok((*addr, buf.into())) - } - - fn encode(&mut self, item: &Self::Out, into: &mut Vec) -> SocketAddr { - into.extend_from_slice(item.1.as_slice()); - into.push('\n' as u8); - item.0 - } -} - diff --git a/src/net/mod.rs b/src/net/mod.rs index 159985c49..737f62c79 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -8,4 +8,4 @@ mod udp; pub use self::tcp::{TcpStream, TcpStreamNew}; pub use self::tcp::{TcpListener, Incoming}; -pub use self::udp::UdpSocket; +pub use self::udp::{UdpSocket, UdpCodec, UdpFramed}; diff --git a/src/net/udp/frame.rs b/src/net/udp/frame.rs new file mode 100644 index 000000000..566d9058a --- /dev/null +++ b/src/net/udp/frame.rs @@ -0,0 +1,154 @@ +use std::io; +use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4}; + +use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; + +use net::UdpSocket; + +/// Encoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `UdpFramed` and provides +/// the `In` and `Out` types which are decoded and encoded from the socket, +/// respectively. +/// +/// Because UDP is a connectionless protocol, the `decode` method receives the +/// address where data came from and the `encode` method is also responsible for +/// determining the remote host to which the datagram should be sent +/// +/// The trait itself is implemented on a type that can track state for decoding +/// or encoding, which is particularly useful for streaming parsers. In many +/// cases, though, this type will simply be a unit struct (e.g. `struct +/// HttpCodec`). +pub trait UdpCodec { + /// The type of decoded frames. + type In; + + /// The type of frames to be encoded. + type Out; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `UdpFramed` on a single datagram which has been + /// read from a socket. The `buf` argument contains the data that was + /// received from the remote address, and `src` is the address the data came + /// from. Note that typically this method should require the entire contents + /// of `buf` to be valid or otherwise return an error with trailing data. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result; + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `msg` into the byte buffer provided by `buf`. + /// The `buf` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + /// + /// The encode method also determines the destination to which the buffer + /// should be directed, which will be returned as a `SocketAddr`. + fn encode(&mut self, msg: Self::Out, buf: &mut Vec) -> SocketAddr; +} + +/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using +/// the `UdpCodec` trait to encode and decode frames. +/// +/// You can acquire a `UdpFramed` instance by using the `UdpSocket::framed` +/// adapter. +pub struct UdpFramed { + socket: UdpSocket, + codec: C, + rd: Vec, + wr: Vec, + out_addr: SocketAddr, +} + +impl Stream for UdpFramed { + type Item = C::In; + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { + let (n, addr) = try_nb!(self.socket.recv_from(&mut self.rd)); + trace!("received {} bytes, decoding", n); + let frame = try!(self.codec.decode(&addr, &self.rd[..n])); + trace!("frame decoded from buffer"); + Ok(Async::Ready(Some(frame))) + } +} + +impl Sink for UdpFramed { + type SinkItem = C::Out; + type SinkError = io::Error; + + fn start_send(&mut self, item: C::Out) -> StartSend { + if self.wr.len() > 0 { + try!(self.poll_complete()); + if self.wr.len() > 0 { + return Ok(AsyncSink::NotReady(item)); + } + } + + self.out_addr = self.codec.encode(item, &mut self.wr); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + trace!("flushing framed transport"); + + if self.wr.is_empty() { + return Ok(Async::Ready(())) + } + + trace!("writing; remaining={}", self.wr.len()); + let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr)); + trace!("written {}", n); + let wrote_all = n == self.wr.len(); + self.wr.clear(); + if wrote_all { + Ok(Async::Ready(())) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to write entire datagram to socket")) + } + } +} + +pub fn new(socket: UdpSocket, codec: C) -> UdpFramed { + UdpFramed { + socket: socket, + codec: codec, + out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), + rd: vec![0; 64 * 1024], + wr: Vec::with_capacity(8 * 1024), + } +} + +impl UdpFramed { + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_ref(&self) -> &UdpSocket { + &self.socket + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut UdpSocket { + &mut self.socket + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> UdpSocket { + self.socket + } +} diff --git a/src/net/udp.rs b/src/net/udp/mod.rs similarity index 69% rename from src/net/udp.rs rename to src/net/udp/mod.rs index 7fb6b9a78..980d6f29a 100644 --- a/src/net/udp.rs +++ b/src/net/udp/mod.rs @@ -1,10 +1,10 @@ use std::io; +use std::mem; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::fmt; -use io::{FramedUdp, framed_udp}; + use futures::{Async, Future, Poll}; use mio; -use std::mem; use reactor::{Handle, PollEvented}; @@ -13,6 +13,9 @@ pub struct UdpSocket { io: PollEvented, } +mod frame; +pub use self::frame::{UdpFramed, UdpCodec}; + impl UdpSocket { /// Create a new UDP socket bound to the specified address. /// @@ -43,11 +46,27 @@ impl UdpSocket { UdpSocket::new(udp, handle) } - /// Creates a FramedUdp object, which leverages a supplied `EncodeUdp` - /// and `DecodeUdp` to implement `Stream` and `Sink` - /// This moves the socket into the newly created FramedUdp object - pub fn framed(self, codec : C) -> FramedUdp { - framed_udp(self, codec) + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `UdpSocket` object, using the provided `UdpCodec` to read and write the + /// raw data. + /// + /// Raw UDP sockets work with datagrams, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of this socket by using the `UdpCodec` + /// trait to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things which require both read and write access to the underlying + /// object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `UdpFramed` returned by this method, which will + /// break them into separate objects, allowing them to interact more + /// easily. + pub fn framed(self, codec: C) -> UdpFramed { + frame::new(self, codec) } /// Returns the local address that this stream is bound to. @@ -93,27 +112,27 @@ impl UdpSocket { Err(e) => Err(e), } } - - /// Creates a future that will write the entire contents of the buffer `buf` to - /// the stream `a` provided. + + /// Creates a future that will write the entire contents of the buffer + /// `buf` provided as a datagram to this socket. /// - /// The returned future will return after data has been written to the outbound - /// socket. - /// The future will resolve to the stream as well as the buffer (for reuse if - /// needed). + /// The returned future will return after data has been written to the + /// outbound socket. The future will resolve to the stream as well as the + /// buffer (for reuse if needed). /// - /// Any error which happens during writing will cause both the stream and the - /// buffer to get destroyed. + /// Any error which happens during writing will cause both the stream and + /// the buffer to get destroyed. Note that failure to write the entire + /// buffer is considered an error for the purposes of sending a datagram. /// - /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should - /// be broadly applicable to accepting data which can be converted to a slice. - /// The `Window` struct is also available in this crate to provide a different - /// window into a slice if necessary. - pub fn send_dgram(self, buf: T, addr : SocketAddr) -> SendDGram + /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which + /// should be broadly applicable to accepting data which can be converted + /// to a slice. The `Window` struct is also available in this crate to + /// provide a different window into a slice if necessary. + pub fn send_dgram(self, buf: T, addr: SocketAddr) -> SendDgram where T: AsRef<[u8]>, { - SendDGram { - state: UdpState::Writing { + SendDgram { + state: SendState::Writing { sock: self, addr: addr, buf: buf, @@ -137,6 +156,31 @@ impl UdpSocket { } } + /// Creates a future that receive a datagram to be written to the buffer + /// provided. + /// + /// The returned future will return after a datagram has been received on + /// this socket. The future will resolve to the socket, the buffer, the + /// amount of data read, and the address the data was received from. + /// + /// An error during reading will cause the socket and buffer to get + /// destroyed and the socket will be returned. + /// + /// The `buf` parameter here only requires the `AsMut<[u8]>` trait, which + /// should be broadly applicable to accepting data which can be converted + /// to a slice. The `Window` struct is also available in this crate to + /// provide a different window into a slice if necessary. + pub fn recv_dgram(self, buf: T) -> RecvDgram + where T: AsMut<[u8]>, + { + RecvDgram { + state: RecvState::Reading { + sock: self, + buf: buf, + }, + } + } + /// Gets the value of the `SO_BROADCAST` option for this socket. /// /// For more information about this option, see @@ -284,16 +328,14 @@ impl fmt::Debug for UdpSocket { } } -/// A future used to write the entire contents of some data to a stream. +/// A future used to write the entire contents of some data to a UDP socket. /// -/// This is created by the [`write_all`] top-level method. -/// -/// [`write_all`]: fn.write_all.html -pub struct SendDGram { - state: UdpState, +/// This is created by the `UdpSocket::send_dgram` method. +pub struct SendDgram { + state: SendState, } -enum UdpState { +enum SendState { Writing { sock: UdpSocket, buf: T, @@ -302,11 +344,11 @@ enum UdpState { Empty, } -fn incomplete_write(reason : &str) -> io::Error { +fn incomplete_write(reason: &str) -> io::Error { io::Error::new(io::ErrorKind::Other, reason) } -impl Future for SendDGram +impl Future for SendDgram where T: AsRef<[u8]>, { type Item = (UdpSocket, T); @@ -314,19 +356,59 @@ impl Future for SendDGram fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { match self.state { - UdpState::Writing { ref sock, ref buf, ref addr} => { - let buf = buf.as_ref(); - let n = try_nb!(sock.send_to(&buf, addr)); - if n != buf.len() { - return Err(incomplete_write("Failed to send entire message in datagram")) + SendState::Writing { ref sock, ref buf, ref addr } => { + let n = try_nb!(sock.send_to(buf.as_ref(), addr)); + if n != buf.as_ref().len() { + return Err(incomplete_write("failed to send entire message \ + in datagram")) } } - UdpState::Empty => panic!("poll a SendDGram after it's done"), + SendState::Empty => panic!("poll a SendDgram after it's done"), } - match mem::replace(&mut self.state, UdpState::Empty) { - UdpState::Writing { sock, buf, .. } => Ok(Async::Ready((sock, (buf).into()))), - UdpState::Empty => panic!(), + match mem::replace(&mut self.state, SendState::Empty) { + SendState::Writing { sock, buf, addr: _ } => { + Ok(Async::Ready((sock, buf))) + } + SendState::Empty => panic!(), + } + } +} + +/// A future used to receive a datagram from a UDP socket. +/// +/// This is created by the `UdpSocket::recv_dgram` method. +pub struct RecvDgram { + state: RecvState, +} + +enum RecvState { + Reading { + sock: UdpSocket, + buf: T, + }, + Empty, +} + +impl Future for RecvDgram + where T: AsMut<[u8]>, +{ + type Item = (UdpSocket, T, usize, SocketAddr); + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let (n, addr) = match self.state { + RecvState::Reading { ref sock, ref mut buf } => { + try_nb!(sock.recv_from(buf.as_mut())) + } + RecvState::Empty => panic!("poll a RecvDgram after it's done"), + }; + + match mem::replace(&mut self.state, RecvState::Empty) { + RecvState::Reading { sock, buf } => { + Ok(Async::Ready((sock, buf, n, addr))) + } + RecvState::Empty => panic!(), } } }