feat: re-introduce Transaction

* Transaction now wraps `&mut Connection` instead of `Connection`
This commit is contained in:
Ryan Leckey 2020-05-30 02:03:14 -07:00
parent 9d2a0141cb
commit cc9d443434
No known key found for this signature in database
GPG Key ID: BBDFC5595030E7D3
21 changed files with 407 additions and 14 deletions

View File

@ -22,6 +22,7 @@ pub trait IntoArguments<'q, DB: HasArguments<'q>>: Sized + Send {
}
// NOTE: required due to lack of lazy normalization
#[allow(unused_macros)]
macro_rules! impl_into_arguments_for_arguments {
($Arguments:path) => {
impl<'q>

View File

@ -1,12 +1,14 @@
use std::str::FromStr;
use futures_core::future::BoxFuture;
use futures_core::Future;
use crate::database::Database;
use crate::error::{BoxDynError, Error};
use crate::transaction::Transaction;
/// Represents a single database connection.
pub trait Connection: Send + 'static {
pub trait Connection: Send {
type Database: Database;
/// Explicitly close this database connection.
@ -18,11 +20,63 @@ pub trait Connection: Send + 'static {
/// Checks if a connection to the database is still valid.
fn ping(&mut self) -> BoxFuture<Result<(), Error>>;
/// Begin a new transaction or establish a savepoint within the active transaction.
///
/// Returns a [`Transaction`] for controlling and tracking the new transaction.
fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self>, Error>> {
Transaction::begin(self)
}
/// Execute the function inside a transaction.
///
/// If the function returns an error, the transaction will be rolled back. If it does not
/// return an error, the transaction will be committed.
fn transaction<'c: 'f, 'f, T, E, F, Fut>(&'c mut self, f: F) -> BoxFuture<'f, Result<T, E>>
where
T: Send,
F: FnOnce(&mut <Self::Database as Database>::Connection) -> Fut + Send + 'f,
E: From<Error> + Send,
Fut: Future<Output = Result<T, E>> + Send,
{
Box::pin(async move {
let mut tx = self.begin().await?;
match f(tx.get_mut()).await {
Ok(r) => {
// no error occurred, commit the transaction
tx.commit().await?;
Ok(r)
}
Err(e) => {
// an error occurred, rollback the transaction
tx.rollback().await?;
Err(e)
}
}
})
}
#[doc(hidden)]
fn get_ref(&self) -> &<Self::Database as Database>::Connection;
#[doc(hidden)]
fn get_mut(&mut self) -> &mut <Self::Database as Database>::Connection;
#[doc(hidden)]
fn transaction_depth(&self) -> usize {
// connections are not normally transactions, a zero depth implies there is no
// active transaction
0
}
}
/// Represents a type that can directly establish a new connection.
pub trait Connect: Sized + Connection {
type Options: FromStr<Err = BoxDynError> + Send + Sync;
type Options: FromStr<Err = BoxDynError> + Send + Sync + 'static;
/// Establish a new database connection.
///

View File

@ -3,6 +3,7 @@ use std::fmt::Debug;
use crate::arguments::Arguments;
use crate::connection::Connect;
use crate::row::Row;
use crate::transaction::TransactionManager;
use crate::type_info::TypeInfo;
use crate::value::{Value, ValueRef};
@ -21,6 +22,10 @@ pub trait Database:
/// The concrete `Connection` implementation for this database.
type Connection: Connect<Database = Self>;
/// The concrete `TransactionManager` implementation for this database.
#[doc(hidden)]
type TransactionManager: TransactionManager<Database = Self>;
/// The concrete `Row` implementation for this database.
type Row: Row<Database = Self>;
@ -39,7 +44,7 @@ pub trait Database:
/// The upcoming Rust feature, [Generic Associated Types], should obviate
/// the need for this trait.
///
/// [Generic Associated Types]: https://www.google.com/search?q=generic+associated+types+rust&oq=generic+associated+types+rust&aqs=chrome..69i57j0l5.3327j0j7&sourceid=chrome&ie=UTF-8
/// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265
pub trait HasValueRef<'r> {
type Database: Database;
@ -55,7 +60,7 @@ pub trait HasValueRef<'r> {
/// The upcoming Rust feature, [Generic Associated Types], should obviate
/// the need for this trait.
///
/// [Generic Associated Types]: https://www.google.com/search?q=generic+associated+types+rust&oq=generic+associated+types+rust&aqs=chrome..69i57j0l5.3327j0j7&sourceid=chrome&ie=UTF-8
/// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265
pub trait HasArguments<'q> {
type Database: Database;

View File

@ -38,6 +38,7 @@ pub mod query;
pub mod query_as;
pub mod query_scalar;
pub mod row;
pub mod transaction;
pub mod type_info;
pub mod types;
pub mod value;

View File

@ -69,6 +69,16 @@ impl Connection for MySqlConnection {
Ok(())
})
}
#[doc(hidden)]
fn get_ref(&self) -> &Self {
self
}
#[doc(hidden)]
fn get_mut(&mut self) -> &mut Self {
self
}
}
impl Connect for MySqlConnection {

View File

@ -1,6 +1,8 @@
use crate::database::{Database, HasArguments, HasValueRef};
use crate::mysql::value::{MySqlValue, MySqlValueRef};
use crate::mysql::{MySqlArguments, MySqlConnection, MySqlRow, MySqlTypeInfo};
use crate::mysql::{
MySqlArguments, MySqlConnection, MySqlRow, MySqlTransactionManager, MySqlTypeInfo,
};
/// PostgreSQL database driver.
#[derive(Debug)]
@ -9,6 +11,8 @@ pub struct MySql;
impl Database for MySql {
type Connection = MySqlConnection;
type TransactionManager = MySqlTransactionManager;
type Row = MySqlRow;
type TypeInfo = MySqlTypeInfo;

View File

@ -8,6 +8,7 @@ mod io;
mod options;
mod protocol;
mod row;
mod transaction;
mod type_info;
pub mod types;
mod value;
@ -18,6 +19,7 @@ pub use database::MySql;
pub use error::MySqlDatabaseError;
pub use options::{MySqlConnectOptions, MySqlSslMode};
pub use row::MySqlRow;
pub use transaction::MySqlTransactionManager;
pub use type_info::MySqlTypeInfo;
pub use value::{MySqlValue, MySqlValueFormat, MySqlValueRef};

View File

@ -0,0 +1,27 @@
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use crate::error::Error;
use crate::mysql::{MySql, MySqlConnection};
use crate::transaction::{
begin_ansi_transaction, commit_ansi_transaction, rollback_ansi_transaction, TransactionManager,
};
/// Implementation of [`TransactionManager`] for MySQL.
pub struct MySqlTransactionManager;
impl TransactionManager for MySqlTransactionManager {
type Database = MySql;
fn begin(conn: &mut MySqlConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
begin_ansi_transaction(conn, index).boxed()
}
fn commit(conn: &mut MySqlConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
commit_ansi_transaction(conn, index).boxed()
}
fn rollback(conn: &mut MySqlConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
rollback_ansi_transaction(conn, index).boxed()
}
}

View File

@ -8,6 +8,7 @@ use futures_core::future::BoxFuture;
use super::inner::{DecrementSizeGuard, SharedPool};
use crate::connection::{Connect, Connection};
use crate::database::Database;
use crate::error::Error;
/// A connection checked out from [`Pool`][crate::pool::Pool].
@ -86,7 +87,7 @@ where
impl<C> Connection for PoolConnection<C>
where
C: Connect,
C: 'static + Connect,
{
type Database = C::Database;
@ -101,6 +102,14 @@ where
fn ping(&mut self) -> BoxFuture<Result<(), Error>> {
Box::pin(self.deref_mut().ping())
}
fn get_ref(&self) -> &<Self::Database as Database>::Connection {
self.deref().get_ref()
}
fn get_mut(&mut self) -> &mut <Self::Database as Database>::Connection {
self.deref_mut().get_mut()
}
}
/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.

View File

@ -128,7 +128,7 @@ where
impl<C> SharedPool<C>
where
C: Connect,
C: 'static + Connect,
{
pub(super) async fn new_arc(url: &str, options: Options) -> Result<Arc<Self>, Error> {
let mut pool = Self {
@ -288,7 +288,7 @@ where
/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections
fn spawn_reaper<C>(pool: &Arc<SharedPool<C>>)
where
C: Connection,
C: 'static + Connection,
{
let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
(Some(it), None) | (None, Some(it)) => it,

View File

@ -28,7 +28,7 @@ pub struct Pool<C>(pub(crate) Arc<SharedPool<C>>);
impl<C> Pool<C>
where
C: Connect,
C: 'static + Connect,
{
/// Creates a connection pool with the default configuration.
///

View File

@ -13,7 +13,7 @@ pub struct Builder<C> {
impl<C> Builder<C>
where
C: Connect,
C: 'static + Connect,
{
/// Get a new builder with default options.
///
@ -124,7 +124,7 @@ where
impl<C, DB> Default for Builder<C>
where
C: Connect<Database = DB>,
C: 'static + Connect<Database = DB>,
DB: Database<Connection = C>,
{
fn default() -> Self {

View File

@ -119,6 +119,16 @@ impl Connection for PgConnection {
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.execute("SELECT 1").map_ok(drop).boxed()
}
#[doc(hidden)]
fn get_ref(&self) -> &Self {
self
}
#[doc(hidden)]
fn get_mut(&mut self) -> &mut Self {
self
}
}
impl Connect for PgConnection {

View File

@ -1,7 +1,7 @@
use crate::database::{Database, HasArguments, HasValueRef};
use crate::postgres::arguments::PgArgumentBuffer;
use crate::postgres::value::{PgValue, PgValueRef};
use crate::postgres::{PgArguments, PgConnection, PgRow, PgTypeInfo};
use crate::postgres::{PgArguments, PgConnection, PgRow, PgTransactionManager, PgTypeInfo};
/// PostgreSQL database driver.
#[derive(Debug)]
@ -10,6 +10,8 @@ pub struct Postgres;
impl Database for Postgres {
type Connection = PgConnection;
type TransactionManager = PgTransactionManager;
type Row = PgRow;
type TypeInfo = PgTypeInfo;

View File

@ -9,6 +9,7 @@ mod listener;
mod message;
mod options;
mod row;
mod transaction;
mod type_info;
pub mod types;
mod value;
@ -21,6 +22,7 @@ pub use listener::{PgListener, PgNotification};
pub use message::PgSeverity;
pub use options::{PgConnectOptions, PgSslMode};
pub use row::PgRow;
pub use transaction::PgTransactionManager;
pub use type_info::{PgTypeInfo, PgTypeKind};
pub use value::{PgValue, PgValueFormat, PgValueRef};

View File

@ -0,0 +1,27 @@
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use crate::error::Error;
use crate::postgres::{PgConnection, Postgres};
use crate::transaction::{
begin_ansi_transaction, commit_ansi_transaction, rollback_ansi_transaction, TransactionManager,
};
/// Implementation of [`TransactionManager`] for PostgreSQL.
pub struct PgTransactionManager;
impl TransactionManager for PgTransactionManager {
type Database = Postgres;
fn begin(conn: &mut PgConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
begin_ansi_transaction(conn, index).boxed()
}
fn commit(conn: &mut PgConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
commit_ansi_transaction(conn, index).boxed()
}
fn rollback(conn: &mut PgConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
rollback_ansi_transaction(conn, index).boxed()
}
}

View File

@ -51,6 +51,16 @@ impl Connection for SqliteConnection {
// For SQLite connections, PING does effectively nothing
Box::pin(future::ok(()))
}
#[doc(hidden)]
fn get_ref(&self) -> &Self {
self
}
#[doc(hidden)]
fn get_mut(&mut self) -> &mut Self {
self
}
}
impl Connect for SqliteConnection {

View File

@ -1,7 +1,7 @@
use crate::database::{Database, HasArguments, HasValueRef};
use crate::sqlite::{
SqliteArgumentValue, SqliteArguments, SqliteConnection, SqliteRow, SqliteTypeInfo, SqliteValue,
SqliteValueRef,
SqliteArgumentValue, SqliteArguments, SqliteConnection, SqliteRow, SqliteTransactionManager,
SqliteTypeInfo, SqliteValue, SqliteValueRef,
};
/// Sqlite database driver.
@ -11,6 +11,8 @@ pub struct Sqlite;
impl Database for Sqlite {
type Connection = SqliteConnection;
type TransactionManager = SqliteTransactionManager;
type Row = SqliteRow;
type TypeInfo = SqliteTypeInfo;

View File

@ -12,6 +12,7 @@ mod error;
mod options;
mod row;
mod statement;
mod transaction;
mod type_info;
pub mod types;
mod value;
@ -22,6 +23,7 @@ pub use database::Sqlite;
pub use error::SqliteError;
pub use options::SqliteConnectOptions;
pub use row::SqliteRow;
pub use transaction::SqliteTransactionManager;
pub use type_info::SqliteTypeInfo;
pub use value::{SqliteValue, SqliteValueRef};

View File

@ -0,0 +1,27 @@
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use crate::error::Error;
use crate::sqlite::{Sqlite, SqliteConnection};
use crate::transaction::{
begin_ansi_transaction, commit_ansi_transaction, rollback_ansi_transaction, TransactionManager,
};
/// Implementation of [`TransactionManager`] for SQLite.
pub struct SqliteTransactionManager;
impl TransactionManager for SqliteTransactionManager {
type Database = Sqlite;
fn begin(conn: &mut SqliteConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
begin_ansi_transaction(conn, index).boxed()
}
fn commit(conn: &mut SqliteConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
commit_ansi_transaction(conn, index).boxed()
}
fn rollback(conn: &mut SqliteConnection, index: usize) -> BoxFuture<'_, Result<(), Error>> {
rollback_ansi_transaction(conn, index).boxed()
}
}

View File

@ -0,0 +1,198 @@
use std::ops::{Deref, DerefMut};
use futures_core::future::BoxFuture;
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::executor::Executor;
/// Generic management of database transactions.
///
/// This trait should not be used, except when implementing [`Connection`].
#[doc(hidden)]
pub trait TransactionManager {
type Database: Database;
/// Begin a new transaction or establish a savepoint within the active transaction.
fn begin(
conn: &mut <Self::Database as Database>::Connection,
depth: usize,
) -> BoxFuture<Result<(), Error>>;
/// Commit the active transaction or release the most recent savepoint.
fn commit(
conn: &mut <Self::Database as Database>::Connection,
depth: usize,
) -> BoxFuture<Result<(), Error>>;
/// Abort the active transaction or restore from the most recent savepoint.
fn rollback(
conn: &mut <Self::Database as Database>::Connection,
depth: usize,
) -> BoxFuture<Result<(), Error>>;
}
/// An in-progress database transaction or savepoint.
///
/// A transaction starts with a call to [`Pool::begin`] or [`Connection::begin`].
///
/// A transaction should end with a call to [`commit`] or [`rollback`]. If neither are called
/// before the transaction goes out-of-scope, [`rollback`] is called. In other
/// words, [`rollback`] is called on `drop` if the transaction is still in-progress.
///
/// A savepoint is a special mark inside a transaction that allows all commands that are
/// executed after it was established to be rolled back, restoring the transaction state to
/// what it was at the time of the savepoint.
///
/// [`Connection::begin`]: struct.Connection.html#method.begin
/// [`Pool::begin`]: struct.Pool.html#method.begin
/// [`commit`]: #method.commit
/// [`rollback`]: #method.rollback
pub struct Transaction<'c, C: Connection + ?Sized> {
connection: &'c mut C,
// the depth of ~this~ transaction, depth directly equates to how many times [`begin()`]
// was called without a corresponding [`commit()`] or [`rollback()`]
depth: usize,
}
impl<'c, DB, C> Transaction<'c, C>
where
DB: Database,
C: ?Sized + Connection<Database = DB>,
{
pub(crate) fn begin(conn: &'c mut C) -> BoxFuture<'c, Result<Self, Error>> {
Box::pin(async move {
let depth = conn.transaction_depth();
DB::TransactionManager::begin(conn.get_mut(), depth).await?;
Ok(Self {
depth: depth + 1,
connection: conn,
})
})
}
/// Commits this transaction or savepoint.
pub async fn commit(self) -> Result<(), Error> {
DB::TransactionManager::commit(self.connection.get_mut(), self.depth).await
}
/// Aborts this transaction or savepoint.
pub async fn rollback(self) -> Result<(), Error> {
DB::TransactionManager::rollback(self.connection.get_mut(), self.depth).await
}
}
impl<'c, C: Connection + ?Sized> Connection for Transaction<'c, C> {
type Database = C::Database;
fn close(self) -> BoxFuture<'static, Result<(), Error>> {
unimplemented!()
}
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.connection.ping()
}
#[doc(hidden)]
fn get_ref(&self) -> &<Self::Database as Database>::Connection {
self.connection.get_ref()
}
#[doc(hidden)]
fn get_mut(&mut self) -> &mut <Self::Database as Database>::Connection {
self.connection.get_mut()
}
#[doc(hidden)]
fn transaction_depth(&self) -> usize {
self.depth
}
}
impl<DB, C> Deref for Transaction<'_, C>
where
DB: Database,
C: ?Sized + Connection<Database = DB>,
{
type Target = DB::Connection;
#[inline]
fn deref(&self) -> &Self::Target {
self.get_ref()
}
}
impl<DB, C> DerefMut for Transaction<'_, C>
where
DB: Database,
C: ?Sized + Connection<Database = DB>,
{
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.get_mut()
}
}
impl<C: Connection + ?Sized> Drop for Transaction<'_, C> {
fn drop(&mut self) {
unimplemented!()
}
}
#[allow(dead_code)]
pub(crate) async fn begin_ansi_transaction<'c, C>(
conn: &'c mut C,
index: usize,
) -> Result<(), Error>
where
&'c mut C: Executor<'c>,
{
if index == 0 {
conn.execute("BEGIN").await?;
} else {
conn.execute(&*format!("SAVEPOINT _sqlx_savepoint_{}", index))
.await?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn commit_ansi_transaction<'c, C>(
conn: &'c mut C,
index: usize,
) -> Result<(), Error>
where
&'c mut C: Executor<'c>,
{
if index == 1 {
conn.execute("COMMIT").await?;
} else {
conn.execute(&*format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", index))
.await?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn rollback_ansi_transaction<'c, C>(
conn: &'c mut C,
index: usize,
) -> Result<(), Error>
where
&'c mut C: Executor<'c>,
{
if index == 1 {
conn.execute("ROLLBACK").await?;
} else {
conn.execute(&*format!("ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}", index))
.await?;
}
Ok(())
}