From e24b1a0864889a390494f80acd92e323e450dff8 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Wed, 6 Jan 2021 23:03:57 -0800 Subject: [PATCH] feat(core): support blocking access on BufStream --- sqlx-core/src/io/buf_stream.rs | 117 +++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 34 deletions(-) diff --git a/sqlx-core/src/io/buf_stream.rs b/sqlx-core/src/io/buf_stream.rs index 141932ea..907eac34 100644 --- a/sqlx-core/src/io/buf_stream.rs +++ b/sqlx-core/src/io/buf_stream.rs @@ -57,8 +57,71 @@ impl BufStream { pub fn write(&mut self, buf: &[u8]) { self.wbuf.extend_from_slice(buf); } + + // returns a mutable reference to the write buffer + pub fn buffer(&mut self) -> &mut Vec { + &mut self.wbuf + } + + fn clear_wbuf(&mut self) { + // fully written buffer, move cursor back to the beginning + self.wbuf_offset = 0; + self.wbuf.clear(); + } } +macro_rules! read { + ($(@$blocking:ident)? $self:ident, $offset:ident, $n:ident) => {{ + // before waiting to receive data; ensure that the write buffer is flushed + if !$self.wbuf.is_empty() { + read!($(@$blocking)? @flush $self); + } + + // while our read buffer is too small to satisfy the requested amount + while $self.rbuf.len() < ($offset + $n) { + // ensure that there is room in the read buffer + $self.rbuf.reserve($n.max(128)); + + #[allow(unsafe_code)] + unsafe { + // prepare a chunk of uninitialized memory to write to + // this is UB if the Read impl of the stream reads from the write buffer + let b = $self.rbuf.chunk_mut(); + let b = from_raw_parts_mut(b.as_mut_ptr(), b.len()); + + // read as much as we can and return when the stream or our buffer is exhausted + let read = read!($(@$blocking)? @read $self, b); + + // [!] read more than the length of our buffer + debug_assert!(read <= b.len()); + + // update the len of the read buffer to let the safe world that its okay + // to look at these bytes now + $self.rbuf.advance_mut(read); + } + } + + Ok(()) + }}; + + (@blocking @flush $self:ident) => { + $self.flush()?; + }; + + (@flush $self:ident) => { + $self.flush_async().await?; + }; + + (@blocking @read $self:ident, $b:ident) => { + $self.stream.read($b)?; + }; + + (@read $self:ident, $b:ident) => { + $self.stream.read($b).await?; + }; +} + + #[cfg(feature = "async")] impl BufStream where @@ -72,43 +135,29 @@ where self.wbuf_offset += self.stream.write(&self.wbuf[self.wbuf_offset..]).await?; } - // fully written buffer, move cursor back to the beginning - self.wbuf_offset = 0; - self.wbuf.clear(); + self.clear_wbuf(); Ok(()) } - pub async fn read_async(&mut self, n: usize) -> crate::Result<()> { - // before waiting to receive data; ensure that the write buffer is flushed - if !self.wbuf.is_empty() { - self.flush_async().await?; - } - - // while our read buffer is too small to satisfy the requested amount - while self.rbuf.len() < n { - // ensure that there is room in the read buffer - self.rbuf.reserve(n.max(128)); - - #[allow(unsafe_code)] - unsafe { - // prepare a chunk of uninitialized memory to write to - // this is UB if the Read impl of the stream reads from the write buffer - let b = self.rbuf.chunk_mut(); - let b = from_raw_parts_mut(b.as_mut_ptr(), b.len()); - - // read as much as we can and return when the stream or our buffer is exhausted - let n = self.stream.read(b).await?; - - // [!] read more than the length of our buffer - debug_assert!(n <= b.len()); - - // update the len of the read buffer to let the safe world that its okay - // to look at these bytes now - self.rbuf.advance_mut(n); - } - } - - Ok(()) + pub async fn read_async(&mut self, offset: usize, n: usize) -> crate::Result<()> { + read!(self, offset, n) + } +} + +#[cfg(feature = "blocking")] +impl BufStream +where + S: Read + Write, +{ + pub fn flush(&mut self) -> crate::Result<()> { + self.stream.write_all(&self.wbuf[self.wbuf_offset..])?; + self.clear_wbuf(); + + Ok(()) + } + + pub fn read(&mut self, offset: usize, n: usize) -> crate::Result<()> { + read!(@blocking self, offset, n) } }