From 6947933b6b90c97a8255e275b31647219f9a9524 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 1 Sep 2016 09:27:42 -0700 Subject: [PATCH] Be sure to call poll before doing I/O Ensures that we can properly clear the readiness bits and manage them correctly. Closes #10 --- src/channel.rs | 4 ++++ src/tcp.rs | 34 +++++++++++++++++++--------------- src/udp.rs | 8 ++++++++ 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 3da44b531..813484b70 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -87,6 +87,10 @@ impl Stream for Receiver { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { + match self.rx.poll_read() { + Poll::Ok(()) => {} + _ => return Poll::NotReady, + } match self.rx.get_ref().try_recv() { Ok(t) => Poll::Ok(Some(t)), Err(TryRecvError::Empty) => { diff --git a/src/tcp.rs b/src/tcp.rs index 26226ec8a..ce50e70c6 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -92,6 +92,10 @@ impl TcpListener { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { + match self.inner.io.poll_read() { + Poll::Ok(()) => {} + _ => return Poll::NotReady, + } match self.inner.io.get_ref().accept() { Ok(pair) => Poll::Ok(Some(pair)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { @@ -288,33 +292,25 @@ impl Future for TcpStreamNew { impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let r = self.io.get_ref().read(buf); - if is_wouldblock(&r) { - self.io.need_read(); - } - return r + <&TcpStream>::read(&mut &*self, buf) } } impl Write for TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { - let r = self.io.get_ref().write(buf); - if is_wouldblock(&r) { - self.io.need_write(); - } - return r + <&TcpStream>::write(&mut &*self, buf) } fn flush(&mut self) -> io::Result<()> { - let r = self.io.get_ref().flush(); - if is_wouldblock(&r) { - self.io.need_write(); - } - return r + <&TcpStream>::flush(&mut &*self) } } impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self.io.poll_read() { + Poll::Ok(()) => {} + _ => return Err(mio::would_block()), + } let r = self.io.get_ref().read(buf); if is_wouldblock(&r) { self.io.need_read(); @@ -325,6 +321,10 @@ impl<'a> Read for &'a TcpStream { impl<'a> Write for &'a TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { + match self.io.poll_write() { + Poll::Ok(()) => {} + _ => return Err(mio::would_block()), + } let r = self.io.get_ref().write(buf); if is_wouldblock(&r) { self.io.need_write(); @@ -333,6 +333,10 @@ impl<'a> Write for &'a TcpStream { } fn flush(&mut self) -> io::Result<()> { + match self.io.poll_write() { + Poll::Ok(()) => {} + _ => return Err(mio::would_block()), + } let r = self.io.get_ref().flush(); if is_wouldblock(&r) { self.io.need_write(); diff --git a/src/udp.rs b/src/udp.rs index b0a4dc6f9..23b3dd5d0 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -84,6 +84,10 @@ impl UdpSocket { /// Address type can be any implementor of `ToSocketAddrs` trait. See its /// documentation for concrete examples. pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result { + match self.io.poll_write() { + Poll::Ok(()) => {} + _ => return Err(mio::would_block()), + } match self.io.get_ref().send_to(buf, target) { Ok(Some(n)) => Ok(n), Ok(None) => { @@ -97,6 +101,10 @@ impl UdpSocket { /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + match self.io.poll_read() { + Poll::Ok(()) => {} + _ => return Err(mio::would_block()), + } match self.io.get_ref().recv_from(buf) { Ok(Some(n)) => Ok(n), Ok(None) => {