feat(core): support blocking access on BufStream

This commit is contained in:
Ryan Leckey 2021-01-06 23:03:57 -08:00
parent 86576106e8
commit e24b1a0864
No known key found for this signature in database
GPG Key ID: F8AA68C235AB08C9

View File

@ -57,8 +57,71 @@ impl<S> BufStream<S> {
pub fn write(&mut self, buf: &[u8]) {
self.wbuf.extend_from_slice(buf);
}
// returns a mutable reference to the write buffer
pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.wbuf
}
fn clear_wbuf(&mut self) {
// fully written buffer, move cursor back to the beginning
self.wbuf_offset = 0;
self.wbuf.clear();
}
}
macro_rules! read {
($(@$blocking:ident)? $self:ident, $offset:ident, $n:ident) => {{
// before waiting to receive data; ensure that the write buffer is flushed
if !$self.wbuf.is_empty() {
read!($(@$blocking)? @flush $self);
}
// while our read buffer is too small to satisfy the requested amount
while $self.rbuf.len() < ($offset + $n) {
// ensure that there is room in the read buffer
$self.rbuf.reserve($n.max(128));
#[allow(unsafe_code)]
unsafe {
// prepare a chunk of uninitialized memory to write to
// this is UB if the Read impl of the stream reads from the write buffer
let b = $self.rbuf.chunk_mut();
let b = from_raw_parts_mut(b.as_mut_ptr(), b.len());
// read as much as we can and return when the stream or our buffer is exhausted
let read = read!($(@$blocking)? @read $self, b);
// [!] read more than the length of our buffer
debug_assert!(read <= b.len());
// update the len of the read buffer to let the safe world that its okay
// to look at these bytes now
$self.rbuf.advance_mut(read);
}
}
Ok(())
}};
(@blocking @flush $self:ident) => {
$self.flush()?;
};
(@flush $self:ident) => {
$self.flush_async().await?;
};
(@blocking @read $self:ident, $b:ident) => {
$self.stream.read($b)?;
};
(@read $self:ident, $b:ident) => {
$self.stream.read($b).await?;
};
}
#[cfg(feature = "async")]
impl<S> BufStream<S>
where
@ -72,43 +135,29 @@ where
self.wbuf_offset += self.stream.write(&self.wbuf[self.wbuf_offset..]).await?;
}
// fully written buffer, move cursor back to the beginning
self.wbuf_offset = 0;
self.wbuf.clear();
self.clear_wbuf();
Ok(())
}
pub async fn read_async(&mut self, n: usize) -> crate::Result<()> {
// before waiting to receive data; ensure that the write buffer is flushed
if !self.wbuf.is_empty() {
self.flush_async().await?;
}
// while our read buffer is too small to satisfy the requested amount
while self.rbuf.len() < n {
// ensure that there is room in the read buffer
self.rbuf.reserve(n.max(128));
#[allow(unsafe_code)]
unsafe {
// prepare a chunk of uninitialized memory to write to
// this is UB if the Read impl of the stream reads from the write buffer
let b = self.rbuf.chunk_mut();
let b = from_raw_parts_mut(b.as_mut_ptr(), b.len());
// read as much as we can and return when the stream or our buffer is exhausted
let n = self.stream.read(b).await?;
// [!] read more than the length of our buffer
debug_assert!(n <= b.len());
// update the len of the read buffer to let the safe world that its okay
// to look at these bytes now
self.rbuf.advance_mut(n);
}
}
Ok(())
pub async fn read_async(&mut self, offset: usize, n: usize) -> crate::Result<()> {
read!(self, offset, n)
}
}
#[cfg(feature = "blocking")]
impl<S> BufStream<S>
where
S: Read + Write,
{
pub fn flush(&mut self) -> crate::Result<()> {
self.stream.write_all(&self.wbuf[self.wbuf_offset..])?;
self.clear_wbuf();
Ok(())
}
pub fn read(&mut self, offset: usize, n: usize) -> crate::Result<()> {
read!(@blocking self, offset, n)
}
}