refactor: introduce io::Stream (blocking::io::Stream) to encapsulate needed IO methods

- rename sqlx::AsyncRuntime to sqlx::Async
 - don't set default runtime in traits
This commit is contained in:
Ryan Leckey 2021-01-10 10:45:29 -08:00
parent d2b31950cf
commit 769e8aa461
No known key found for this signature in database
GPG Key ID: F8AA68C235AB08C9
30 changed files with 463 additions and 216 deletions

View File

@ -1,9 +1,9 @@
#[cfg(feature = "async")]
use futures_util::future::BoxFuture;
use crate::{Database, DefaultRuntime, Runtime};
use crate::{Database, Runtime};
pub trait Acquire<Rt: Runtime = DefaultRuntime> {
pub trait Acquire<Rt: Runtime> {
type Database: Database<Rt>;
/// Get a connection from the pool, make a new connection, or wait for one to become

View File

@ -1,6 +1,9 @@
//! Types and traits used to implement a database driver with **blocking** I/O.
//!
use std::io::{Read, Result as IoResult, Write};
use std::net::TcpStream;
mod acquire;
mod close;
mod connect;
@ -8,19 +11,71 @@ mod connection;
mod options;
mod runtime;
#[doc(hidden)]
pub mod io;
pub use acquire::Acquire;
pub use close::Close;
pub use connect::Connect;
pub use connection::Connection;
pub use options::ConnectOptions;
pub use runtime::{Blocking, Runtime};
pub use runtime::Runtime;
pub mod prelude {
pub use super::Acquire as _;
pub use super::Close as _;
pub use super::Connect as _;
pub use super::ConnectOptions as _;
pub use super::Connection as _;
pub use super::Runtime as _;
pub use crate::Database as _;
pub use super::Acquire;
pub use super::Close;
pub use super::Connect;
pub use super::ConnectOptions;
pub use super::Connection;
pub use super::Runtime;
pub use crate::Database;
}
/// Uses the `std::net` primitives to implement a blocking runtime for SQLx.
#[derive(Debug)]
pub struct Blocking;
impl crate::Runtime for Blocking {
type TcpStream = TcpStream;
}
impl Runtime for Blocking {
fn connect_tcp(host: &str, port: u16) -> IoResult<Self::TcpStream> {
TcpStream::connect((host, port))
}
}
// 's: stream
impl<'s> crate::io::Stream<'s, Blocking> for TcpStream {
#[cfg(feature = "async")]
type ReadFuture = futures_util::future::BoxFuture<'s, IoResult<usize>>;
#[cfg(feature = "async")]
type WriteFuture = futures_util::future::BoxFuture<'s, IoResult<usize>>;
#[cfg(feature = "async")]
fn read_async(&'s mut self, _buf: &'s mut [u8]) -> Self::ReadFuture {
// UNREACHABLE: [`Blocking`] does not implement the [`Async`] marker
unreachable!()
}
#[cfg(feature = "async")]
fn write_async(&'s mut self, _buf: &'s [u8]) -> Self::WriteFuture {
// UNREACHABLE: [`Blocking`] does not implement the [`Async`] marker
unreachable!()
}
}
// 's: stream
impl<'s> io::Stream<'s, Blocking> for TcpStream {
fn read(&'s mut self, buf: &'s mut [u8]) -> IoResult<usize> {
Read::read(self, buf)
}
fn write(&'s mut self, buf: &'s [u8]) -> IoResult<usize> {
let size = buf.len();
self.write_all(buf)?;
Ok(size)
}
}

View File

@ -1,7 +1,10 @@
use super::{Blocking, Runtime};
use super::Runtime;
use crate::Database;
pub trait Acquire<Rt: Runtime = Blocking>: crate::Acquire<Rt> {
pub trait Acquire<Rt>: crate::Acquire<Rt>
where
Rt: Runtime,
{
/// Get a connection from the pool, make a new connection, or wait for one to become
/// available.
///

View File

@ -1,11 +1,12 @@
use std::io;
use super::{io::Stream, Runtime};
use super::{Blocking, Runtime};
pub trait Close<Rt: Runtime = Blocking>: crate::Close<Rt> {
pub trait Close<Rt>: crate::Close<Rt>
where
Rt: Runtime,
{
fn close(self) -> crate::Result<()>
where
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write;
for<'s> <Rt as crate::Runtime>::TcpStream: Stream<'s, Rt>;
}
// TODO: impl Close for Pool { ... }

View File

@ -1,12 +1,13 @@
use std::io;
use super::{io::Stream, Runtime};
use super::{Blocking, Runtime};
pub trait Connect<Rt: Runtime = Blocking>: crate::Connect<Rt> {
pub trait Connect<Rt>: crate::Connect<Rt>
where
Rt: Runtime,
{
fn connect(url: &str) -> crate::Result<Self>
where
Self: Sized,
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write;
for<'s> <Rt as crate::Runtime>::TcpStream: Stream<'s, Rt>;
}
// TODO: impl Connect for Pool { ... }

View File

@ -1,17 +1,13 @@
use std::io;
use super::{Blocking, Close, Connect, ConnectOptions, Runtime};
use super::{io::Stream, Close, Connect, ConnectOptions, Runtime};
/// A unique connection (session) with a specific database.
///
/// For detailed information, refer to the asynchronous version of
/// this: [`Connection`][crate::Connection].
///
pub trait Connection<Rt: Runtime = Blocking>:
crate::Connection<Rt> + Close<Rt> + Connect<Rt>
pub trait Connection<Rt>: crate::Connection<Rt> + Close<Rt> + Connect<Rt>
where
Rt: Runtime,
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write,
Self::Options: ConnectOptions<Rt>,
{
/// Checks if a connection to the database is still valid.
@ -19,5 +15,7 @@ where
/// For detailed information, refer to the asynchronous version of
/// this: [`ping()`][crate::Connection::ping].
///
fn ping(&mut self) -> crate::Result<()>;
fn ping(&mut self) -> crate::Result<()>
where
for<'s> <Rt as crate::Runtime>::TcpStream: Stream<'s, Rt>;
}

View File

@ -0,0 +1,15 @@
use std::io;
use crate::Runtime;
// 's: stream
pub trait Stream<'s, Rt>: crate::io::Stream<'s, Rt>
where
Rt: Runtime,
{
#[doc(hidden)]
fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result<usize>;
#[doc(hidden)]
fn write(&'s mut self, buf: &'s [u8]) -> io::Result<usize>;
}

View File

@ -1,6 +1,4 @@
use std::io;
use super::{Blocking, Connection, Runtime};
use super::{io::Stream, Connection, Runtime};
/// Options which can be used to configure how a SQL connection is opened.
///
@ -8,9 +6,9 @@ use super::{Blocking, Connection, Runtime};
/// this: [`ConnectOptions`][crate::ConnectOptions].
///
#[allow(clippy::module_name_repetitions)]
pub trait ConnectOptions<Rt: Runtime = Blocking>: crate::ConnectOptions<Rt>
pub trait ConnectOptions<Rt>: crate::ConnectOptions<Rt>
where
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write,
Rt: Runtime,
Self::Connection: crate::Connection<Rt, Options = Self> + Connection<Rt>,
{
/// Establish a connection to the database.
@ -20,5 +18,6 @@ where
///
fn connect(&self) -> crate::Result<Self::Connection>
where
Self::Connection: Sized;
Self::Connection: Sized,
for<'s> <Rt as crate::Runtime>::TcpStream: Stream<'s, Rt>;
}

View File

@ -1,5 +1,4 @@
use std::io;
use std::net::TcpStream;
/// Describes a set of types and functions used to open and manage
/// resources within SQLx.
@ -11,17 +10,3 @@ pub trait Runtime: crate::Runtime {
/// Opens a TCP connection to a remote host at the specified port.
fn connect_tcp(host: &str, port: u16) -> io::Result<Self::TcpStream>;
}
/// Uses the `std::net` primitives to implement a blocking runtime for SQLx.
#[derive(Debug)]
pub struct Blocking;
impl crate::Runtime for Blocking {
type TcpStream = TcpStream;
}
impl Runtime for Blocking {
fn connect_tcp(host: &str, port: u16) -> io::Result<Self::TcpStream> {
TcpStream::connect((host, port))
}
}

View File

@ -1,11 +1,15 @@
use crate::{DefaultRuntime, Runtime};
use crate::Runtime;
pub trait Close<Rt: Runtime = DefaultRuntime> {
// for<'a> &'a mut Rt::TcpStream: crate::io::Stream<'a>,
pub trait Close<Rt>
where
Rt: Runtime,
{
#[cfg(feature = "async")]
fn close(self) -> futures_util::future::BoxFuture<'static, crate::Result<()>>
where
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin;
Rt: crate::Async,
for<'s> <Rt as Runtime>::TcpStream: crate::io::Stream<'s, Rt>;
}
// TODO: impl Close for Pool { ... }

View File

@ -1,14 +1,17 @@
use crate::{ConnectOptions, DefaultRuntime, Runtime};
use crate::{ConnectOptions, Runtime};
pub trait Connect<Rt: Runtime = DefaultRuntime> {
pub trait Connect<Rt>
where
Rt: Runtime,
{
type Options: ConnectOptions<Rt, Connection = Self>;
#[cfg(feature = "async")]
fn connect(url: &str) -> futures_util::future::BoxFuture<'_, crate::Result<Self>>
where
Self: Sized,
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin;
Rt: crate::Async,
for<'s> <Rt as Runtime>::TcpStream: crate::io::Stream<'s, Rt>;
}
// TODO: impl Connect for Pool { ... }

View File

@ -1,7 +1,7 @@
#[cfg(feature = "async")]
use futures_util::future::BoxFuture;
use crate::{Close, Connect, Database, DefaultRuntime, Runtime};
use crate::{Close, Connect, Database, Runtime};
/// A unique connection (session) with a specific database.
///
@ -11,8 +11,10 @@ use crate::{Close, Connect, Database, DefaultRuntime, Runtime};
/// SQL statements will be executed and results returned within the context
/// of this single SQL connection.
///
pub trait Connection<Rt: Runtime = DefaultRuntime>:
'static + Send + Connect<Rt> + Close<Rt>
// for<'a> &'a mut Rt::TcpStream: crate::io::Stream<'a>,
pub trait Connection<Rt>: 'static + Send + Connect<Rt> + Close<Rt>
where
Rt: Runtime,
{
type Database: Database<Rt, Connection = Self>;
@ -26,6 +28,6 @@ pub trait Connection<Rt: Runtime = DefaultRuntime>:
#[cfg(feature = "async")]
fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>>
where
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin;
Rt: crate::Async,
for<'s> <Rt as Runtime>::TcpStream: crate::io::Stream<'s, Rt>;
}

View File

@ -1,14 +1,13 @@
use std::fmt::Debug;
use crate::{Connection, DefaultRuntime, Runtime};
use crate::{Connection, Runtime};
/// A database driver.
///
/// This trait encapsulates a complete set of traits that implement a driver for a
/// specific database (e.g., MySQL, PostgreSQL).
///
// 'x: execution
pub trait Database<Rt = DefaultRuntime>: 'static + Sized + Debug + for<'x> HasOutput<'x>
pub trait Database<Rt>: 'static + Sized + Debug + for<'x> HasOutput<'x>
where
Rt: Runtime,
{

View File

@ -6,6 +6,7 @@ mod database;
pub use database::DatabaseError;
/// `Result` type returned from methods that can have SQLx errors.
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]

View File

@ -2,10 +2,12 @@ mod buf;
mod buf_stream;
mod deserialize;
mod serialize;
mod stream;
mod write;
pub use buf::BufExt;
pub use buf_stream::BufStream;
pub use deserialize::Deserialize;
pub use serialize::Serialize;
pub use stream::Stream;
pub use write::WriteExt;

View File

@ -1,11 +1,6 @@
#[cfg(feature = "blocking")]
use std::io::{Read, Write};
use std::marker::PhantomData;
use bytes::{Bytes, BytesMut};
#[cfg(feature = "async")]
use futures_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "async")]
use futures_util::{AsyncReadExt, AsyncWriteExt};
/// Wraps a stream and buffers input and output to and from it.
///
@ -14,7 +9,9 @@ use futures_util::{AsyncReadExt, AsyncWriteExt};
/// a network interaction). `BufStream` keeps a read and write buffer with infrequent calls
/// to `read` and `write` on the underlying stream.
///
pub struct BufStream<S> {
pub struct BufStream<Rt, S> {
runtime: PhantomData<Rt>,
#[cfg_attr(not(any(feature = "async", feature = "blocking")), allow(unused))]
stream: S,
@ -29,10 +26,11 @@ pub struct BufStream<S> {
wbuf_offset: usize,
}
impl<S> BufStream<S> {
impl<Rt, S> BufStream<Rt, S> {
pub fn with_capacity(stream: S, read: usize, write: usize) -> Self {
Self {
stream,
runtime: PhantomData,
rbuf: BytesMut::with_capacity(read),
wbuf: Vec::with_capacity(write),
#[cfg(feature = "async")]
@ -116,21 +114,18 @@ macro_rules! read {
};
(@read $self:ident, $b:ident) => {
$self.stream.read($b).await?;
$self.stream.read_async($b).await?;
};
}
#[cfg(feature = "async")]
impl<S> BufStream<S>
where
S: AsyncRead + AsyncWrite + Send + Unpin,
{
impl<Rt: crate::Runtime, S: for<'s> crate::io::Stream<'s, Rt>> BufStream<Rt, S> {
pub async fn flush_async(&mut self) -> crate::Result<()> {
// write as much as we can each time and move the cursor as we write from the buffer
// if _this_ future drops, offset will have a record of how much of the wbuf has
// been written
while self.wbuf_offset < self.wbuf.len() {
self.wbuf_offset += self.stream.write(&self.wbuf[self.wbuf_offset..]).await?;
self.wbuf_offset += self.stream.write_async(&self.wbuf[self.wbuf_offset..]).await?;
}
// fully written buffer, move cursor back to the beginning
@ -146,12 +141,9 @@ where
}
#[cfg(feature = "blocking")]
impl<S> BufStream<S>
where
S: Read + Write,
{
impl<Rt: crate::Runtime, S: for<'s> crate::blocking::io::Stream<'s, Rt>> BufStream<Rt, S> {
pub fn flush(&mut self) -> crate::Result<()> {
self.stream.write_all(&self.wbuf)?;
self.stream.write(&self.wbuf)?;
self.wbuf.clear();
Ok(())

View File

@ -0,0 +1,18 @@
use crate::Runtime;
// 's: stream
pub trait Stream<'s, Rt: Runtime>: Send + Sync + Unpin {
#[cfg(feature = "async")]
type ReadFuture: 's + std::future::Future<Output = std::io::Result<usize>> + Send;
#[cfg(feature = "async")]
type WriteFuture: 's + std::future::Future<Output = std::io::Result<usize>> + Send;
#[cfg(feature = "async")]
#[doc(hidden)]
fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture;
#[cfg(feature = "async")]
#[doc(hidden)]
fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture;
}

View File

@ -17,15 +17,6 @@
#![allow(clippy::doc_markdown)]
#![allow(clippy::clippy::missing_errors_doc)]
// crate renames to allow the feature name "tokio" and "async-std" (as features
// can't directly conflict with dependency names)
#[cfg(feature = "async-std")]
extern crate _async_std as async_std;
#[cfg(feature = "tokio")]
extern crate _tokio as tokio;
mod acquire;
mod close;
mod connect;
@ -43,6 +34,8 @@ pub mod io;
pub mod blocking;
pub use acquire::Acquire;
#[cfg(feature = "blocking")]
pub use blocking::Blocking;
pub use close::Close;
pub use connect::Connect;
pub use connection::Connection;
@ -52,57 +45,23 @@ pub use options::ConnectOptions;
pub use pool::Pool;
#[cfg(feature = "actix")]
pub use runtime::Actix;
#[cfg(feature = "async")]
pub use runtime::AsyncRuntime;
#[cfg(feature = "async-std")]
pub use runtime::AsyncStd;
pub use runtime::Runtime;
// #[cfg(feature = "_mock")]
// pub use mock::Mock;
#[cfg(feature = "tokio")]
pub use runtime::Tokio;
#[cfg(feature = "_mock")]
#[doc(hidden)]
pub use runtime::{mock, Mock};
// pick a default runtime
// this is so existing applications in SQLx pre 0.6 work and to
// make it more convenient, if your application only uses 1 runtime (99%+)
// most of the time you won't have to worry about picking the runtime
#[cfg(feature = "async-std")]
pub type DefaultRuntime = AsyncStd;
#[cfg(all(not(feature = "async-std"), feature = "tokio"))]
pub type DefaultRuntime = Tokio;
#[cfg(all(not(all(feature = "async-std", feature = "tokio")), feature = "actix"))]
pub type DefaultRuntime = Actix;
#[cfg(all(
not(any(feature = "async-std", feature = "tokio", feature = "actix")),
feature = "blocking"
))]
pub type DefaultRuntime = blocking::Blocking;
// when there is no async runtime, and the blocking runtime is not present
// the unit type is implemented for Runtime, this is only to allow the
// lib to compile, the lib is mostly useless in this state
#[cfg(not(any(
feature = "async-std",
feature = "actix",
feature = "tokio",
feature = "blocking"
)))]
pub type DefaultRuntime = ();
pub use runtime::{Async, DefaultRuntime, Runtime};
#[cfg(any(feature = "async-std", feature = "tokio", feature = "actix"))]
pub mod prelude {
pub use super::Acquire as _;
pub use super::Close as _;
pub use super::Connect as _;
pub use super::ConnectOptions as _;
pub use super::Connection as _;
pub use super::Database as _;
pub use super::Runtime as _;
pub use super::Acquire;
pub use super::Close;
pub use super::Connect;
pub use super::ConnectOptions;
pub use super::Connection;
pub use super::Database;
pub use super::Runtime;
}
#[cfg(all(

View File

@ -1,11 +1,11 @@
use std::fmt::Debug;
use std::str::FromStr;
use crate::{Connection, DefaultRuntime, Runtime};
use crate::{Connection, Runtime};
/// Options which can be used to configure how a SQL connection is opened.
#[allow(clippy::module_name_repetitions)]
pub trait ConnectOptions<Rt = DefaultRuntime>:
pub trait ConnectOptions<Rt>:
'static + Sized + Send + Sync + Default + Debug + Clone + FromStr<Err = crate::Error>
where
Rt: Runtime,
@ -17,6 +17,6 @@ where
fn connect(&self) -> futures_util::future::BoxFuture<'_, crate::Result<Self::Connection>>
where
Self::Connection: Sized,
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin;
Rt: crate::Async,
for<'s> Rt::TcpStream: crate::io::Stream<'s, Rt>;
}

View File

@ -3,7 +3,7 @@ use std::marker::PhantomData;
use crate::{Database, DefaultRuntime, Runtime};
/// A connection pool to enable the efficient reuse of a managed pool of SQL connections.
pub struct Pool<Db: Database, Rt: Runtime = DefaultRuntime> {
pub struct Pool<Db: Database<Rt>, Rt: Runtime = DefaultRuntime> {
runtime: PhantomData<Rt>,
database: PhantomData<Db>,
}

View File

@ -1,6 +1,8 @@
#[cfg(feature = "_mock")]
#[doc(hidden)]
pub mod mock;
use crate::io::Stream;
// #[cfg(feature = "_mock")]
// #[doc(hidden)]
// pub mod mock;
#[cfg(feature = "async-std")]
#[path = "runtime/async_std.rs"]
@ -18,31 +20,62 @@ mod tokio_;
pub use actix_::Actix;
#[cfg(feature = "async-std")]
pub use async_std_::AsyncStd;
#[cfg(feature = "_mock")]
pub use mock::Mock;
// #[cfg(feature = "_mock")]
// pub use mock::Mock;
#[cfg(feature = "tokio")]
pub use tokio_::Tokio;
/// Describes a set of types and functions used to open and manage
/// resources within SQLx.
pub trait Runtime: 'static + Send + Sync {
type TcpStream: Send;
/// Describes a set of types and functions used to open and manage IO resources within SQLx.
///
/// In the greater ecosystem we have several choices for an asynchronous runtime (executor) to
/// schedule and interact with our futures. Libraries that wish to be generally available have
/// tended to either pick one (and allow compatibility wrappers to others) or use mutually-exclusive
/// cargo feature flags to pick between runtimes. Each of these approaches have their own
/// problems.
///
/// In SQLx, most types and traits are parameterized with a `Rt: Runtime` bound. Asynchronous
/// implementations of `Runtime` are available for [**async-std**](https://async.rs/),
/// [**Tokio**](https://tokio.rs/), and [**Actix**](https://actix.rs/) (given
/// those crate features are activated).
///
/// - [`AsyncStd`]
/// - [`Tokio`]
/// - [`Actix`]
///
/// Additionally, a `std` blocking runtime is provided. This is intended for use in
/// environments where asynchronous IO either doesn't make sense or isn't available.
///
/// - [`Blocking`][crate::blocking::Blocking]
///
pub trait Runtime: 'static + Send + Sync + Sized {
#[doc(hidden)]
type TcpStream: 'static + Send + Sync;
}
#[cfg(feature = "async")]
#[allow(clippy::module_name_repetitions)]
pub trait AsyncRuntime: Runtime {
/// Opens a TCP connection to a remote host at the specified port.
fn connect_tcp(
/// Marks a [`Runtime`] as being capable of handling asynchronous execution.
// Provided so that attempting to use the asynchronous methods with the
// Blocking runtime will error at compile-time as opposed to runtime.
pub trait Async: Runtime
where
// NOTE: This requires a **pervasive** bound for everything that needs
// to bound on <Runtime>. Remove if you can think of something else that
// allows us to both allow polymorphic read/write on streams across
// runtimes _and_ not depend on <BoxFuture>. When GATs are stabilized,
// we should be able to switch to that and remove this HRTB.
Self::TcpStream: for<'s> Stream<'s, Self>,
{
#[cfg(feature = "async")]
#[doc(hidden)]
fn connect_tcp_async(
host: &str,
port: u16,
) -> futures_util::future::BoxFuture<'_, std::io::Result<Self::TcpStream>>;
}
// when the async feature is not specified, this is an empty trait
// when no runtime is available
// we implement `()` for it to allow the lib to still compile
#[cfg(not(any(
feature = "async_std",
feature = "async-std",
feature = "actix",
feature = "tokio",
feature = "blocking"
@ -50,3 +83,67 @@ pub trait AsyncRuntime: Runtime {
impl Runtime for () {
type TcpStream = ();
}
mod default {
// pick a default runtime
// this is so existing applications in SQLx pre 0.6 work and to
// make it more convenient, if your application only uses 1 runtime (99%+)
// most of the time you won't have to worry about picking the runtime
#[cfg(feature = "async-std")]
pub type Runtime = super::AsyncStd;
#[cfg(all(not(feature = "async-std"), feature = "tokio"))]
pub type Runtime = super::Tokio;
#[cfg(all(not(all(feature = "async-std", feature = "tokio")), feature = "actix"))]
pub type Runtime = super::Actix;
#[cfg(all(
not(any(feature = "async-std", feature = "tokio", feature = "actix")),
feature = "blocking"
))]
pub type Runtime = crate::Blocking;
// when there is no async runtime, and the blocking runtime is not present
// the unit type is implemented for Runtime, this is only to allow the
// lib to compile, the lib is mostly useless in this state
#[cfg(not(any(
feature = "async-std",
feature = "actix",
feature = "tokio",
feature = "blocking"
)))]
pub type Runtime = ();
}
/// The default runtime in use by SQLx when one is unspecified.
///
/// Following the crate features for each runtime are activated, a default is picked
/// by following a priority list. The actual sorting here is mostly arbitrary (what is
/// important is that there _is_ a stable ordering).
///
/// 1. [`AsyncStd`]
/// 2. [`Tokio`]
/// 3. [`Actix`]
/// 4. [`Blocking`]
/// 5. `()` No runtime selected (nothing is possible)
///
/// The intent is to allow the following to cleanly work, regardless of the enabled runtime,
/// if only one runtime is enabled.
///
/// <br>
///
/// ```rust,ignore
/// use sqlx::postgres::{PgConnection, PgConnectOptions};
/// use sqlx::prelude::*;
///
/// // PgConnection<Rt = sqlx::DefaultRuntime>
/// let conn: PgConnection = PgConnectOptions::new()
/// .host("localhost")
/// .username("postgres")
/// .password("password")
/// // .connect()?; // for Blocking runtime
/// .connect().await?; // for Async runtimes
/// ```
///
pub type DefaultRuntime = default::Runtime;

View File

@ -2,9 +2,10 @@ use std::io;
use actix_rt::net::TcpStream;
use async_compat_02::Compat;
use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
use futures_util::io::{Read, Write};
use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt};
use crate::{AsyncRuntime, Runtime};
use crate::{io::Stream, Async, Runtime};
/// Actix SQLx runtime. Uses [`actix-rt`][actix_rt] to provide [`Runtime`].
///
@ -17,14 +18,36 @@ use crate::{AsyncRuntime, Runtime};
pub struct Actix;
impl Runtime for Actix {
// NOTE: Compat<_> is used to avoid requiring a Box per read/write call
// https://github.com/tokio-rs/tokio/issues/2723
#[doc(hidden)]
type TcpStream = Compat<TcpStream>;
}
impl AsyncRuntime for Actix
where
Self::TcpStream: futures_io::AsyncRead,
{
fn connect_tcp(host: &str, port: u16) -> BoxFuture<'_, io::Result<Self::TcpStream>> {
impl Async for Actix {
#[doc(hidden)]
fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result<Self::TcpStream>> {
TcpStream::connect((host, port)).map_ok(Compat::new).boxed()
}
}
// 's: stream
impl<'s> Stream<'s, Actix> for Compat<TcpStream> {
#[doc(hidden)]
type ReadFuture = Read<'s, Self>;
#[doc(hidden)]
type WriteFuture = Write<'s, Self>;
#[inline]
#[doc(hidden)]
fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture {
self.read(buf)
}
#[inline]
#[doc(hidden)]
fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture {
self.write(buf)
}
}

View File

@ -1,28 +1,71 @@
use std::io;
use _async_std::net::TcpStream;
use futures_util::io::{Read, Write};
use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt};
use async_std::net::TcpStream;
use futures_util::{future::BoxFuture, FutureExt};
#[cfg(feature = "blocking")]
use crate::blocking;
use crate::{io::Stream, Async, Runtime};
use crate::{AsyncRuntime, Runtime};
/// [`async-std`](async_std) implementation of [`Runtime`].
/// Provides [`Runtime`] for [**async-std**][_async_std]. Supports both blocking
/// and non-blocking operation.
///
/// For blocking operation, the equivalent non-blocking methods are called
/// and trivially wrapped in [`task::block_on`][_async_std::task::block_on].
///
#[cfg_attr(doc_cfg, doc(cfg(feature = "async-std")))]
#[derive(Debug)]
pub struct AsyncStd;
impl Runtime for AsyncStd {
#[doc(hidden)]
type TcpStream = TcpStream;
}
impl AsyncRuntime for AsyncStd {
fn connect_tcp(host: &str, port: u16) -> BoxFuture<'_, io::Result<Self::TcpStream>> {
impl Async for AsyncStd {
#[doc(hidden)]
fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, std::io::Result<Self::TcpStream>> {
TcpStream::connect((host, port)).boxed()
}
}
#[cfg(feature = "blocking")]
impl crate::blocking::Runtime for AsyncStd {
fn connect_tcp(host: &str, port: u16) -> io::Result<Self::TcpStream> {
async_std::task::block_on(<AsyncStd as AsyncRuntime>::connect_tcp(host, port))
impl blocking::Runtime for AsyncStd {
fn connect_tcp(host: &str, port: u16) -> std::io::Result<Self::TcpStream> {
_async_std::task::block_on(Self::connect_tcp_async(host, port))
}
}
// 's: stream
impl<'s> Stream<'s, AsyncStd> for TcpStream {
#[doc(hidden)]
type ReadFuture = Read<'s, Self>;
#[doc(hidden)]
type WriteFuture = Write<'s, Self>;
#[inline]
#[doc(hidden)]
fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture {
self.read(buf)
}
#[inline]
#[doc(hidden)]
fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture {
self.write(buf)
}
}
// 's: stream
#[cfg(feature = "blocking")]
impl<'s> blocking::io::Stream<'s, AsyncStd> for TcpStream {
#[doc(hidden)]
fn read(&'s mut self, buf: &'s mut [u8]) -> std::io::Result<usize> {
_async_std::task::block_on(self.read_async(buf))
}
#[doc(hidden)]
fn write(&'s mut self, buf: &'s [u8]) -> std::io::Result<usize> {
_async_std::task::block_on(self.write_async(buf))
}
}

View File

@ -1,10 +1,11 @@
use std::io;
use _tokio::net::TcpStream;
use async_compat::Compat;
use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
use tokio::net::TcpStream;
use futures_util::io::{Read, Write};
use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt};
use crate::{AsyncRuntime, Runtime};
use crate::{io::Stream, Async, Runtime};
/// Tokio SQLx runtime. Uses [`tokio`] to provide [`Runtime`].
///
@ -15,11 +16,36 @@ use crate::{AsyncRuntime, Runtime};
pub struct Tokio;
impl Runtime for Tokio {
// NOTE: Compat<_> is used to avoid requiring a Box per read/write call
// https://github.com/tokio-rs/tokio/issues/2723
#[doc(hidden)]
type TcpStream = Compat<TcpStream>;
}
impl AsyncRuntime for Tokio {
fn connect_tcp(host: &str, port: u16) -> BoxFuture<'_, io::Result<Self::TcpStream>> {
impl Async for Tokio {
#[doc(hidden)]
fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result<Self::TcpStream>> {
TcpStream::connect((host, port)).map_ok(Compat::new).boxed()
}
}
// 's: stream
impl<'s> Stream<'s, Tokio> for Compat<TcpStream> {
#[doc(hidden)]
type ReadFuture = Read<'s, Self>;
#[doc(hidden)]
type WriteFuture = Write<'s, Self>;
#[inline]
#[doc(hidden)]
fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture {
self.read(buf)
}
#[inline]
#[doc(hidden)]
fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture {
self.write(buf)
}
}

View File

@ -12,8 +12,11 @@ mod ping;
mod stream;
#[allow(clippy::module_name_repetitions)]
pub struct MySqlConnection<Rt: Runtime = DefaultRuntime> {
stream: BufStream<Rt::TcpStream>,
pub struct MySqlConnection<Rt = DefaultRuntime>
where
Rt: Runtime,
{
stream: BufStream<Rt, Rt::TcpStream>,
connection_id: u32,
// the capability flags are used by the client and server to indicate which
@ -25,7 +28,10 @@ pub struct MySqlConnection<Rt: Runtime = DefaultRuntime> {
sequence_id: u8,
}
impl<Rt: Runtime> MySqlConnection<Rt> {
impl<Rt> MySqlConnection<Rt>
where
Rt: Runtime,
{
pub(crate) fn new(stream: Rt::TcpStream) -> Self {
Self {
stream: BufStream::with_capacity(stream, 4096, 1024),
@ -48,20 +54,26 @@ impl<Rt: Runtime> MySqlConnection<Rt> {
}
}
impl<Rt: Runtime> Debug for MySqlConnection<Rt> {
impl<Rt> Debug for MySqlConnection<Rt>
where
Rt: Runtime,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("MySqlConnection").finish()
}
}
impl<Rt: Runtime> Connection<Rt> for MySqlConnection<Rt> {
impl<Rt> Connection<Rt> for MySqlConnection<Rt>
where
Rt: Runtime,
{
type Database = MySql;
#[cfg(feature = "async")]
fn ping(&mut self) -> futures_util::future::BoxFuture<'_, sqlx_core::Result<()>>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
Rt: sqlx_core::Async,
for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
Box::pin(self.ping_async())
}
@ -74,8 +86,8 @@ impl<Rt: Runtime> Connect<Rt> for MySqlConnection<Rt> {
fn connect(url: &str) -> futures_util::future::BoxFuture<'_, sqlx_core::Result<Self>>
where
Self: Sized,
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
Rt: sqlx_core::Async,
for<'s> <Rt as Runtime>::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
use sqlx_core::ConnectOptions;
@ -88,8 +100,8 @@ impl<Rt: Runtime> Close<Rt> for MySqlConnection<Rt> {
#[cfg(feature = "async")]
fn close(self) -> futures_util::future::BoxFuture<'static, sqlx_core::Result<()>>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
Rt: sqlx_core::Async,
for<'s> <Rt as Runtime>::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
Box::pin(self.close_async())
}
@ -99,9 +111,11 @@ impl<Rt: Runtime> Close<Rt> for MySqlConnection<Rt> {
impl<Rt> sqlx_core::blocking::Connection<Rt> for MySqlConnection<Rt>
where
Rt: sqlx_core::blocking::Runtime,
<Rt as Runtime>::TcpStream: std::io::Read + std::io::Write,
{
fn ping(&mut self) -> sqlx_core::Result<()> {
fn ping(&mut self) -> sqlx_core::Result<()>
where
for<'s> <Rt as Runtime>::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
<MySqlConnection<Rt>>::ping(self)
}
}
@ -114,7 +128,7 @@ where
fn connect(url: &str) -> sqlx_core::Result<Self>
where
Self: Sized,
<Rt as Runtime>::TcpStream: std::io::Read + std::io::Write,
for<'s> <Rt as Runtime>::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
Self::connect(&url.parse::<MySqlConnectOptions<Rt>>()?)
}
@ -127,7 +141,7 @@ where
{
fn close(self) -> sqlx_core::Result<()>
where
<Rt as Runtime>::TcpStream: std::io::Read + std::io::Write,
for<'s> <Rt as Runtime>::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
self.close()
}

View File

@ -9,8 +9,8 @@ where
#[cfg(feature = "async")]
pub(crate) async fn close_async(mut self) -> Result<()>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncWrite + futures_io::AsyncRead + Unpin,
Rt: sqlx_core::Async,
for<'s> <Rt as Runtime>::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
self.write_packet(&Quit)?;
self.stream.flush_async().await?;
@ -21,7 +21,8 @@ where
#[cfg(feature = "blocking")]
pub(crate) fn close(mut self) -> Result<()>
where
<Rt as Runtime>::TcpStream: std::io::Write + std::io::Read,
Rt: sqlx_core::blocking::Runtime,
for<'s> <Rt as Runtime>::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
self.write_packet(&Quit)?;
self.stream.flush()?;

View File

@ -11,7 +11,7 @@
//!
//! https://dev.mysql.com/doc/internals/en/connection-phase.html
//!
use sqlx_core::{Result, Runtime};
use sqlx_core::Result;
use crate::protocol::{Auth, AuthResponse, Handshake, HandshakeResponse};
use crate::{MySqlConnectOptions, MySqlConnection};
@ -22,7 +22,7 @@ macro_rules! connect {
};
(@tcp $options:ident) => {
Rt::connect_tcp($options.get_host(), $options.get_port()).await?;
Rt::connect_tcp_async($options.get_host(), $options.get_port()).await?;
};
(@blocking @packet $self:ident) => {
@ -116,10 +116,13 @@ macro_rules! connect {
#[cfg(feature = "async")]
impl<Rt> MySqlConnection<Rt>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: Unpin + futures_io::AsyncWrite + futures_io::AsyncRead,
Rt: sqlx_core::Runtime,
{
pub(crate) async fn connect_async(options: &MySqlConnectOptions<Rt>) -> Result<Self> {
pub(crate) async fn connect_async(options: &MySqlConnectOptions<Rt>) -> Result<Self>
where
Rt: sqlx_core::Async,
for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
connect!(options)
}
}
@ -128,9 +131,11 @@ where
impl<Rt> MySqlConnection<Rt>
where
Rt: sqlx_core::blocking::Runtime,
<Rt as Runtime>::TcpStream: std::io::Write + std::io::Read,
{
pub(crate) fn connect(options: &MySqlConnectOptions<Rt>) -> Result<Self> {
pub(crate) fn connect(options: &MySqlConnectOptions<Rt>) -> Result<Self>
where
for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
connect!(@blocking options)
}
}

View File

@ -13,8 +13,8 @@ where
#[cfg(feature = "async")]
pub(crate) async fn ping_async(&mut self) -> Result<()>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncWrite + futures_io::AsyncRead + Unpin,
Rt: sqlx_core::Async,
for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
self.write_packet(&Ping)?;
@ -26,7 +26,7 @@ where
#[cfg(feature = "blocking")]
pub(crate) fn ping(&mut self) -> Result<()>
where
<Rt as Runtime>::TcpStream: std::io::Write + std::io::Read,
for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
self.write_packet(&Ping)?;

View File

@ -128,12 +128,13 @@ macro_rules! read_packet {
#[cfg(feature = "async")]
impl<Rt> MySqlConnection<Rt>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: Unpin + futures_io::AsyncWrite + futures_io::AsyncRead,
Rt: Runtime,
{
pub(super) async fn read_packet_async<'de, T>(&'de mut self) -> Result<T>
where
T: Deserialize<'de, Capabilities>,
Rt: sqlx_core::Async,
for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
read_packet!(self)
}
@ -143,11 +144,11 @@ where
impl<Rt> MySqlConnection<Rt>
where
Rt: Runtime,
<Rt as Runtime>::TcpStream: std::io::Write + std::io::Read,
{
pub(super) fn read_packet<'de, T>(&'de mut self) -> Result<T>
where
T: Deserialize<'de, Capabilities>,
for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
read_packet!(@blocking self)
}

View File

@ -143,8 +143,8 @@ where
fn connect(&self) -> futures_util::future::BoxFuture<'_, sqlx_core::Result<Self::Connection>>
where
Self::Connection: Sized,
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
Rt: sqlx_core::Async,
for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>,
{
Box::pin(MySqlConnection::connect_async(self))
}
@ -154,11 +154,11 @@ where
impl<Rt> sqlx_core::blocking::ConnectOptions<Rt> for MySqlConnectOptions<Rt>
where
Rt: sqlx_core::blocking::Runtime,
<Rt as Runtime>::TcpStream: std::io::Read + std::io::Write,
{
fn connect(&self) -> sqlx_core::Result<Self::Connection>
where
Self::Connection: Sized,
for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>,
{
<MySqlConnection<Rt>>::connect(self)
}