mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-12-29 21:00:54 +00:00
feat: teach sqlx-cli about migrate.table-name
This commit is contained in:
parent
1ff6a8a950
commit
45c0b85b4c
@ -130,10 +130,10 @@ pub async fn info(config: &Config, migration_source: &MigrationSourceOpt, connec
|
||||
let migrator = Migrator::new(ResolveWith(Path::new(source), config.migrate.to_resolve_config())).await?;
|
||||
let mut conn = crate::connect(connect_opts).await?;
|
||||
|
||||
conn.ensure_migrations_table().await?;
|
||||
conn.ensure_migrations_table(config.migrate.table_name()).await?;
|
||||
|
||||
let applied_migrations: HashMap<_, _> = conn
|
||||
.list_applied_migrations()
|
||||
.list_applied_migrations(config.migrate.table_name())
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|m| (m.version, m))
|
||||
@ -224,14 +224,14 @@ pub async fn run(
|
||||
|
||||
let mut conn = crate::connect(connect_opts).await?;
|
||||
|
||||
conn.ensure_migrations_table().await?;
|
||||
conn.ensure_migrations_table(config.migrate.table_name()).await?;
|
||||
|
||||
let version = conn.dirty_version().await?;
|
||||
let version = conn.dirty_version(config.migrate.table_name()).await?;
|
||||
if let Some(version) = version {
|
||||
bail!(MigrateError::Dirty(version));
|
||||
}
|
||||
|
||||
let applied_migrations = conn.list_applied_migrations().await?;
|
||||
let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?;
|
||||
validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
|
||||
|
||||
let latest_version = applied_migrations
|
||||
@ -269,7 +269,7 @@ pub async fn run(
|
||||
let elapsed = if dry_run || skip {
|
||||
Duration::new(0, 0)
|
||||
} else {
|
||||
conn.apply(migration).await?
|
||||
conn.apply(config.migrate.table_name(), migration).await?
|
||||
};
|
||||
let text = if skip {
|
||||
"Skipped"
|
||||
@ -319,14 +319,14 @@ pub async fn revert(
|
||||
|
||||
let mut conn = crate::connect(connect_opts).await?;
|
||||
|
||||
conn.ensure_migrations_table().await?;
|
||||
conn.ensure_migrations_table(config.migrate.table_name()).await?;
|
||||
|
||||
let version = conn.dirty_version().await?;
|
||||
let version = conn.dirty_version(config.migrate.table_name()).await?;
|
||||
if let Some(version) = version {
|
||||
bail!(MigrateError::Dirty(version));
|
||||
}
|
||||
|
||||
let applied_migrations = conn.list_applied_migrations().await?;
|
||||
let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?;
|
||||
validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
|
||||
|
||||
let latest_version = applied_migrations
|
||||
@ -360,7 +360,7 @@ pub async fn revert(
|
||||
let elapsed = if dry_run || skip {
|
||||
Duration::new(0, 0)
|
||||
} else {
|
||||
conn.revert(migration).await?
|
||||
conn.revert(config.migrate.table_name(), migration).await?
|
||||
};
|
||||
let text = if skip {
|
||||
"Skipped"
|
||||
|
||||
@ -6,10 +6,12 @@ use std::{
|
||||
fs::remove_file,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use sqlx::_unstable::config::Config;
|
||||
|
||||
pub struct TestDatabase {
|
||||
file_path: PathBuf,
|
||||
migrations: String,
|
||||
config: &'static Config,
|
||||
}
|
||||
|
||||
impl TestDatabase {
|
||||
@ -19,6 +21,7 @@ impl TestDatabase {
|
||||
let ret = Self {
|
||||
file_path,
|
||||
migrations: String::from(migrations_path.to_str().unwrap()),
|
||||
config: Config::from_crate(),
|
||||
};
|
||||
Command::cargo_bin("cargo-sqlx")
|
||||
.unwrap()
|
||||
@ -77,7 +80,7 @@ impl TestDatabase {
|
||||
let mut conn = SqliteConnection::connect(&self.connection_string())
|
||||
.await
|
||||
.unwrap();
|
||||
conn.list_applied_migrations()
|
||||
conn.list_applied_migrations(self.config.migrate.table_name())
|
||||
.await
|
||||
.unwrap()
|
||||
.iter()
|
||||
|
||||
@ -44,18 +44,16 @@ impl MigrateDatabase for Any {
|
||||
}
|
||||
|
||||
impl Migrate for AnyConnection {
|
||||
fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.ensure_migrations_table().await })
|
||||
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.ensure_migrations_table(table_name).await })
|
||||
}
|
||||
|
||||
fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.dirty_version().await })
|
||||
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.dirty_version(table_name).await })
|
||||
}
|
||||
|
||||
fn list_applied_migrations(
|
||||
&mut self,
|
||||
) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.list_applied_migrations().await })
|
||||
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.list_applied_migrations(table_name).await })
|
||||
}
|
||||
|
||||
fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
||||
@ -66,17 +64,19 @@ impl Migrate for AnyConnection {
|
||||
Box::pin(async { self.get_migrate()?.unlock().await })
|
||||
}
|
||||
|
||||
fn apply<'e: 'm, 'm>(
|
||||
fn apply<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.apply(migration).await })
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.apply(table_name, migration).await })
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
fn revert<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.revert(migration).await })
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async { self.get_migrate()?.revert(table_name, migration).await })
|
||||
}
|
||||
}
|
||||
|
||||
@ -186,6 +186,10 @@ impl Config {
|
||||
self.migrations_dir.as_deref().unwrap_or("migrations")
|
||||
}
|
||||
|
||||
pub fn table_name(&self) -> &str {
|
||||
self.table_name.as_deref().unwrap_or("_sqlx_migrations")
|
||||
}
|
||||
|
||||
pub fn to_resolve_config(&self) -> crate::migrate::ResolveConfig {
|
||||
let mut config = crate::migrate::ResolveConfig::new();
|
||||
config.ignore_chars(self.ignored_chars.iter().copied());
|
||||
|
||||
@ -27,16 +27,14 @@ pub trait MigrateDatabase {
|
||||
pub trait Migrate {
|
||||
// ensure migrations table exists
|
||||
// will create or migrate it if needed
|
||||
fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>>;
|
||||
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>>;
|
||||
|
||||
// Return the version on which the database is dirty or None otherwise.
|
||||
// "dirty" means there is a partially applied migration that failed.
|
||||
fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>>;
|
||||
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>>;
|
||||
|
||||
// Return the ordered list of applied migrations
|
||||
fn list_applied_migrations(
|
||||
&mut self,
|
||||
) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>>;
|
||||
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>>;
|
||||
|
||||
// Should acquire a database lock so that only one migration process
|
||||
// can run at a time. [`Migrate`] will call this function before applying
|
||||
@ -50,16 +48,18 @@ pub trait Migrate {
|
||||
// run SQL from migration in a DDL transaction
|
||||
// insert new row to [_migrations] table on completion (success or failure)
|
||||
// returns the time taking to run the migration SQL
|
||||
fn apply<'e: 'm, 'm>(
|
||||
fn apply<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>>;
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>>;
|
||||
|
||||
// run a revert SQL from migration in a DDL transaction
|
||||
// deletes the row in [_migrations] table with specified migration version on completion (success or failure)
|
||||
// returns the time taking to run the migration SQL
|
||||
fn revert<'e: 'm, 'm>(
|
||||
fn revert<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>>;
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>>;
|
||||
}
|
||||
|
||||
@ -27,25 +27,6 @@ pub struct Migrator {
|
||||
pub table_name: Cow<'static, str>,
|
||||
}
|
||||
|
||||
fn validate_applied_migrations(
|
||||
applied_migrations: &[AppliedMigration],
|
||||
migrator: &Migrator,
|
||||
) -> Result<(), MigrateError> {
|
||||
if migrator.ignore_missing {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
|
||||
|
||||
for applied_migration in applied_migrations {
|
||||
if !migrations.contains(&applied_migration.version) {
|
||||
return Err(MigrateError::VersionMissing(applied_migration.version));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl Migrator {
|
||||
#[doc(hidden)]
|
||||
pub const DEFAULT: Migrator = Migrator {
|
||||
@ -156,12 +137,21 @@ impl Migrator {
|
||||
<A::Connection as Deref>::Target: Migrate,
|
||||
{
|
||||
let mut conn = migrator.acquire().await?;
|
||||
self.run_direct(&mut *conn).await
|
||||
self.run_direct(None, &mut *conn).await
|
||||
}
|
||||
|
||||
pub async fn run_to<'a, A>(&self, target: i64, migrator: A) -> Result<(), MigrateError>
|
||||
where
|
||||
A: Acquire<'a>,
|
||||
<A::Connection as Deref>::Target: Migrate,
|
||||
{
|
||||
let mut conn = migrator.acquire().await?;
|
||||
self.run_direct(Some(target), &mut *conn).await
|
||||
}
|
||||
|
||||
// Getting around the annoying "implementation of `Acquire` is not general enough" error
|
||||
#[doc(hidden)]
|
||||
pub async fn run_direct<C>(&self, conn: &mut C) -> Result<(), MigrateError>
|
||||
pub async fn run_direct<C>(&self, target: Option<i64>, conn: &mut C) -> Result<(), MigrateError>
|
||||
where
|
||||
C: Migrate,
|
||||
{
|
||||
@ -172,14 +162,14 @@ impl Migrator {
|
||||
|
||||
// creates [_migrations] table only if needed
|
||||
// eventually this will likely migrate previous versions of the table
|
||||
conn.ensure_migrations_table().await?;
|
||||
conn.ensure_migrations_table(&self.table_name).await?;
|
||||
|
||||
let version = conn.dirty_version().await?;
|
||||
let version = conn.dirty_version(&self.table_name).await?;
|
||||
if let Some(version) = version {
|
||||
return Err(MigrateError::Dirty(version));
|
||||
}
|
||||
|
||||
let applied_migrations = conn.list_applied_migrations().await?;
|
||||
let applied_migrations = conn.list_applied_migrations(&self.table_name).await?;
|
||||
validate_applied_migrations(&applied_migrations, self)?;
|
||||
|
||||
let applied_migrations: HashMap<_, _> = applied_migrations
|
||||
@ -188,6 +178,11 @@ impl Migrator {
|
||||
.collect();
|
||||
|
||||
for migration in self.iter() {
|
||||
if target.is_some_and(|target| target < migration.version) {
|
||||
// Target version reached
|
||||
break;
|
||||
}
|
||||
|
||||
if migration.migration_type.is_down_migration() {
|
||||
continue;
|
||||
}
|
||||
@ -199,7 +194,7 @@ impl Migrator {
|
||||
}
|
||||
}
|
||||
None => {
|
||||
conn.apply(migration).await?;
|
||||
conn.apply(&self.table_name, migration).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -244,14 +239,14 @@ impl Migrator {
|
||||
|
||||
// creates [_migrations] table only if needed
|
||||
// eventually this will likely migrate previous versions of the table
|
||||
conn.ensure_migrations_table().await?;
|
||||
conn.ensure_migrations_table(&self.table_name).await?;
|
||||
|
||||
let version = conn.dirty_version().await?;
|
||||
let version = conn.dirty_version(&self.table_name).await?;
|
||||
if let Some(version) = version {
|
||||
return Err(MigrateError::Dirty(version));
|
||||
}
|
||||
|
||||
let applied_migrations = conn.list_applied_migrations().await?;
|
||||
let applied_migrations = conn.list_applied_migrations(&self.table_name).await?;
|
||||
validate_applied_migrations(&applied_migrations, self)?;
|
||||
|
||||
let applied_migrations: HashMap<_, _> = applied_migrations
|
||||
@ -266,7 +261,7 @@ impl Migrator {
|
||||
.filter(|m| applied_migrations.contains_key(&m.version))
|
||||
.filter(|m| m.version > target)
|
||||
{
|
||||
conn.revert(migration).await?;
|
||||
conn.revert(&self.table_name, migration).await?;
|
||||
}
|
||||
|
||||
// unlock the migrator to allow other migrators to run
|
||||
@ -278,3 +273,22 @@ impl Migrator {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_applied_migrations(
|
||||
applied_migrations: &[AppliedMigration],
|
||||
migrator: &Migrator,
|
||||
) -> Result<(), MigrateError> {
|
||||
if migrator.ignore_missing {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
|
||||
|
||||
for applied_migration in applied_migrations {
|
||||
if !migrations.contains(&applied_migration.version) {
|
||||
return Err(MigrateError::VersionMissing(applied_migration.version));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -243,7 +243,7 @@ async fn setup_test_db<DB: Database>(
|
||||
|
||||
if let Some(migrator) = args.migrator {
|
||||
migrator
|
||||
.run_direct(&mut conn)
|
||||
.run_direct(None, &mut conn)
|
||||
.await
|
||||
.expect("failed to apply migrations");
|
||||
}
|
||||
|
||||
@ -4,7 +4,6 @@ use std::time::Instant;
|
||||
|
||||
use futures_core::future::BoxFuture;
|
||||
pub(crate) use sqlx_core::migrate::*;
|
||||
|
||||
use crate::connection::{ConnectOptions, Connection};
|
||||
use crate::error::Error;
|
||||
use crate::executor::Executor;
|
||||
@ -75,12 +74,12 @@ impl MigrateDatabase for MySql {
|
||||
}
|
||||
|
||||
impl Migrate for MySqlConnection {
|
||||
fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
||||
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=MySQL
|
||||
self.execute(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
&*format!(r#"
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
version BIGINT PRIMARY KEY,
|
||||
description TEXT NOT NULL,
|
||||
installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
@ -88,7 +87,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
checksum BLOB NOT NULL,
|
||||
execution_time BIGINT NOT NULL
|
||||
);
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -96,11 +95,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
|
||||
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQL
|
||||
let row: Option<(i64,)> = query_as(
|
||||
"SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
|
||||
&format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"),
|
||||
)
|
||||
.fetch_optional(self)
|
||||
.await?;
|
||||
@ -109,13 +108,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn list_applied_migrations(
|
||||
&mut self,
|
||||
) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQL
|
||||
let rows: Vec<(i64, Vec<u8>)> =
|
||||
query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
|
||||
query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version"))
|
||||
.fetch_all(self)
|
||||
.await?;
|
||||
|
||||
@ -167,10 +164,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn apply<'e: 'm, 'm>(
|
||||
fn apply<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// Use a single transaction for the actual migration script and the essential bookeeping so we never
|
||||
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
|
||||
@ -188,10 +186,10 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
//
|
||||
// language=MySQL
|
||||
let _ = query(
|
||||
r#"
|
||||
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
|
||||
&format!(r#"
|
||||
INSERT INTO {table_name} ( version, description, success, checksum, execution_time )
|
||||
VALUES ( ?, ?, FALSE, ?, -1 )
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(migration.version)
|
||||
.bind(&*migration.description)
|
||||
@ -206,11 +204,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
|
||||
// language=MySQL
|
||||
let _ = query(
|
||||
r#"
|
||||
UPDATE _sqlx_migrations
|
||||
&format!(r#"
|
||||
UPDATE {table_name}
|
||||
SET success = TRUE
|
||||
WHERE version = ?
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(migration.version)
|
||||
.execute(&mut *tx)
|
||||
@ -226,11 +224,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
|
||||
#[allow(clippy::cast_possible_truncation)]
|
||||
let _ = query(
|
||||
r#"
|
||||
UPDATE _sqlx_migrations
|
||||
&format!(r#"
|
||||
UPDATE {table_name}
|
||||
SET execution_time = ?
|
||||
WHERE version = ?
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(elapsed.as_nanos() as i64)
|
||||
.bind(migration.version)
|
||||
@ -241,10 +239,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
fn revert<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// Use a single transaction for the actual migration script and the essential bookeeping so we never
|
||||
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
|
||||
@ -259,11 +258,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
//
|
||||
// language=MySQL
|
||||
let _ = query(
|
||||
r#"
|
||||
UPDATE _sqlx_migrations
|
||||
&format!(r#"
|
||||
UPDATE {table_name}
|
||||
SET success = FALSE
|
||||
WHERE version = ?
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(migration.version)
|
||||
.execute(&mut *tx)
|
||||
@ -272,7 +271,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
tx.execute(&*migration.sql).await?;
|
||||
|
||||
// language=SQL
|
||||
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
|
||||
let _ = query(&format!(r#"DELETE FROM {table_name} WHERE version = ?"#))
|
||||
.bind(migration.version)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
@ -111,12 +111,12 @@ impl MigrateDatabase for Postgres {
|
||||
}
|
||||
|
||||
impl Migrate for PgConnection {
|
||||
fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
||||
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQL
|
||||
self.execute(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
&*format!(r#"
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
version BIGINT PRIMARY KEY,
|
||||
description TEXT NOT NULL,
|
||||
installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
@ -124,7 +124,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
checksum BYTEA NOT NULL,
|
||||
execution_time BIGINT NOT NULL
|
||||
);
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -132,11 +132,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
|
||||
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQL
|
||||
let row: Option<(i64,)> = query_as(
|
||||
"SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
|
||||
&*format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"),
|
||||
)
|
||||
.fetch_optional(self)
|
||||
.await?;
|
||||
@ -145,13 +145,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn list_applied_migrations(
|
||||
&mut self,
|
||||
) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQL
|
||||
let rows: Vec<(i64, Vec<u8>)> =
|
||||
query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
|
||||
query_as(&*format!("SELECT version, checksum FROM {table_name} ORDER BY version"))
|
||||
.fetch_all(self)
|
||||
.await?;
|
||||
|
||||
@ -203,16 +201,17 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn apply<'e: 'm, 'm>(
|
||||
fn apply<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
let start = Instant::now();
|
||||
|
||||
// execute migration queries
|
||||
if migration.no_tx {
|
||||
execute_migration(self, migration).await?;
|
||||
execute_migration(self, table_name, migration).await?;
|
||||
} else {
|
||||
// Use a single transaction for the actual migration script and the essential bookeeping so we never
|
||||
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
|
||||
@ -220,7 +219,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
|
||||
// and update it once the actual transaction completed.
|
||||
let mut tx = self.begin().await?;
|
||||
execute_migration(&mut tx, migration).await?;
|
||||
execute_migration(&mut tx, table_name, migration).await?;
|
||||
tx.commit().await?;
|
||||
}
|
||||
|
||||
@ -232,11 +231,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
// language=SQL
|
||||
#[allow(clippy::cast_possible_truncation)]
|
||||
let _ = query(
|
||||
r#"
|
||||
UPDATE _sqlx_migrations
|
||||
&*format!(r#"
|
||||
UPDATE {table_name}
|
||||
SET execution_time = $1
|
||||
WHERE version = $2
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(elapsed.as_nanos() as i64)
|
||||
.bind(migration.version)
|
||||
@ -247,21 +246,22 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
fn revert<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
let start = Instant::now();
|
||||
|
||||
// execute migration queries
|
||||
if migration.no_tx {
|
||||
revert_migration(self, migration).await?;
|
||||
revert_migration(self, table_name, migration).await?;
|
||||
} else {
|
||||
// Use a single transaction for the actual migration script and the essential bookeeping so we never
|
||||
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
|
||||
let mut tx = self.begin().await?;
|
||||
revert_migration(&mut tx, migration).await?;
|
||||
revert_migration(&mut tx, table_name, migration).await?;
|
||||
tx.commit().await?;
|
||||
}
|
||||
|
||||
@ -274,6 +274,7 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
|
||||
async fn execute_migration(
|
||||
conn: &mut PgConnection,
|
||||
table_name: &str,
|
||||
migration: &Migration,
|
||||
) -> Result<(), MigrateError> {
|
||||
let _ = conn
|
||||
@ -283,10 +284,10 @@ async fn execute_migration(
|
||||
|
||||
// language=SQL
|
||||
let _ = query(
|
||||
r#"
|
||||
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
|
||||
&*format!(r#"
|
||||
INSERT INTO {table_name} ( version, description, success, checksum, execution_time )
|
||||
VALUES ( $1, $2, TRUE, $3, -1 )
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(migration.version)
|
||||
.bind(&*migration.description)
|
||||
@ -299,6 +300,7 @@ async fn execute_migration(
|
||||
|
||||
async fn revert_migration(
|
||||
conn: &mut PgConnection,
|
||||
table_name: &str,
|
||||
migration: &Migration,
|
||||
) -> Result<(), MigrateError> {
|
||||
let _ = conn
|
||||
@ -307,7 +309,7 @@ async fn revert_migration(
|
||||
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
|
||||
|
||||
// language=SQL
|
||||
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
|
||||
let _ = query(&*format!(r#"DELETE FROM {table_name} WHERE version = $1"#))
|
||||
.bind(migration.version)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
@ -64,12 +64,11 @@ impl MigrateDatabase for Sqlite {
|
||||
}
|
||||
|
||||
impl Migrate for SqliteConnection {
|
||||
fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
||||
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQLite
|
||||
self.execute(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
self.execute(&*format!(r#"
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
version BIGINT PRIMARY KEY,
|
||||
description TEXT NOT NULL,
|
||||
installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
@ -77,19 +76,19 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
checksum BLOB NOT NULL,
|
||||
execution_time BIGINT NOT NULL
|
||||
);
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.await?;
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
|
||||
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQLite
|
||||
let row: Option<(i64,)> = query_as(
|
||||
"SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
|
||||
&format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"),
|
||||
)
|
||||
.fetch_optional(self)
|
||||
.await?;
|
||||
@ -98,13 +97,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn list_applied_migrations(
|
||||
&mut self,
|
||||
) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// language=SQLite
|
||||
let rows: Vec<(i64, Vec<u8>)> =
|
||||
query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
|
||||
query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version"))
|
||||
.fetch_all(self)
|
||||
.await?;
|
||||
|
||||
@ -128,10 +125,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
Box::pin(async move { Ok(()) })
|
||||
}
|
||||
|
||||
fn apply<'e: 'm, 'm>(
|
||||
fn apply<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
let mut tx = self.begin().await?;
|
||||
let start = Instant::now();
|
||||
@ -148,10 +146,10 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
|
||||
// language=SQL
|
||||
let _ = query(
|
||||
r#"
|
||||
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
|
||||
&format!(r#"
|
||||
INSERT INTO {table_name} ( version, description, success, checksum, execution_time )
|
||||
VALUES ( ?1, ?2, TRUE, ?3, -1 )
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(migration.version)
|
||||
.bind(&*migration.description)
|
||||
@ -170,11 +168,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
// language=SQL
|
||||
#[allow(clippy::cast_possible_truncation)]
|
||||
let _ = query(
|
||||
r#"
|
||||
UPDATE _sqlx_migrations
|
||||
&format!(r#"
|
||||
UPDATE {table_name}
|
||||
SET execution_time = ?1
|
||||
WHERE version = ?2
|
||||
"#,
|
||||
"#),
|
||||
)
|
||||
.bind(elapsed.as_nanos() as i64)
|
||||
.bind(migration.version)
|
||||
@ -185,10 +183,11 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
})
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
fn revert<'e>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
table_name: &'e str,
|
||||
migration: &'e Migration,
|
||||
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
// Use a single transaction for the actual migration script and the essential bookeeping so we never
|
||||
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
|
||||
@ -197,8 +196,8 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
||||
|
||||
let _ = tx.execute(&*migration.sql).await?;
|
||||
|
||||
// language=SQL
|
||||
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
|
||||
// language=SQLite
|
||||
let _ = query(&format!(r#"DELETE FROM {table_name} WHERE version = ?1"#))
|
||||
.bind(migration.version)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user