diff --git a/sqlx-cli/src/database.rs b/sqlx-cli/src/database.rs index 0ee3ba0b..1970826b 100644 --- a/sqlx-cli/src/database.rs +++ b/sqlx-cli/src/database.rs @@ -40,5 +40,5 @@ pub async fn reset(migration_source: &str, uri: &str, confirm: bool) -> anyhow:: pub async fn setup(migration_source: &str, uri: &str) -> anyhow::Result<()> { create(uri).await?; - migrate::run(migration_source, uri).await + migrate::run(migration_source, uri, false).await } diff --git a/sqlx-cli/src/lib.rs b/sqlx-cli/src/lib.rs index c3ac96d9..9d18bf51 100644 --- a/sqlx-cli/src/lib.rs +++ b/sqlx-cli/src/lib.rs @@ -31,8 +31,16 @@ hint: This command only works in the manifest directory of a Cargo package."# match opt.command { Command::Migrate(migrate) => match migrate.command { - MigrateCommand::Add { description } => migrate::add(&migrate.source, &description)?, - MigrateCommand::Run => migrate::run(&migrate.source, &database_url).await?, + MigrateCommand::Add { + description, + reversible, + } => migrate::add(&migrate.source, &description, reversible).await?, + MigrateCommand::Run { dry_run } => { + migrate::run(&migrate.source, &database_url, dry_run).await? + } + MigrateCommand::Revert { dry_run } => { + migrate::revert(&migrate.source, &database_url, dry_run).await? + } MigrateCommand::Info => migrate::info(&migrate.source, &database_url).await?, }, diff --git a/sqlx-cli/src/migrate.rs b/sqlx-cli/src/migrate.rs index 5ff099a3..54a8c876 100644 --- a/sqlx-cli/src/migrate.rs +++ b/sqlx-cli/src/migrate.rs @@ -1,22 +1,25 @@ use anyhow::{bail, Context}; +use chrono::Utc; use console::style; -use sqlx::migrate::{Migrate, MigrateError, Migrator}; +use sqlx::migrate::{Migrate, MigrateError, MigrationType, Migrator}; use sqlx::{AnyConnection, Connection}; use std::fs::{self, File}; use std::io::Write; use std::path::Path; +use std::time::Duration; -pub fn add(migration_source: &str, description: &str) -> anyhow::Result<()> { - use chrono::prelude::*; +fn create_file( + migration_source: &str, + file_prefix: &str, + description: &str, + migration_type: MigrationType, +) -> anyhow::Result<()> { use std::path::PathBuf; - fs::create_dir_all(migration_source).context("Unable to create migrations directory")?; - - let dt = Utc::now(); - let mut file_name = dt.format("%Y%m%d%H%M%S").to_string(); + let mut file_name = file_prefix.to_string(); file_name.push_str("_"); file_name.push_str(&description.replace(' ', "_")); - file_name.push_str(".sql"); + file_name.push_str(migration_type.suffix()); let mut path = PathBuf::new(); path.push(migration_source); @@ -26,7 +29,49 @@ pub fn add(migration_source: &str, description: &str) -> anyhow::Result<()> { let mut file = File::create(&path).context("Failed to create migration file")?; - file.write_all(b"-- Add migration script here\n")?; + file.write_all(migration_type.file_content().as_bytes())?; + + Ok(()) +} + +pub async fn add( + migration_source: &str, + description: &str, + reversible: bool, +) -> anyhow::Result<()> { + fs::create_dir_all(migration_source).context("Unable to create migrations directory")?; + + let migrator = Migrator::new(Path::new(migration_source)).await?; + // This checks if all existing migrations are of the same type as the reverisble flag passed + for migration in migrator.iter() { + if migration.migration_type.is_reversible() != reversible { + bail!(MigrateError::InvalidMixReversibleAndSimple); + } + } + + let dt = Utc::now(); + let file_prefix = dt.format("%Y%m%d%H%M%S").to_string(); + if reversible { + create_file( + migration_source, + &file_prefix, + description, + MigrationType::ReversibleUp, + )?; + create_file( + migration_source, + &file_prefix, + description, + MigrationType::ReversibleDown, + )?; + } else { + create_file( + migration_source, + &file_prefix, + description, + MigrationType::Simple, + )?; + } Ok(()) } @@ -55,7 +100,7 @@ pub async fn info(migration_source: &str, uri: &str) -> anyhow::Result<()> { Ok(()) } -pub async fn run(migration_source: &str, uri: &str) -> anyhow::Result<()> { +pub async fn run(migration_source: &str, uri: &str, dry_run: bool) -> anyhow::Result<()> { let migrator = Migrator::new(Path::new(migration_source)).await?; let mut conn = AnyConnection::connect(uri).await?; @@ -68,13 +113,23 @@ pub async fn run(migration_source: &str, uri: &str) -> anyhow::Result<()> { } for migration in migrator.iter() { + if migration.migration_type.is_down_migration() { + // Skipping down migrations + continue; + } if migration.version > version { - let elapsed = conn.apply(migration).await?; + let elapsed = if dry_run { + Duration::new(0, 0) + } else { + conn.apply(migration).await? + }; + let text = if dry_run { "Can apply" } else { "Applied" }; println!( - "{}/{} {} {}", + "{} {}/{} {} {}", + text, style(migration.version).cyan(), - style("migrate").green(), + style(migration.migration_type.label()).green(), migration.description, style(format!("({:?})", elapsed)).dim() ); @@ -85,3 +140,54 @@ pub async fn run(migration_source: &str, uri: &str) -> anyhow::Result<()> { Ok(()) } + +pub async fn revert(migration_source: &str, uri: &str, dry_run: bool) -> anyhow::Result<()> { + let migrator = Migrator::new(Path::new(migration_source)).await?; + let mut conn = AnyConnection::connect(uri).await?; + + conn.ensure_migrations_table().await?; + + let (version, dirty) = conn.version().await?.unwrap_or((0, false)); + + if dirty { + bail!(MigrateError::Dirty(version)); + } + + let mut is_applied = false; + for migration in migrator.iter().rev() { + if !migration.migration_type.is_down_migration() { + // Skipping non down migration + // This will skip any standard or up migration file + continue; + } + if migration.version > version { + // Skipping unapplied migrations + continue; + } + + let elapsed = if dry_run { + Duration::new(0, 0) + } else { + conn.revert(migration).await? + }; + let text = if dry_run { "Can apply" } else { "Applied" }; + + println!( + "{} {}/{} {} {}", + text, + style(migration.version).cyan(), + style(migration.migration_type.label()).green(), + migration.description, + style(format!("({:?})", elapsed)).dim() + ); + + is_applied = true; + // Only a single migration will be reverted at a time, so we break + break; + } + if !is_applied { + println!("No migrations available to revert"); + } + + Ok(()) +} diff --git a/sqlx-cli/src/opt.rs b/sqlx-cli/src/opt.rs index d04506cb..9bddee06 100644 --- a/sqlx-cli/src/opt.rs +++ b/sqlx-cli/src/opt.rs @@ -93,10 +93,28 @@ pub struct MigrateOpt { pub enum MigrateCommand { /// Create a new migration with the given description, /// and the current time as the version. - Add { description: String }, + Add { + description: String, + + /// If true, creates a pair of up and down migration files with same version + /// else creates a single sql file + #[clap(short)] + reversible: bool, + }, /// Run all pending migrations. - Run, + Run { + /// List all the migrations to be run without applying + #[clap(long)] + dry_run: bool, + }, + + /// Revert the latest migration with a down file. + Revert { + /// List the migration to be reverted without applying + #[clap(long)] + dry_run: bool, + }, /// List all available migrations. Info, diff --git a/sqlx-core/src/any/migrate.rs b/sqlx-core/src/any/migrate.rs index f87fbab5..d5d7df5b 100644 --- a/sqlx-core/src/any/migrate.rs +++ b/sqlx-core/src/any/migrate.rs @@ -171,4 +171,23 @@ impl Migrate for AnyConnection { } } } + + fn revert<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + match &mut self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.revert(migration), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.revert(migration), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.revert(migration), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => unimplemented!(), + } + } } diff --git a/sqlx-core/src/migrate/error.rs b/sqlx-core/src/migrate/error.rs index e9689e43..36dbe811 100644 --- a/sqlx-core/src/migrate/error.rs +++ b/sqlx-core/src/migrate/error.rs @@ -15,6 +15,9 @@ pub enum MigrateError { #[error("migration {0} was previously applied but has been modified")] VersionMismatch(i64), + #[error("cannot mix reversible migrations with simple migrations. All migrations should be reversible or simple migrations")] + InvalidMixReversibleAndSimple, + // NOTE: this will only happen with a database that does not have transactional DDL (.e.g, MySQL or Oracle) #[error( "migration {0} is partially applied; fix and remove row from `_sqlx_migrations` table" diff --git a/sqlx-core/src/migrate/migrate.rs b/sqlx-core/src/migrate/migrate.rs index c434c20c..31690d92 100644 --- a/sqlx-core/src/migrate/migrate.rs +++ b/sqlx-core/src/migrate/migrate.rs @@ -50,4 +50,12 @@ pub trait Migrate { &'e mut self, migration: &'m Migration, ) -> BoxFuture<'m, Result>; + + // run a revert SQL from migration in a DDL transaction + // deletes the row in [_migrations] table with specified migration version on completion (success or failure) + // returns the time taking to run the migration SQL + fn revert<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result>; } diff --git a/sqlx-core/src/migrate/migration.rs b/sqlx-core/src/migrate/migration.rs index c664d8fa..ed362da2 100644 --- a/sqlx-core/src/migrate/migration.rs +++ b/sqlx-core/src/migrate/migration.rs @@ -2,21 +2,30 @@ use std::borrow::Cow; use sha2::{Digest, Sha384}; +use super::MigrationType; + #[derive(Debug, Clone)] pub struct Migration { pub version: i64, pub description: Cow<'static, str>, + pub migration_type: MigrationType, pub sql: Cow<'static, str>, pub checksum: Cow<'static, [u8]>, } impl Migration { - pub fn new(version: i64, description: Cow<'static, str>, sql: Cow<'static, str>) -> Self { + pub fn new( + version: i64, + description: Cow<'static, str>, + migration_type: MigrationType, + sql: Cow<'static, str>, + ) -> Self { let checksum = Cow::Owned(Vec::from(Sha384::digest(sql.as_bytes()).as_slice())); Migration { version, description, + migration_type, sql, checksum, } diff --git a/sqlx-core/src/migrate/migration_type.rs b/sqlx-core/src/migrate/migration_type.rs new file mode 100644 index 00000000..edabdbed --- /dev/null +++ b/sqlx-core/src/migrate/migration_type.rs @@ -0,0 +1,66 @@ +/// Migration Type represents the type of migration +#[derive(Debug, Copy, Clone)] +pub enum MigrationType { + /// Simple migration are single file migrations with no up / down queries + Simple, + + /// ReversibleUp migrations represents the add or update part of a reversible migrations + /// It is expected the every migration of this type will have a corresponding down file + ReversibleUp, + + /// ReversibleDown migrations represents the delete or downgrade part of a reversible migrations + /// It is expected the every migration of this type will have a corresponding up file + ReversibleDown, +} + +impl MigrationType { + pub fn from_filename(filename: &str) -> Self { + if filename.ends_with(MigrationType::ReversibleUp.suffix()) { + MigrationType::ReversibleUp + } else if filename.ends_with(MigrationType::ReversibleDown.suffix()) { + MigrationType::ReversibleDown + } else { + MigrationType::Simple + } + } + + pub fn is_reversible(&self) -> bool { + match self { + MigrationType::Simple => false, + MigrationType::ReversibleUp => true, + MigrationType::ReversibleDown => true, + } + } + + pub fn is_down_migration(&self) -> bool { + match self { + MigrationType::Simple => false, + MigrationType::ReversibleUp => false, + MigrationType::ReversibleDown => true, + } + } + + pub fn label(&self) -> &'static str { + match self { + MigrationType::Simple => "migrate", + MigrationType::ReversibleUp => "migrate", + MigrationType::ReversibleDown => "revert", + } + } + + pub fn suffix(&self) -> &'static str { + match self { + MigrationType::Simple => ".sql", + MigrationType::ReversibleUp => ".up.sql", + MigrationType::ReversibleDown => ".down.sql", + } + } + + pub fn file_content(&self) -> &'static str { + match self { + MigrationType::Simple => "-- Add migration script here\n", + MigrationType::ReversibleUp => "-- Add up migration script here\n", + MigrationType::ReversibleDown => "-- Add down migration script here\n", + } + } +} diff --git a/sqlx-core/src/migrate/mod.rs b/sqlx-core/src/migrate/mod.rs index 04231733..d569be43 100644 --- a/sqlx-core/src/migrate/mod.rs +++ b/sqlx-core/src/migrate/mod.rs @@ -1,11 +1,13 @@ mod error; mod migrate; mod migration; +mod migration_type; mod migrator; mod source; pub use error::MigrateError; pub use migrate::{Migrate, MigrateDatabase}; pub use migration::Migration; +pub use migration_type::MigrationType; pub use migrator::Migrator; pub use source::MigrationSource; diff --git a/sqlx-core/src/migrate/source.rs b/sqlx-core/src/migrate/source.rs index e337e911..65142f50 100644 --- a/sqlx-core/src/migrate/source.rs +++ b/sqlx-core/src/migrate/source.rs @@ -1,5 +1,5 @@ use crate::error::BoxDynError; -use crate::migrate::Migration; +use crate::migrate::{Migration, MigrationType}; use futures_core::future::BoxFuture; use futures_util::TryStreamExt; use sqlx_rt::fs; @@ -35,9 +35,10 @@ impl<'s> MigrationSource<'s> for &'s Path { let version: i64 = parts[0].parse()?; + let migration_type = MigrationType::from_filename(parts[1]); // remove the `.sql` and replace `_` with ` ` let description = parts[1] - .trim_end_matches(".sql") + .trim_end_matches(migration_type.suffix()) .replace('_', " ") .to_owned(); @@ -46,6 +47,7 @@ impl<'s> MigrationSource<'s> for &'s Path { migrations.push(Migration::new( version, Cow::Owned(description), + migration_type, Cow::Owned(sql), )); } diff --git a/sqlx-core/src/mysql/migrate.rs b/sqlx-core/src/mysql/migrate.rs index 1476ef04..0aedac28 100644 --- a/sqlx-core/src/mysql/migrate.rs +++ b/sqlx-core/src/mysql/migrate.rs @@ -201,6 +201,27 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( Ok(elapsed) }) } + + fn revert<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + Box::pin(async move { + let start = Instant::now(); + + self.execute(&*migration.sql).await?; + + let elapsed = start.elapsed(); + + // language=SQL + let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#) + .bind(migration.version) + .execute(self) + .await?; + + Ok(elapsed) + }) + } } async fn current_database(conn: &mut MySqlConnection) -> Result { diff --git a/sqlx-core/src/postgres/migrate.rs b/sqlx-core/src/postgres/migrate.rs index ef80eb3b..d9adcea1 100644 --- a/sqlx-core/src/postgres/migrate.rs +++ b/sqlx-core/src/postgres/migrate.rs @@ -211,6 +211,30 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( Ok(elapsed) }) } + + fn revert<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + Box::pin(async move { + let mut tx = self.begin().await?; + let start = Instant::now(); + + let _ = tx.execute(&*migration.sql).await?; + + tx.commit().await?; + + let elapsed = start.elapsed(); + + // language=SQL + let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#) + .bind(migration.version) + .execute(self) + .await?; + + Ok(elapsed) + }) + } } async fn current_database(conn: &mut PgConnection) -> Result { diff --git a/sqlx-core/src/sqlite/migrate.rs b/sqlx-core/src/sqlite/migrate.rs index 3572a99a..09f37076 100644 --- a/sqlx-core/src/sqlite/migrate.rs +++ b/sqlx-core/src/sqlite/migrate.rs @@ -150,4 +150,28 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations ( Ok(elapsed) }) } + + fn revert<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + Box::pin(async move { + let mut tx = self.begin().await?; + let start = Instant::now(); + + let _ = tx.execute(&*migration.sql).await?; + + tx.commit().await?; + + let elapsed = start.elapsed(); + + // language=SQL + let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#) + .bind(migration.version) + .execute(self) + .await?; + + Ok(elapsed) + }) + } }