Add Connection::begin

This commit is contained in:
Ryan Leckey
2020-03-11 08:24:49 -07:00
parent d269506899
commit c7d416a1c3
2 changed files with 77 additions and 2 deletions

View File

@@ -5,6 +5,7 @@ use futures_core::future::BoxFuture;
use crate::executor::Executor;
use crate::pool::{Pool, PoolConnection};
use crate::url::Url;
use crate::transaction::Transaction;
/// Represents a single database connection rather than a pool of database connections.
///
@@ -15,6 +16,13 @@ where
Self: Send + 'static,
Self: Executor,
{
/// Starts a transaction.
///
/// Returns [`Transaction`](struct.Transaction.html).
fn begin(self) -> BoxFuture<'static, crate::Result<Transaction<Self>>> where Self: Sized {
Box::pin(Transaction::new(0, self))
}
/// Close this database connection.
fn close(self) -> BoxFuture<'static, crate::Result<()>>;

View File

@@ -1,6 +1,6 @@
use futures::TryStreamExt;
use sqlx::postgres::{PgPool, PgRow};
use sqlx::{postgres::PgConnection, Connect, Executor, Row};
use sqlx::postgres::{PgPool, PgRow, PgQueryAs};
use sqlx::{postgres::PgConnection, Connect, Executor, Connection, Row};
use std::time::Duration;
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
@@ -86,6 +86,73 @@ async fn it_can_return_interleaved_nulls_issue_104() -> anyhow::Result<()> {
Ok(())
}
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
async fn it_can_work_with_transactions() -> anyhow::Result<()> {
let mut conn = connect().await?;
conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_1922 (id INTEGER PRIMARY KEY)")
.await?;
conn.execute("TRUNCATE _sqlx_users_1922")
.await?;
// begin .. rollback
let mut tx = conn.begin().await?;
sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)")
.bind(10_i32)
.execute(&mut tx)
.await?;
conn = tx.rollback().await?;
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922")
.fetch_one(&mut conn)
.await?;
assert_eq!(count, 0);
// begin .. commit
let mut tx = conn.begin().await?;
sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)")
.bind(10_i32)
.execute(&mut tx)
.await?;
conn = tx.commit().await?;
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922")
.fetch_one(&mut conn)
.await?;
assert_eq!(count, 1);
// begin .. (drop)
{
let mut tx = conn.begin().await?;
sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)")
.bind(20_i32)
.execute(&mut tx)
.await?;
}
conn = connect().await?;
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922")
.fetch_one(&mut conn)
.await?;
assert_eq!(count, 1);
Ok(())
}
// run with `cargo test --features postgres -- --ignored --nocapture pool_smoke_test`
#[ignore]
#[cfg_attr(feature = "runtime-async-std", async_std::test)]