refactor: split <Connection> into <Connect>, <Close>, <Acquire>, and <Connection>

This commit is contained in:
Ryan Leckey 2021-01-09 16:37:36 -08:00
parent e0e5b76f79
commit d2b31950cf
No known key found for this signature in database
GPG Key ID: F8AA68C235AB08C9
17 changed files with 250 additions and 1040 deletions

920
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,11 +8,11 @@ authors = [
]
[dependencies]
actix-web = "3.3.2"
#actix-web = "3.3.2"
anyhow = "1.0.36"
async-std = { version = "1.8.0", features = ["attributes"] }
#async-std = { version = "1.8.0", features = ["attributes"] }
tokio = { version = "1.0.1", features = ["rt", "rt-multi-thread", "macros"] }
log = "0.4"
env_logger = "0.8.2"
#sqlx = { path = "../../sqlx", features = ["tokio", "mysql"] }
sqlx = { path = "../../sqlx", features = ["blocking", "mysql"] }
sqlx = { path = "../../sqlx", features = ["blocking", "tokio", "mysql"] }

View File

@ -1,11 +1,19 @@
// use sqlx::prelude::*;
use sqlx::blocking::{prelude::*, Blocking};
use sqlx::mysql::MySqlConnection;
use sqlx::prelude::*;
// #[tokio::main]
// async fn main() -> anyhow::Result<()> {
// env_logger::try_init()?;
//
// let _conn = <MySqlConnection>::connect("mysql://root:password@localhost").await?;
// // connect to the database
// let mut conn = <MySqlConnection>::connect("mysql://root:password@localhost").await?;
//
// // ping, say HAI
// conn.ping().await?;
//
// // , and now close the connection explicitly
// conn.close().await?;
//
// Ok(())
// }
@ -13,7 +21,10 @@ use sqlx::prelude::*;
fn main() -> anyhow::Result<()> {
env_logger::try_init()?;
let _conn = <MySqlConnection>::connect("mysql://root:password@localhost")?;
let mut conn = <MySqlConnection<Blocking>>::connect("mysql://root:password@localhost")?;
conn.ping()?;
conn.close()?;
Ok(())
}

46
sqlx-core/src/acquire.rs Normal file
View File

@ -0,0 +1,46 @@
#[cfg(feature = "async")]
use futures_util::future::BoxFuture;
use crate::{Database, DefaultRuntime, Runtime};
pub trait Acquire<Rt: Runtime = DefaultRuntime> {
type Database: Database<Rt>;
/// Get a connection from the pool, make a new connection, or wait for one to become
/// available.
///
/// Takes exclusive use of the connection until it is released.
///
#[cfg(feature = "async")]
fn acquire(
self,
) -> BoxFuture<'static, crate::Result<<Self::Database as Database<Rt>>::Connection>>
where
<Self::Database as Database<Rt>>::Connection: Sized;
/// Get a connection from the pool, if available.
///
/// Returns `None` immediately if there are no connections available.
///
fn try_acquire(self) -> Option<<Self::Database as Database<Rt>>::Connection>
where
<Self::Database as Database<Rt>>::Connection: Sized;
#[cfg(feature = "async")]
fn begin(
self,
) -> BoxFuture<'static, crate::Result<<Self::Database as Database<Rt>>::Connection>>
where
<Self::Database as Database<Rt>>::Connection: Sized;
#[cfg(feature = "async")]
fn try_begin(
self,
) -> BoxFuture<'static, crate::Result<Option<<Self::Database as Database<Rt>>::Connection>>>
where
<Self::Database as Database<Rt>>::Connection: Sized;
}
// TODO: impl Acquire for &Pool { ... }
// TODO: impl<C: Connection> Acquire for &mut C { ... }
// TODO: impl<A: Acquire> Acquire for &mut &A { ... }

View File

@ -1,15 +1,24 @@
//! Types and traits used to implement a database driver with **blocking** I/O.
//!
mod acquire;
mod close;
mod connect;
mod connection;
mod options;
mod runtime;
pub use acquire::Acquire;
pub use close::Close;
pub use connect::Connect;
pub use connection::Connection;
pub use options::ConnectOptions;
pub use runtime::{Blocking, Runtime};
pub mod prelude {
pub use super::Acquire as _;
pub use super::Close as _;
pub use super::Connect as _;
pub use super::ConnectOptions as _;
pub use super::Connection as _;
pub use super::Runtime as _;

View File

@ -0,0 +1,28 @@
use super::{Blocking, Runtime};
use crate::Database;
pub trait Acquire<Rt: Runtime = Blocking>: crate::Acquire<Rt> {
/// Get a connection from the pool, make a new connection, or wait for one to become
/// available.
///
/// Takes exclusive use of the connection until it is released.
///
/// For detailed information, refer to the asynchronous version of
/// this: [`acquire()`][crate::Acquire::acquire].
///
fn acquire(self) -> crate::Result<<Self::Database as Database<Rt>>::Connection>
where
<Self::Database as Database<Rt>>::Connection: Sized;
fn begin(self) -> crate::Result<<Self::Database as Database<Rt>>::Connection>
where
<Self::Database as Database<Rt>>::Connection: Sized;
fn try_begin(self) -> crate::Result<Option<<Self::Database as Database<Rt>>::Connection>>
where
<Self::Database as Database<Rt>>::Connection: Sized;
}
// TODO: impl Acquire for &Pool { ... }
// TODO: impl<C: Connection> Acquire for &mut C { ... }
// TODO: impl<A: Acquire> Acquire for &mut &A { ... }

View File

@ -0,0 +1,12 @@
use std::io;
use super::{Blocking, Runtime};
pub trait Close<Rt: Runtime = Blocking>: crate::Close<Rt> {
fn close(self) -> crate::Result<()>
where
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write;
}
// TODO: impl Close for Pool { ... }
// TODO: impl<C: Connection> Close for C { ... }

View File

@ -0,0 +1,13 @@
use std::io;
use super::{Blocking, Runtime};
pub trait Connect<Rt: Runtime = Blocking>: crate::Connect<Rt> {
fn connect(url: &str) -> crate::Result<Self>
where
Self: Sized,
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write;
}
// TODO: impl Connect for Pool { ... }
// TODO: impl Connect for PgConnection { ... }

View File

@ -1,38 +1,19 @@
use std::io;
use super::{ConnectOptions, Runtime};
use crate::DefaultRuntime;
use super::{Blocking, Close, Connect, ConnectOptions, Runtime};
/// A unique connection (session) with a specific database.
///
/// For detailed information, refer to the asynchronous version of
/// this: [`Connection`][crate::Connection].
///
pub trait Connection<Rt = DefaultRuntime>: crate::Connection<Rt>
pub trait Connection<Rt: Runtime = Blocking>:
crate::Connection<Rt> + Close<Rt> + Connect<Rt>
where
Rt: Runtime,
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write,
Self::Options: ConnectOptions<Rt>,
{
/// Establish a new database connection.
///
/// For detailed information, refer to the asynchronous version of
/// this: [`connect()`][crate::Connection::connect].
///
fn connect(url: &str) -> crate::Result<Self>
where
Self: Sized,
{
url.parse::<<Self as crate::Connection<Rt>>::Options>()?.connect()
}
/// Explicitly close this database connection.
///
/// For detailed information, refer to the asynchronous version of
/// this: [`close()`][crate::Connection::close].
///
fn close(self) -> crate::Result<()>;
/// Checks if a connection to the database is still valid.
///
/// For detailed information, refer to the asynchronous version of

View File

@ -1,7 +1,6 @@
use std::io;
use super::{Connection, Runtime};
use crate::DefaultRuntime;
use super::{Blocking, Connection, Runtime};
/// Options which can be used to configure how a SQL connection is opened.
///
@ -9,9 +8,8 @@ use crate::DefaultRuntime;
/// this: [`ConnectOptions`][crate::ConnectOptions].
///
#[allow(clippy::module_name_repetitions)]
pub trait ConnectOptions<Rt = DefaultRuntime>: crate::ConnectOptions<Rt>
pub trait ConnectOptions<Rt: Runtime = Blocking>: crate::ConnectOptions<Rt>
where
Rt: Runtime,
<Rt as crate::Runtime>::TcpStream: io::Read + io::Write,
Self::Connection: crate::Connection<Rt, Options = Self> + Connection<Rt>,
{

12
sqlx-core/src/close.rs Normal file
View File

@ -0,0 +1,12 @@
use crate::{DefaultRuntime, Runtime};
pub trait Close<Rt: Runtime = DefaultRuntime> {
#[cfg(feature = "async")]
fn close(self) -> futures_util::future::BoxFuture<'static, crate::Result<()>>
where
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin;
}
// TODO: impl Close for Pool { ... }
// TODO: impl<C: Connection> Close for C { ... }

15
sqlx-core/src/connect.rs Normal file
View File

@ -0,0 +1,15 @@
use crate::{ConnectOptions, DefaultRuntime, Runtime};
pub trait Connect<Rt: Runtime = DefaultRuntime> {
type Options: ConnectOptions<Rt, Connection = Self>;
#[cfg(feature = "async")]
fn connect(url: &str) -> futures_util::future::BoxFuture<'_, crate::Result<Self>>
where
Self: Sized,
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin;
}
// TODO: impl Connect for Pool { ... }
// TODO: impl Connect for PgConnection { ... }

View File

@ -1,7 +1,7 @@
#[cfg(feature = "async")]
use futures_util::future::BoxFuture;
use crate::{ConnectOptions, Database, DefaultRuntime, Runtime};
use crate::{Close, Connect, Database, DefaultRuntime, Runtime};
/// A unique connection (session) with a specific database.
///
@ -11,64 +11,11 @@ use crate::{ConnectOptions, Database, DefaultRuntime, Runtime};
/// SQL statements will be executed and results returned within the context
/// of this single SQL connection.
///
pub trait Connection<Rt = DefaultRuntime>: 'static + Send
where
Rt: Runtime,
pub trait Connection<Rt: Runtime = DefaultRuntime>:
'static + Send + Connect<Rt> + Close<Rt>
{
type Database: Database<Rt, Connection = Self>;
type Options: ConnectOptions<Rt, Connection = Self>;
/// Establish a new database connection.
///
/// A value of [`Options`](#associatedtype.Options) is parsed from the provided connection string. This parsing
/// is database-specific.
///
/// ```rust,ignore
/// use sqlx::postgres::PgConnection;
///
/// let mut conn = <PgConnection>::connect(
/// "postgres://postgres:password@localhost/database",
/// ).await?;
/// ```
///
/// You may alternatively build the connection options imperatively.
///
/// ```rust,ignore
/// use sqlx::mysql::MySqlConnectOptions;
/// use sqlx::ConnectOptions;
///
/// let mut conn = <MySqlConnectOptions>::new()
/// .host("localhost")
/// .username("root")
/// .password("password")
/// .connect().await?;
/// ```
///
#[cfg(feature = "async")]
#[must_use]
fn connect(url: &str) -> BoxFuture<'_, crate::Result<Self>>
where
Self: Sized,
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
{
let options = url.parse::<Self::Options>();
Box::pin(async move { options?.connect().await })
}
/// Explicitly close this database connection.
///
/// This method is **not required** for safe and consistent operation. However, it is
/// recommended to call it instead of letting a connection `drop` as the database backend
/// will be faster at cleaning up resources.
///
#[cfg(feature = "async")]
fn close(self) -> BoxFuture<'static, crate::Result<()>>
where
Rt: crate::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin;
/// Checks if a connection to the database is still valid.
///
/// The method of operation greatly depends on the database driver. In MySQL, there is an

View File

@ -26,10 +26,14 @@ extern crate _async_std as async_std;
#[cfg(feature = "tokio")]
extern crate _tokio as tokio;
mod acquire;
mod close;
mod connect;
mod connection;
mod database;
mod error;
mod options;
mod pool;
mod runtime;
#[doc(hidden)]
@ -38,10 +42,14 @@ pub mod io;
#[cfg(feature = "blocking")]
pub mod blocking;
pub use acquire::Acquire;
pub use close::Close;
pub use connect::Connect;
pub use connection::Connection;
pub use database::{Database, HasOutput};
pub use error::{DatabaseError, Error, Result};
pub use options::ConnectOptions;
pub use pool::Pool;
#[cfg(feature = "actix")]
pub use runtime::Actix;
#[cfg(feature = "async")]
@ -79,7 +87,7 @@ pub type DefaultRuntime = blocking::Blocking;
// the unit type is implemented for Runtime, this is only to allow the
// lib to compile, the lib is mostly useless in this state
#[cfg(not(any(
feature = "async_std",
feature = "async-std",
feature = "actix",
feature = "tokio",
feature = "blocking"
@ -88,6 +96,9 @@ pub type DefaultRuntime = ();
#[cfg(any(feature = "async-std", feature = "tokio", feature = "actix"))]
pub mod prelude {
pub use super::Acquire as _;
pub use super::Close as _;
pub use super::Connect as _;
pub use super::ConnectOptions as _;
pub use super::Connection as _;
pub use super::Database as _;
@ -101,7 +112,7 @@ pub mod prelude {
pub use blocking::prelude;
#[cfg(not(any(
feature = "async_std",
feature = "async-std",
feature = "actix",
feature = "tokio",
feature = "blocking"

View File

@ -12,7 +12,7 @@ where
{
type Connection: Connection<Rt> + ?Sized;
/// Establish a connection to the database.
/// Establish a new connection to the database.
#[cfg(feature = "async")]
fn connect(&self) -> futures_util::future::BoxFuture<'_, crate::Result<Self::Connection>>
where

13
sqlx-core/src/pool.rs Normal file
View File

@ -0,0 +1,13 @@
use std::marker::PhantomData;
use crate::{Database, DefaultRuntime, Runtime};
/// A connection pool to enable the efficient reuse of a managed pool of SQL connections.
pub struct Pool<Db: Database, Rt: Runtime = DefaultRuntime> {
runtime: PhantomData<Rt>,
database: PhantomData<Db>,
}
// TODO: impl Acquire for &Pool
// TODO: impl Connect for Pool
// TODO: impl Close for Pool

View File

@ -1,7 +1,7 @@
use std::fmt::{self, Debug, Formatter};
use sqlx_core::io::BufStream;
use sqlx_core::{Connection, DefaultRuntime, Runtime};
use sqlx_core::{Close, Connect, Connection, DefaultRuntime, Runtime};
use crate::protocol::Capabilities;
use crate::{MySql, MySqlConnectOptions};
@ -12,10 +12,7 @@ mod ping;
mod stream;
#[allow(clippy::module_name_repetitions)]
pub struct MySqlConnection<Rt = DefaultRuntime>
where
Rt: Runtime,
{
pub struct MySqlConnection<Rt: Runtime = DefaultRuntime> {
stream: BufStream<Rt::TcpStream>,
connection_id: u32,
@ -28,10 +25,7 @@ where
sequence_id: u8,
}
impl<Rt> MySqlConnection<Rt>
where
Rt: Runtime,
{
impl<Rt: Runtime> MySqlConnection<Rt> {
pub(crate) fn new(stream: Rt::TcpStream) -> Self {
Self {
stream: BufStream::with_capacity(stream, 4096, 1024),
@ -54,32 +48,15 @@ where
}
}
impl<Rt> Debug for MySqlConnection<Rt>
where
Rt: Runtime,
{
impl<Rt: Runtime> Debug for MySqlConnection<Rt> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("MySqlConnection").finish()
}
}
impl<Rt> Connection<Rt> for MySqlConnection<Rt>
where
Rt: Runtime,
{
impl<Rt: Runtime> Connection<Rt> for MySqlConnection<Rt> {
type Database = MySql;
type Options = MySqlConnectOptions<Rt>;
#[cfg(feature = "async")]
fn close(self) -> futures_util::future::BoxFuture<'static, sqlx_core::Result<()>>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
{
Box::pin(self.close_async())
}
#[cfg(feature = "async")]
fn ping(&mut self) -> futures_util::future::BoxFuture<'_, sqlx_core::Result<()>>
where
@ -90,17 +67,68 @@ where
}
}
impl<Rt: Runtime> Connect<Rt> for MySqlConnection<Rt> {
type Options = MySqlConnectOptions<Rt>;
#[cfg(feature = "async")]
fn connect(url: &str) -> futures_util::future::BoxFuture<'_, sqlx_core::Result<Self>>
where
Self: Sized,
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
{
use sqlx_core::ConnectOptions;
let options = url.parse::<Self::Options>();
Box::pin(async move { options?.connect().await })
}
}
impl<Rt: Runtime> Close<Rt> for MySqlConnection<Rt> {
#[cfg(feature = "async")]
fn close(self) -> futures_util::future::BoxFuture<'static, sqlx_core::Result<()>>
where
Rt: sqlx_core::AsyncRuntime,
<Rt as Runtime>::TcpStream: futures_io::AsyncRead + futures_io::AsyncWrite + Unpin,
{
Box::pin(self.close_async())
}
}
#[cfg(feature = "blocking")]
impl<Rt> sqlx_core::blocking::Connection<Rt> for MySqlConnection<Rt>
where
Rt: sqlx_core::blocking::Runtime,
<Rt as Runtime>::TcpStream: std::io::Read + std::io::Write,
{
fn close(self) -> sqlx_core::Result<()> {
<MySqlConnection<Rt>>::close(self)
}
fn ping(&mut self) -> sqlx_core::Result<()> {
<MySqlConnection<Rt>>::ping(self)
}
}
#[cfg(feature = "blocking")]
impl<Rt> sqlx_core::blocking::Connect<Rt> for MySqlConnection<Rt>
where
Rt: sqlx_core::blocking::Runtime,
{
fn connect(url: &str) -> sqlx_core::Result<Self>
where
Self: Sized,
<Rt as Runtime>::TcpStream: std::io::Read + std::io::Write,
{
Self::connect(&url.parse::<MySqlConnectOptions<Rt>>()?)
}
}
#[cfg(feature = "blocking")]
impl<Rt> sqlx_core::blocking::Close<Rt> for MySqlConnection<Rt>
where
Rt: sqlx_core::blocking::Runtime,
{
fn close(self) -> sqlx_core::Result<()>
where
<Rt as Runtime>::TcpStream: std::io::Read + std::io::Write,
{
self.close()
}
}