mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-10-03 07:45:30 +00:00
Fix: nextest cleanup race condition (#3334)
* remove unused trait fn `cleanup_test_dbs` * *wip* solve test cleanup race condition * check for exactly 63 chars in database name * move base64 dependency * change * Use url_safe base64 encoding * Assert quoting for database name * refactor * add mysql support? * borrow * fix borrows * ensure quoting * re-add trait cleanup_test_dbs * fix mysql insert * cargo lock * use actual field * cleanup converted path in sqlite * replace dashes with underscore in db name * refactor: remove redundant path conversion in cleanup_test and add db_name method --------- Co-authored-by: Austin Bonander <austin.bonander@gmail.com>
This commit is contained in:
parent
aae800090b
commit
a83395a360
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3563,6 +3563,7 @@ version = "0.8.3"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"async-io 1.13.0",
|
"async-io 1.13.0",
|
||||||
"async-std",
|
"async-std",
|
||||||
|
"base64 0.22.0",
|
||||||
"bigdecimal",
|
"bigdecimal",
|
||||||
"bit-vec",
|
"bit-vec",
|
||||||
"bstr",
|
"bstr",
|
||||||
|
@ -54,6 +54,7 @@ mac_address = { workspace = true, optional = true }
|
|||||||
uuid = { workspace = true, optional = true }
|
uuid = { workspace = true, optional = true }
|
||||||
|
|
||||||
async-io = { version = "1.9.0", optional = true }
|
async-io = { version = "1.9.0", optional = true }
|
||||||
|
base64 = { version = "0.22.0", default-features = false, features = ["std"] }
|
||||||
bytes = "1.1.0"
|
bytes = "1.1.0"
|
||||||
chrono = { version = "0.4.34", default-features = false, features = ["clock"], optional = true }
|
chrono = { version = "0.4.34", default-features = false, features = ["clock"], optional = true }
|
||||||
crc = { version = "3", optional = true }
|
crc = { version = "3", optional = true }
|
||||||
|
@ -3,7 +3,9 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use futures_core::future::BoxFuture;
|
use futures_core::future::BoxFuture;
|
||||||
|
|
||||||
|
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
|
||||||
pub use fixtures::FixtureSnapshot;
|
pub use fixtures::FixtureSnapshot;
|
||||||
|
use sha2::{Digest, Sha512};
|
||||||
|
|
||||||
use crate::connection::{ConnectOptions, Connection};
|
use crate::connection::{ConnectOptions, Connection};
|
||||||
use crate::database::Database;
|
use crate::database::Database;
|
||||||
@ -41,6 +43,17 @@ pub trait TestSupport: Database {
|
|||||||
/// This snapshot can then be used to generate test fixtures.
|
/// This snapshot can then be used to generate test fixtures.
|
||||||
fn snapshot(conn: &mut Self::Connection)
|
fn snapshot(conn: &mut Self::Connection)
|
||||||
-> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>>;
|
-> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>>;
|
||||||
|
|
||||||
|
/// Generate a unique database name for the given test path.
|
||||||
|
fn db_name(args: &TestArgs) -> String {
|
||||||
|
let mut hasher = Sha512::new();
|
||||||
|
hasher.update(args.test_path.as_bytes());
|
||||||
|
let hash = hasher.finalize();
|
||||||
|
let hash = URL_SAFE.encode(&hash[..39]);
|
||||||
|
let db_name = format!("_sqlx_test_{}", hash).replace('-', "_");
|
||||||
|
debug_assert!(db_name.len() == 63);
|
||||||
|
db_name
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TestFixture {
|
pub struct TestFixture {
|
||||||
@ -217,7 +230,7 @@ where
|
|||||||
let res = test_fn(test_context.pool_opts, test_context.connect_opts).await;
|
let res = test_fn(test_context.pool_opts, test_context.connect_opts).await;
|
||||||
|
|
||||||
if res.is_success() {
|
if res.is_success() {
|
||||||
if let Err(e) = DB::cleanup_test(&test_context.db_name).await {
|
if let Err(e) = DB::cleanup_test(&DB::db_name(&args)).await {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"failed to delete database {:?}: {}",
|
"failed to delete database {:?}: {}",
|
||||||
test_context.db_name, e
|
test_context.db_name, e
|
||||||
|
@ -1,29 +1,25 @@
|
|||||||
use std::fmt::Write;
|
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::time::Duration;
|
||||||
use std::time::{Duration, SystemTime};
|
|
||||||
|
|
||||||
use futures_core::future::BoxFuture;
|
use futures_core::future::BoxFuture;
|
||||||
|
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
|
use sqlx_core::connection::Connection;
|
||||||
use crate::connection::Connection;
|
use sqlx_core::query_builder::QueryBuilder;
|
||||||
|
use sqlx_core::query_scalar::query_scalar;
|
||||||
|
use std::fmt::Write;
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::executor::Executor;
|
use crate::executor::Executor;
|
||||||
use crate::pool::{Pool, PoolOptions};
|
use crate::pool::{Pool, PoolOptions};
|
||||||
use crate::query::query;
|
use crate::query::query;
|
||||||
use crate::query_builder::QueryBuilder;
|
|
||||||
use crate::query_scalar::query_scalar;
|
|
||||||
use crate::{MySql, MySqlConnectOptions, MySqlConnection};
|
use crate::{MySql, MySqlConnectOptions, MySqlConnection};
|
||||||
|
|
||||||
pub(crate) use sqlx_core::testing::*;
|
pub(crate) use sqlx_core::testing::*;
|
||||||
|
|
||||||
// Using a blocking `OnceCell` here because the critical sections are short.
|
// Using a blocking `OnceCell` here because the critical sections are short.
|
||||||
static MASTER_POOL: OnceCell<Pool<MySql>> = OnceCell::new();
|
static MASTER_POOL: OnceCell<Pool<MySql>> = OnceCell::new();
|
||||||
// Automatically delete any databases created before the start of the test binary.
|
|
||||||
static DO_CLEANUP: AtomicBool = AtomicBool::new(true);
|
|
||||||
|
|
||||||
impl TestSupport for MySql {
|
impl TestSupport for MySql {
|
||||||
fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
|
fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
|
||||||
@ -38,17 +34,7 @@ impl TestSupport for MySql {
|
|||||||
.acquire()
|
.acquire()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let db_id = db_id(db_name);
|
do_cleanup(&mut conn, db_name).await
|
||||||
|
|
||||||
conn.execute(&format!("drop database if exists {db_name};")[..])
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
query("delete from _sqlx_test_databases where db_id = ?")
|
|
||||||
.bind(db_id)
|
|
||||||
.execute(&mut *conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,13 +44,55 @@ impl TestSupport for MySql {
|
|||||||
|
|
||||||
let mut conn = MySqlConnection::connect(&url).await?;
|
let mut conn = MySqlConnection::connect(&url).await?;
|
||||||
|
|
||||||
let now = SystemTime::now()
|
let delete_db_names: Vec<String> =
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
query_scalar("select db_name from _sqlx_test_databases")
|
||||||
.unwrap();
|
.fetch_all(&mut conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if delete_db_names.is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
|
||||||
|
|
||||||
|
let mut command = String::new();
|
||||||
|
|
||||||
|
for db_name in &delete_db_names {
|
||||||
|
command.clear();
|
||||||
|
|
||||||
|
let db_name = format!("_sqlx_test_database_{db_name}");
|
||||||
|
|
||||||
|
writeln!(command, "drop database if exists {db_name:?};").ok();
|
||||||
|
match conn.execute(&*command).await {
|
||||||
|
Ok(_deleted) => {
|
||||||
|
deleted_db_names.push(db_name);
|
||||||
|
}
|
||||||
|
// Assume a database error just means the DB is still in use.
|
||||||
|
Err(Error::Database(dbe)) => {
|
||||||
|
eprintln!("could not clean test database {db_name:?}: {dbe}")
|
||||||
|
}
|
||||||
|
// Bubble up other errors
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if deleted_db_names.is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut query =
|
||||||
|
QueryBuilder::new("delete from _sqlx_test_databases where db_name in (");
|
||||||
|
|
||||||
|
let mut separated = query.separated(",");
|
||||||
|
|
||||||
|
for db_name in &deleted_db_names {
|
||||||
|
separated.push_bind(db_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
query.push(")").build().execute(&mut conn).await?;
|
||||||
|
|
||||||
let num_deleted = do_cleanup(&mut conn, now).await?;
|
|
||||||
let _ = conn.close().await;
|
let _ = conn.close().await;
|
||||||
Ok(Some(num_deleted))
|
Ok(Some(delete_db_names.len()))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,7 +145,7 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
|
|||||||
conn.execute(
|
conn.execute(
|
||||||
r#"
|
r#"
|
||||||
create table if not exists _sqlx_test_databases (
|
create table if not exists _sqlx_test_databases (
|
||||||
db_id bigint unsigned primary key auto_increment,
|
db_name text primary key,
|
||||||
test_path text not null,
|
test_path text not null,
|
||||||
created_at timestamp not null default current_timestamp
|
created_at timestamp not null default current_timestamp
|
||||||
);
|
);
|
||||||
@ -125,34 +153,19 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Record the current time _before_ we acquire the `DO_CLEANUP` permit. This
|
let db_name = MySql::db_name(args);
|
||||||
// prevents the first test thread from accidentally deleting new test dbs
|
do_cleanup(&mut conn, &db_name).await?;
|
||||||
// created by other test threads if we're a bit slow.
|
|
||||||
let now = SystemTime::now()
|
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Only run cleanup if the test binary just started.
|
query("insert into _sqlx_test_databases(db_name, test_path) values (?, ?)")
|
||||||
if DO_CLEANUP.swap(false, Ordering::SeqCst) {
|
.bind(&db_name)
|
||||||
do_cleanup(&mut conn, now).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
query("insert into _sqlx_test_databases(test_path) values (?)")
|
|
||||||
.bind(args.test_path)
|
.bind(args.test_path)
|
||||||
.execute(&mut *conn)
|
.execute(&mut *conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// MySQL doesn't have `INSERT ... RETURNING`
|
conn.execute(&format!("create database {db_name:?}")[..])
|
||||||
let new_db_id: u64 = query_scalar("select last_insert_id()")
|
|
||||||
.fetch_one(&mut *conn)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let new_db_name = db_name(new_db_id);
|
eprintln!("created database {db_name}");
|
||||||
|
|
||||||
conn.execute(&format!("create database {new_db_name}")[..])
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
eprintln!("created database {new_db_name}");
|
|
||||||
|
|
||||||
Ok(TestContext {
|
Ok(TestContext {
|
||||||
pool_opts: PoolOptions::new()
|
pool_opts: PoolOptions::new()
|
||||||
@ -167,74 +180,18 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
|
|||||||
.connect_options()
|
.connect_options()
|
||||||
.deref()
|
.deref()
|
||||||
.clone()
|
.clone()
|
||||||
.database(&new_db_name),
|
.database(&db_name),
|
||||||
db_name: new_db_name,
|
db_name,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_cleanup(conn: &mut MySqlConnection, created_before: Duration) -> Result<usize, Error> {
|
async fn do_cleanup(conn: &mut MySqlConnection, db_name: &str) -> Result<(), Error> {
|
||||||
// since SystemTime is not monotonic we added a little margin here to avoid race conditions with other threads
|
let delete_db_command = format!("drop database if exists {db_name:?};");
|
||||||
let created_before_as_secs = created_before.as_secs() - 2;
|
conn.execute(&*delete_db_command).await?;
|
||||||
let delete_db_ids: Vec<u64> = query_scalar(
|
query("delete from _sqlx_test.databases where db_name = $1::text")
|
||||||
"select db_id from _sqlx_test_databases \
|
.bind(db_name)
|
||||||
where created_at < from_unixtime(?)",
|
.execute(&mut *conn)
|
||||||
)
|
.await?;
|
||||||
.bind(created_before_as_secs)
|
|
||||||
.fetch_all(&mut *conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if delete_db_ids.is_empty() {
|
Ok(())
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut deleted_db_ids = Vec::with_capacity(delete_db_ids.len());
|
|
||||||
|
|
||||||
let mut command = String::new();
|
|
||||||
|
|
||||||
for db_id in delete_db_ids {
|
|
||||||
command.clear();
|
|
||||||
|
|
||||||
let db_name = db_name(db_id);
|
|
||||||
|
|
||||||
writeln!(command, "drop database if exists {db_name}").ok();
|
|
||||||
match conn.execute(&*command).await {
|
|
||||||
Ok(_deleted) => {
|
|
||||||
deleted_db_ids.push(db_id);
|
|
||||||
}
|
|
||||||
// Assume a database error just means the DB is still in use.
|
|
||||||
Err(Error::Database(dbe)) => {
|
|
||||||
eprintln!("could not clean test database {db_id:?}: {dbe}")
|
|
||||||
}
|
|
||||||
// Bubble up other errors
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut query = QueryBuilder::new("delete from _sqlx_test_databases where db_id in (");
|
|
||||||
|
|
||||||
let mut separated = query.separated(",");
|
|
||||||
|
|
||||||
for db_id in &deleted_db_ids {
|
|
||||||
separated.push_bind(db_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
query.push(")").build().execute(&mut *conn).await?;
|
|
||||||
|
|
||||||
Ok(deleted_db_ids.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn db_name(id: u64) -> String {
|
|
||||||
format!("_sqlx_test_database_{id}")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn db_id(name: &str) -> u64 {
|
|
||||||
name.trim_start_matches("_sqlx_test_database_")
|
|
||||||
.parse()
|
|
||||||
.unwrap_or_else(|_1| panic!("failed to parse ID from database name {name:?}"))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_db_name_id() {
|
|
||||||
assert_eq!(db_name(12345), "_sqlx_test_database_12345");
|
|
||||||
assert_eq!(db_id("_sqlx_test_database_12345"), 12345);
|
|
||||||
}
|
}
|
||||||
|
@ -1,20 +1,18 @@
|
|||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::time::Duration;
|
||||||
use std::time::{Duration, SystemTime};
|
|
||||||
|
|
||||||
use futures_core::future::BoxFuture;
|
use futures_core::future::BoxFuture;
|
||||||
|
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
|
use sqlx_core::connection::Connection;
|
||||||
use crate::connection::Connection;
|
use sqlx_core::query_scalar::query_scalar;
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::executor::Executor;
|
use crate::executor::Executor;
|
||||||
use crate::pool::{Pool, PoolOptions};
|
use crate::pool::{Pool, PoolOptions};
|
||||||
use crate::query::query;
|
use crate::query::query;
|
||||||
use crate::query_scalar::query_scalar;
|
|
||||||
use crate::{PgConnectOptions, PgConnection, Postgres};
|
use crate::{PgConnectOptions, PgConnection, Postgres};
|
||||||
|
|
||||||
pub(crate) use sqlx_core::testing::*;
|
pub(crate) use sqlx_core::testing::*;
|
||||||
@ -22,7 +20,6 @@ pub(crate) use sqlx_core::testing::*;
|
|||||||
// Using a blocking `OnceCell` here because the critical sections are short.
|
// Using a blocking `OnceCell` here because the critical sections are short.
|
||||||
static MASTER_POOL: OnceCell<Pool<Postgres>> = OnceCell::new();
|
static MASTER_POOL: OnceCell<Pool<Postgres>> = OnceCell::new();
|
||||||
// Automatically delete any databases created before the start of the test binary.
|
// Automatically delete any databases created before the start of the test binary.
|
||||||
static DO_CLEANUP: AtomicBool = AtomicBool::new(true);
|
|
||||||
|
|
||||||
impl TestSupport for Postgres {
|
impl TestSupport for Postgres {
|
||||||
fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
|
fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
|
||||||
@ -37,15 +34,7 @@ impl TestSupport for Postgres {
|
|||||||
.acquire()
|
.acquire()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
conn.execute(&format!("drop database if exists {db_name:?};")[..])
|
do_cleanup(&mut conn, db_name).await
|
||||||
.await?;
|
|
||||||
|
|
||||||
query("delete from _sqlx_test.databases where db_name = $1")
|
|
||||||
.bind(db_name)
|
|
||||||
.execute(&mut *conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,13 +44,42 @@ impl TestSupport for Postgres {
|
|||||||
|
|
||||||
let mut conn = PgConnection::connect(&url).await?;
|
let mut conn = PgConnection::connect(&url).await?;
|
||||||
|
|
||||||
let now = SystemTime::now()
|
let delete_db_names: Vec<String> =
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
query_scalar("select db_name from _sqlx_test.databases")
|
||||||
.unwrap();
|
.fetch_all(&mut conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if delete_db_names.is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
|
||||||
|
|
||||||
|
let mut command = String::new();
|
||||||
|
|
||||||
|
for db_name in &delete_db_names {
|
||||||
|
command.clear();
|
||||||
|
writeln!(command, "drop database if exists {db_name:?};").ok();
|
||||||
|
match conn.execute(&*command).await {
|
||||||
|
Ok(_deleted) => {
|
||||||
|
deleted_db_names.push(db_name);
|
||||||
|
}
|
||||||
|
// Assume a database error just means the DB is still in use.
|
||||||
|
Err(Error::Database(dbe)) => {
|
||||||
|
eprintln!("could not clean test database {db_name:?}: {dbe}")
|
||||||
|
}
|
||||||
|
// Bubble up other errors
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
query("delete from _sqlx_test.databases where db_name = any($1::text[])")
|
||||||
|
.bind(&deleted_db_names)
|
||||||
|
.execute(&mut conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let num_deleted = do_cleanup(&mut conn, now).await?;
|
|
||||||
let _ = conn.close().await;
|
let _ = conn.close().await;
|
||||||
Ok(Some(num_deleted))
|
Ok(Some(delete_db_names.len()))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,31 +153,22 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Record the current time _before_ we acquire the `DO_CLEANUP` permit. This
|
let db_name = Postgres::db_name(args);
|
||||||
// prevents the first test thread from accidentally deleting new test dbs
|
do_cleanup(&mut conn, &db_name).await?;
|
||||||
// created by other test threads if we're a bit slow.
|
|
||||||
let now = SystemTime::now()
|
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Only run cleanup if the test binary just started.
|
query(
|
||||||
if DO_CLEANUP.swap(false, Ordering::SeqCst) {
|
|
||||||
do_cleanup(&mut conn, now).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let new_db_name: String = query_scalar(
|
|
||||||
r#"
|
r#"
|
||||||
insert into _sqlx_test.databases(db_name, test_path)
|
insert into _sqlx_test.databases(db_name, test_path) values ($1, $2)
|
||||||
select '_sqlx_test_' || nextval('_sqlx_test.database_ids'), $1
|
|
||||||
returning db_name
|
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
|
.bind(&db_name)
|
||||||
.bind(args.test_path)
|
.bind(args.test_path)
|
||||||
.fetch_one(&mut *conn)
|
.execute(&mut *conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
conn.execute(&format!("create database {new_db_name:?}")[..])
|
let create_command = format!("create database {db_name:?}");
|
||||||
.await?;
|
debug_assert!(create_command.starts_with("create database \""));
|
||||||
|
conn.execute(&(create_command)[..]).await?;
|
||||||
|
|
||||||
Ok(TestContext {
|
Ok(TestContext {
|
||||||
pool_opts: PoolOptions::new()
|
pool_opts: PoolOptions::new()
|
||||||
@ -174,52 +183,18 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
|
|||||||
.connect_options()
|
.connect_options()
|
||||||
.deref()
|
.deref()
|
||||||
.clone()
|
.clone()
|
||||||
.database(&new_db_name),
|
.database(&db_name),
|
||||||
db_name: new_db_name,
|
db_name,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_cleanup(conn: &mut PgConnection, created_before: Duration) -> Result<usize, Error> {
|
async fn do_cleanup(conn: &mut PgConnection, db_name: &str) -> Result<(), Error> {
|
||||||
// since SystemTime is not monotonic we added a little margin here to avoid race conditions with other threads
|
let delete_db_command = format!("drop database if exists {db_name:?};");
|
||||||
let created_before = i64::try_from(created_before.as_secs()).unwrap() - 2;
|
conn.execute(&*delete_db_command).await?;
|
||||||
|
query("delete from _sqlx_test.databases where db_name = $1::text")
|
||||||
let delete_db_names: Vec<String> = query_scalar(
|
.bind(db_name)
|
||||||
"select db_name from _sqlx_test.databases \
|
|
||||||
where created_at < (to_timestamp($1) at time zone 'UTC')",
|
|
||||||
)
|
|
||||||
.bind(created_before)
|
|
||||||
.fetch_all(&mut *conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if delete_db_names.is_empty() {
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
|
|
||||||
let delete_db_names = delete_db_names.into_iter();
|
|
||||||
|
|
||||||
let mut command = String::new();
|
|
||||||
|
|
||||||
for db_name in delete_db_names {
|
|
||||||
command.clear();
|
|
||||||
writeln!(command, "drop database if exists {db_name:?};").ok();
|
|
||||||
match conn.execute(&*command).await {
|
|
||||||
Ok(_deleted) => {
|
|
||||||
deleted_db_names.push(db_name);
|
|
||||||
}
|
|
||||||
// Assume a database error just means the DB is still in use.
|
|
||||||
Err(Error::Database(dbe)) => {
|
|
||||||
eprintln!("could not clean test database {db_name:?}: {dbe}")
|
|
||||||
}
|
|
||||||
// Bubble up other errors
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
query("delete from _sqlx_test.databases where db_name = any($1::text[])")
|
|
||||||
.bind(&deleted_db_names)
|
|
||||||
.execute(&mut *conn)
|
.execute(&mut *conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(deleted_db_names.len())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,10 @@ impl TestSupport for Sqlite {
|
|||||||
) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
|
) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn db_name(args: &TestArgs) -> String {
|
||||||
|
convert_path(args.test_path)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_context(args: &TestArgs) -> Result<TestContext<Sqlite>, Error> {
|
async fn test_context(args: &TestArgs) -> Result<TestContext<Sqlite>, Error> {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user