use futures::TryStreamExt; use sqlx::postgres::PgRow; use sqlx::{postgres::Postgres, Connection, Executor, Row}; use sqlx_core::postgres::{PgDatabaseError, PgErrorPosition, PgSeverity}; use sqlx_test::new; #[sqlx_macros::test] async fn it_connects() -> anyhow::Result<()> { let mut conn = new::().await?; let value = sqlx::query("select 1 + 1") .try_map(|row: PgRow| row.try_get::(0)) .fetch_one(&mut conn) .await?; assert_eq!(2i32, value); Ok(()) } #[sqlx_macros::test] async fn it_pings() -> anyhow::Result<()> { let mut conn = new::().await?; conn.ping().await?; Ok(()) } #[sqlx_macros::test] async fn it_maths() -> anyhow::Result<()> { let mut conn = new::().await?; let value = sqlx::query("select 1 + $1::int") .bind(5_i32) .try_map(|row: PgRow| row.try_get::(0)) .fetch_one(&mut conn) .await?; assert_eq!(6i32, value); Ok(()) } #[sqlx_macros::test] async fn it_can_inspect_errors() -> anyhow::Result<()> { let mut conn = new::().await?; let res: Result = sqlx::query("select f").execute(&mut conn).await; let err = res.unwrap_err(); // can also do [as_database_error] or use `match ..` let err = err.into_database_error().unwrap(); assert_eq!(err.message(), "column \"f\" does not exist"); assert_eq!(err.code().as_deref(), Some("42703")); // can also do [downcast_ref] let err: Box = err.downcast(); assert_eq!(err.severity(), PgSeverity::Error); assert_eq!(err.message(), "column \"f\" does not exist"); assert_eq!(err.code(), "42703"); assert_eq!(err.position(), Some(PgErrorPosition::Original(8))); assert_eq!(err.routine(), Some("errorMissingColumn")); Ok(()) } #[sqlx_macros::test] async fn it_executes() -> anyhow::Result<()> { let mut conn = new::().await?; let _ = conn .execute( r#" CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY); "#, ) .await?; for index in 1..=10_i32 { let cnt = sqlx::query("INSERT INTO users (id) VALUES ($1)") .bind(index) .execute(&mut conn) .await?; assert_eq!(cnt, 1); } let sum: i32 = sqlx::query("SELECT id FROM users") .try_map(|row: PgRow| row.try_get::(0)) .fetch(&mut conn) .try_fold(0_i32, |acc, x| async move { Ok(acc + x) }) .await?; assert_eq!(sum, 55); Ok(()) } // https://github.com/launchbadge/sqlx/issues/104 #[sqlx_macros::test] async fn it_can_return_interleaved_nulls_issue_104() -> anyhow::Result<()> { let mut conn = new::().await?; let tuple = sqlx::query("SELECT NULL, 10::INT, NULL, 20::INT, NULL, 40::INT, NULL, 80::INT") .try_map(|row: PgRow| { Ok(( row.get::, _>(0), row.get::, _>(1), row.get::, _>(2), row.get::, _>(3), row.get::, _>(4), row.get::, _>(5), row.get::, _>(6), row.get::, _>(7), )) }) .fetch_one(&mut conn) .await?; assert_eq!(tuple.0, None); assert_eq!(tuple.1, Some(10)); assert_eq!(tuple.2, None); assert_eq!(tuple.3, Some(20)); assert_eq!(tuple.4, None); assert_eq!(tuple.5, Some(40)); assert_eq!(tuple.6, None); assert_eq!(tuple.7, Some(80)); Ok(()) } #[sqlx_macros::test] async fn it_can_query_scalar() -> anyhow::Result<()> { let mut conn = new::().await?; let scalar: i32 = sqlx::query_scalar("SELECT 42").fetch_one(&mut conn).await?; assert_eq!(scalar, 42); let scalar: Option = sqlx::query_scalar("SELECT 42").fetch_one(&mut conn).await?; assert_eq!(scalar, Some(42)); let scalar: Option = sqlx::query_scalar("SELECT NULL") .fetch_one(&mut conn) .await?; assert_eq!(scalar, None); let scalar: Option = sqlx::query_scalar("SELECT 42::bigint") .fetch_optional(&mut conn) .await?; assert_eq!(scalar, Some(42)); let scalar: Option = sqlx::query_scalar("").fetch_optional(&mut conn).await?; assert_eq!(scalar, None); Ok(()) } #[sqlx_macros::test] /// This is seperate from `it_can_query_scalar` because while implementing it I ran into a /// bug which that prevented `Vec` from compiling but allowed Vec>. async fn it_can_query_all_scalar() -> anyhow::Result<()> { let mut conn = new::().await?; let scalar: Vec = sqlx::query_scalar("SELECT $1") .bind(42) .fetch_all(&mut conn) .await?; assert_eq!(scalar, vec![42]); let scalar: Vec> = sqlx::query_scalar("SELECT $1 UNION ALL SELECT NULL") .bind(42) .fetch_all(&mut conn) .await?; assert_eq!(scalar, vec![Some(42), None]); Ok(()) } #[sqlx_macros::test] async fn it_can_work_with_transactions() -> anyhow::Result<()> { let mut conn = new::().await?; conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_1922 (id INTEGER PRIMARY KEY)") .await?; conn.execute("TRUNCATE _sqlx_users_1922").await?; // begin .. rollback let mut tx = conn.begin().await?; sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)") .bind(10_i32) .execute(&mut tx) .await?; tx.rollback().await?; let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922") .fetch_one(&mut conn) .await?; assert_eq!(count, 0); // begin .. commit let mut tx = conn.begin().await?; sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)") .bind(10_i32) .execute(&mut tx) .await?; tx.commit().await?; let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922") .fetch_one(&mut conn) .await?; assert_eq!(count, 1); // begin .. (drop) { let mut tx = conn.begin().await?; sqlx::query("INSERT INTO _sqlx_users_1922 (id) VALUES ($1)") .bind(20_i32) .execute(&mut tx) .await?; } conn = new::().await?; let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_1922") .fetch_one(&mut conn) .await?; assert_eq!(count, 1); Ok(()) } #[sqlx_macros::test] async fn it_can_work_with_nested_transactions() -> anyhow::Result<()> { let mut conn = new::().await?; conn.execute("CREATE TABLE IF NOT EXISTS _sqlx_users_2523 (id INTEGER PRIMARY KEY)") .await?; conn.execute("TRUNCATE _sqlx_users_2523").await?; // begin let mut tx = conn.begin().await?; // insert a user sqlx::query("INSERT INTO _sqlx_users_2523 (id) VALUES ($1)") .bind(50_i32) .execute(&mut tx) .await?; // begin once more let mut tx2 = tx.begin().await?; // insert another user sqlx::query("INSERT INTO _sqlx_users_2523 (id) VALUES ($1)") .bind(10_i32) .execute(&mut tx2) .await?; // never mind, rollback tx2.rollback().await?; // did we really? let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_2523") .fetch_one(&mut tx) .await?; assert_eq!(count, 1); // actually, commit tx.commit().await?; // did we really? let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _sqlx_users_2523") .fetch_one(&mut conn) .await?; assert_eq!(count, 1); Ok(()) } // // run with `cargo test --features postgres -- --ignored --nocapture pool_smoke_test` // #[ignore] // #[cfg_attr(feature = "runtime-async-std", async_std::test)] // #[cfg_attr(feature = "runtime-tokio", tokio::test)] // async fn pool_smoke_test() -> anyhow::Result<()> { // #[cfg(feature = "runtime-tokio")] // use tokio::{task::spawn, time::delay_for as sleep, time::timeout}; // // #[cfg(feature = "runtime-async-std")] // use async_std::{future::timeout, task::sleep, task::spawn}; // // eprintln!("starting pool"); // // let pool = PgPool::builder() // .connect_timeout(Duration::from_secs(5)) // .min_size(5) // .max_size(10) // .build(&dotenv::var("DATABASE_URL")?) // .await?; // // // spin up more tasks than connections available, and ensure we don't deadlock // for i in 0..20 { // let pool = pool.clone(); // spawn(async move { // loop { // if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await { // eprintln!("pool task {} dying due to {}", i, e); // break; // } // } // }); // } // // for _ in 0..5 { // let pool = pool.clone(); // spawn(async move { // while !pool.is_closed() { // // drop acquire() futures in a hot loop // // https://github.com/launchbadge/sqlx/issues/83 // drop(pool.acquire()); // } // }); // } // // eprintln!("sleeping for 30 seconds"); // // sleep(Duration::from_secs(30)).await; // // assert_eq!(pool.size(), 10); // // eprintln!("closing pool"); // // timeout(Duration::from_secs(30), pool.close()).await?; // // eprintln!("pool closed successfully"); // // Ok(()) // } #[sqlx_macros::test] async fn test_invalid_query() -> anyhow::Result<()> { let mut conn = new::().await?; conn.execute("definitely not a correct query") .await .unwrap_err(); let mut s = conn.fetch("select 1"); let row = s.try_next().await?.unwrap(); assert_eq!(row.get::(0), 1i32); Ok(()) } /// Tests the edge case of executing a completely empty query string. /// /// This gets flagged as an `EmptyQueryResponse` in Postgres. We /// catch this and just return no rows. #[sqlx_macros::test] async fn test_empty_query() -> anyhow::Result<()> { let mut conn = new::().await?; let affected = conn.execute("").await?; assert_eq!(affected, 0); Ok(()) } /// Test a simple select expression. This should return the row. #[sqlx_macros::test] async fn test_select_expression() -> anyhow::Result<()> { let mut conn = new::().await?; let mut s = conn.fetch("SELECT 5"); let row = s.try_next().await?.unwrap(); assert!(5i32 == row.try_get::(0)?); Ok(()) } /// Test that we can interleave reads and writes to the database /// in one simple query. Using the `Cursor` API we should be /// able to fetch from both queries in sequence. #[sqlx_macros::test] async fn test_multi_read_write() -> anyhow::Result<()> { let mut conn = new::().await?; let mut s = conn.fetch( " CREATE TABLE IF NOT EXISTS _sqlx_test_postgres_5112 ( id BIGSERIAL PRIMARY KEY, text TEXT NOT NULL ); SELECT 'Hello World' as _1; INSERT INTO _sqlx_test_postgres_5112 (text) VALUES ('this is a test'); SELECT id, text FROM _sqlx_test_postgres_5112; ", ); let row = s.try_next().await?.unwrap(); assert!("Hello World" == row.try_get::<&str, _>("_1")?); let row = s.try_next().await?.unwrap(); let id: i64 = row.try_get("id")?; let text: &str = row.try_get("text")?; assert_eq!(1_i64, id); assert_eq!("this is a test", text); Ok(()) }