mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
io: ensure ReadHalf/WriteHalf do not return WouldBlock directly (#655)
* io: ensure ReadHalf/WriteHalf do not return WouldBlock directly These facades were passing back WouldBlock when the internal BiLock couldn't be acquired, which does not fit the intended behavior. Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com> * io: pull from the local crate, not crates.io
This commit is contained in:
parent
20ca59114a
commit
1119d572ee
@ -21,3 +21,6 @@ categories = ["asynchronous"]
|
||||
bytes = "0.4.7"
|
||||
futures = "0.1.18"
|
||||
log = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-current-thread = { version = "0.1.1", path = "../tokio-current-thread" }
|
||||
|
@ -38,10 +38,8 @@ impl<T: AsyncRead> Read for ReadHalf<T> {
|
||||
|
||||
impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
|
||||
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
match self.handle.poll_lock() {
|
||||
Async::Ready(mut l) => l.read_buf(buf),
|
||||
Async::NotReady => Err(would_block()),
|
||||
}
|
||||
let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
|
||||
l.read_buf(buf)
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,18 +61,111 @@ impl<T: AsyncWrite> Write for WriteHalf<T> {
|
||||
|
||||
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
match self.handle.poll_lock() {
|
||||
Async::Ready(mut l) => l.shutdown(),
|
||||
Async::NotReady => Err(would_block()),
|
||||
}
|
||||
let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
|
||||
l.shutdown()
|
||||
}
|
||||
|
||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
|
||||
where Self: Sized,
|
||||
{
|
||||
match self.handle.poll_lock() {
|
||||
Async::Ready(mut l) => l.write_buf(buf),
|
||||
Async::NotReady => Err(would_block()),
|
||||
}
|
||||
let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
|
||||
l.write_buf(buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn wrap_as_io<T>(t: Async<T>) -> Result<Async<T>, io::Error> {
|
||||
Ok(t)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
extern crate tokio_current_thread;
|
||||
|
||||
use super::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
|
||||
use bytes::{BytesMut, IntoBuf};
|
||||
use futures::{Async, Poll, future::lazy, future::ok};
|
||||
use futures::sync::BiLock;
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
|
||||
struct RW;
|
||||
|
||||
impl Read for RW {
|
||||
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
|
||||
Ok(1)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for RW {}
|
||||
|
||||
impl Write for RW {
|
||||
fn write(&mut self, _: &[u8]) -> io::Result<usize> {
|
||||
Ok(1)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for RW {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_readhalf_translate_wouldblock_to_not_ready() {
|
||||
tokio_current_thread::block_on_all(lazy(move || {
|
||||
let rw = RW {};
|
||||
let (a, b) = BiLock::new(rw);
|
||||
let mut rx = ReadHalf { handle: a };
|
||||
|
||||
let mut buf = BytesMut::with_capacity(64);
|
||||
|
||||
// First read is uncontended, should go through.
|
||||
assert!(rx.read_buf(&mut buf).unwrap().is_ready());
|
||||
|
||||
// Take lock from write side.
|
||||
let lock = b.poll_lock();
|
||||
|
||||
// Second read should be NotReady.
|
||||
assert!(!rx.read_buf(&mut buf).unwrap().is_ready());
|
||||
|
||||
drop(lock);
|
||||
|
||||
// Back to uncontended.
|
||||
assert!(rx.read_buf(&mut buf).unwrap().is_ready());
|
||||
|
||||
ok::<(), ()>(())
|
||||
})).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn split_writehalf_translate_wouldblock_to_not_ready() {
|
||||
tokio_current_thread::block_on_all(lazy(move || {
|
||||
let rw = RW {};
|
||||
let (a, b) = BiLock::new(rw);
|
||||
let mut tx = WriteHalf { handle: a };
|
||||
|
||||
let bufmut = BytesMut::with_capacity(64);
|
||||
let mut buf = bufmut.into_buf();
|
||||
|
||||
// First write is uncontended, should go through.
|
||||
assert!(tx.write_buf(&mut buf).unwrap().is_ready());
|
||||
|
||||
// Take lock from read side.
|
||||
let lock = b.poll_lock();
|
||||
|
||||
// Second write should be NotReady.
|
||||
assert!(!tx.write_buf(&mut buf).unwrap().is_ready());
|
||||
|
||||
drop(lock);
|
||||
|
||||
// Back to uncontended.
|
||||
assert!(tx.write_buf(&mut buf).unwrap().is_ready());
|
||||
|
||||
ok::<(), ()>(())
|
||||
})).unwrap();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user