fix(core): blocking IO runtime compiles in isolation

This commit is contained in:
Ryan Leckey 2021-01-08 15:26:06 -08:00
parent 06a8ed03bc
commit e8ea502cbb
No known key found for this signature in database
GPG Key ID: F8AA68C235AB08C9
5 changed files with 42 additions and 20 deletions

View File

@ -1,3 +1,5 @@
use std::io;
use super::{ConnectOptions, Runtime};
use crate::DefaultRuntime;
@ -9,6 +11,7 @@ use crate::DefaultRuntime;
pub trait Connection<Rt = DefaultRuntime>: crate::Connection<Rt>
where
Rt: Runtime,
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write,
Self::Options: ConnectOptions<Rt>,
{
/// Establish a new database connection.

View File

@ -1,3 +1,5 @@
use std::io;
use super::{Connection, Runtime};
use crate::DefaultRuntime;
@ -10,6 +12,7 @@ use crate::DefaultRuntime;
pub trait ConnectOptions<Rt = DefaultRuntime>: crate::ConnectOptions<Rt>
where
Rt: Runtime,
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write,
Self::Connection: crate::Connection<Rt, Options = Self> + Connection<Rt>,
{
/// Establish a connection to the database.

View File

@ -1,8 +1,7 @@
#[cfg(feature = "blocking")]
use std::io::{Read, Write};
use std::slice::from_raw_parts_mut;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Bytes, BytesMut};
#[cfg(feature = "async")]
use futures_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "async")]
@ -16,6 +15,7 @@ use futures_util::{AsyncReadExt, AsyncWriteExt};
/// to `read` and `write` on the underlying stream.
///
pub struct BufStream<S> {
#[cfg_attr(not(any(feature = "async", feature = "blocking")), allow(unused))]
stream: S,
// (r)ead buffer
@ -25,6 +25,7 @@ pub struct BufStream<S> {
wbuf: Vec<u8>,
// offset into [wbuf] that a previous write operation has written into
#[cfg(feature = "async")]
wbuf_offset: usize,
}
@ -34,6 +35,7 @@ impl<S> BufStream<S> {
stream,
rbuf: BytesMut::with_capacity(read),
wbuf: Vec::with_capacity(write),
#[cfg(feature = "async")]
wbuf_offset: 0,
}
}
@ -62,16 +64,13 @@ impl<S> BufStream<S> {
pub fn buffer(&mut self) -> &mut Vec<u8> {
&mut self.wbuf
}
fn clear_wbuf(&mut self) {
// fully written buffer, move cursor back to the beginning
self.wbuf_offset = 0;
self.wbuf.clear();
}
}
#[cfg_attr(not(any(feature = "async", feature = "blocking")), allow(unused))]
macro_rules! read {
($(@$blocking:ident)? $self:ident, $offset:ident, $n:ident) => {{
use bytes::BufMut;
// before waiting to receive data; ensure that the write buffer is flushed
if !$self.wbuf.is_empty() {
read!($(@$blocking)? @flush $self);
@ -87,7 +86,7 @@ macro_rules! read {
// prepare a chunk of uninitialized memory to write to
// this is UB if the Read impl of the stream reads from the write buffer
let b = $self.rbuf.chunk_mut();
let b = from_raw_parts_mut(b.as_mut_ptr(), b.len());
let b = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len());
// read as much as we can and return when the stream or our buffer is exhausted
let read = read!($(@$blocking)? @read $self, b);
@ -121,7 +120,6 @@ macro_rules! read {
};
}
#[cfg(feature = "async")]
impl<S> BufStream<S>
where
@ -135,7 +133,9 @@ where
self.wbuf_offset += self.stream.write(&self.wbuf[self.wbuf_offset..]).await?;
}
self.clear_wbuf();
// fully written buffer, move cursor back to the beginning
self.wbuf_offset = 0;
self.wbuf.clear();
Ok(())
}
@ -151,8 +151,8 @@ where
S: Read + Write,
{
pub fn flush(&mut self) -> crate::Result<()> {
self.stream.write_all(&self.wbuf[self.wbuf_offset..])?;
self.clear_wbuf();
self.stream.write_all(&self.wbuf)?;
self.wbuf.clear();
Ok(())
}

View File

@ -53,6 +53,9 @@ pub use runtime::AsyncStd;
pub use runtime::Runtime;
#[cfg(feature = "tokio")]
pub use runtime::Tokio;
#[cfg(feature = "_mock")]
#[doc(hidden)]
pub use runtime::{mock, Mock};
// pick a default runtime
// this is so existing applications in SQLx pre 0.6 work and to
@ -68,20 +71,33 @@ pub type DefaultRuntime = Tokio;
#[cfg(all(not(all(feature = "async-std", feature = "tokio")), feature = "actix"))]
pub type DefaultRuntime = Actix;
#[cfg(all(not(feature = "async"), feature = "blocking"))]
#[cfg(all(
not(any(feature = "async-std", feature = "tokio", feature = "actix")),
feature = "blocking"
))]
pub type DefaultRuntime = blocking::Blocking;
// when there is no async runtime and the blocking runtime is not present
// when there is no async runtime, and the blocking runtime is not present
// the unit type is implemented for Runtime, this is only to allow the
// lib to compile, the lib is mostly useless in this state
#[cfg(not(any(feature = "async", feature = "blocking")))]
#[cfg(not(any(
feature = "async_std",
feature = "actix",
feature = "tokio",
feature = "blocking"
)))]
pub type DefaultRuntime = ();
#[cfg(any(feature = "async-std", feature = "tokio", feature = "actix"))]
pub mod prelude {
#[cfg(all(not(feature = "async"), feature = "blocking"))]
pub use super::blocking::prelude::*;
pub use super::ConnectOptions as _;
pub use super::Connection as _;
pub use super::Database as _;
pub use super::Runtime as _;
}
#[cfg(all(
not(any(feature = "async-std", feature = "tokio", feature = "actix")),
feature = "blocking"
))]
pub use blocking::prelude;

View File

@ -1,6 +1,6 @@
use std::io;
use async_std::{net::TcpStream, task::block_on};
use async_std::net::TcpStream;
use futures_util::{future::BoxFuture, FutureExt};
use crate::{AsyncRuntime, Runtime};
@ -23,6 +23,6 @@ impl AsyncRuntime for AsyncStd {
#[cfg(feature = "blocking")]
impl crate::blocking::Runtime for AsyncStd {
fn connect_tcp(host: &str, port: u16) -> io::Result<Self::TcpStream> {
block_on(<AsyncStd as AsyncRuntime>::connect_tcp(host, port))
async_std::task::block_on(<AsyncStd as AsyncRuntime>::connect_tcp(host, port))
}
}