mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-10-07 01:34:35 +00:00

* Include test case for regular subtransactions While using COPY and subtransactions I kept running into errors. This test case documents that error, it currently fails with: Error: encountered unexpected or invalid data: expecting ParseComplete but received CommandComplete * PostgreSQL Copy: Consume ReadyForQuery on error When a COPY statement was in error inside a subtransaction, a Protocol Error used to be raised. By consuming the ReadyForQuery message when there is an error, we no longer have this issue.
1943 lines
54 KiB
Rust
1943 lines
54 KiB
Rust
use futures::{Stream, StreamExt, TryStreamExt};
|
|
|
|
use sqlx::postgres::types::Oid;
|
|
use sqlx::postgres::{
|
|
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgListener,
|
|
PgPoolOptions, PgRow, PgSeverity, Postgres,
|
|
};
|
|
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
|
|
use sqlx_core::bytes::Bytes;
|
|
use sqlx_test::{new, pool, setup_if_needed};
|
|
use std::env;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_connects() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let value = sqlx::query("select 1 + 1")
|
|
.try_map(|row: PgRow| row.try_get::<i32, _>(0))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(2i32, value);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_select_void() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
// pg_notify just happens to be a function that returns void
|
|
let _: () = sqlx::query_scalar("select pg_notify('chan', 'message');")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_pings() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
conn.ping().await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_pings_after_suspended_query() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
sqlx::raw_sql("create temporary table processed_row(val int4 primary key)")
|
|
.execute(&mut conn)
|
|
.await?;
|
|
|
|
// This query wants to return 50 rows but we only read the first one.
|
|
// This will return a `SuspendedPortal` that the driver currently ignores.
|
|
let _: i32 = sqlx::query_scalar(
|
|
r#"
|
|
insert into processed_row(val)
|
|
select * from generate_series(1, 50)
|
|
returning val
|
|
"#,
|
|
)
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
// `Sync` closes the current autocommit transaction which presumably includes closing any
|
|
// suspended portals.
|
|
conn.ping().await?;
|
|
|
|
// Make sure that all the values got inserted even though we only read the first one back.
|
|
let count: i64 = sqlx::query_scalar("select count(*) from processed_row")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(count, 50);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_maths() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let value = sqlx::query("select 1 + $1::int")
|
|
.bind(5_i32)
|
|
.try_map(|row: PgRow| row.try_get::<i32, _>(0))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(6i32, value);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_inspect_errors() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let res: Result<_, sqlx::Error> = sqlx::query("select f").execute(&mut conn).await;
|
|
let err = res.unwrap_err();
|
|
|
|
// can also do [as_database_error] or use `match ..`
|
|
let err = err.into_database_error().unwrap();
|
|
|
|
assert_eq!(err.message(), "column \"f\" does not exist");
|
|
assert_eq!(err.code().as_deref(), Some("42703"));
|
|
|
|
// can also do [downcast_ref]
|
|
let err: Box<PgDatabaseError> = err.downcast();
|
|
|
|
assert_eq!(err.severity(), PgSeverity::Error);
|
|
assert_eq!(err.message(), "column \"f\" does not exist");
|
|
assert_eq!(err.code(), "42703");
|
|
assert_eq!(err.position(), Some(PgErrorPosition::Original(8)));
|
|
assert_eq!(err.routine(), Some("errorMissingColumn"));
|
|
assert_eq!(err.constraint(), None);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_inspect_constraint_errors() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let res: Result<_, sqlx::Error> =
|
|
sqlx::query("INSERT INTO products VALUES (1, 'Product 1', 0);")
|
|
.execute(&mut conn)
|
|
.await;
|
|
let err = res.unwrap_err();
|
|
|
|
// can also do [as_database_error] or use `match ..`
|
|
let err = err.into_database_error().unwrap();
|
|
|
|
assert_eq!(
|
|
err.message(),
|
|
"new row for relation \"products\" violates check constraint \"products_price_check\""
|
|
);
|
|
assert_eq!(err.code().as_deref(), Some("23514"));
|
|
|
|
// can also do [downcast_ref]
|
|
let err: Box<PgDatabaseError> = err.downcast();
|
|
|
|
assert_eq!(err.severity(), PgSeverity::Error);
|
|
assert_eq!(
|
|
err.message(),
|
|
"new row for relation \"products\" violates check constraint \"products_price_check\""
|
|
);
|
|
assert_eq!(err.code(), "23514");
|
|
assert_eq!(err.position(), None);
|
|
assert_eq!(err.routine(), Some("ExecConstraints"));
|
|
assert_eq!(err.constraint(), Some("products_price_check"));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_executes() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let _ = conn
|
|
.execute(
|
|
r#"
|
|
CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
for index in 1..=10_i32 {
|
|
let done = sqlx::query("INSERT INTO users (id) VALUES ($1)")
|
|
.bind(index)
|
|
.execute(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(done.rows_affected(), 1);
|
|
}
|
|
|
|
let sum: i32 = sqlx::query("SELECT id FROM users")
|
|
.try_map(|row: PgRow| row.try_get::<i32, _>(0))
|
|
.fetch(&mut conn)
|
|
.try_fold(0_i32, |acc, x| async move { Ok(acc + x) })
|
|
.await?;
|
|
|
|
assert_eq!(sum, 55);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_nest_map() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let res = sqlx::query("SELECT 5")
|
|
.map(|row: PgRow| row.get(0))
|
|
.map(|int: i32| int.to_string())
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(res, "5");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(feature = "json")]
|
|
#[sqlx_macros::test]
|
|
async fn it_describes_and_inserts_json_and_jsonb() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let _ = conn
|
|
.execute(
|
|
r#"
|
|
CREATE TEMPORARY TABLE json_stuff (obj json, obj2 jsonb);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
let query = "INSERT INTO json_stuff (obj, obj2) VALUES ($1, $2)";
|
|
let _ = conn.describe(query).await?;
|
|
|
|
let done = sqlx::query(query)
|
|
.bind(serde_json::json!({ "a": "a" }))
|
|
.bind(serde_json::json!({ "a": "a" }))
|
|
.execute(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(done.rows_affected(), 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_works_with_cache_disabled() -> anyhow::Result<()> {
|
|
setup_if_needed();
|
|
|
|
let mut url = url::Url::parse(&env::var("DATABASE_URL")?)?;
|
|
url.query_pairs_mut()
|
|
.append_pair("statement-cache-capacity", "0");
|
|
|
|
let mut conn = PgConnection::connect(url.as_ref()).await?;
|
|
|
|
for index in 1..=10_i32 {
|
|
let _ = sqlx::query("SELECT $1")
|
|
.bind(index)
|
|
.execute(&mut conn)
|
|
.await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_executes_with_pool() -> anyhow::Result<()> {
|
|
let pool = sqlx_test::pool::<Postgres>().await?;
|
|
|
|
let rows = pool.fetch_all("SELECT 1; SElECT 2").await?;
|
|
|
|
assert_eq!(rows.len(), 2);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// https://github.com/launchbadge/sqlx/issues/104
|
|
#[sqlx_macros::test]
|
|
async fn it_can_return_interleaved_nulls_issue_104() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let tuple = sqlx::query("SELECT NULL, 10::INT, NULL, 20::INT, NULL, 40::INT, NULL, 80::INT")
|
|
.map(|row: PgRow| {
|
|
(
|
|
row.get::<Option<i32>, _>(0),
|
|
row.get::<Option<i32>, _>(1),
|
|
row.get::<Option<i32>, _>(2),
|
|
row.get::<Option<i32>, _>(3),
|
|
row.get::<Option<i32>, _>(4),
|
|
row.get::<Option<i32>, _>(5),
|
|
row.get::<Option<i32>, _>(6),
|
|
row.get::<Option<i32>, _>(7),
|
|
)
|
|
})
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(tuple.0, None);
|
|
assert_eq!(tuple.1, Some(10));
|
|
assert_eq!(tuple.2, None);
|
|
assert_eq!(tuple.3, Some(20));
|
|
assert_eq!(tuple.4, None);
|
|
assert_eq!(tuple.5, Some(40));
|
|
assert_eq!(tuple.6, None);
|
|
assert_eq!(tuple.7, Some(80));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_fail_and_recover() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
for i in 0..10 {
|
|
// make a query that will fail
|
|
let res = conn
|
|
.execute("INSERT INTO not_found (column) VALUES (10)")
|
|
.await;
|
|
|
|
assert!(res.is_err());
|
|
|
|
// now try and use the connection
|
|
let val: i32 = conn.fetch_one(&*format!("SELECT {i}::int4")).await?.get(0);
|
|
|
|
assert_eq!(val, i);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_fail_and_recover_with_pool() -> anyhow::Result<()> {
|
|
let pool = sqlx_test::pool::<Postgres>().await?;
|
|
|
|
for i in 0..10 {
|
|
// make a query that will fail
|
|
let res = pool
|
|
.execute("INSERT INTO not_found (column) VALUES (10)")
|
|
.await;
|
|
|
|
assert!(res.is_err());
|
|
|
|
// now try and use the connection
|
|
let val: i32 = pool.fetch_one(&*format!("SELECT {i}::int4")).await?.get(0);
|
|
|
|
assert_eq!(val, i);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_query_scalar() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let scalar: i32 = sqlx::query_scalar("SELECT 42").fetch_one(&mut conn).await?;
|
|
assert_eq!(scalar, 42);
|
|
|
|
let scalar: Option<i32> = sqlx::query_scalar("SELECT 42").fetch_one(&mut conn).await?;
|
|
assert_eq!(scalar, Some(42));
|
|
|
|
let scalar: Option<i32> = sqlx::query_scalar("SELECT NULL")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
assert_eq!(scalar, None);
|
|
|
|
let scalar: Option<i64> = sqlx::query_scalar("SELECT 42::bigint")
|
|
.fetch_optional(&mut conn)
|
|
.await?;
|
|
assert_eq!(scalar, Some(42));
|
|
|
|
let scalar: Option<i16> = sqlx::query_scalar("").fetch_optional(&mut conn).await?;
|
|
assert_eq!(scalar, None);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
/// This is separate from `it_can_query_scalar` because while implementing it I ran into a
|
|
/// bug which that prevented `Vec<i32>` from compiling but allowed Vec<Option<i32>>.
|
|
async fn it_can_query_all_scalar() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let scalar: Vec<i32> = sqlx::query_scalar("SELECT $1")
|
|
.bind(42)
|
|
.fetch_all(&mut conn)
|
|
.await?;
|
|
assert_eq!(scalar, vec![42]);
|
|
|
|
let scalar: Vec<Option<i32>> = sqlx::query_scalar("SELECT $1 UNION ALL SELECT NULL")
|
|
.bind(42)
|
|
.fetch_all(&mut conn)
|
|
.await?;
|
|
assert_eq!(scalar, vec![Some(42), None]);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn copy_can_work_with_failed_transactions() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
// We're using a (local) statement_timeout to simulate a runtime failure, as opposed to
|
|
// a parse/plan failure.
|
|
let mut tx = conn.begin().await?;
|
|
let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)")
|
|
.bind("statement_timeout")
|
|
.bind("1ms")
|
|
.execute(tx.as_mut())
|
|
.await?;
|
|
|
|
let mut copy_out: Pin<
|
|
Box<dyn Stream<Item = Result<Bytes, sqlx::Error>> + Send>,
|
|
> = (&mut tx)
|
|
.copy_out_raw("COPY (SELECT nspname FROM pg_catalog.pg_namespace WHERE pg_sleep(0.001) IS NULL) TO STDOUT")
|
|
.await?;
|
|
|
|
while copy_out.try_next().await.is_ok() {}
|
|
drop(copy_out);
|
|
|
|
tx.rollback().await?;
|
|
|
|
// conn should be usable again, as we explictly rolled back the transaction
|
|
let got: i32 = sqlx::query_scalar("SELECT 1")
|
|
.fetch_one(conn.as_mut())
|
|
.await?;
|
|
assert_eq!(1, got);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_work_with_failed_transactions() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
// We're using a (local) statement_timeout to simulate a runtime failure, as opposed to
|
|
// a parse/plan failure.
|
|
let mut tx = conn.begin().await?;
|
|
let _ = sqlx::query("SELECT pg_catalog.set_config($1, $2, true)")
|
|
.bind("statement_timeout")
|
|
.bind("1ms")
|
|
.execute(tx.as_mut())
|
|
.await?;
|
|
|
|
assert!(sqlx::query("SELECT 1 WHERE pg_sleep(0.30) IS NULL")
|
|
.fetch_one(tx.as_mut())
|
|
.await
|
|
.is_err());
|
|
tx.rollback().await?;
|
|
|
|
// conn should be usable again, as we explictly rolled back the transaction
|
|
let got: i32 = sqlx::query_scalar("SELECT 1")
|
|
.fetch_one(conn.as_mut())
|
|
.await?;
|
|
assert_eq!(1, got);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_work_with_transactions() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_1922 (id INTEGER PRIMARY KEY)")
|
|
.await?;
|
|
|
|
conn.execute("TRUNCATE _sqlx_users_1922").await?;
|
|
|
|
// begin .. rollback
|
|
|
|
let mut tx = conn.begin().await?;
|
|
|
|
sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)")
|
|
.bind(10_i32)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
tx.rollback().await?;
|
|
|
|
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(count, 0);
|
|
|
|
// begin .. commit
|
|
|
|
let mut tx = conn.begin().await?;
|
|
|
|
sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)")
|
|
.bind(10_i32)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
tx.commit().await?;
|
|
|
|
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(count, 1);
|
|
|
|
// begin .. (drop)
|
|
|
|
{
|
|
let mut tx = conn.begin().await?;
|
|
|
|
sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)")
|
|
.bind(20_i32)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
}
|
|
|
|
conn = new::<Postgres>().await?;
|
|
|
|
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(count, 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_work_with_nested_transactions() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_2523 (id INTEGER PRIMARY KEY)")
|
|
.await?;
|
|
|
|
conn.execute("TRUNCATE _sqlx_users_2523").await?;
|
|
|
|
// begin
|
|
let mut tx = conn.begin().await?; // transaction
|
|
|
|
// insert a user
|
|
sqlx::query("INSERT INTO _sqlx_users_2523 (id) VALUES ($1)")
|
|
.bind(50_i32)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
// begin once more
|
|
let mut tx2 = tx.begin().await?; // savepoint
|
|
|
|
// insert another user
|
|
sqlx::query("INSERT INTO _sqlx_users_2523 (id) VALUES ($1)")
|
|
.bind(10_i32)
|
|
.execute(&mut *tx2)
|
|
.await?;
|
|
|
|
// never mind, rollback
|
|
tx2.rollback().await?; // roll that one back
|
|
|
|
// did we really?
|
|
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_2523")
|
|
.fetch_one(&mut *tx)
|
|
.await?;
|
|
|
|
assert_eq!(count, 1);
|
|
|
|
// actually, commit
|
|
tx.commit().await?;
|
|
|
|
// did we really?
|
|
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_2523")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(count, 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_drop_multiple_transactions() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_3952 (id INTEGER PRIMARY KEY)")
|
|
.await?;
|
|
|
|
conn.execute("TRUNCATE _sqlx_users_3952").await?;
|
|
|
|
// begin .. (drop)
|
|
|
|
// run 2 times to see what happens if we drop transactions repeatedly
|
|
for _ in 0..2 {
|
|
{
|
|
let mut tx = conn.begin().await?;
|
|
|
|
// do actually something before dropping
|
|
let _user = sqlx::query("INSERT INTO _sqlx_users_3952 (id) VALUES ($1) RETURNING id")
|
|
.bind(20_i32)
|
|
.fetch_one(&mut *tx)
|
|
.await?;
|
|
}
|
|
|
|
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_3952")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(count, 0);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// run with `cargo test --features postgres -- --ignored --nocapture pool_smoke_test`
|
|
#[ignore]
|
|
#[sqlx_macros::test]
|
|
async fn pool_smoke_test() -> anyhow::Result<()> {
|
|
use futures::{future, task::Poll, Future};
|
|
|
|
eprintln!("starting pool");
|
|
|
|
let pool = PgPoolOptions::new()
|
|
.acquire_timeout(Duration::from_secs(5))
|
|
.min_connections(1)
|
|
.max_connections(1)
|
|
.connect(&dotenvy::var("DATABASE_URL")?)
|
|
.await?;
|
|
|
|
// spin up more tasks than connections available, and ensure we don't deadlock
|
|
for i in 0..200 {
|
|
let pool = pool.clone();
|
|
sqlx_core::rt::spawn(async move {
|
|
for j in 0.. {
|
|
if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await {
|
|
// normal error at termination of the test
|
|
if matches!(e, sqlx::Error::PoolClosed) {
|
|
eprintln!("pool task {i} exiting normally after {j} iterations");
|
|
} else {
|
|
eprintln!("pool task {i} dying due to {e} after {j} iterations");
|
|
}
|
|
break;
|
|
}
|
|
|
|
// shouldn't be necessary if the pool is fair
|
|
// sqlx_core::rt::yield_now().await;
|
|
}
|
|
});
|
|
}
|
|
|
|
// spawn a bunch of tasks that attempt to acquire but give up to ensure correct handling
|
|
// of cancellations
|
|
for _ in 0..50 {
|
|
let pool = pool.clone();
|
|
sqlx_core::rt::spawn(async move {
|
|
while !pool.is_closed() {
|
|
let acquire = pool.acquire();
|
|
futures::pin_mut!(acquire);
|
|
|
|
// poll the acquire future once to put the waiter in the queue
|
|
future::poll_fn(move |cx| {
|
|
let _ = acquire.as_mut().poll(cx);
|
|
Poll::Ready(())
|
|
})
|
|
.await;
|
|
|
|
// this one is necessary since this is a hot loop,
|
|
// otherwise this task will never be descheduled
|
|
sqlx_core::rt::yield_now().await;
|
|
}
|
|
});
|
|
}
|
|
|
|
eprintln!("sleeping for 30 seconds");
|
|
|
|
sqlx_core::rt::sleep(Duration::from_secs(30)).await;
|
|
|
|
// assert_eq!(pool.size(), 10);
|
|
|
|
eprintln!("closing pool");
|
|
|
|
sqlx_core::rt::timeout(Duration::from_secs(30), pool.close()).await?;
|
|
|
|
eprintln!("pool closed successfully");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_invalid_query() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
conn.execute("definitely not a correct query")
|
|
.await
|
|
.unwrap_err();
|
|
|
|
let mut s = conn.fetch("select 1");
|
|
let row = s.try_next().await?.unwrap();
|
|
|
|
assert_eq!(row.get::<i32, _>(0), 1i32);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Tests the edge case of executing a completely empty query string.
|
|
///
|
|
/// This gets flagged as an `EmptyQueryResponse` in Postgres. We
|
|
/// catch this and just return no rows.
|
|
#[sqlx_macros::test]
|
|
async fn test_empty_query() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
let done = conn.execute("").await?;
|
|
|
|
assert_eq!(done.rows_affected(), 0);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Test a simple select expression. This should return the row.
|
|
#[sqlx_macros::test]
|
|
async fn test_select_expression() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let mut s = conn.fetch("SELECT 5");
|
|
let row = s.try_next().await?.unwrap();
|
|
|
|
assert!(5i32 == row.try_get::<i32, _>(0)?);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Test that we can interleave reads and writes to the database
|
|
/// in one simple query. Using the `Cursor` API we should be
|
|
/// able to fetch from both queries in sequence.
|
|
#[sqlx_macros::test]
|
|
async fn test_multi_read_write() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let mut s = conn.fetch(
|
|
"
|
|
CREATE TABLE IF NOT EXISTS _sqlx_test_postgres_5112 (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
text TEXT NOT NULL
|
|
);
|
|
|
|
SELECT 'Hello World' as _1;
|
|
|
|
INSERT INTO _sqlx_test_postgres_5112 (text) VALUES ('this is a test');
|
|
|
|
SELECT id, text FROM _sqlx_test_postgres_5112;
|
|
",
|
|
);
|
|
|
|
let row = s.try_next().await?.unwrap();
|
|
|
|
assert!("Hello World" == row.try_get::<&str, _>("_1")?);
|
|
|
|
let row = s.try_next().await?.unwrap();
|
|
|
|
let id: i64 = row.try_get("id")?;
|
|
let text: &str = row.try_get("text")?;
|
|
|
|
assert_eq!(1_i64, id);
|
|
assert_eq!("this is a test", text);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_caches_statements() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
for i in 0..2 {
|
|
let row = sqlx::query("SELECT $1 AS val")
|
|
.bind(Oid(i))
|
|
.persistent(true)
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
let val: Oid = row.get("val");
|
|
|
|
assert_eq!(Oid(i), val);
|
|
}
|
|
|
|
assert_eq!(1, conn.cached_statements_size());
|
|
conn.clear_cached_statements().await?;
|
|
assert_eq!(0, conn.cached_statements_size());
|
|
|
|
for i in 0..2 {
|
|
let row = sqlx::query("SELECT $1 AS val")
|
|
.bind(Oid(i))
|
|
.persistent(false)
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
let val: Oid = row.get("val");
|
|
|
|
assert_eq!(Oid(i), val);
|
|
}
|
|
|
|
assert_eq!(0, conn.cached_statements_size());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_closes_statement_from_cache_issue_470() -> anyhow::Result<()> {
|
|
sqlx_test::setup_if_needed();
|
|
|
|
let mut options: PgConnectOptions = env::var("DATABASE_URL")?.parse().unwrap();
|
|
|
|
// a capacity of 1 means that before each statement (after the first)
|
|
// we will close the previous statement
|
|
options = options.statement_cache_capacity(1);
|
|
|
|
let mut conn = PgConnection::connect_with(&options).await?;
|
|
|
|
for i in 0..5 {
|
|
let row = sqlx::query(&*format!("SELECT {i}::int4 AS val"))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
let val: i32 = row.get("val");
|
|
|
|
assert_eq!(i, val);
|
|
}
|
|
|
|
assert_eq!(1, conn.cached_statements_size());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_sets_application_name() -> anyhow::Result<()> {
|
|
sqlx_test::setup_if_needed();
|
|
|
|
let mut options: PgConnectOptions = env::var("DATABASE_URL")?.parse().unwrap();
|
|
options = options.application_name("some-name");
|
|
|
|
let mut conn = PgConnection::connect_with(&options).await?;
|
|
|
|
let row = sqlx::query("select current_setting('application_name') as app_name")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
let val: String = row.get("app_name");
|
|
|
|
assert_eq!("some-name", &val);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_handle_parameter_status_message_issue_484() -> anyhow::Result<()> {
|
|
new::<Postgres>().await?.execute("SET NAMES 'UTF8'").await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_prepare_then_execute() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
let mut tx = conn.begin().await?;
|
|
|
|
let tweet_id: i64 =
|
|
sqlx::query_scalar("INSERT INTO tweet ( text ) VALUES ( 'Hello, World' ) RETURNING id")
|
|
.fetch_one(&mut *tx)
|
|
.await?;
|
|
|
|
let statement = tx.prepare("SELECT * FROM tweet WHERE id = $1").await?;
|
|
|
|
assert_eq!(statement.column(0).name(), "id");
|
|
assert_eq!(statement.column(1).name(), "created_at");
|
|
assert_eq!(statement.column(2).name(), "text");
|
|
assert_eq!(statement.column(3).name(), "owner_id");
|
|
|
|
assert_eq!(statement.column(0).type_info().name(), "INT8");
|
|
assert_eq!(statement.column(1).type_info().name(), "TIMESTAMPTZ");
|
|
assert_eq!(statement.column(2).type_info().name(), "TEXT");
|
|
assert_eq!(statement.column(3).type_info().name(), "INT8");
|
|
|
|
let row = statement.query().bind(tweet_id).fetch_one(&mut *tx).await?;
|
|
let tweet_text: &str = row.try_get("text")?;
|
|
|
|
assert_eq!(tweet_text, "Hello, World");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// repro is more reliable with the basic scheduler used by `#[tokio::test]`
|
|
#[cfg(feature = "_rt-tokio")]
|
|
#[tokio::test]
|
|
async fn test_issue_622() -> anyhow::Result<()> {
|
|
use std::time::Instant;
|
|
|
|
setup_if_needed();
|
|
|
|
let pool = PgPoolOptions::new()
|
|
.max_connections(1) // also fails with higher counts, e.g. 5
|
|
.connect(&std::env::var("DATABASE_URL").unwrap())
|
|
.await?;
|
|
|
|
println!("pool state: {pool:?}");
|
|
|
|
let mut handles = vec![];
|
|
|
|
// given repro spawned 100 tasks but I found it reliably reproduced with 3
|
|
for i in 0..3 {
|
|
let pool = pool.clone();
|
|
|
|
handles.push(sqlx_core::rt::spawn(async move {
|
|
{
|
|
let mut conn = pool.acquire().await.unwrap();
|
|
|
|
let _ = sqlx::query("SELECT 1").fetch_one(&mut *conn).await.unwrap();
|
|
|
|
// conn gets dropped here and should be returned to the pool
|
|
}
|
|
|
|
// (do some other work here without holding on to a connection)
|
|
// this actually fixes the issue, depending on the timeout used
|
|
// sqlx_core::rt::sleep(Duration::from_millis(500)).await;
|
|
|
|
{
|
|
let start = Instant::now();
|
|
match pool.acquire().await {
|
|
Ok(conn) => {
|
|
println!("{} acquire took {:?}", i, start.elapsed());
|
|
drop(conn);
|
|
}
|
|
Err(e) => panic!("{i} acquire returned error: {e} pool state: {pool:?}"),
|
|
}
|
|
}
|
|
|
|
Result::<(), anyhow::Error>::Ok(())
|
|
}));
|
|
}
|
|
|
|
futures::future::try_join_all(handles).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_describe_outer_join_nullable() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
// test nullability inference for various joins
|
|
|
|
// inner join, nullability should not be overridden
|
|
// language=PostgreSQL
|
|
let describe = conn
|
|
.describe(
|
|
"select tweet.id
|
|
from tweet
|
|
inner join products on products.name = tweet.text",
|
|
)
|
|
.await?;
|
|
|
|
assert_eq!(describe.nullable(0), Some(false));
|
|
|
|
// language=PostgreSQL
|
|
let describe = conn
|
|
.describe(
|
|
"select tweet.id
|
|
from (values (null)) vals(val)
|
|
left join tweet on false",
|
|
)
|
|
.await?;
|
|
|
|
// tweet.id is marked NOT NULL but it's brought in from a left-join here
|
|
// which should make it nullable
|
|
assert_eq!(describe.nullable(0), Some(true));
|
|
|
|
// make sure we don't mis-infer for the outer half of the join
|
|
// language=PostgreSQL
|
|
let describe = conn
|
|
.describe(
|
|
"select tweet1.id, tweet2.id
|
|
from tweet tweet1
|
|
left join tweet tweet2 on false",
|
|
)
|
|
.await?;
|
|
|
|
assert_eq!(describe.nullable(0), Some(false));
|
|
assert_eq!(describe.nullable(1), Some(true));
|
|
|
|
// right join, nullability should be inverted
|
|
// language=PostgreSQL
|
|
let describe = conn
|
|
.describe(
|
|
"select tweet1.id, tweet2.id
|
|
from tweet tweet1
|
|
right join tweet tweet2 on false",
|
|
)
|
|
.await?;
|
|
|
|
assert_eq!(describe.nullable(0), Some(true));
|
|
assert_eq!(describe.nullable(1), Some(false));
|
|
|
|
// full outer join, both tables are nullable
|
|
// language=PostgreSQL
|
|
let describe = conn
|
|
.describe(
|
|
"select tweet1.id, tweet2.id
|
|
from tweet tweet1
|
|
full join tweet tweet2 on false",
|
|
)
|
|
.await?;
|
|
|
|
assert_eq!(describe.nullable(0), Some(true));
|
|
assert_eq!(describe.nullable(1), Some(true));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_listener_cleanup() -> anyhow::Result<()> {
|
|
use sqlx_core::rt::timeout;
|
|
|
|
use sqlx::pool::PoolOptions;
|
|
use sqlx::postgres::PgListener;
|
|
|
|
// Create a connection on which to send notifications
|
|
let mut notify_conn = new::<Postgres>().await?;
|
|
|
|
// Create a pool with exactly one connection so we can
|
|
// deterministically test the cleanup.
|
|
let pool = PoolOptions::<Postgres>::new()
|
|
.min_connections(1)
|
|
.max_connections(1)
|
|
.test_before_acquire(true)
|
|
.connect(&env::var("DATABASE_URL")?)
|
|
.await?;
|
|
|
|
let mut listener = PgListener::connect_with(&pool).await?;
|
|
listener.listen("test_channel").await?;
|
|
|
|
// Checks for a notification on the test channel
|
|
async fn try_recv(listener: &mut PgListener) -> anyhow::Result<bool> {
|
|
match timeout(Duration::from_millis(100), listener.recv()).await {
|
|
Ok(res) => {
|
|
res?;
|
|
Ok(true)
|
|
}
|
|
Err(_) => Ok(false),
|
|
}
|
|
}
|
|
|
|
// Check no notification is received before one is sent
|
|
assert!(!try_recv(&mut listener).await?, "Notification not sent");
|
|
|
|
// Check notification is sent and received
|
|
notify_conn.execute("NOTIFY test_channel").await?;
|
|
assert!(
|
|
try_recv(&mut listener).await?,
|
|
"Notification sent and received"
|
|
);
|
|
assert!(
|
|
!try_recv(&mut listener).await?,
|
|
"Notification is not duplicated"
|
|
);
|
|
|
|
// Test that cleanup stops listening on the channel
|
|
drop(listener);
|
|
let mut listener = PgListener::connect_with(&pool).await?;
|
|
|
|
// Check notification is sent but not received
|
|
notify_conn.execute("NOTIFY test_channel").await?;
|
|
assert!(
|
|
!try_recv(&mut listener).await?,
|
|
"Notification is not received on fresh listener"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_pg_listener_allows_pool_to_close() -> anyhow::Result<()> {
|
|
let pool = pool::<Postgres>().await?;
|
|
|
|
// acquires and holds a connection which would normally prevent the pool from closing
|
|
let mut listener = PgListener::connect_with(&pool).await?;
|
|
|
|
sqlx_core::rt::spawn(async move {
|
|
listener.recv().await.unwrap();
|
|
});
|
|
|
|
// would previously hang forever since `PgListener` had no way to know the pool wanted to close
|
|
pool.close().await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_supports_domain_types_in_composite_domain_types() -> anyhow::Result<()> {
|
|
// Only supported in Postgres 11+
|
|
let mut conn = new::<Postgres>().await?;
|
|
if matches!(conn.server_version_num(), Some(version) if version < 110000) {
|
|
return Ok(());
|
|
}
|
|
|
|
conn.execute(
|
|
r#"
|
|
DROP TABLE IF EXISTS heating_bills;
|
|
DROP DOMAIN IF EXISTS winter_year_month;
|
|
DROP TYPE IF EXISTS year_month;
|
|
DROP DOMAIN IF EXISTS month_id;
|
|
|
|
CREATE DOMAIN month_id AS INT2 CHECK (1 <= value AND value <= 12);
|
|
CREATE TYPE year_month AS (year INT4, month month_id);
|
|
CREATE DOMAIN winter_year_month AS year_month CHECK ((value).month <= 3);
|
|
CREATE TABLE heating_bills (
|
|
month winter_year_month NOT NULL PRIMARY KEY,
|
|
cost INT4 NOT NULL
|
|
);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
struct MonthId(i16);
|
|
|
|
impl sqlx::Type<Postgres> for MonthId {
|
|
fn type_info() -> sqlx::postgres::PgTypeInfo {
|
|
sqlx::postgres::PgTypeInfo::with_name("month_id")
|
|
}
|
|
|
|
fn compatible(ty: &sqlx::postgres::PgTypeInfo) -> bool {
|
|
*ty == Self::type_info()
|
|
}
|
|
}
|
|
|
|
impl<'r> sqlx::Decode<'r, Postgres> for MonthId {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
Ok(Self(<i16 as sqlx::Decode<Postgres>>::decode(value)?))
|
|
}
|
|
}
|
|
|
|
impl<'q> sqlx::Encode<'q, Postgres> for MonthId {
|
|
fn encode_by_ref(
|
|
&self,
|
|
buf: &mut sqlx::postgres::PgArgumentBuffer,
|
|
) -> sqlx::encode::IsNull {
|
|
<i16 as sqlx::Encode<Postgres>>::encode(self.0, buf)
|
|
}
|
|
}
|
|
|
|
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
struct WinterYearMonth {
|
|
year: i32,
|
|
month: MonthId,
|
|
}
|
|
|
|
impl sqlx::Type<Postgres> for WinterYearMonth {
|
|
fn type_info() -> sqlx::postgres::PgTypeInfo {
|
|
sqlx::postgres::PgTypeInfo::with_name("winter_year_month")
|
|
}
|
|
|
|
fn compatible(ty: &sqlx::postgres::PgTypeInfo) -> bool {
|
|
*ty == Self::type_info()
|
|
}
|
|
}
|
|
|
|
impl<'r> sqlx::Decode<'r, Postgres> for WinterYearMonth {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
let mut decoder = sqlx::postgres::types::PgRecordDecoder::new(value)?;
|
|
|
|
let year = decoder.try_decode::<i32>()?;
|
|
let month = decoder.try_decode::<MonthId>()?;
|
|
|
|
Ok(Self { year, month })
|
|
}
|
|
}
|
|
|
|
impl<'q> sqlx::Encode<'q, Postgres> for WinterYearMonth {
|
|
fn encode_by_ref(
|
|
&self,
|
|
buf: &mut sqlx::postgres::PgArgumentBuffer,
|
|
) -> sqlx::encode::IsNull {
|
|
let mut encoder = sqlx::postgres::types::PgRecordEncoder::new(buf);
|
|
encoder.encode(self.year);
|
|
encoder.encode(self.month);
|
|
encoder.finish();
|
|
sqlx::encode::IsNull::No
|
|
}
|
|
}
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let result = sqlx::query("DELETE FROM heating_bills;")
|
|
.execute(&mut conn)
|
|
.await;
|
|
|
|
let result = result.unwrap();
|
|
assert_eq!(result.rows_affected(), 0);
|
|
|
|
let result =
|
|
sqlx::query("INSERT INTO heating_bills(month, cost) VALUES($1::winter_year_month, 100);")
|
|
.bind(WinterYearMonth {
|
|
year: 2021,
|
|
month: MonthId(1),
|
|
})
|
|
.execute(&mut conn)
|
|
.await;
|
|
|
|
let result = result.unwrap();
|
|
assert_eq!(result.rows_affected(), 1);
|
|
|
|
let result = sqlx::query("DELETE FROM heating_bills;")
|
|
.execute(&mut conn)
|
|
.await;
|
|
|
|
let result = result.unwrap();
|
|
assert_eq!(result.rows_affected(), 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_resolves_custom_type_in_array() -> anyhow::Result<()> {
|
|
// Only supported in Postgres 11+
|
|
let mut conn = new::<Postgres>().await?;
|
|
if matches!(conn.server_version_num(), Some(version) if version < 110000) {
|
|
return Ok(());
|
|
}
|
|
|
|
// language=PostgreSQL
|
|
conn.execute(
|
|
r#"
|
|
DROP TABLE IF EXISTS pets;
|
|
DROP TYPE IF EXISTS pet_name_and_race;
|
|
|
|
CREATE TYPE pet_name_and_race AS (
|
|
name TEXT,
|
|
race TEXT
|
|
);
|
|
CREATE TABLE pets (
|
|
owner TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
race TEXT NOT NULL,
|
|
PRIMARY KEY (owner, name)
|
|
);
|
|
INSERT INTO pets(owner, name, race)
|
|
VALUES
|
|
('Alice', 'Foo', 'cat');
|
|
INSERT INTO pets(owner, name, race)
|
|
VALUES
|
|
('Alice', 'Bar', 'dog');
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
struct PetNameAndRace {
|
|
name: String,
|
|
race: String,
|
|
}
|
|
|
|
impl sqlx::Type<Postgres> for PetNameAndRace {
|
|
fn type_info() -> sqlx::postgres::PgTypeInfo {
|
|
sqlx::postgres::PgTypeInfo::with_name("pet_name_and_race")
|
|
}
|
|
}
|
|
|
|
impl<'r> sqlx::Decode<'r, Postgres> for PetNameAndRace {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
let mut decoder = sqlx::postgres::types::PgRecordDecoder::new(value)?;
|
|
let name = decoder.try_decode::<String>()?;
|
|
let race = decoder.try_decode::<String>()?;
|
|
Ok(Self { name, race })
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
struct PetNameAndRaceArray(Vec<PetNameAndRace>);
|
|
|
|
impl sqlx::Type<Postgres> for PetNameAndRaceArray {
|
|
fn type_info() -> sqlx::postgres::PgTypeInfo {
|
|
// Array type name is the name of the element type prefixed with `_`
|
|
sqlx::postgres::PgTypeInfo::with_name("_pet_name_and_race")
|
|
}
|
|
}
|
|
|
|
impl<'r> sqlx::Decode<'r, Postgres> for PetNameAndRaceArray {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
Ok(Self(Vec::<PetNameAndRace>::decode(value)?))
|
|
}
|
|
}
|
|
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let row = sqlx::query("select owner, array_agg(row(name, race)::pet_name_and_race) as pets from pets group by owner")
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
let pets: PetNameAndRaceArray = row.get("pets");
|
|
|
|
assert_eq!(pets.0.len(), 2);
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_resolves_custom_types_in_anonymous_records() -> anyhow::Result<()> {
|
|
use sqlx_core::error::Error;
|
|
// This request involves nested records and array types.
|
|
|
|
// Only supported in Postgres 11+
|
|
let mut conn = new::<Postgres>().await?;
|
|
if matches!(conn.server_version_num(), Some(version) if version < 110000) {
|
|
return Ok(());
|
|
}
|
|
|
|
// language=PostgreSQL
|
|
conn.execute(
|
|
r#"
|
|
DROP TABLE IF EXISTS repo_users;
|
|
DROP TABLE IF EXISTS repositories;
|
|
DROP TABLE IF EXISTS repo_memberships;
|
|
DROP TYPE IF EXISTS repo_member;
|
|
|
|
CREATE TABLE repo_users (
|
|
user_id INT4 NOT NULL,
|
|
username TEXT NOT NULL,
|
|
PRIMARY KEY (user_id)
|
|
);
|
|
CREATE TABLE repositories (
|
|
repo_id INT4 NOT NULL,
|
|
repo_name TEXT NOT NULL,
|
|
PRIMARY KEY (repo_id)
|
|
);
|
|
CREATE TABLE repo_memberships (
|
|
repo_id INT4 NOT NULL,
|
|
user_id INT4 NOT NULL,
|
|
permission TEXT NOT NULL,
|
|
PRIMARY KEY (repo_id, user_id)
|
|
);
|
|
CREATE TYPE repo_member AS (
|
|
user_id INT4,
|
|
permission TEXT
|
|
);
|
|
INSERT INTO repo_users(user_id, username)
|
|
VALUES
|
|
(101, 'alice'),
|
|
(102, 'bob'),
|
|
(103, 'charlie');
|
|
INSERT INTO repositories(repo_id, repo_name)
|
|
VALUES
|
|
(201, 'rust'),
|
|
(202, 'sqlx'),
|
|
(203, 'hello-world');
|
|
INSERT INTO repo_memberships(repo_id, user_id, permission)
|
|
VALUES
|
|
(201, 101, 'admin'),
|
|
(201, 102, 'write'),
|
|
(201, 103, 'read'),
|
|
(202, 102, 'admin');
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
struct RepoMember {
|
|
user_id: i32,
|
|
permission: String,
|
|
}
|
|
|
|
impl sqlx::Type<Postgres> for RepoMember {
|
|
fn type_info() -> sqlx::postgres::PgTypeInfo {
|
|
sqlx::postgres::PgTypeInfo::with_name("repo_member")
|
|
}
|
|
}
|
|
|
|
impl<'r> sqlx::Decode<'r, Postgres> for RepoMember {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
let mut decoder = sqlx::postgres::types::PgRecordDecoder::new(value)?;
|
|
let user_id = decoder.try_decode::<i32>()?;
|
|
let permission = decoder.try_decode::<String>()?;
|
|
Ok(Self {
|
|
user_id,
|
|
permission,
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
struct RepoMemberArray(Vec<RepoMember>);
|
|
|
|
impl sqlx::Type<Postgres> for RepoMemberArray {
|
|
fn type_info() -> sqlx::postgres::PgTypeInfo {
|
|
// Array type name is the name of the element type prefixed with `_`
|
|
sqlx::postgres::PgTypeInfo::with_name("_repo_member")
|
|
}
|
|
}
|
|
|
|
impl<'r> sqlx::Decode<'r, Postgres> for RepoMemberArray {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
Ok(Self(Vec::<RepoMember>::decode(value)?))
|
|
}
|
|
}
|
|
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
#[derive(Debug, sqlx::FromRow)]
|
|
#[allow(dead_code)] // We don't actually read these fields.
|
|
struct Row {
|
|
count: i64,
|
|
items: Vec<(i32, String, RepoMemberArray)>,
|
|
}
|
|
// language=PostgreSQL
|
|
let row: Result<Row, Error> = sqlx::query_as::<_, Row>(
|
|
r"
|
|
WITH
|
|
members_by_repo AS (
|
|
SELECT repo_id,
|
|
ARRAY_AGG(ROW (user_id, permission)::repo_member) AS members
|
|
FROM repo_memberships
|
|
GROUP BY repo_id
|
|
),
|
|
repos AS (
|
|
SELECT repo_id, repo_name, COALESCE(members, '{}') AS members
|
|
FROM repositories
|
|
LEFT OUTER JOIN members_by_repo USING (repo_id)
|
|
ORDER BY repo_id
|
|
),
|
|
repo_array AS (
|
|
SELECT COALESCE(ARRAY_AGG(repos.*), '{}') AS items
|
|
FROM repos
|
|
),
|
|
repo_count AS (
|
|
SELECT COUNT(*) AS count
|
|
FROM repos
|
|
)
|
|
SELECT count, items
|
|
FROM repo_count, repo_array
|
|
;
|
|
",
|
|
)
|
|
.fetch_one(&mut conn)
|
|
.await;
|
|
|
|
// This test currently tests mitigations for `#1672` (use regular errors
|
|
// instead of panics). Once we fully support custom types, it should be
|
|
// updated accordingly.
|
|
match row {
|
|
Ok(_) => panic!("full support for custom types is not implemented yet"),
|
|
Err(e) => assert!(e
|
|
.to_string()
|
|
.contains("custom types in records are not fully supported yet")),
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn custom_type_resolution_respects_search_path() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
conn.execute(
|
|
r#"
|
|
DROP TYPE IF EXISTS some_enum_type;
|
|
DROP SCHEMA IF EXISTS another CASCADE;
|
|
|
|
CREATE SCHEMA another;
|
|
CREATE TYPE some_enum_type AS ENUM ('a', 'b', 'c');
|
|
CREATE TYPE another.some_enum_type AS ENUM ('d', 'e', 'f');
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
struct SomeEnumType(String);
|
|
|
|
impl sqlx::Type<Postgres> for SomeEnumType {
|
|
fn type_info() -> sqlx::postgres::PgTypeInfo {
|
|
sqlx::postgres::PgTypeInfo::with_name("some_enum_type")
|
|
}
|
|
|
|
fn compatible(ty: &sqlx::postgres::PgTypeInfo) -> bool {
|
|
*ty == Self::type_info()
|
|
}
|
|
}
|
|
|
|
impl<'r> sqlx::Decode<'r, Postgres> for SomeEnumType {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
Ok(Self(<String as sqlx::Decode<Postgres>>::decode(value)?))
|
|
}
|
|
}
|
|
|
|
impl<'q> sqlx::Encode<'q, Postgres> for SomeEnumType {
|
|
fn encode_by_ref(
|
|
&self,
|
|
buf: &mut sqlx::postgres::PgArgumentBuffer,
|
|
) -> sqlx::encode::IsNull {
|
|
<String as sqlx::Encode<Postgres>>::encode_by_ref(&self.0, buf)
|
|
}
|
|
}
|
|
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
sqlx::query("set search_path = 'another'")
|
|
.execute(&mut conn)
|
|
.await?;
|
|
|
|
let result = sqlx::query("SELECT 1 WHERE $1::some_enum_type = 'd'::some_enum_type;")
|
|
.bind(SomeEnumType("d".into()))
|
|
.fetch_all(&mut conn)
|
|
.await;
|
|
|
|
let result = result.unwrap();
|
|
assert_eq!(result.len(), 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_pg_server_num() -> anyhow::Result<()> {
|
|
let conn = new::<Postgres>().await?;
|
|
|
|
assert!(conn.server_version_num().is_some());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_copy_in() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
conn.execute(
|
|
r#"
|
|
CREATE TEMPORARY TABLE users (id INTEGER NOT NULL);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
let mut copy = conn
|
|
.copy_in_raw(
|
|
r#"
|
|
COPY users (id) FROM STDIN WITH (FORMAT CSV, HEADER);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
copy.send("id\n1\n2\n".as_bytes()).await?;
|
|
let rows = copy.finish().await?;
|
|
assert_eq!(rows, 2);
|
|
|
|
// conn is safe for reuse
|
|
let value = sqlx::query("select 1 + 1")
|
|
.try_map(|row: PgRow| row.try_get::<i32, _>(0))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(2i32, value);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_abort_copy_in() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
conn.execute(
|
|
r#"
|
|
CREATE TEMPORARY TABLE users (id INTEGER NOT NULL);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
let copy = conn
|
|
.copy_in_raw(
|
|
r#"
|
|
COPY users (id) FROM STDIN WITH (FORMAT CSV, HEADER);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
copy.abort("this is only a test").await?;
|
|
|
|
// conn is safe for reuse
|
|
let value = sqlx::query("select 1 + 1")
|
|
.try_map(|row: PgRow| row.try_get::<i32, _>(0))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(2i32, value);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_can_copy_out() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
{
|
|
let mut copy = conn
|
|
.copy_out_raw(
|
|
"
|
|
COPY (SELECT generate_series(1, 2) AS id) TO STDOUT WITH (FORMAT CSV, HEADER);
|
|
",
|
|
)
|
|
.await?;
|
|
|
|
assert_eq!(copy.next().await.unwrap().unwrap(), "id\n");
|
|
assert_eq!(copy.next().await.unwrap().unwrap(), "1\n");
|
|
assert_eq!(copy.next().await.unwrap().unwrap(), "2\n");
|
|
if copy.next().await.is_some() {
|
|
anyhow::bail!("Unexpected data from COPY");
|
|
}
|
|
}
|
|
|
|
// conn is safe for reuse
|
|
let value = sqlx::query("select 1 + 1")
|
|
.try_map(|row: PgRow| row.try_get::<i32, _>(0))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(2i32, value);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn it_encodes_custom_array_issue_1504() -> anyhow::Result<()> {
|
|
use sqlx::encode::IsNull;
|
|
use sqlx::postgres::{PgArgumentBuffer, PgTypeInfo};
|
|
use sqlx::{Decode, Encode, Type, ValueRef};
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
enum Value {
|
|
String(String),
|
|
Number(i32),
|
|
Array(Vec<Value>),
|
|
}
|
|
|
|
impl<'r> Decode<'r, Postgres> for Value {
|
|
fn decode(
|
|
value: sqlx::postgres::PgValueRef<'r>,
|
|
) -> std::result::Result<Self, Box<dyn std::error::Error + 'static + Send + Sync>> {
|
|
let typ = value.type_info().into_owned();
|
|
|
|
if typ == PgTypeInfo::with_name("text") {
|
|
let s = <String as Decode<'_, Postgres>>::decode(value)?;
|
|
|
|
Ok(Self::String(s))
|
|
} else if typ == PgTypeInfo::with_name("int4") {
|
|
let n = <i32 as Decode<'_, Postgres>>::decode(value)?;
|
|
|
|
Ok(Self::Number(n))
|
|
} else if typ == PgTypeInfo::with_name("_text") {
|
|
let arr = Vec::<String>::decode(value)?;
|
|
let v = arr.into_iter().map(|s| Value::String(s)).collect();
|
|
|
|
Ok(Self::Array(v))
|
|
} else if typ == PgTypeInfo::with_name("_int4") {
|
|
let arr = Vec::<i32>::decode(value)?;
|
|
let v = arr.into_iter().map(|n| Value::Number(n)).collect();
|
|
|
|
Ok(Self::Array(v))
|
|
} else {
|
|
Err("unknown type".into())
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Encode<'_, Postgres> for Value {
|
|
fn produces(&self) -> Option<PgTypeInfo> {
|
|
match self {
|
|
Self::Array(a) => {
|
|
if a.len() < 1 {
|
|
return Some(PgTypeInfo::with_name("_text"));
|
|
}
|
|
|
|
match a[0] {
|
|
Self::String(_) => Some(PgTypeInfo::with_name("_text")),
|
|
Self::Number(_) => Some(PgTypeInfo::with_name("_int4")),
|
|
Self::Array(_) => None,
|
|
}
|
|
}
|
|
Self::String(_) => Some(PgTypeInfo::with_name("text")),
|
|
Self::Number(_) => Some(PgTypeInfo::with_name("int4")),
|
|
}
|
|
}
|
|
|
|
fn encode_by_ref(&self, buf: &mut PgArgumentBuffer) -> IsNull {
|
|
match self {
|
|
Value::String(s) => <String as Encode<'_, Postgres>>::encode_by_ref(s, buf),
|
|
Value::Number(n) => <i32 as Encode<'_, Postgres>>::encode_by_ref(n, buf),
|
|
Value::Array(arr) => arr.encode(buf),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Type<Postgres> for Value {
|
|
fn type_info() -> PgTypeInfo {
|
|
PgTypeInfo::with_name("unknown")
|
|
}
|
|
|
|
fn compatible(ty: &PgTypeInfo) -> bool {
|
|
[
|
|
PgTypeInfo::with_name("text"),
|
|
PgTypeInfo::with_name("_text"),
|
|
PgTypeInfo::with_name("int4"),
|
|
PgTypeInfo::with_name("_int4"),
|
|
]
|
|
.contains(ty)
|
|
}
|
|
}
|
|
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
let (row,): (Value,) = sqlx::query_as("SELECT $1::text[] as Dummy")
|
|
.bind(Value::Array(vec![
|
|
Value::String("Test 0".to_string()),
|
|
Value::String("Test 1".to_string()),
|
|
]))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(
|
|
row,
|
|
Value::Array(vec![
|
|
Value::String("Test 0".to_string()),
|
|
Value::String("Test 1".to_string()),
|
|
])
|
|
);
|
|
|
|
let (row,): (Value,) = sqlx::query_as("SELECT $1::int4[] as Dummy")
|
|
.bind(Value::Array(vec![
|
|
Value::Number(3),
|
|
Value::Number(2),
|
|
Value::Number(1),
|
|
]))
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(
|
|
row,
|
|
Value::Array(vec![Value::Number(3), Value::Number(2), Value::Number(1)])
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_issue_1254() -> anyhow::Result<()> {
|
|
#[derive(sqlx::Type)]
|
|
#[sqlx(type_name = "pair")]
|
|
struct Pair {
|
|
one: i32,
|
|
two: i32,
|
|
}
|
|
|
|
// array for custom type is not supported, use wrapper
|
|
#[derive(sqlx::Type)]
|
|
#[sqlx(type_name = "_pair")]
|
|
struct Pairs(Vec<Pair>);
|
|
|
|
let mut conn = new::<Postgres>().await?;
|
|
conn.execute(
|
|
"
|
|
DROP TABLE IF EXISTS issue_1254;
|
|
DROP TYPE IF EXISTS pair;
|
|
|
|
CREATE TYPE pair AS (one INT4, two INT4);
|
|
CREATE TABLE issue_1254 (id INT4 PRIMARY KEY, pairs PAIR[]);
|
|
",
|
|
)
|
|
.await?;
|
|
|
|
let result = sqlx::query("INSERT INTO issue_1254 VALUES($1, $2)")
|
|
.bind(0)
|
|
.bind(Pairs(vec![Pair { one: 94, two: 87 }]))
|
|
.execute(&mut conn)
|
|
.await?;
|
|
assert_eq!(result.rows_affected(), 1);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_advisory_locks() -> anyhow::Result<()> {
|
|
let pool = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&dotenvy::var("DATABASE_URL")?)
|
|
.await?;
|
|
|
|
let lock1 = Arc::new(PgAdvisoryLock::new("sqlx-postgres-tests-1"));
|
|
let lock2 = Arc::new(PgAdvisoryLock::new("sqlx-postgres-tests-2"));
|
|
|
|
let conn1 = pool.acquire().await?;
|
|
let mut conn1_lock1 = lock1.acquire(conn1).await?;
|
|
|
|
// try acquiring a recursive lock through a mutable reference then dropping
|
|
drop(lock1.acquire(&mut conn1_lock1).await?);
|
|
|
|
let conn2 = pool.acquire().await?;
|
|
// leak so we can take it across the task boundary
|
|
let conn2_lock2 = lock2.acquire(conn2).await?.leak();
|
|
|
|
sqlx_core::rt::spawn({
|
|
let lock1 = lock1.clone();
|
|
let lock2 = lock2.clone();
|
|
|
|
async move {
|
|
let conn2_lock2 = lock1.try_acquire(conn2_lock2).await?.right_or_else(|_| {
|
|
panic!(
|
|
"acquired lock but wasn't supposed to! Key: {:?}",
|
|
lock1.key()
|
|
)
|
|
});
|
|
|
|
let (conn2, released) = lock2.force_release(conn2_lock2).await?;
|
|
assert!(released);
|
|
|
|
// acquire both locks but let the pool release them
|
|
let conn2_lock1 = lock1.acquire(conn2).await?;
|
|
let _conn2_lock1and2 = lock2.acquire(conn2_lock1).await?;
|
|
|
|
anyhow::Ok(())
|
|
}
|
|
});
|
|
|
|
// acquire lock2 on conn1, we leak the lock1 guard so we can manually release it before lock2
|
|
let conn1_lock1and2 = lock2.acquire(conn1_lock1.leak()).await?;
|
|
|
|
// release lock1 while holding lock2
|
|
let (conn1_lock2, released) = lock1.force_release(conn1_lock1and2).await?;
|
|
assert!(released);
|
|
|
|
let conn1 = conn1_lock2.release_now().await?;
|
|
|
|
// acquire both locks to be sure they were released
|
|
{
|
|
let conn1_lock1 = lock1.acquire(conn1).await?;
|
|
let _conn1_lock1and2 = lock2.acquire(conn1_lock1).await?;
|
|
}
|
|
|
|
pool.close().await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_postgres_bytea_hex_deserialization_errors() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
conn.execute("SET bytea_output = 'escape';").await?;
|
|
for value in ["", "DEADBEEF"] {
|
|
let query = format!("SELECT '\\x{value}'::bytea");
|
|
let res: sqlx::Result<Vec<u8>> = conn.fetch_one(query.as_str()).await?.try_get(0usize);
|
|
// Deserialization only supports hex format so this should error and definitely not panic.
|
|
res.unwrap_err();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_shrink_buffers() -> anyhow::Result<()> {
|
|
// We don't really have a good way to test that `.shrink_buffers()` functions as expected
|
|
// without exposing a lot of internals, but we can at least be sure it doesn't
|
|
// materially affect the operation of the connection.
|
|
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
// The connection buffer is only 8 KiB by default so this should definitely force it to grow.
|
|
let data = vec![0u8; 32 * 1024];
|
|
|
|
let ret: Vec<u8> = sqlx::query_scalar("SELECT $1::bytea")
|
|
.bind(&data)
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(ret, data);
|
|
|
|
conn.shrink_buffers();
|
|
|
|
let ret: i64 = sqlx::query_scalar("SELECT $1::int8")
|
|
.bind(&12345678i64)
|
|
.fetch_one(&mut conn)
|
|
.await?;
|
|
|
|
assert_eq!(ret, 12345678i64);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
async fn test_error_handling_with_deferred_constraints() -> anyhow::Result<()> {
|
|
let mut conn = new::<Postgres>().await?;
|
|
|
|
sqlx::query("CREATE TABLE IF NOT EXISTS deferred_constraint ( id INTEGER PRIMARY KEY )")
|
|
.execute(&mut conn)
|
|
.await?;
|
|
|
|
sqlx::query("CREATE TABLE IF NOT EXISTS deferred_constraint_fk ( fk INTEGER CONSTRAINT deferred_fk REFERENCES deferred_constraint(id) DEFERRABLE INITIALLY DEFERRED )")
|
|
.execute(&mut conn)
|
|
.await?;
|
|
|
|
let result: sqlx::Result<i32> =
|
|
sqlx::query_scalar("INSERT INTO deferred_constraint_fk VALUES (1) RETURNING fk")
|
|
.fetch_one(&mut conn)
|
|
.await;
|
|
|
|
let err = result.unwrap_err();
|
|
let db_err = err.as_database_error().unwrap();
|
|
assert_eq!(db_err.constraint(), Some("deferred_fk"));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[sqlx_macros::test]
|
|
#[cfg(feature = "bigdecimal")]
|
|
async fn test_issue_3052() {
|
|
use sqlx::types::BigDecimal;
|
|
|
|
// https://github.com/launchbadge/sqlx/issues/3052
|
|
// Previously, attempting to bind a `BigDecimal` would panic if the value was out of range.
|
|
// Now, we rewrite it to a sentinel value so that Postgres will return a range error.
|
|
let too_small: BigDecimal = "1E-65536".parse().unwrap();
|
|
let too_large: BigDecimal = "1E262144".parse().unwrap();
|
|
|
|
let mut conn = new::<Postgres>().await.unwrap();
|
|
|
|
let too_small_res = sqlx::query_scalar::<_, BigDecimal>("SELECT $1::numeric")
|
|
.bind(&too_small)
|
|
.fetch_one(&mut conn)
|
|
.await;
|
|
|
|
match too_small_res {
|
|
Err(sqlx::Error::Database(dbe)) => {
|
|
let dbe = dbe.downcast::<PgDatabaseError>();
|
|
|
|
assert_eq!(dbe.code(), "22P03");
|
|
}
|
|
other => panic!("expected Err(DatabaseError), got {other:?}"),
|
|
}
|
|
|
|
let too_large_res = sqlx::query_scalar::<_, BigDecimal>("SELECT $1::numeric")
|
|
.bind(&too_large)
|
|
.fetch_one(&mut conn)
|
|
.await;
|
|
|
|
match too_large_res {
|
|
Err(sqlx::Error::Database(dbe)) => {
|
|
let dbe = dbe.downcast::<PgDatabaseError>();
|
|
|
|
assert_eq!(dbe.code(), "22P03");
|
|
}
|
|
other => panic!("expected Err(DatabaseError), got {other:?}"),
|
|
}
|
|
}
|