From 769e8aa461bd0b74925d2a98fd6da908d6e3bee0 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Sun, 10 Jan 2021 10:45:29 -0800 Subject: [PATCH] refactor: introduce io::Stream (blocking::io::Stream) to encapsulate needed IO methods - rename sqlx::AsyncRuntime to sqlx::Async - don't set default runtime in traits --- sqlx-core/src/acquire.rs | 4 +- sqlx-core/src/blocking.rs | 71 +++++++++++++-- sqlx-core/src/blocking/acquire.rs | 7 +- sqlx-core/src/blocking/close.rs | 11 +-- sqlx-core/src/blocking/connect.rs | 11 +-- sqlx-core/src/blocking/connection.rs | 12 ++- sqlx-core/src/blocking/io.rs | 15 ++++ sqlx-core/src/blocking/options.rs | 11 ++- sqlx-core/src/blocking/runtime.rs | 15 ---- sqlx-core/src/close.rs | 12 ++- sqlx-core/src/connect.rs | 11 ++- sqlx-core/src/connection.rs | 12 +-- sqlx-core/src/database.rs | 5 +- sqlx-core/src/error.rs | 1 + sqlx-core/src/io.rs | 2 + sqlx-core/src/io/buf_stream.rs | 30 +++---- sqlx-core/src/io/stream.rs | 18 ++++ sqlx-core/src/lib.rs | 65 +++----------- sqlx-core/src/options.rs | 8 +- sqlx-core/src/pool.rs | 2 +- sqlx-core/src/runtime.rs | 129 +++++++++++++++++++++++---- sqlx-core/src/runtime/actix.rs | 37 ++++++-- sqlx-core/src/runtime/async_std.rs | 65 +++++++++++--- sqlx-core/src/runtime/tokio.rs | 36 ++++++-- sqlx-mysql/src/connection.rs | 44 +++++---- sqlx-mysql/src/connection/close.rs | 7 +- sqlx-mysql/src/connection/connect.rs | 19 ++-- sqlx-mysql/src/connection/ping.rs | 6 +- sqlx-mysql/src/connection/stream.rs | 7 +- sqlx-mysql/src/options.rs | 6 +- 30 files changed, 463 insertions(+), 216 deletions(-) create mode 100644 sqlx-core/src/blocking/io.rs create mode 100644 sqlx-core/src/io/stream.rs diff --git a/sqlx-core/src/acquire.rs b/sqlx-core/src/acquire.rs index 7e3ccdcf..6abe5b5a 100644 --- a/sqlx-core/src/acquire.rs +++ b/sqlx-core/src/acquire.rs @@ -1,9 +1,9 @@ #[cfg(feature = "async")] use futures_util::future::BoxFuture; -use crate::{Database, DefaultRuntime, Runtime}; +use crate::{Database, Runtime}; -pub trait Acquire { +pub trait Acquire { type Database: Database; /// Get a connection from the pool, make a new connection, or wait for one to become diff --git a/sqlx-core/src/blocking.rs b/sqlx-core/src/blocking.rs index 99f6c57b..633e7098 100644 --- a/sqlx-core/src/blocking.rs +++ b/sqlx-core/src/blocking.rs @@ -1,6 +1,9 @@ //! Types and traits used to implement a database driver with **blocking** I/O. //! +use std::io::{Read, Result as IoResult, Write}; +use std::net::TcpStream; + mod acquire; mod close; mod connect; @@ -8,19 +11,71 @@ mod connection; mod options; mod runtime; +#[doc(hidden)] +pub mod io; + pub use acquire::Acquire; pub use close::Close; pub use connect::Connect; pub use connection::Connection; pub use options::ConnectOptions; -pub use runtime::{Blocking, Runtime}; +pub use runtime::Runtime; pub mod prelude { - pub use super::Acquire as _; - pub use super::Close as _; - pub use super::Connect as _; - pub use super::ConnectOptions as _; - pub use super::Connection as _; - pub use super::Runtime as _; - pub use crate::Database as _; + pub use super::Acquire; + pub use super::Close; + pub use super::Connect; + pub use super::ConnectOptions; + pub use super::Connection; + pub use super::Runtime; + pub use crate::Database; +} + +/// Uses the `std::net` primitives to implement a blocking runtime for SQLx. +#[derive(Debug)] +pub struct Blocking; + +impl crate::Runtime for Blocking { + type TcpStream = TcpStream; +} + +impl Runtime for Blocking { + fn connect_tcp(host: &str, port: u16) -> IoResult { + TcpStream::connect((host, port)) + } +} + +// 's: stream +impl<'s> crate::io::Stream<'s, Blocking> for TcpStream { + #[cfg(feature = "async")] + type ReadFuture = futures_util::future::BoxFuture<'s, IoResult>; + + #[cfg(feature = "async")] + type WriteFuture = futures_util::future::BoxFuture<'s, IoResult>; + + #[cfg(feature = "async")] + fn read_async(&'s mut self, _buf: &'s mut [u8]) -> Self::ReadFuture { + // UNREACHABLE: [`Blocking`] does not implement the [`Async`] marker + unreachable!() + } + + #[cfg(feature = "async")] + fn write_async(&'s mut self, _buf: &'s [u8]) -> Self::WriteFuture { + // UNREACHABLE: [`Blocking`] does not implement the [`Async`] marker + unreachable!() + } +} + +// 's: stream +impl<'s> io::Stream<'s, Blocking> for TcpStream { + fn read(&'s mut self, buf: &'s mut [u8]) -> IoResult { + Read::read(self, buf) + } + + fn write(&'s mut self, buf: &'s [u8]) -> IoResult { + let size = buf.len(); + self.write_all(buf)?; + + Ok(size) + } } diff --git a/sqlx-core/src/blocking/acquire.rs b/sqlx-core/src/blocking/acquire.rs index 69e31743..b03018fa 100644 --- a/sqlx-core/src/blocking/acquire.rs +++ b/sqlx-core/src/blocking/acquire.rs @@ -1,7 +1,10 @@ -use super::{Blocking, Runtime}; +use super::Runtime; use crate::Database; -pub trait Acquire: crate::Acquire { +pub trait Acquire: crate::Acquire +where + Rt: Runtime, +{ /// Get a connection from the pool, make a new connection, or wait for one to become /// available. /// diff --git a/sqlx-core/src/blocking/close.rs b/sqlx-core/src/blocking/close.rs index b0da47c3..8944b3aa 100644 --- a/sqlx-core/src/blocking/close.rs +++ b/sqlx-core/src/blocking/close.rs @@ -1,11 +1,12 @@ -use std::io; +use super::{io::Stream, Runtime}; -use super::{Blocking, Runtime}; - -pub trait Close: crate::Close { +pub trait Close: crate::Close +where + Rt: Runtime, +{ fn close(self) -> crate::Result<()> where - ::TcpStream: io::Read + io::Write; + for<'s> ::TcpStream: Stream<'s, Rt>; } // TODO: impl Close for Pool { ... } diff --git a/sqlx-core/src/blocking/connect.rs b/sqlx-core/src/blocking/connect.rs index 40dbd847..f9c2fe8b 100644 --- a/sqlx-core/src/blocking/connect.rs +++ b/sqlx-core/src/blocking/connect.rs @@ -1,12 +1,13 @@ -use std::io; +use super::{io::Stream, Runtime}; -use super::{Blocking, Runtime}; - -pub trait Connect: crate::Connect { +pub trait Connect: crate::Connect +where + Rt: Runtime, +{ fn connect(url: &str) -> crate::Result where Self: Sized, - ::TcpStream: io::Read + io::Write; + for<'s> ::TcpStream: Stream<'s, Rt>; } // TODO: impl Connect for Pool { ... } diff --git a/sqlx-core/src/blocking/connection.rs b/sqlx-core/src/blocking/connection.rs index 5d0ade51..7420f1fb 100644 --- a/sqlx-core/src/blocking/connection.rs +++ b/sqlx-core/src/blocking/connection.rs @@ -1,17 +1,13 @@ -use std::io; - -use super::{Blocking, Close, Connect, ConnectOptions, Runtime}; +use super::{io::Stream, Close, Connect, ConnectOptions, Runtime}; /// A unique connection (session) with a specific database. /// /// For detailed information, refer to the asynchronous version of /// this: [`Connection`][crate::Connection]. /// -pub trait Connection: - crate::Connection + Close + Connect +pub trait Connection: crate::Connection + Close + Connect where Rt: Runtime, - ::TcpStream: io::Read + io::Write, Self::Options: ConnectOptions, { /// Checks if a connection to the database is still valid. @@ -19,5 +15,7 @@ where /// For detailed information, refer to the asynchronous version of /// this: [`ping()`][crate::Connection::ping]. /// - fn ping(&mut self) -> crate::Result<()>; + fn ping(&mut self) -> crate::Result<()> + where + for<'s> ::TcpStream: Stream<'s, Rt>; } diff --git a/sqlx-core/src/blocking/io.rs b/sqlx-core/src/blocking/io.rs new file mode 100644 index 00000000..f7e93f49 --- /dev/null +++ b/sqlx-core/src/blocking/io.rs @@ -0,0 +1,15 @@ +use std::io; + +use crate::Runtime; + +// 's: stream +pub trait Stream<'s, Rt>: crate::io::Stream<'s, Rt> +where + Rt: Runtime, +{ + #[doc(hidden)] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result; + + #[doc(hidden)] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result; +} diff --git a/sqlx-core/src/blocking/options.rs b/sqlx-core/src/blocking/options.rs index d0fdd1be..96e316f1 100644 --- a/sqlx-core/src/blocking/options.rs +++ b/sqlx-core/src/blocking/options.rs @@ -1,6 +1,4 @@ -use std::io; - -use super::{Blocking, Connection, Runtime}; +use super::{io::Stream, Connection, Runtime}; /// Options which can be used to configure how a SQL connection is opened. /// @@ -8,9 +6,9 @@ use super::{Blocking, Connection, Runtime}; /// this: [`ConnectOptions`][crate::ConnectOptions]. /// #[allow(clippy::module_name_repetitions)] -pub trait ConnectOptions: crate::ConnectOptions +pub trait ConnectOptions: crate::ConnectOptions where - ::TcpStream: io::Read + io::Write, + Rt: Runtime, Self::Connection: crate::Connection + Connection, { /// Establish a connection to the database. @@ -20,5 +18,6 @@ where /// fn connect(&self) -> crate::Result where - Self::Connection: Sized; + Self::Connection: Sized, + for<'s> ::TcpStream: Stream<'s, Rt>; } diff --git a/sqlx-core/src/blocking/runtime.rs b/sqlx-core/src/blocking/runtime.rs index 40cd8498..e60d9e5e 100644 --- a/sqlx-core/src/blocking/runtime.rs +++ b/sqlx-core/src/blocking/runtime.rs @@ -1,5 +1,4 @@ use std::io; -use std::net::TcpStream; /// Describes a set of types and functions used to open and manage /// resources within SQLx. @@ -11,17 +10,3 @@ pub trait Runtime: crate::Runtime { /// Opens a TCP connection to a remote host at the specified port. fn connect_tcp(host: &str, port: u16) -> io::Result; } - -/// Uses the `std::net` primitives to implement a blocking runtime for SQLx. -#[derive(Debug)] -pub struct Blocking; - -impl crate::Runtime for Blocking { - type TcpStream = TcpStream; -} - -impl Runtime for Blocking { - fn connect_tcp(host: &str, port: u16) -> io::Result { - TcpStream::connect((host, port)) - } -} diff --git a/sqlx-core/src/close.rs b/sqlx-core/src/close.rs index 1ce9033c..2dfeba02 100644 --- a/sqlx-core/src/close.rs +++ b/sqlx-core/src/close.rs @@ -1,11 +1,15 @@ -use crate::{DefaultRuntime, Runtime}; +use crate::Runtime; -pub trait Close { +// for<'a> &'a mut Rt::TcpStream: crate::io::Stream<'a>, +pub trait Close +where + Rt: Runtime, +{ #[cfg(feature = "async")] fn close(self) -> futures_util::future::BoxFuture<'static, crate::Result<()>> where - Rt: crate::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin; + Rt: crate::Async, + for<'s> ::TcpStream: crate::io::Stream<'s, Rt>; } // TODO: impl Close for Pool { ... } diff --git a/sqlx-core/src/connect.rs b/sqlx-core/src/connect.rs index 26730d9e..e206b348 100644 --- a/sqlx-core/src/connect.rs +++ b/sqlx-core/src/connect.rs @@ -1,14 +1,17 @@ -use crate::{ConnectOptions, DefaultRuntime, Runtime}; +use crate::{ConnectOptions, Runtime}; -pub trait Connect { +pub trait Connect +where + Rt: Runtime, +{ type Options: ConnectOptions; #[cfg(feature = "async")] fn connect(url: &str) -> futures_util::future::BoxFuture<'_, crate::Result> where Self: Sized, - Rt: crate::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin; + Rt: crate::Async, + for<'s> ::TcpStream: crate::io::Stream<'s, Rt>; } // TODO: impl Connect for Pool { ... } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 74417689..31490c8b 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -1,7 +1,7 @@ #[cfg(feature = "async")] use futures_util::future::BoxFuture; -use crate::{Close, Connect, Database, DefaultRuntime, Runtime}; +use crate::{Close, Connect, Database, Runtime}; /// A unique connection (session) with a specific database. /// @@ -11,8 +11,10 @@ use crate::{Close, Connect, Database, DefaultRuntime, Runtime}; /// SQL statements will be executed and results returned within the context /// of this single SQL connection. /// -pub trait Connection: - 'static + Send + Connect + Close +// for<'a> &'a mut Rt::TcpStream: crate::io::Stream<'a>, +pub trait Connection: 'static + Send + Connect + Close +where + Rt: Runtime, { type Database: Database; @@ -26,6 +28,6 @@ pub trait Connection: #[cfg(feature = "async")] fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>> where - Rt: crate::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin; + Rt: crate::Async, + for<'s> ::TcpStream: crate::io::Stream<'s, Rt>; } diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs index 027686ed..d3938959 100644 --- a/sqlx-core/src/database.rs +++ b/sqlx-core/src/database.rs @@ -1,14 +1,13 @@ use std::fmt::Debug; -use crate::{Connection, DefaultRuntime, Runtime}; +use crate::{Connection, Runtime}; /// A database driver. /// /// This trait encapsulates a complete set of traits that implement a driver for a /// specific database (e.g., MySQL, PostgreSQL). /// -// 'x: execution -pub trait Database: 'static + Sized + Debug + for<'x> HasOutput<'x> +pub trait Database: 'static + Sized + Debug + for<'x> HasOutput<'x> where Rt: Runtime, { diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index c25726fa..c19c9d56 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -6,6 +6,7 @@ mod database; pub use database::DatabaseError; +/// `Result` type returned from methods that can have SQLx errors. pub type Result = std::result::Result; #[derive(Debug)] diff --git a/sqlx-core/src/io.rs b/sqlx-core/src/io.rs index 469b67d9..341f194a 100644 --- a/sqlx-core/src/io.rs +++ b/sqlx-core/src/io.rs @@ -2,10 +2,12 @@ mod buf; mod buf_stream; mod deserialize; mod serialize; +mod stream; mod write; pub use buf::BufExt; pub use buf_stream::BufStream; pub use deserialize::Deserialize; pub use serialize::Serialize; +pub use stream::Stream; pub use write::WriteExt; diff --git a/sqlx-core/src/io/buf_stream.rs b/sqlx-core/src/io/buf_stream.rs index 681be40f..4600cf71 100644 --- a/sqlx-core/src/io/buf_stream.rs +++ b/sqlx-core/src/io/buf_stream.rs @@ -1,11 +1,6 @@ -#[cfg(feature = "blocking")] -use std::io::{Read, Write}; +use std::marker::PhantomData; use bytes::{Bytes, BytesMut}; -#[cfg(feature = "async")] -use futures_io::{AsyncRead, AsyncWrite}; -#[cfg(feature = "async")] -use futures_util::{AsyncReadExt, AsyncWriteExt}; /// Wraps a stream and buffers input and output to and from it. /// @@ -14,7 +9,9 @@ use futures_util::{AsyncReadExt, AsyncWriteExt}; /// a network interaction). `BufStream` keeps a read and write buffer with infrequent calls /// to `read` and `write` on the underlying stream. /// -pub struct BufStream { +pub struct BufStream { + runtime: PhantomData, + #[cfg_attr(not(any(feature = "async", feature = "blocking")), allow(unused))] stream: S, @@ -29,10 +26,11 @@ pub struct BufStream { wbuf_offset: usize, } -impl BufStream { +impl BufStream { pub fn with_capacity(stream: S, read: usize, write: usize) -> Self { Self { stream, + runtime: PhantomData, rbuf: BytesMut::with_capacity(read), wbuf: Vec::with_capacity(write), #[cfg(feature = "async")] @@ -116,21 +114,18 @@ macro_rules! read { }; (@read $self:ident, $b:ident) => { - $self.stream.read($b).await?; + $self.stream.read_async($b).await?; }; } #[cfg(feature = "async")] -impl BufStream -where - S: AsyncRead + AsyncWrite + Send + Unpin, -{ +impl crate::io::Stream<'s, Rt>> BufStream { pub async fn flush_async(&mut self) -> crate::Result<()> { // write as much as we can each time and move the cursor as we write from the buffer // if _this_ future drops, offset will have a record of how much of the wbuf has // been written while self.wbuf_offset < self.wbuf.len() { - self.wbuf_offset += self.stream.write(&self.wbuf[self.wbuf_offset..]).await?; + self.wbuf_offset += self.stream.write_async(&self.wbuf[self.wbuf_offset..]).await?; } // fully written buffer, move cursor back to the beginning @@ -146,12 +141,9 @@ where } #[cfg(feature = "blocking")] -impl BufStream -where - S: Read + Write, -{ +impl crate::blocking::io::Stream<'s, Rt>> BufStream { pub fn flush(&mut self) -> crate::Result<()> { - self.stream.write_all(&self.wbuf)?; + self.stream.write(&self.wbuf)?; self.wbuf.clear(); Ok(()) diff --git a/sqlx-core/src/io/stream.rs b/sqlx-core/src/io/stream.rs new file mode 100644 index 00000000..37cb6fb2 --- /dev/null +++ b/sqlx-core/src/io/stream.rs @@ -0,0 +1,18 @@ +use crate::Runtime; + +// 's: stream +pub trait Stream<'s, Rt: Runtime>: Send + Sync + Unpin { + #[cfg(feature = "async")] + type ReadFuture: 's + std::future::Future> + Send; + + #[cfg(feature = "async")] + type WriteFuture: 's + std::future::Future> + Send; + + #[cfg(feature = "async")] + #[doc(hidden)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture; + + #[cfg(feature = "async")] + #[doc(hidden)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture; +} diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index d3359bad..bac0b99e 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -17,15 +17,6 @@ #![allow(clippy::doc_markdown)] #![allow(clippy::clippy::missing_errors_doc)] -// crate renames to allow the feature name "tokio" and "async-std" (as features -// can't directly conflict with dependency names) - -#[cfg(feature = "async-std")] -extern crate _async_std as async_std; - -#[cfg(feature = "tokio")] -extern crate _tokio as tokio; - mod acquire; mod close; mod connect; @@ -43,6 +34,8 @@ pub mod io; pub mod blocking; pub use acquire::Acquire; +#[cfg(feature = "blocking")] +pub use blocking::Blocking; pub use close::Close; pub use connect::Connect; pub use connection::Connection; @@ -52,57 +45,23 @@ pub use options::ConnectOptions; pub use pool::Pool; #[cfg(feature = "actix")] pub use runtime::Actix; -#[cfg(feature = "async")] -pub use runtime::AsyncRuntime; #[cfg(feature = "async-std")] pub use runtime::AsyncStd; -pub use runtime::Runtime; +// #[cfg(feature = "_mock")] +// pub use mock::Mock; #[cfg(feature = "tokio")] pub use runtime::Tokio; -#[cfg(feature = "_mock")] -#[doc(hidden)] -pub use runtime::{mock, Mock}; - -// pick a default runtime -// this is so existing applications in SQLx pre 0.6 work and to -// make it more convenient, if your application only uses 1 runtime (99%+) -// most of the time you won't have to worry about picking the runtime - -#[cfg(feature = "async-std")] -pub type DefaultRuntime = AsyncStd; - -#[cfg(all(not(feature = "async-std"), feature = "tokio"))] -pub type DefaultRuntime = Tokio; - -#[cfg(all(not(all(feature = "async-std", feature = "tokio")), feature = "actix"))] -pub type DefaultRuntime = Actix; - -#[cfg(all( - not(any(feature = "async-std", feature = "tokio", feature = "actix")), - feature = "blocking" -))] -pub type DefaultRuntime = blocking::Blocking; - -// when there is no async runtime, and the blocking runtime is not present -// the unit type is implemented for Runtime, this is only to allow the -// lib to compile, the lib is mostly useless in this state -#[cfg(not(any( - feature = "async-std", - feature = "actix", - feature = "tokio", - feature = "blocking" -)))] -pub type DefaultRuntime = (); +pub use runtime::{Async, DefaultRuntime, Runtime}; #[cfg(any(feature = "async-std", feature = "tokio", feature = "actix"))] pub mod prelude { - pub use super::Acquire as _; - pub use super::Close as _; - pub use super::Connect as _; - pub use super::ConnectOptions as _; - pub use super::Connection as _; - pub use super::Database as _; - pub use super::Runtime as _; + pub use super::Acquire; + pub use super::Close; + pub use super::Connect; + pub use super::ConnectOptions; + pub use super::Connection; + pub use super::Database; + pub use super::Runtime; } #[cfg(all( diff --git a/sqlx-core/src/options.rs b/sqlx-core/src/options.rs index 6a5af42a..bbac8ae0 100644 --- a/sqlx-core/src/options.rs +++ b/sqlx-core/src/options.rs @@ -1,11 +1,11 @@ use std::fmt::Debug; use std::str::FromStr; -use crate::{Connection, DefaultRuntime, Runtime}; +use crate::{Connection, Runtime}; /// Options which can be used to configure how a SQL connection is opened. #[allow(clippy::module_name_repetitions)] -pub trait ConnectOptions: +pub trait ConnectOptions: 'static + Sized + Send + Sync + Default + Debug + Clone + FromStr where Rt: Runtime, @@ -17,6 +17,6 @@ where fn connect(&self) -> futures_util::future::BoxFuture<'_, crate::Result> where Self::Connection: Sized, - Rt: crate::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin; + Rt: crate::Async, + for<'s> Rt::TcpStream: crate::io::Stream<'s, Rt>; } diff --git a/sqlx-core/src/pool.rs b/sqlx-core/src/pool.rs index 6dfaeb25..b05097a5 100644 --- a/sqlx-core/src/pool.rs +++ b/sqlx-core/src/pool.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use crate::{Database, DefaultRuntime, Runtime}; /// A connection pool to enable the efficient reuse of a managed pool of SQL connections. -pub struct Pool { +pub struct Pool, Rt: Runtime = DefaultRuntime> { runtime: PhantomData, database: PhantomData, } diff --git a/sqlx-core/src/runtime.rs b/sqlx-core/src/runtime.rs index 172edc3f..260b6996 100644 --- a/sqlx-core/src/runtime.rs +++ b/sqlx-core/src/runtime.rs @@ -1,6 +1,8 @@ -#[cfg(feature = "_mock")] -#[doc(hidden)] -pub mod mock; +use crate::io::Stream; + +// #[cfg(feature = "_mock")] +// #[doc(hidden)] +// pub mod mock; #[cfg(feature = "async-std")] #[path = "runtime/async_std.rs"] @@ -18,31 +20,62 @@ mod tokio_; pub use actix_::Actix; #[cfg(feature = "async-std")] pub use async_std_::AsyncStd; -#[cfg(feature = "_mock")] -pub use mock::Mock; +// #[cfg(feature = "_mock")] +// pub use mock::Mock; #[cfg(feature = "tokio")] pub use tokio_::Tokio; -/// Describes a set of types and functions used to open and manage -/// resources within SQLx. -pub trait Runtime: 'static + Send + Sync { - type TcpStream: Send; +/// Describes a set of types and functions used to open and manage IO resources within SQLx. +/// +/// In the greater ecosystem we have several choices for an asynchronous runtime (executor) to +/// schedule and interact with our futures. Libraries that wish to be generally available have +/// tended to either pick one (and allow compatibility wrappers to others) or use mutually-exclusive +/// cargo feature flags to pick between runtimes. Each of these approaches have their own +/// problems. +/// +/// In SQLx, most types and traits are parameterized with a `Rt: Runtime` bound. Asynchronous +/// implementations of `Runtime` are available for [**async-std**](https://async.rs/), +/// [**Tokio**](https://tokio.rs/), and [**Actix**](https://actix.rs/) (given +/// those crate features are activated). +/// +/// - [`AsyncStd`] +/// - [`Tokio`] +/// - [`Actix`] +/// +/// Additionally, a `std` blocking runtime is provided. This is intended for use in +/// environments where asynchronous IO either doesn't make sense or isn't available. +/// +/// - [`Blocking`][crate::blocking::Blocking] +/// +pub trait Runtime: 'static + Send + Sync + Sized { + #[doc(hidden)] + type TcpStream: 'static + Send + Sync; } -#[cfg(feature = "async")] -#[allow(clippy::module_name_repetitions)] -pub trait AsyncRuntime: Runtime { - /// Opens a TCP connection to a remote host at the specified port. - fn connect_tcp( +/// Marks a [`Runtime`] as being capable of handling asynchronous execution. +// Provided so that attempting to use the asynchronous methods with the +// Blocking runtime will error at compile-time as opposed to runtime. +pub trait Async: Runtime +where + // NOTE: This requires a **pervasive** bound for everything that needs + // to bound on . Remove if you can think of something else that + // allows us to both allow polymorphic read/write on streams across + // runtimes _and_ not depend on . When GATs are stabilized, + // we should be able to switch to that and remove this HRTB. + Self::TcpStream: for<'s> Stream<'s, Self>, +{ + #[cfg(feature = "async")] + #[doc(hidden)] + fn connect_tcp_async( host: &str, port: u16, ) -> futures_util::future::BoxFuture<'_, std::io::Result>; } -// when the async feature is not specified, this is an empty trait +// when no runtime is available // we implement `()` for it to allow the lib to still compile #[cfg(not(any( - feature = "async_std", + feature = "async-std", feature = "actix", feature = "tokio", feature = "blocking" @@ -50,3 +83,67 @@ pub trait AsyncRuntime: Runtime { impl Runtime for () { type TcpStream = (); } + +mod default { + // pick a default runtime + // this is so existing applications in SQLx pre 0.6 work and to + // make it more convenient, if your application only uses 1 runtime (99%+) + // most of the time you won't have to worry about picking the runtime + #[cfg(feature = "async-std")] + pub type Runtime = super::AsyncStd; + + #[cfg(all(not(feature = "async-std"), feature = "tokio"))] + pub type Runtime = super::Tokio; + + #[cfg(all(not(all(feature = "async-std", feature = "tokio")), feature = "actix"))] + pub type Runtime = super::Actix; + + #[cfg(all( + not(any(feature = "async-std", feature = "tokio", feature = "actix")), + feature = "blocking" + ))] + pub type Runtime = crate::Blocking; + + // when there is no async runtime, and the blocking runtime is not present + // the unit type is implemented for Runtime, this is only to allow the + // lib to compile, the lib is mostly useless in this state + #[cfg(not(any( + feature = "async-std", + feature = "actix", + feature = "tokio", + feature = "blocking" + )))] + pub type Runtime = (); +} + +/// The default runtime in use by SQLx when one is unspecified. +/// +/// Following the crate features for each runtime are activated, a default is picked +/// by following a priority list. The actual sorting here is mostly arbitrary (what is +/// important is that there _is_ a stable ordering). +/// +/// 1. [`AsyncStd`] +/// 2. [`Tokio`] +/// 3. [`Actix`] +/// 4. [`Blocking`] +/// 5. `()` – No runtime selected (nothing is possible) +/// +/// The intent is to allow the following to cleanly work, regardless of the enabled runtime, +/// if only one runtime is enabled. +/// +///
+/// +/// ```rust,ignore +/// use sqlx::postgres::{PgConnection, PgConnectOptions}; +/// use sqlx::prelude::*; +/// +/// // PgConnection +/// let conn: PgConnection = PgConnectOptions::new() +/// .host("localhost") +/// .username("postgres") +/// .password("password") +/// // .connect()?; // for Blocking runtime +/// .connect().await?; // for Async runtimes +/// ``` +/// +pub type DefaultRuntime = default::Runtime; diff --git a/sqlx-core/src/runtime/actix.rs b/sqlx-core/src/runtime/actix.rs index da562b99..e0eae901 100644 --- a/sqlx-core/src/runtime/actix.rs +++ b/sqlx-core/src/runtime/actix.rs @@ -2,9 +2,10 @@ use std::io; use actix_rt::net::TcpStream; use async_compat_02::Compat; -use futures_util::{future::BoxFuture, FutureExt, TryFutureExt}; +use futures_util::io::{Read, Write}; +use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt}; -use crate::{AsyncRuntime, Runtime}; +use crate::{io::Stream, Async, Runtime}; /// Actix SQLx runtime. Uses [`actix-rt`][actix_rt] to provide [`Runtime`]. /// @@ -17,14 +18,36 @@ use crate::{AsyncRuntime, Runtime}; pub struct Actix; impl Runtime for Actix { + // NOTE: Compat<_> is used to avoid requiring a Box per read/write call + // https://github.com/tokio-rs/tokio/issues/2723 + #[doc(hidden)] type TcpStream = Compat; } -impl AsyncRuntime for Actix -where - Self::TcpStream: futures_io::AsyncRead, -{ - fn connect_tcp(host: &str, port: u16) -> BoxFuture<'_, io::Result> { +impl Async for Actix { + #[doc(hidden)] + fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> { TcpStream::connect((host, port)).map_ok(Compat::new).boxed() } } + +// 's: stream +impl<'s> Stream<'s, Actix> for Compat { + #[doc(hidden)] + type ReadFuture = Read<'s, Self>; + + #[doc(hidden)] + type WriteFuture = Write<'s, Self>; + + #[inline] + #[doc(hidden)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { + self.read(buf) + } + + #[inline] + #[doc(hidden)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { + self.write(buf) + } +} diff --git a/sqlx-core/src/runtime/async_std.rs b/sqlx-core/src/runtime/async_std.rs index 0190dbf2..42ccbb66 100644 --- a/sqlx-core/src/runtime/async_std.rs +++ b/sqlx-core/src/runtime/async_std.rs @@ -1,28 +1,71 @@ -use std::io; +use _async_std::net::TcpStream; +use futures_util::io::{Read, Write}; +use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt}; -use async_std::net::TcpStream; -use futures_util::{future::BoxFuture, FutureExt}; +#[cfg(feature = "blocking")] +use crate::blocking; +use crate::{io::Stream, Async, Runtime}; -use crate::{AsyncRuntime, Runtime}; - -/// [`async-std`](async_std) implementation of [`Runtime`]. +/// Provides [`Runtime`] for [**async-std**][_async_std]. Supports both blocking +/// and non-blocking operation. +/// +/// For blocking operation, the equivalent non-blocking methods are called +/// and trivially wrapped in [`task::block_on`][_async_std::task::block_on]. +/// #[cfg_attr(doc_cfg, doc(cfg(feature = "async-std")))] #[derive(Debug)] pub struct AsyncStd; impl Runtime for AsyncStd { + #[doc(hidden)] type TcpStream = TcpStream; } -impl AsyncRuntime for AsyncStd { - fn connect_tcp(host: &str, port: u16) -> BoxFuture<'_, io::Result> { +impl Async for AsyncStd { + #[doc(hidden)] + fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, std::io::Result> { TcpStream::connect((host, port)).boxed() } } #[cfg(feature = "blocking")] -impl crate::blocking::Runtime for AsyncStd { - fn connect_tcp(host: &str, port: u16) -> io::Result { - async_std::task::block_on(::connect_tcp(host, port)) +impl blocking::Runtime for AsyncStd { + fn connect_tcp(host: &str, port: u16) -> std::io::Result { + _async_std::task::block_on(Self::connect_tcp_async(host, port)) + } +} + +// 's: stream +impl<'s> Stream<'s, AsyncStd> for TcpStream { + #[doc(hidden)] + type ReadFuture = Read<'s, Self>; + + #[doc(hidden)] + type WriteFuture = Write<'s, Self>; + + #[inline] + #[doc(hidden)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { + self.read(buf) + } + + #[inline] + #[doc(hidden)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { + self.write(buf) + } +} + +// 's: stream +#[cfg(feature = "blocking")] +impl<'s> blocking::io::Stream<'s, AsyncStd> for TcpStream { + #[doc(hidden)] + fn read(&'s mut self, buf: &'s mut [u8]) -> std::io::Result { + _async_std::task::block_on(self.read_async(buf)) + } + + #[doc(hidden)] + fn write(&'s mut self, buf: &'s [u8]) -> std::io::Result { + _async_std::task::block_on(self.write_async(buf)) } } diff --git a/sqlx-core/src/runtime/tokio.rs b/sqlx-core/src/runtime/tokio.rs index 05f6ba1e..e47f02c2 100644 --- a/sqlx-core/src/runtime/tokio.rs +++ b/sqlx-core/src/runtime/tokio.rs @@ -1,10 +1,11 @@ use std::io; +use _tokio::net::TcpStream; use async_compat::Compat; -use futures_util::{future::BoxFuture, FutureExt, TryFutureExt}; -use tokio::net::TcpStream; +use futures_util::io::{Read, Write}; +use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt}; -use crate::{AsyncRuntime, Runtime}; +use crate::{io::Stream, Async, Runtime}; /// Tokio SQLx runtime. Uses [`tokio`] to provide [`Runtime`]. /// @@ -15,11 +16,36 @@ use crate::{AsyncRuntime, Runtime}; pub struct Tokio; impl Runtime for Tokio { + // NOTE: Compat<_> is used to avoid requiring a Box per read/write call + // https://github.com/tokio-rs/tokio/issues/2723 + #[doc(hidden)] type TcpStream = Compat; } -impl AsyncRuntime for Tokio { - fn connect_tcp(host: &str, port: u16) -> BoxFuture<'_, io::Result> { +impl Async for Tokio { + #[doc(hidden)] + fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> { TcpStream::connect((host, port)).map_ok(Compat::new).boxed() } } + +// 's: stream +impl<'s> Stream<'s, Tokio> for Compat { + #[doc(hidden)] + type ReadFuture = Read<'s, Self>; + + #[doc(hidden)] + type WriteFuture = Write<'s, Self>; + + #[inline] + #[doc(hidden)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { + self.read(buf) + } + + #[inline] + #[doc(hidden)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { + self.write(buf) + } +} diff --git a/sqlx-mysql/src/connection.rs b/sqlx-mysql/src/connection.rs index 990a2619..f596a660 100644 --- a/sqlx-mysql/src/connection.rs +++ b/sqlx-mysql/src/connection.rs @@ -12,8 +12,11 @@ mod ping; mod stream; #[allow(clippy::module_name_repetitions)] -pub struct MySqlConnection { - stream: BufStream, +pub struct MySqlConnection +where + Rt: Runtime, +{ + stream: BufStream, connection_id: u32, // the capability flags are used by the client and server to indicate which @@ -25,7 +28,10 @@ pub struct MySqlConnection { sequence_id: u8, } -impl MySqlConnection { +impl MySqlConnection +where + Rt: Runtime, +{ pub(crate) fn new(stream: Rt::TcpStream) -> Self { Self { stream: BufStream::with_capacity(stream, 4096, 1024), @@ -48,20 +54,26 @@ impl MySqlConnection { } } -impl Debug for MySqlConnection { +impl Debug for MySqlConnection +where + Rt: Runtime, +{ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("MySqlConnection").finish() } } -impl Connection for MySqlConnection { +impl Connection for MySqlConnection +where + Rt: Runtime, +{ type Database = MySql; #[cfg(feature = "async")] fn ping(&mut self) -> futures_util::future::BoxFuture<'_, sqlx_core::Result<()>> where - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin, + Rt: sqlx_core::Async, + for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { Box::pin(self.ping_async()) } @@ -74,8 +86,8 @@ impl Connect for MySqlConnection { fn connect(url: &str) -> futures_util::future::BoxFuture<'_, sqlx_core::Result> where Self: Sized, - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin, + Rt: sqlx_core::Async, + for<'s> ::TcpStream: sqlx_core::io::Stream<'s, Rt>, { use sqlx_core::ConnectOptions; @@ -88,8 +100,8 @@ impl Close for MySqlConnection { #[cfg(feature = "async")] fn close(self) -> futures_util::future::BoxFuture<'static, sqlx_core::Result<()>> where - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin, + Rt: sqlx_core::Async, + for<'s> ::TcpStream: sqlx_core::io::Stream<'s, Rt>, { Box::pin(self.close_async()) } @@ -99,9 +111,11 @@ impl Close for MySqlConnection { impl sqlx_core::blocking::Connection for MySqlConnection where Rt: sqlx_core::blocking::Runtime, - ::TcpStream: std::io::Read + std::io::Write, { - fn ping(&mut self) -> sqlx_core::Result<()> { + fn ping(&mut self) -> sqlx_core::Result<()> + where + for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, + { >::ping(self) } } @@ -114,7 +128,7 @@ where fn connect(url: &str) -> sqlx_core::Result where Self: Sized, - ::TcpStream: std::io::Read + std::io::Write, + for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { Self::connect(&url.parse::>()?) } @@ -127,7 +141,7 @@ where { fn close(self) -> sqlx_core::Result<()> where - ::TcpStream: std::io::Read + std::io::Write, + for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { self.close() } diff --git a/sqlx-mysql/src/connection/close.rs b/sqlx-mysql/src/connection/close.rs index 80c43ca4..9eb3566a 100644 --- a/sqlx-mysql/src/connection/close.rs +++ b/sqlx-mysql/src/connection/close.rs @@ -9,8 +9,8 @@ where #[cfg(feature = "async")] pub(crate) async fn close_async(mut self) -> Result<()> where - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: futures_io::AsyncWrite + futures_io::AsyncRead + Unpin, + Rt: sqlx_core::Async, + for<'s> ::TcpStream: sqlx_core::io::Stream<'s, Rt>, { self.write_packet(&Quit)?; self.stream.flush_async().await?; @@ -21,7 +21,8 @@ where #[cfg(feature = "blocking")] pub(crate) fn close(mut self) -> Result<()> where - ::TcpStream: std::io::Write + std::io::Read, + Rt: sqlx_core::blocking::Runtime, + for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { self.write_packet(&Quit)?; self.stream.flush()?; diff --git a/sqlx-mysql/src/connection/connect.rs b/sqlx-mysql/src/connection/connect.rs index 34c135b7..04d953e4 100644 --- a/sqlx-mysql/src/connection/connect.rs +++ b/sqlx-mysql/src/connection/connect.rs @@ -11,7 +11,7 @@ //! //! https://dev.mysql.com/doc/internals/en/connection-phase.html //! -use sqlx_core::{Result, Runtime}; +use sqlx_core::Result; use crate::protocol::{Auth, AuthResponse, Handshake, HandshakeResponse}; use crate::{MySqlConnectOptions, MySqlConnection}; @@ -22,7 +22,7 @@ macro_rules! connect { }; (@tcp $options:ident) => { - Rt::connect_tcp($options.get_host(), $options.get_port()).await?; + Rt::connect_tcp_async($options.get_host(), $options.get_port()).await?; }; (@blocking @packet $self:ident) => { @@ -116,10 +116,13 @@ macro_rules! connect { #[cfg(feature = "async")] impl MySqlConnection where - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: Unpin + futures_io::AsyncWrite + futures_io::AsyncRead, + Rt: sqlx_core::Runtime, { - pub(crate) async fn connect_async(options: &MySqlConnectOptions) -> Result { + pub(crate) async fn connect_async(options: &MySqlConnectOptions) -> Result + where + Rt: sqlx_core::Async, + for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, + { connect!(options) } } @@ -128,9 +131,11 @@ where impl MySqlConnection where Rt: sqlx_core::blocking::Runtime, - ::TcpStream: std::io::Write + std::io::Read, { - pub(crate) fn connect(options: &MySqlConnectOptions) -> Result { + pub(crate) fn connect(options: &MySqlConnectOptions) -> Result + where + for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, + { connect!(@blocking options) } } diff --git a/sqlx-mysql/src/connection/ping.rs b/sqlx-mysql/src/connection/ping.rs index 601025ee..7e202702 100644 --- a/sqlx-mysql/src/connection/ping.rs +++ b/sqlx-mysql/src/connection/ping.rs @@ -13,8 +13,8 @@ where #[cfg(feature = "async")] pub(crate) async fn ping_async(&mut self) -> Result<()> where - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: futures_io::AsyncWrite + futures_io::AsyncRead + Unpin, + Rt: sqlx_core::Async, + for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { self.write_packet(&Ping)?; @@ -26,7 +26,7 @@ where #[cfg(feature = "blocking")] pub(crate) fn ping(&mut self) -> Result<()> where - ::TcpStream: std::io::Write + std::io::Read, + for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { self.write_packet(&Ping)?; diff --git a/sqlx-mysql/src/connection/stream.rs b/sqlx-mysql/src/connection/stream.rs index aefb506a..2061f4fd 100644 --- a/sqlx-mysql/src/connection/stream.rs +++ b/sqlx-mysql/src/connection/stream.rs @@ -128,12 +128,13 @@ macro_rules! read_packet { #[cfg(feature = "async")] impl MySqlConnection where - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: Unpin + futures_io::AsyncWrite + futures_io::AsyncRead, + Rt: Runtime, { pub(super) async fn read_packet_async<'de, T>(&'de mut self) -> Result where T: Deserialize<'de, Capabilities>, + Rt: sqlx_core::Async, + for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { read_packet!(self) } @@ -143,11 +144,11 @@ where impl MySqlConnection where Rt: Runtime, - ::TcpStream: std::io::Write + std::io::Read, { pub(super) fn read_packet<'de, T>(&'de mut self) -> Result where T: Deserialize<'de, Capabilities>, + for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { read_packet!(@blocking self) } diff --git a/sqlx-mysql/src/options.rs b/sqlx-mysql/src/options.rs index fd61fbd1..be1d1d9d 100644 --- a/sqlx-mysql/src/options.rs +++ b/sqlx-mysql/src/options.rs @@ -143,8 +143,8 @@ where fn connect(&self) -> futures_util::future::BoxFuture<'_, sqlx_core::Result> where Self::Connection: Sized, - Rt: sqlx_core::AsyncRuntime, - ::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin, + Rt: sqlx_core::Async, + for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { Box::pin(MySqlConnection::connect_async(self)) } @@ -154,11 +154,11 @@ where impl sqlx_core::blocking::ConnectOptions for MySqlConnectOptions where Rt: sqlx_core::blocking::Runtime, - ::TcpStream: std::io::Read + std::io::Write, { fn connect(&self) -> sqlx_core::Result where Self::Connection: Sized, + for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { >::connect(self) }