From 45c0b85b4c478f3aa44b8146cafa64dbed7307b1 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 22 Jan 2025 14:24:18 -0800 Subject: [PATCH] feat: teach `sqlx-cli` about `migrate.table-name` --- sqlx-cli/src/migrate.rs | 20 ++++----- sqlx-cli/tests/common/mod.rs | 5 ++- sqlx-core/src/any/migrate.rs | 32 +++++++------- sqlx-core/src/config/migrate.rs | 4 ++ sqlx-core/src/migrate/migrate.rs | 22 +++++----- sqlx-core/src/migrate/migrator.rs | 72 ++++++++++++++++++------------- sqlx-core/src/testing/mod.rs | 2 +- sqlx-mysql/src/migrate.rs | 59 +++++++++++++------------ sqlx-postgres/src/migrate.rs | 56 ++++++++++++------------ sqlx-sqlite/src/migrate.rs | 51 +++++++++++----------- 10 files changed, 172 insertions(+), 151 deletions(-) diff --git a/sqlx-cli/src/migrate.rs b/sqlx-cli/src/migrate.rs index aabee292..9e011968 100644 --- a/sqlx-cli/src/migrate.rs +++ b/sqlx-cli/src/migrate.rs @@ -130,10 +130,10 @@ pub async fn info(config: &Config, migration_source: &MigrationSourceOpt, connec let migrator = Migrator::new(ResolveWith(Path::new(source), config.migrate.to_resolve_config())).await?; let mut conn = crate::connect(connect_opts).await?; - conn.ensure_migrations_table().await?; + conn.ensure_migrations_table(config.migrate.table_name()).await?; let applied_migrations: HashMap<_, _> = conn - .list_applied_migrations() + .list_applied_migrations(config.migrate.table_name()) .await? .into_iter() .map(|m| (m.version, m)) @@ -224,14 +224,14 @@ pub async fn run( let mut conn = crate::connect(connect_opts).await?; - conn.ensure_migrations_table().await?; + conn.ensure_migrations_table(config.migrate.table_name()).await?; - let version = conn.dirty_version().await?; + let version = conn.dirty_version(config.migrate.table_name()).await?; if let Some(version) = version { bail!(MigrateError::Dirty(version)); } - let applied_migrations = conn.list_applied_migrations().await?; + let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?; validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?; let latest_version = applied_migrations @@ -269,7 +269,7 @@ pub async fn run( let elapsed = if dry_run || skip { Duration::new(0, 0) } else { - conn.apply(migration).await? + conn.apply(config.migrate.table_name(), migration).await? }; let text = if skip { "Skipped" @@ -319,14 +319,14 @@ pub async fn revert( let mut conn = crate::connect(connect_opts).await?; - conn.ensure_migrations_table().await?; + conn.ensure_migrations_table(config.migrate.table_name()).await?; - let version = conn.dirty_version().await?; + let version = conn.dirty_version(config.migrate.table_name()).await?; if let Some(version) = version { bail!(MigrateError::Dirty(version)); } - let applied_migrations = conn.list_applied_migrations().await?; + let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?; validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?; let latest_version = applied_migrations @@ -360,7 +360,7 @@ pub async fn revert( let elapsed = if dry_run || skip { Duration::new(0, 0) } else { - conn.revert(migration).await? + conn.revert(config.migrate.table_name(), migration).await? }; let text = if skip { "Skipped" diff --git a/sqlx-cli/tests/common/mod.rs b/sqlx-cli/tests/common/mod.rs index 43c0dbc1..bb58554f 100644 --- a/sqlx-cli/tests/common/mod.rs +++ b/sqlx-cli/tests/common/mod.rs @@ -6,10 +6,12 @@ use std::{ fs::remove_file, path::{Path, PathBuf}, }; +use sqlx::_unstable::config::Config; pub struct TestDatabase { file_path: PathBuf, migrations: String, + config: &'static Config, } impl TestDatabase { @@ -19,6 +21,7 @@ impl TestDatabase { let ret = Self { file_path, migrations: String::from(migrations_path.to_str().unwrap()), + config: Config::from_crate(), }; Command::cargo_bin("cargo-sqlx") .unwrap() @@ -77,7 +80,7 @@ impl TestDatabase { let mut conn = SqliteConnection::connect(&self.connection_string()) .await .unwrap(); - conn.list_applied_migrations() + conn.list_applied_migrations(self.config.migrate.table_name()) .await .unwrap() .iter() diff --git a/sqlx-core/src/any/migrate.rs b/sqlx-core/src/any/migrate.rs index cb4f72c3..b287ec45 100644 --- a/sqlx-core/src/any/migrate.rs +++ b/sqlx-core/src/any/migrate.rs @@ -44,18 +44,16 @@ impl MigrateDatabase for Any { } impl Migrate for AnyConnection { - fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { - Box::pin(async { self.get_migrate()?.ensure_migrations_table().await }) + fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { + Box::pin(async { self.get_migrate()?.ensure_migrations_table(table_name).await }) } - fn dirty_version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { - Box::pin(async { self.get_migrate()?.dirty_version().await }) + fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + Box::pin(async { self.get_migrate()?.dirty_version(table_name).await }) } - fn list_applied_migrations( - &mut self, - ) -> BoxFuture<'_, Result, MigrateError>> { - Box::pin(async { self.get_migrate()?.list_applied_migrations().await }) + fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + Box::pin(async { self.get_migrate()?.list_applied_migrations(table_name).await }) } fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { @@ -66,17 +64,19 @@ impl Migrate for AnyConnection { Box::pin(async { self.get_migrate()?.unlock().await }) } - fn apply<'e: 'm, 'm>( + fn apply<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { - Box::pin(async { self.get_migrate()?.apply(migration).await }) + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { + Box::pin(async { self.get_migrate()?.apply(table_name, migration).await }) } - fn revert<'e: 'm, 'm>( + fn revert<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { - Box::pin(async { self.get_migrate()?.revert(migration).await }) + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { + Box::pin(async { self.get_migrate()?.revert(table_name, migration).await }) } } diff --git a/sqlx-core/src/config/migrate.rs b/sqlx-core/src/config/migrate.rs index 666ed5bf..a70938b2 100644 --- a/sqlx-core/src/config/migrate.rs +++ b/sqlx-core/src/config/migrate.rs @@ -186,6 +186,10 @@ impl Config { self.migrations_dir.as_deref().unwrap_or("migrations") } + pub fn table_name(&self) -> &str { + self.table_name.as_deref().unwrap_or("_sqlx_migrations") + } + pub fn to_resolve_config(&self) -> crate::migrate::ResolveConfig { let mut config = crate::migrate::ResolveConfig::new(); config.ignore_chars(self.ignored_chars.iter().copied()); diff --git a/sqlx-core/src/migrate/migrate.rs b/sqlx-core/src/migrate/migrate.rs index 0e4448a9..2258f06f 100644 --- a/sqlx-core/src/migrate/migrate.rs +++ b/sqlx-core/src/migrate/migrate.rs @@ -27,16 +27,14 @@ pub trait MigrateDatabase { pub trait Migrate { // ensure migrations table exists // will create or migrate it if needed - fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>>; + fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>>; // Return the version on which the database is dirty or None otherwise. // "dirty" means there is a partially applied migration that failed. - fn dirty_version(&mut self) -> BoxFuture<'_, Result, MigrateError>>; + fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>>; // Return the ordered list of applied migrations - fn list_applied_migrations( - &mut self, - ) -> BoxFuture<'_, Result, MigrateError>>; + fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>>; // Should acquire a database lock so that only one migration process // can run at a time. [`Migrate`] will call this function before applying @@ -50,16 +48,18 @@ pub trait Migrate { // run SQL from migration in a DDL transaction // insert new row to [_migrations] table on completion (success or failure) // returns the time taking to run the migration SQL - fn apply<'e: 'm, 'm>( + fn apply<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result>; + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result>; // run a revert SQL from migration in a DDL transaction // deletes the row in [_migrations] table with specified migration version on completion (success or failure) // returns the time taking to run the migration SQL - fn revert<'e: 'm, 'm>( + fn revert<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result>; + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result>; } diff --git a/sqlx-core/src/migrate/migrator.rs b/sqlx-core/src/migrate/migrator.rs index 42cc3095..aa737ad3 100644 --- a/sqlx-core/src/migrate/migrator.rs +++ b/sqlx-core/src/migrate/migrator.rs @@ -27,25 +27,6 @@ pub struct Migrator { pub table_name: Cow<'static, str>, } -fn validate_applied_migrations( - applied_migrations: &[AppliedMigration], - migrator: &Migrator, -) -> Result<(), MigrateError> { - if migrator.ignore_missing { - return Ok(()); - } - - let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect(); - - for applied_migration in applied_migrations { - if !migrations.contains(&applied_migration.version) { - return Err(MigrateError::VersionMissing(applied_migration.version)); - } - } - - Ok(()) -} - impl Migrator { #[doc(hidden)] pub const DEFAULT: Migrator = Migrator { @@ -156,12 +137,21 @@ impl Migrator { ::Target: Migrate, { let mut conn = migrator.acquire().await?; - self.run_direct(&mut *conn).await + self.run_direct(None, &mut *conn).await + } + + pub async fn run_to<'a, A>(&self, target: i64, migrator: A) -> Result<(), MigrateError> + where + A: Acquire<'a>, + ::Target: Migrate, + { + let mut conn = migrator.acquire().await?; + self.run_direct(Some(target), &mut *conn).await } // Getting around the annoying "implementation of `Acquire` is not general enough" error #[doc(hidden)] - pub async fn run_direct(&self, conn: &mut C) -> Result<(), MigrateError> + pub async fn run_direct(&self, target: Option, conn: &mut C) -> Result<(), MigrateError> where C: Migrate, { @@ -172,14 +162,14 @@ impl Migrator { // creates [_migrations] table only if needed // eventually this will likely migrate previous versions of the table - conn.ensure_migrations_table().await?; + conn.ensure_migrations_table(&self.table_name).await?; - let version = conn.dirty_version().await?; + let version = conn.dirty_version(&self.table_name).await?; if let Some(version) = version { return Err(MigrateError::Dirty(version)); } - let applied_migrations = conn.list_applied_migrations().await?; + let applied_migrations = conn.list_applied_migrations(&self.table_name).await?; validate_applied_migrations(&applied_migrations, self)?; let applied_migrations: HashMap<_, _> = applied_migrations @@ -188,6 +178,11 @@ impl Migrator { .collect(); for migration in self.iter() { + if target.is_some_and(|target| target < migration.version) { + // Target version reached + break; + } + if migration.migration_type.is_down_migration() { continue; } @@ -199,7 +194,7 @@ impl Migrator { } } None => { - conn.apply(migration).await?; + conn.apply(&self.table_name, migration).await?; } } } @@ -244,14 +239,14 @@ impl Migrator { // creates [_migrations] table only if needed // eventually this will likely migrate previous versions of the table - conn.ensure_migrations_table().await?; + conn.ensure_migrations_table(&self.table_name).await?; - let version = conn.dirty_version().await?; + let version = conn.dirty_version(&self.table_name).await?; if let Some(version) = version { return Err(MigrateError::Dirty(version)); } - let applied_migrations = conn.list_applied_migrations().await?; + let applied_migrations = conn.list_applied_migrations(&self.table_name).await?; validate_applied_migrations(&applied_migrations, self)?; let applied_migrations: HashMap<_, _> = applied_migrations @@ -266,7 +261,7 @@ impl Migrator { .filter(|m| applied_migrations.contains_key(&m.version)) .filter(|m| m.version > target) { - conn.revert(migration).await?; + conn.revert(&self.table_name, migration).await?; } // unlock the migrator to allow other migrators to run @@ -278,3 +273,22 @@ impl Migrator { Ok(()) } } + +fn validate_applied_migrations( + applied_migrations: &[AppliedMigration], + migrator: &Migrator, +) -> Result<(), MigrateError> { + if migrator.ignore_missing { + return Ok(()); + } + + let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect(); + + for applied_migration in applied_migrations { + if !migrations.contains(&applied_migration.version) { + return Err(MigrateError::VersionMissing(applied_migration.version)); + } + } + + Ok(()) +} \ No newline at end of file diff --git a/sqlx-core/src/testing/mod.rs b/sqlx-core/src/testing/mod.rs index d82d1a36..9db65e9d 100644 --- a/sqlx-core/src/testing/mod.rs +++ b/sqlx-core/src/testing/mod.rs @@ -243,7 +243,7 @@ async fn setup_test_db( if let Some(migrator) = args.migrator { migrator - .run_direct(&mut conn) + .run_direct(None, &mut conn) .await .expect("failed to apply migrations"); } diff --git a/sqlx-mysql/src/migrate.rs b/sqlx-mysql/src/migrate.rs index 79b55ace..f0d0d6a0 100644 --- a/sqlx-mysql/src/migrate.rs +++ b/sqlx-mysql/src/migrate.rs @@ -4,7 +4,6 @@ use std::time::Instant; use futures_core::future::BoxFuture; pub(crate) use sqlx_core::migrate::*; - use crate::connection::{ConnectOptions, Connection}; use crate::error::Error; use crate::executor::Executor; @@ -75,12 +74,12 @@ impl MigrateDatabase for MySql { } impl Migrate for MySqlConnection { - fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { Box::pin(async move { // language=MySQL self.execute( - r#" -CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + &*format!(r#" +CREATE TABLE IF NOT EXISTS {table_name} ( version BIGINT PRIMARY KEY, description TEXT NOT NULL, installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -88,7 +87,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( checksum BLOB NOT NULL, execution_time BIGINT NOT NULL ); - "#, + "#), ) .await?; @@ -96,11 +95,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn dirty_version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { + fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL let row: Option<(i64,)> = query_as( - "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1", + &format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"), ) .fetch_optional(self) .await?; @@ -109,13 +108,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn list_applied_migrations( - &mut self, - ) -> BoxFuture<'_, Result, MigrateError>> { + fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL let rows: Vec<(i64, Vec)> = - query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version") + query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version")) .fetch_all(self) .await?; @@ -167,10 +164,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn apply<'e: 'm, 'm>( + fn apply<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { Box::pin(async move { // Use a single transaction for the actual migration script and the essential bookeeping so we never // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966. @@ -188,10 +186,10 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( // // language=MySQL let _ = query( - r#" - INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) + &format!(r#" + INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) VALUES ( ?, ?, FALSE, ?, -1 ) - "#, + "#), ) .bind(migration.version) .bind(&*migration.description) @@ -206,11 +204,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( // language=MySQL let _ = query( - r#" - UPDATE _sqlx_migrations + &format!(r#" + UPDATE {table_name} SET success = TRUE WHERE version = ? - "#, + "#), ) .bind(migration.version) .execute(&mut *tx) @@ -226,11 +224,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( #[allow(clippy::cast_possible_truncation)] let _ = query( - r#" - UPDATE _sqlx_migrations + &format!(r#" + UPDATE {table_name} SET execution_time = ? WHERE version = ? - "#, + "#), ) .bind(elapsed.as_nanos() as i64) .bind(migration.version) @@ -241,10 +239,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn revert<'e: 'm, 'm>( + fn revert<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { Box::pin(async move { // Use a single transaction for the actual migration script and the essential bookeeping so we never // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966. @@ -259,11 +258,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( // // language=MySQL let _ = query( - r#" - UPDATE _sqlx_migrations + &format!(r#" + UPDATE {table_name} SET success = FALSE WHERE version = ? - "#, + "#), ) .bind(migration.version) .execute(&mut *tx) @@ -272,7 +271,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( tx.execute(&*migration.sql).await?; // language=SQL - let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#) + let _ = query(&format!(r#"DELETE FROM {table_name} WHERE version = ?"#)) .bind(migration.version) .execute(&mut *tx) .await?; diff --git a/sqlx-postgres/src/migrate.rs b/sqlx-postgres/src/migrate.rs index c37e92f4..26464663 100644 --- a/sqlx-postgres/src/migrate.rs +++ b/sqlx-postgres/src/migrate.rs @@ -111,12 +111,12 @@ impl MigrateDatabase for Postgres { } impl Migrate for PgConnection { - fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { Box::pin(async move { // language=SQL self.execute( - r#" -CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + &*format!(r#" +CREATE TABLE IF NOT EXISTS {table_name} ( version BIGINT PRIMARY KEY, description TEXT NOT NULL, installed_on TIMESTAMPTZ NOT NULL DEFAULT now(), @@ -124,7 +124,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( checksum BYTEA NOT NULL, execution_time BIGINT NOT NULL ); - "#, + "#), ) .await?; @@ -132,11 +132,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn dirty_version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { + fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL let row: Option<(i64,)> = query_as( - "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1", + &*format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"), ) .fetch_optional(self) .await?; @@ -145,13 +145,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn list_applied_migrations( - &mut self, - ) -> BoxFuture<'_, Result, MigrateError>> { + fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL let rows: Vec<(i64, Vec)> = - query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version") + query_as(&*format!("SELECT version, checksum FROM {table_name} ORDER BY version")) .fetch_all(self) .await?; @@ -203,16 +201,17 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn apply<'e: 'm, 'm>( + fn apply<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { Box::pin(async move { let start = Instant::now(); // execute migration queries if migration.no_tx { - execute_migration(self, migration).await?; + execute_migration(self, table_name, migration).await?; } else { // Use a single transaction for the actual migration script and the essential bookeeping so we never // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966. @@ -220,7 +219,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( // data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1 // and update it once the actual transaction completed. let mut tx = self.begin().await?; - execute_migration(&mut tx, migration).await?; + execute_migration(&mut tx, table_name, migration).await?; tx.commit().await?; } @@ -232,11 +231,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( // language=SQL #[allow(clippy::cast_possible_truncation)] let _ = query( - r#" - UPDATE _sqlx_migrations + &*format!(r#" + UPDATE {table_name} SET execution_time = $1 WHERE version = $2 - "#, + "#), ) .bind(elapsed.as_nanos() as i64) .bind(migration.version) @@ -247,21 +246,22 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn revert<'e: 'm, 'm>( + fn revert<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { Box::pin(async move { let start = Instant::now(); // execute migration queries if migration.no_tx { - revert_migration(self, migration).await?; + revert_migration(self, table_name, migration).await?; } else { // Use a single transaction for the actual migration script and the essential bookeeping so we never // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966. let mut tx = self.begin().await?; - revert_migration(&mut tx, migration).await?; + revert_migration(&mut tx, table_name, migration).await?; tx.commit().await?; } @@ -274,6 +274,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( async fn execute_migration( conn: &mut PgConnection, + table_name: &str, migration: &Migration, ) -> Result<(), MigrateError> { let _ = conn @@ -283,10 +284,10 @@ async fn execute_migration( // language=SQL let _ = query( - r#" - INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) + &*format!(r#" + INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) VALUES ( $1, $2, TRUE, $3, -1 ) - "#, + "#), ) .bind(migration.version) .bind(&*migration.description) @@ -299,6 +300,7 @@ async fn execute_migration( async fn revert_migration( conn: &mut PgConnection, + table_name: &str, migration: &Migration, ) -> Result<(), MigrateError> { let _ = conn @@ -307,7 +309,7 @@ async fn revert_migration( .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?; // language=SQL - let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#) + let _ = query(&*format!(r#"DELETE FROM {table_name} WHERE version = $1"#)) .bind(migration.version) .execute(conn) .await?; diff --git a/sqlx-sqlite/src/migrate.rs b/sqlx-sqlite/src/migrate.rs index b9ce22dc..8b5c2474 100644 --- a/sqlx-sqlite/src/migrate.rs +++ b/sqlx-sqlite/src/migrate.rs @@ -64,12 +64,11 @@ impl MigrateDatabase for Sqlite { } impl Migrate for SqliteConnection { - fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { Box::pin(async move { // language=SQLite - self.execute( - r#" -CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + self.execute(&*format!(r#" +CREATE TABLE IF NOT EXISTS {table_name} ( version BIGINT PRIMARY KEY, description TEXT NOT NULL, installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -77,19 +76,19 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( checksum BLOB NOT NULL, execution_time BIGINT NOT NULL ); - "#, + "#), ) - .await?; + .await?; Ok(()) }) } - fn dirty_version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { + fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQLite let row: Option<(i64,)> = query_as( - "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1", + &format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"), ) .fetch_optional(self) .await?; @@ -98,13 +97,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn list_applied_migrations( - &mut self, - ) -> BoxFuture<'_, Result, MigrateError>> { + fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQLite let rows: Vec<(i64, Vec)> = - query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version") + query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version")) .fetch_all(self) .await?; @@ -128,10 +125,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( Box::pin(async move { Ok(()) }) } - fn apply<'e: 'm, 'm>( + fn apply<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { Box::pin(async move { let mut tx = self.begin().await?; let start = Instant::now(); @@ -148,10 +146,10 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( // language=SQL let _ = query( - r#" - INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) + &format!(r#" + INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) VALUES ( ?1, ?2, TRUE, ?3, -1 ) - "#, + "#), ) .bind(migration.version) .bind(&*migration.description) @@ -170,11 +168,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( // language=SQL #[allow(clippy::cast_possible_truncation)] let _ = query( - r#" - UPDATE _sqlx_migrations + &format!(r#" + UPDATE {table_name} SET execution_time = ?1 WHERE version = ?2 - "#, + "#), ) .bind(elapsed.as_nanos() as i64) .bind(migration.version) @@ -185,10 +183,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( }) } - fn revert<'e: 'm, 'm>( + fn revert<'e>( &'e mut self, - migration: &'m Migration, - ) -> BoxFuture<'m, Result> { + table_name: &'e str, + migration: &'e Migration, + ) -> BoxFuture<'e, Result> { Box::pin(async move { // Use a single transaction for the actual migration script and the essential bookeeping so we never // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966. @@ -197,8 +196,8 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( let _ = tx.execute(&*migration.sql).await?; - // language=SQL - let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#) + // language=SQLite + let _ = query(&format!(r#"DELETE FROM {table_name} WHERE version = ?1"#)) .bind(migration.version) .execute(&mut *tx) .await?;