From e8ea502cbb37886f34b25ad321f5a61a6d65cded Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Fri, 8 Jan 2021 15:26:06 -0800 Subject: [PATCH] fix(core): blocking IO runtime compiles in isolation --- sqlx-core/src/blocking/connection.rs | 3 +++ sqlx-core/src/blocking/options.rs | 3 +++ sqlx-core/src/io/buf_stream.rs | 26 +++++++++++++------------- sqlx-core/src/lib.rs | 26 +++++++++++++++++++++----- sqlx-core/src/runtime/async_std.rs | 4 ++-- 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/sqlx-core/src/blocking/connection.rs b/sqlx-core/src/blocking/connection.rs index e14d6302..a4b0b080 100644 --- a/sqlx-core/src/blocking/connection.rs +++ b/sqlx-core/src/blocking/connection.rs @@ -1,3 +1,5 @@ +use std::io; + use super::{ConnectOptions, Runtime}; use crate::DefaultRuntime; @@ -9,6 +11,7 @@ use crate::DefaultRuntime; pub trait Connection: crate::Connection where Rt: Runtime, + ::TcpStream: io::Read + io::Write, Self::Options: ConnectOptions, { /// Establish a new database connection. diff --git a/sqlx-core/src/blocking/options.rs b/sqlx-core/src/blocking/options.rs index eed024bb..691000be 100644 --- a/sqlx-core/src/blocking/options.rs +++ b/sqlx-core/src/blocking/options.rs @@ -1,3 +1,5 @@ +use std::io; + use super::{Connection, Runtime}; use crate::DefaultRuntime; @@ -10,6 +12,7 @@ use crate::DefaultRuntime; pub trait ConnectOptions: crate::ConnectOptions where Rt: Runtime, + ::TcpStream: io::Read + io::Write, Self::Connection: crate::Connection + Connection, { /// Establish a connection to the database. diff --git a/sqlx-core/src/io/buf_stream.rs b/sqlx-core/src/io/buf_stream.rs index 907eac34..089055d1 100644 --- a/sqlx-core/src/io/buf_stream.rs +++ b/sqlx-core/src/io/buf_stream.rs @@ -1,8 +1,7 @@ #[cfg(feature = "blocking")] use std::io::{Read, Write}; -use std::slice::from_raw_parts_mut; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; #[cfg(feature = "async")] use futures_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "async")] @@ -16,6 +15,7 @@ use futures_util::{AsyncReadExt, AsyncWriteExt}; /// to `read` and `write` on the underlying stream. /// pub struct BufStream { + #[cfg_attr(not(any(feature = "async", feature = "blocking")), allow(unused))] stream: S, // (r)ead buffer @@ -25,6 +25,7 @@ pub struct BufStream { wbuf: Vec, // offset into [wbuf] that a previous write operation has written into + #[cfg(feature = "async")] wbuf_offset: usize, } @@ -34,6 +35,7 @@ impl BufStream { stream, rbuf: BytesMut::with_capacity(read), wbuf: Vec::with_capacity(write), + #[cfg(feature = "async")] wbuf_offset: 0, } } @@ -62,16 +64,13 @@ impl BufStream { pub fn buffer(&mut self) -> &mut Vec { &mut self.wbuf } - - fn clear_wbuf(&mut self) { - // fully written buffer, move cursor back to the beginning - self.wbuf_offset = 0; - self.wbuf.clear(); - } } +#[cfg_attr(not(any(feature = "async", feature = "blocking")), allow(unused))] macro_rules! read { ($(@$blocking:ident)? $self:ident, $offset:ident, $n:ident) => {{ + use bytes::BufMut; + // before waiting to receive data; ensure that the write buffer is flushed if !$self.wbuf.is_empty() { read!($(@$blocking)? @flush $self); @@ -87,7 +86,7 @@ macro_rules! read { // prepare a chunk of uninitialized memory to write to // this is UB if the Read impl of the stream reads from the write buffer let b = $self.rbuf.chunk_mut(); - let b = from_raw_parts_mut(b.as_mut_ptr(), b.len()); + let b = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len()); // read as much as we can and return when the stream or our buffer is exhausted let read = read!($(@$blocking)? @read $self, b); @@ -121,7 +120,6 @@ macro_rules! read { }; } - #[cfg(feature = "async")] impl BufStream where @@ -135,7 +133,9 @@ where self.wbuf_offset += self.stream.write(&self.wbuf[self.wbuf_offset..]).await?; } - self.clear_wbuf(); + // fully written buffer, move cursor back to the beginning + self.wbuf_offset = 0; + self.wbuf.clear(); Ok(()) } @@ -151,8 +151,8 @@ where S: Read + Write, { pub fn flush(&mut self) -> crate::Result<()> { - self.stream.write_all(&self.wbuf[self.wbuf_offset..])?; - self.clear_wbuf(); + self.stream.write_all(&self.wbuf)?; + self.wbuf.clear(); Ok(()) } diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 10bc5f6e..26a542f5 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -53,6 +53,9 @@ pub use runtime::AsyncStd; pub use runtime::Runtime; #[cfg(feature = "tokio")] pub use runtime::Tokio; +#[cfg(feature = "_mock")] +#[doc(hidden)] +pub use runtime::{mock, Mock}; // pick a default runtime // this is so existing applications in SQLx pre 0.6 work and to @@ -68,20 +71,33 @@ pub type DefaultRuntime = Tokio; #[cfg(all(not(all(feature = "async-std", feature = "tokio")), feature = "actix"))] pub type DefaultRuntime = Actix; -#[cfg(all(not(feature = "async"), feature = "blocking"))] +#[cfg(all( + not(any(feature = "async-std", feature = "tokio", feature = "actix")), + feature = "blocking" +))] pub type DefaultRuntime = blocking::Blocking; -// when there is no async runtime and the blocking runtime is not present +// when there is no async runtime, and the blocking runtime is not present // the unit type is implemented for Runtime, this is only to allow the // lib to compile, the lib is mostly useless in this state -#[cfg(not(any(feature = "async", feature = "blocking")))] +#[cfg(not(any( + feature = "async_std", + feature = "actix", + feature = "tokio", + feature = "blocking" +)))] pub type DefaultRuntime = (); +#[cfg(any(feature = "async-std", feature = "tokio", feature = "actix"))] pub mod prelude { - #[cfg(all(not(feature = "async"), feature = "blocking"))] - pub use super::blocking::prelude::*; pub use super::ConnectOptions as _; pub use super::Connection as _; pub use super::Database as _; pub use super::Runtime as _; } + +#[cfg(all( + not(any(feature = "async-std", feature = "tokio", feature = "actix")), + feature = "blocking" +))] +pub use blocking::prelude; diff --git a/sqlx-core/src/runtime/async_std.rs b/sqlx-core/src/runtime/async_std.rs index 8c6e473a..0190dbf2 100644 --- a/sqlx-core/src/runtime/async_std.rs +++ b/sqlx-core/src/runtime/async_std.rs @@ -1,6 +1,6 @@ use std::io; -use async_std::{net::TcpStream, task::block_on}; +use async_std::net::TcpStream; use futures_util::{future::BoxFuture, FutureExt}; use crate::{AsyncRuntime, Runtime}; @@ -23,6 +23,6 @@ impl AsyncRuntime for AsyncStd { #[cfg(feature = "blocking")] impl crate::blocking::Runtime for AsyncStd { fn connect_tcp(host: &str, port: u16) -> io::Result { - block_on(::connect_tcp(host, port)) + async_std::task::block_on(::connect_tcp(host, port)) } }