Merge of #3427 (by @mpyw) and #3614 (by @bonsairobo) (#3765)

* feat: Implement `get_transaction_depth` for drivers

* test: Verify `get_transaction_depth()` on postgres

* Refactor: `TransactionManager` delegation without BC

SQLite implementation is currently WIP

* Fix: Avoid breaking changes on `AnyConnectionBackend`

* Refactor: Remove verbose `SqliteConnection` typing

* Feat: Implementation for SQLite

I have included `AtomicUsize` in `WorkerSharedState`. Ideally, it is not desirable to execute `load` and `fetch_add` in two separate steps, but we decided to allow it here since there is only one thread writing. To prevent writing from other threads, the field itself was made private, and a getter method was provided with `pub(crate)`.

* Refactor: Same approach for `cached_statements_size`

ref: a66787d36d

* Fix: Add missing `is_in_transaction` for backend

* Doc: Remove verbose "synchronously" word

* Fix: Remove useless `mut` qualifier

* feat: add Connection::begin_with

This patch completes the plumbing of an optional statement from these methods to
`TransactionManager::begin` without any validation of the provided statement.

There is a new `Error::InvalidSavePoint` which is triggered by any attempt to
call `Connection::begin_with` when we are already inside of a transaction.

* feat: add Pool::begin_with and Pool::try_begin_with

* feat: add Error::BeginFailed and validate that custom "begin" statements are successful

* chore: add tests of Error::BeginFailed

* chore: add tests of Error::InvalidSavePointStatement

* chore: test begin_with works for all SQLite "BEGIN" statements

* chore: improve comment on Connection::begin_with

* feat: add default impl of `Connection::begin_with`

This makes the new method a non-breaking change.

* refactor: combine if statement + unwrap_or_else into one match

* feat: use in-memory SQLite DB to avoid conflicts across tests run in parallel

* feedback: remove public wrapper for sqlite3_txn_state

Move the wrapper directly into the test that uses it instead.

* fix: cache Status on MySqlConnection

* fix: compilation errors

* fix: format

* fix: postgres test

* refactor: delete `Connection::get_transaction_depth`

* fix: tests

---------

Co-authored-by: mpyw <ryosuke_i_628@yahoo.co.jp>
Co-authored-by: Duncan Fairbanks <duncanfairbanks6@gmail.com>
This commit is contained in:
Austin Bonander
2025-03-10 14:29:46 -07:00
committed by GitHub
parent 2f10c29dfd
commit 393b731d5e
29 changed files with 494 additions and 61 deletions

View File

@@ -5,6 +5,7 @@ use crate::{
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt};
use std::borrow::Cow;
use std::{future, pin::pin};
use sqlx_core::any::{
@@ -39,8 +40,11 @@ impl AnyConnectionBackend for PgConnection {
Connection::ping(self)
}
fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
PgTransactionManager::begin(self)
fn begin(
&mut self,
statement: Option<Cow<'static, str>>,
) -> BoxFuture<'_, sqlx_core::Result<()>> {
PgTransactionManager::begin(self, statement)
}
fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
@@ -55,6 +59,10 @@ impl AnyConnectionBackend for PgConnection {
PgTransactionManager::start_rollback(self)
}
fn get_transaction_depth(&self) -> usize {
PgTransactionManager::get_transaction_depth(self)
}
fn shrink_buffers(&mut self) {
Connection::shrink_buffers(self);
}

View File

@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
@@ -127,6 +128,13 @@ impl PgConnection {
Ok(())
}
pub(crate) fn in_transaction(&self) -> bool {
match self.inner.transaction_status {
TransactionStatus::Transaction => true,
TransactionStatus::Error | TransactionStatus::Idle => false,
}
}
}
impl Debug for PgConnection {
@@ -179,7 +187,17 @@ impl Connection for PgConnection {
where
Self: Sized,
{
Transaction::begin(self)
Transaction::begin(self, None)
}
fn begin_with(
&mut self,
statement: impl Into<Cow<'static, str>>,
) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
where
Self: Sized,
{
Transaction::begin(self, Some(statement.into()))
}
fn cached_statements_size(&self) -> usize {

View File

@@ -1,4 +1,6 @@
use futures_core::future::BoxFuture;
use sqlx_core::database::Database;
use std::borrow::Cow;
use crate::error::Error;
use crate::executor::Executor;
@@ -13,13 +15,27 @@ pub struct PgTransactionManager;
impl TransactionManager for PgTransactionManager {
type Database = Postgres;
fn begin(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
fn begin<'conn>(
conn: &'conn mut PgConnection,
statement: Option<Cow<'static, str>>,
) -> BoxFuture<'conn, Result<(), Error>> {
Box::pin(async move {
let depth = conn.inner.transaction_depth;
let statement = match statement {
// custom `BEGIN` statements are not allowed if we're already in
// a transaction (we need to issue a `SAVEPOINT` instead)
Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
Some(statement) => statement,
None => begin_ansi_transaction_sql(depth),
};
let rollback = Rollback::new(conn);
let query = begin_ansi_transaction_sql(rollback.conn.inner.transaction_depth);
rollback.conn.queue_simple_query(&query)?;
rollback.conn.inner.transaction_depth += 1;
rollback.conn.queue_simple_query(&statement)?;
rollback.conn.wait_until_ready().await?;
if !rollback.conn.in_transaction() {
return Err(Error::BeginFailed);
}
rollback.conn.inner.transaction_depth += 1;
rollback.defuse();
Ok(())
@@ -62,6 +78,10 @@ impl TransactionManager for PgTransactionManager {
conn.inner.transaction_depth -= 1;
}
}
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
conn.inner.transaction_depth
}
}
struct Rollback<'c> {