diff --git a/sqlx-cli/src/database.rs b/sqlx-cli/src/database.rs index 1fd8bcc5..a0af55d6 100644 --- a/sqlx-cli/src/database.rs +++ b/sqlx-cli/src/database.rs @@ -1,5 +1,5 @@ -use crate::{migrate, Config}; use crate::opt::{ConnectOpts, MigrationSourceOpt}; +use crate::{migrate, Config}; use console::style; use promptly::{prompt, ReadlineError}; use sqlx::any::Any; @@ -54,7 +54,11 @@ pub async fn reset( setup(config, migration_source, connect_opts).await } -pub async fn setup(config: &Config, migration_source: &MigrationSourceOpt, connect_opts: &ConnectOpts) -> anyhow::Result<()> { +pub async fn setup( + config: &Config, + migration_source: &MigrationSourceOpt, + connect_opts: &ConnectOpts, +) -> anyhow::Result<()> { create(connect_opts).await?; migrate::run(config, migration_source, connect_opts, false, false, None).await } diff --git a/sqlx-cli/src/lib.rs b/sqlx-cli/src/lib.rs index 63257f54..43b301e4 100644 --- a/sqlx-cli/src/lib.rs +++ b/sqlx-cli/src/lib.rs @@ -1,5 +1,5 @@ use std::io; -use std::path::{PathBuf}; +use std::path::PathBuf; use std::time::Duration; use anyhow::{Context, Result}; @@ -28,7 +28,7 @@ pub async fn run(opt: Opt) -> Result<()> { match opt.command { Command::Migrate(migrate) => match migrate.command { - MigrateCommand::Add(opts)=> migrate::add(config, opts).await?, + MigrateCommand::Add(opts) => migrate::add(config, opts).await?, MigrateCommand::Run { source, dry_run, @@ -74,15 +74,17 @@ pub async fn run(opt: Opt) -> Result<()> { connect_opts.populate_db_url(config)?; migrate::info(config, &source, &connect_opts).await? - }, - MigrateCommand::BuildScript { source, force } => migrate::build_script(config, &source, force)?, + } + MigrateCommand::BuildScript { source, force } => { + migrate::build_script(config, &source, force)? + } }, Command::Database(database) => match database.command { DatabaseCommand::Create { mut connect_opts } => { connect_opts.populate_db_url(config)?; database::create(&connect_opts).await? - }, + } DatabaseCommand::Drop { confirmation, mut connect_opts, @@ -90,7 +92,7 @@ pub async fn run(opt: Opt) -> Result<()> { } => { connect_opts.populate_db_url(config)?; database::drop(&connect_opts, !confirmation.yes, force).await? - }, + } DatabaseCommand::Reset { confirmation, source, @@ -99,14 +101,14 @@ pub async fn run(opt: Opt) -> Result<()> { } => { connect_opts.populate_db_url(config)?; database::reset(config, &source, &connect_opts, !confirmation.yes, force).await? - }, + } DatabaseCommand::Setup { source, mut connect_opts, } => { connect_opts.populate_db_url(config)?; database::setup(config, &source, &connect_opts).await? - }, + } }, Command::Prepare { @@ -118,7 +120,7 @@ pub async fn run(opt: Opt) -> Result<()> { } => { connect_opts.populate_db_url(config)?; prepare::run(check, all, workspace, connect_opts, args).await? - }, + } #[cfg(feature = "completions")] Command::Completions { shell } => completions::run(shell), @@ -183,6 +185,6 @@ async fn config_from_current_dir() -> anyhow::Result<&'static Config> { Config::read_with_or_default(move || Ok(path)) }) - .await - .context("unexpected error loading config") + .await + .context("unexpected error loading config") } diff --git a/sqlx-cli/src/migrate.rs b/sqlx-cli/src/migrate.rs index 9e011968..3618fbe7 100644 --- a/sqlx-cli/src/migrate.rs +++ b/sqlx-cli/src/migrate.rs @@ -1,7 +1,10 @@ +use crate::config::Config; use crate::opt::{AddMigrationOpts, ConnectOpts, MigrationSourceOpt}; use anyhow::{bail, Context}; use console::style; -use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator, ResolveWith}; +use sqlx::migrate::{ + AppliedMigration, Migrate, MigrateError, MigrationType, Migrator, ResolveWith, +}; use sqlx::Connection; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; @@ -9,14 +12,10 @@ use std::fmt::Write; use std::fs::{self, File}; use std::path::Path; use std::time::Duration; -use crate::config::Config; -pub async fn add( - config: &Config, - opts: AddMigrationOpts, -) -> anyhow::Result<()> { +pub async fn add(config: &Config, opts: AddMigrationOpts) -> anyhow::Result<()> { let source = opts.source.resolve(config); - + fs::create_dir_all(source).context("Unable to create migrations directory")?; let migrator = Migrator::new(Path::new(source)).await?; @@ -124,13 +123,27 @@ fn short_checksum(checksum: &[u8]) -> String { s } -pub async fn info(config: &Config, migration_source: &MigrationSourceOpt, connect_opts: &ConnectOpts) -> anyhow::Result<()> { +pub async fn info( + config: &Config, + migration_source: &MigrationSourceOpt, + connect_opts: &ConnectOpts, +) -> anyhow::Result<()> { let source = migration_source.resolve(config); - - let migrator = Migrator::new(ResolveWith(Path::new(source), config.migrate.to_resolve_config())).await?; + + let migrator = Migrator::new(ResolveWith( + Path::new(source), + config.migrate.to_resolve_config(), + )) + .await?; let mut conn = crate::connect(connect_opts).await?; - conn.ensure_migrations_table(config.migrate.table_name()).await?; + // FIXME: we shouldn't actually be creating anything here + for schema_name in &config.migrate.create_schemas { + conn.create_schema_if_not_exists(schema_name).await?; + } + + conn.ensure_migrations_table(config.migrate.table_name()) + .await?; let applied_migrations: HashMap<_, _> = conn .list_applied_migrations(config.migrate.table_name()) @@ -214,7 +227,7 @@ pub async fn run( target_version: Option, ) -> anyhow::Result<()> { let source = migration_source.resolve(config); - + let migrator = Migrator::new(Path::new(source)).await?; if let Some(target_version) = target_version { if !migrator.version_exists(target_version) { @@ -224,14 +237,21 @@ pub async fn run( let mut conn = crate::connect(connect_opts).await?; - conn.ensure_migrations_table(config.migrate.table_name()).await?; + for schema_name in &config.migrate.create_schemas { + conn.create_schema_if_not_exists(schema_name).await?; + } + + conn.ensure_migrations_table(config.migrate.table_name()) + .await?; let version = conn.dirty_version(config.migrate.table_name()).await?; if let Some(version) = version { bail!(MigrateError::Dirty(version)); } - let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?; + let applied_migrations = conn + .list_applied_migrations(config.migrate.table_name()) + .await?; validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?; let latest_version = applied_migrations @@ -319,14 +339,22 @@ pub async fn revert( let mut conn = crate::connect(connect_opts).await?; - conn.ensure_migrations_table(config.migrate.table_name()).await?; + // FIXME: we should not be creating anything here if it doesn't exist + for schema_name in &config.migrate.create_schemas { + conn.create_schema_if_not_exists(schema_name).await?; + } + + conn.ensure_migrations_table(config.migrate.table_name()) + .await?; let version = conn.dirty_version(config.migrate.table_name()).await?; if let Some(version) = version { bail!(MigrateError::Dirty(version)); } - let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?; + let applied_migrations = conn + .list_applied_migrations(config.migrate.table_name()) + .await?; validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?; let latest_version = applied_migrations @@ -397,9 +425,13 @@ pub async fn revert( Ok(()) } -pub fn build_script(config: &Config, migration_source: &MigrationSourceOpt, force: bool) -> anyhow::Result<()> { +pub fn build_script( + config: &Config, + migration_source: &MigrationSourceOpt, + force: bool, +) -> anyhow::Result<()> { let source = migration_source.resolve(config); - + anyhow::ensure!( Path::new("Cargo.toml").exists(), "must be run in a Cargo project root" diff --git a/sqlx-cli/src/opt.rs b/sqlx-cli/src/opt.rs index 0b72af65..9716303c 100644 --- a/sqlx-cli/src/opt.rs +++ b/sqlx-cli/src/opt.rs @@ -1,13 +1,13 @@ -use std::env; -use std::ops::{Deref, Not}; +use crate::config::migrate::{DefaultMigrationType, DefaultVersioning}; +use crate::config::Config; use anyhow::Context; use chrono::Utc; use clap::{Args, Parser}; #[cfg(feature = "completions")] use clap_complete::Shell; -use crate::config::Config; use sqlx::migrate::Migrator; -use crate::config::migrate::{DefaultMigrationType, DefaultVersioning}; +use std::env; +use std::ops::{Deref, Not}; #[derive(Parser, Debug)] #[clap(version, about, author)] @@ -129,7 +129,7 @@ pub enum MigrateCommand { /// Create a new migration with the given description. /// /// -------------------------------- - /// + /// /// Migrations may either be simple, or reversible. /// /// Reversible migrations can be reverted with `sqlx migrate revert`, simple migrations cannot. @@ -152,7 +152,7 @@ pub enum MigrateCommand { /// It is recommended to always back up the database before running migrations. /// /// -------------------------------- - /// + /// /// For convenience, this command attempts to detect if reversible migrations are in-use. /// /// If the latest existing migration is reversible, the new migration will also be reversible. @@ -164,7 +164,7 @@ pub enum MigrateCommand { /// The default type to use can also be set in `sqlx.toml`. /// /// -------------------------------- - /// + /// /// A version number will be automatically assigned to the migration. /// /// Migrations are applied in ascending order by version number. @@ -174,9 +174,9 @@ pub enum MigrateCommand { /// less than _any_ previously applied migration. /// /// Migrations should only be created with increasing version number. - /// + /// /// -------------------------------- - /// + /// /// For convenience, this command will attempt to detect if sequential versioning is in use, /// and if so, continue the sequence. /// @@ -290,7 +290,7 @@ pub struct AddMigrationOpts { #[derive(Args, Debug)] pub struct MigrationSourceOpt { /// Path to folder containing migrations. - /// + /// /// Defaults to `migrations/` if not specified, but a different default may be set by `sqlx.toml`. #[clap(long)] pub source: Option, @@ -301,7 +301,7 @@ impl MigrationSourceOpt { if let Some(source) = &self.source { return source; } - + config.migrate.migrations_dir() } } @@ -335,7 +335,9 @@ impl ConnectOpts { /// Require a database URL to be provided, otherwise /// return an error. pub fn expect_db_url(&self) -> anyhow::Result<&str> { - self.database_url.as_deref().context("BUG: database_url not populated") + self.database_url + .as_deref() + .context("BUG: database_url not populated") } /// Populate `database_url` from the environment, if not set. @@ -359,7 +361,7 @@ impl ConnectOpts { } self.database_url = Some(url) - }, + } Err(env::VarError::NotPresent) => { anyhow::bail!("`--database-url` or `{var}`{context} must be set") } @@ -407,22 +409,20 @@ impl Not for IgnoreMissing { impl AddMigrationOpts { pub fn reversible(&self, config: &Config, migrator: &Migrator) -> bool { - if self.reversible { return true; } - if self.simple { return false; } + if self.reversible { + return true; + } + if self.simple { + return false; + } match config.migrate.defaults.migration_type { - DefaultMigrationType::Inferred => { - migrator - .iter() - .last() - .is_some_and(|m| m.migration_type.is_reversible()) - } - DefaultMigrationType::Simple => { - false - } - DefaultMigrationType::Reversible => { - true - } + DefaultMigrationType::Inferred => migrator + .iter() + .last() + .is_some_and(|m| m.migration_type.is_reversible()), + DefaultMigrationType::Simple => false, + DefaultMigrationType::Reversible => true, } } @@ -434,8 +434,7 @@ impl AddMigrationOpts { } if self.sequential || matches!(default_versioning, DefaultVersioning::Sequential) { - return next_sequential(migrator) - .unwrap_or_else(|| fmt_sequential(1)); + return next_sequential(migrator).unwrap_or_else(|| fmt_sequential(1)); } next_sequential(migrator).unwrap_or_else(next_timestamp) @@ -455,18 +454,16 @@ fn next_sequential(migrator: &Migrator) -> Option { match migrations { [previous, latest] => { // If the latest two versions differ by 1, infer sequential. - (latest.version - previous.version == 1) - .then_some(latest.version + 1) - }, + (latest.version - previous.version == 1).then_some(latest.version + 1) + } [latest] => { // If only one migration exists and its version is 0 or 1, infer sequential - matches!(latest.version, 0 | 1) - .then_some(latest.version + 1) + matches!(latest.version, 0 | 1).then_some(latest.version + 1) } _ => unreachable!(), } }); - + next_version.map(fmt_sequential) } diff --git a/sqlx-cli/tests/common/mod.rs b/sqlx-cli/tests/common/mod.rs index bb58554f..26f041d6 100644 --- a/sqlx-cli/tests/common/mod.rs +++ b/sqlx-cli/tests/common/mod.rs @@ -1,12 +1,12 @@ use assert_cmd::{assert::Assert, Command}; +use sqlx::_unstable::config::Config; use sqlx::{migrate::Migrate, Connection, SqliteConnection}; use std::{ env::temp_dir, fs::remove_file, path::{Path, PathBuf}, }; -use sqlx::_unstable::config::Config; pub struct TestDatabase { file_path: PathBuf, diff --git a/sqlx-core/src/any/migrate.rs b/sqlx-core/src/any/migrate.rs index b287ec45..69b5bf6a 100644 --- a/sqlx-core/src/any/migrate.rs +++ b/sqlx-core/src/any/migrate.rs @@ -44,16 +44,44 @@ impl MigrateDatabase for Any { } impl Migrate for AnyConnection { - fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { - Box::pin(async { self.get_migrate()?.ensure_migrations_table(table_name).await }) + fn create_schema_if_not_exists<'e>( + &'e mut self, + schema_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { + Box::pin(async { + self.get_migrate()? + .create_schema_if_not_exists(schema_name) + .await + }) } - fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + fn ensure_migrations_table<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { + Box::pin(async { + self.get_migrate()? + .ensure_migrations_table(table_name) + .await + }) + } + + fn dirty_version<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async { self.get_migrate()?.dirty_version(table_name).await }) } - fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { - Box::pin(async { self.get_migrate()?.list_applied_migrations(table_name).await }) + fn list_applied_migrations<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { + Box::pin(async { + self.get_migrate()? + .list_applied_migrations(table_name) + .await + }) } fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { diff --git a/sqlx-core/src/config/migrate.rs b/sqlx-core/src/config/migrate.rs index a70938b2..4865e24c 100644 --- a/sqlx-core/src/config/migrate.rs +++ b/sqlx-core/src/config/migrate.rs @@ -19,6 +19,20 @@ use std::collections::BTreeSet; serde(default, rename_all = "kebab-case") )] pub struct Config { + /// Specify the names of schemas to create if they don't already exist. + /// + /// This is done before checking the existence of the migrations table + /// (`_sqlx_migrations` or overridden `table_name` below) so that it may be placed in + /// one of these schemas. + /// + /// ### Example + /// `sqlx.toml`: + /// ```toml + /// [migrate] + /// create-schemas = ["foo"] + /// ``` + pub create_schemas: BTreeSet>, + /// Override the name of the table used to track executed migrations. /// /// May be schema-qualified and/or contain quotes. Defaults to `_sqlx_migrations`. @@ -185,14 +199,14 @@ impl Config { pub fn migrations_dir(&self) -> &str { self.migrations_dir.as_deref().unwrap_or("migrations") } - + pub fn table_name(&self) -> &str { self.table_name.as_deref().unwrap_or("_sqlx_migrations") } - + pub fn to_resolve_config(&self) -> crate::migrate::ResolveConfig { let mut config = crate::migrate::ResolveConfig::new(); config.ignore_chars(self.ignored_chars.iter().copied()); config } -} \ No newline at end of file +} diff --git a/sqlx-core/src/migrate/error.rs b/sqlx-core/src/migrate/error.rs index 608d55b1..a0424396 100644 --- a/sqlx-core/src/migrate/error.rs +++ b/sqlx-core/src/migrate/error.rs @@ -39,4 +39,7 @@ pub enum MigrateError { "migration {0} is partially applied; fix and remove row from `_sqlx_migrations` table" )] Dirty(i64), + + #[error("database driver does not support creation of schemas at migrate time: {0}")] + CreateSchemasNotSupported(String), } diff --git a/sqlx-core/src/migrate/migrate.rs b/sqlx-core/src/migrate/migrate.rs index 2258f06f..841f7759 100644 --- a/sqlx-core/src/migrate/migrate.rs +++ b/sqlx-core/src/migrate/migrate.rs @@ -25,16 +25,31 @@ pub trait MigrateDatabase { // 'e = Executor pub trait Migrate { + /// Create a database schema with the given name if it does not already exist. + fn create_schema_if_not_exists<'e>( + &'e mut self, + schema_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>>; + // ensure migrations table exists // will create or migrate it if needed - fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>>; + fn ensure_migrations_table<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>>; // Return the version on which the database is dirty or None otherwise. // "dirty" means there is a partially applied migration that failed. - fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>>; + fn dirty_version<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>>; // Return the ordered list of applied migrations - fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>>; + fn list_applied_migrations<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>>; // Should acquire a database lock so that only one migration process // can run at a time. [`Migrate`] will call this function before applying diff --git a/sqlx-core/src/migrate/migrator.rs b/sqlx-core/src/migrate/migrator.rs index aa737ad3..0f5cfb3f 100644 --- a/sqlx-core/src/migrate/migrator.rs +++ b/sqlx-core/src/migrate/migrator.rs @@ -25,6 +25,9 @@ pub struct Migrator { pub no_tx: bool, #[doc(hidden)] pub table_name: Cow<'static, str>, + + #[doc(hidden)] + pub create_schemas: Cow<'static, [Cow<'static, str>]>, } impl Migrator { @@ -35,6 +38,7 @@ impl Migrator { no_tx: false, locking: true, table_name: Cow::Borrowed("_sqlx_migrations"), + create_schemas: Cow::Borrowed(&[]), }; /// Creates a new instance with the given source. @@ -84,6 +88,19 @@ impl Migrator { self } + /// Add a schema name to be created if it does not already exist. + /// + /// May be used with [`Self::dangerous_set_table_name()`] to place the migrations table + /// in a new schema without requiring it to exist first. + /// + /// ### Note: Support Depends on Database + /// SQLite cannot create new schemas without attaching them to a database file, + /// the path of which must be specified separately in an [`ATTACH DATABASE`](https://www.sqlite.org/lang_attach.html) command. + pub fn create_schema(&mut self, schema_name: impl Into>) -> &Self { + self.create_schemas.to_mut().push(schema_name.into()); + self + } + /// Specify whether applied migrations that are missing from the resolved migrations should be ignored. pub fn set_ignore_missing(&mut self, ignore_missing: bool) -> &Self { self.ignore_missing = ignore_missing; @@ -160,6 +177,10 @@ impl Migrator { conn.lock().await?; } + for schema_name in self.create_schemas.iter() { + conn.create_schema_if_not_exists(schema_name).await?; + } + // creates [_migrations] table only if needed // eventually this will likely migrate previous versions of the table conn.ensure_migrations_table(&self.table_name).await?; @@ -182,7 +203,7 @@ impl Migrator { // Target version reached break; } - + if migration.migration_type.is_down_migration() { continue; } @@ -291,4 +312,4 @@ fn validate_applied_migrations( } Ok(()) -} \ No newline at end of file +} diff --git a/sqlx-macros-core/src/migrate.rs b/sqlx-macros-core/src/migrate.rs index 0ae2eaeb..2f0e92bc 100644 --- a/sqlx-macros-core/src/migrate.rs +++ b/sqlx-macros-core/src/migrate.rs @@ -5,10 +5,10 @@ use std::path::{Path, PathBuf}; use proc_macro2::{Span, TokenStream}; use quote::{quote, ToTokens, TokenStreamExt}; -use syn::LitStr; -use syn::spanned::Spanned; use sqlx_core::config::Config; use sqlx_core::migrate::{Migration, MigrationType, ResolveConfig}; +use syn::spanned::Spanned; +use syn::LitStr; pub const DEFAULT_PATH: &str = "./migrations"; @@ -85,7 +85,9 @@ impl ToTokens for QuoteMigration { } pub fn default_path(config: &Config) -> &str { - config.migrate.migrations_dir + config + .migrate + .migrations_dir .as_deref() .unwrap_or(DEFAULT_PATH) } @@ -93,12 +95,10 @@ pub fn default_path(config: &Config) -> &str { pub fn expand(path_arg: Option) -> crate::Result { let config = Config::from_crate(); - let path = match path_arg { - Some(path_arg) => crate::common::resolve_path(path_arg.value(), path_arg.span())?, - None => { - crate::common::resolve_path(default_path(config), Span::call_site()) - }? - }; + let path = match path_arg { + Some(path_arg) => crate::common::resolve_path(path_arg.value(), path_arg.span())?, + None => { crate::common::resolve_path(default_path(config), Span::call_site()) }?, + }; expand_with_path(config, &path) } @@ -130,18 +130,21 @@ pub fn expand_with_path(config: &Config, path: &Path) -> crate::Result crate::Result { let path = crate::migrate::default_path(config); - let resolved_path = - crate::common::resolve_path(path, proc_macro2::Span::call_site())?; + let resolved_path = crate::common::resolve_path(path, proc_macro2::Span::call_site())?; if resolved_path.is_dir() { let migrator = crate::migrate::expand_with_path(config, &resolved_path)?; diff --git a/sqlx-mysql/src/migrate.rs b/sqlx-mysql/src/migrate.rs index f0d0d6a0..45ca7d98 100644 --- a/sqlx-mysql/src/migrate.rs +++ b/sqlx-mysql/src/migrate.rs @@ -2,8 +2,6 @@ use std::str::FromStr; use std::time::Duration; use std::time::Instant; -use futures_core::future::BoxFuture; -pub(crate) use sqlx_core::migrate::*; use crate::connection::{ConnectOptions, Connection}; use crate::error::Error; use crate::executor::Executor; @@ -11,6 +9,8 @@ use crate::query::query; use crate::query_as::query_as; use crate::query_scalar::query_scalar; use crate::{MySql, MySqlConnectOptions, MySqlConnection}; +use futures_core::future::BoxFuture; +pub(crate) use sqlx_core::migrate::*; fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Error> { let mut options = MySqlConnectOptions::from_str(url)?; @@ -74,11 +74,27 @@ impl MigrateDatabase for MySql { } impl Migrate for MySqlConnection { - fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { + fn create_schema_if_not_exists<'e>( + &'e mut self, + schema_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { + Box::pin(async move { + // language=SQL + self.execute(&*format!(r#"CREATE SCHEMA IF NOT EXISTS {schema_name};"#)) + .await?; + + Ok(()) + }) + } + + fn ensure_migrations_table<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { Box::pin(async move { // language=MySQL - self.execute( - &*format!(r#" + self.execute(&*format!( + r#" CREATE TABLE IF NOT EXISTS {table_name} ( version BIGINT PRIMARY KEY, description TEXT NOT NULL, @@ -87,20 +103,23 @@ CREATE TABLE IF NOT EXISTS {table_name} ( checksum BLOB NOT NULL, execution_time BIGINT NOT NULL ); - "#), - ) + "# + )) .await?; Ok(()) }) } - fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + fn dirty_version<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL - let row: Option<(i64,)> = query_as( - &format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"), - ) + let row: Option<(i64,)> = query_as(&format!( + "SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1" + )) .fetch_optional(self) .await?; @@ -108,13 +127,17 @@ CREATE TABLE IF NOT EXISTS {table_name} ( }) } - fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + fn list_applied_migrations<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL - let rows: Vec<(i64, Vec)> = - query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version")) - .fetch_all(self) - .await?; + let rows: Vec<(i64, Vec)> = query_as(&format!( + "SELECT version, checksum FROM {table_name} ORDER BY version" + )) + .fetch_all(self) + .await?; let migrations = rows .into_iter() @@ -185,12 +208,12 @@ CREATE TABLE IF NOT EXISTS {table_name} ( // `success=FALSE` and later modify the flag. // // language=MySQL - let _ = query( - &format!(r#" + let _ = query(&format!( + r#" INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) VALUES ( ?, ?, FALSE, ?, -1 ) - "#), - ) + "# + )) .bind(migration.version) .bind(&*migration.description) .bind(&*migration.checksum) @@ -203,13 +226,13 @@ CREATE TABLE IF NOT EXISTS {table_name} ( .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?; // language=MySQL - let _ = query( - &format!(r#" + let _ = query(&format!( + r#" UPDATE {table_name} SET success = TRUE WHERE version = ? - "#), - ) + "# + )) .bind(migration.version) .execute(&mut *tx) .await?; @@ -223,13 +246,13 @@ CREATE TABLE IF NOT EXISTS {table_name} ( let elapsed = start.elapsed(); #[allow(clippy::cast_possible_truncation)] - let _ = query( - &format!(r#" + let _ = query(&format!( + r#" UPDATE {table_name} SET execution_time = ? WHERE version = ? - "#), - ) + "# + )) .bind(elapsed.as_nanos() as i64) .bind(migration.version) .execute(self) @@ -257,13 +280,13 @@ CREATE TABLE IF NOT EXISTS {table_name} ( // `success=FALSE` and later remove the migration altogether. // // language=MySQL - let _ = query( - &format!(r#" + let _ = query(&format!( + r#" UPDATE {table_name} SET success = FALSE WHERE version = ? - "#), - ) + "# + )) .bind(migration.version) .execute(&mut *tx) .await?; diff --git a/sqlx-postgres/src/connection/describe.rs b/sqlx-postgres/src/connection/describe.rs index 5b6a2aa0..8119e2e9 100644 --- a/sqlx-postgres/src/connection/describe.rs +++ b/sqlx-postgres/src/connection/describe.rs @@ -208,18 +208,18 @@ impl PgConnection { attribute_no: i16, should_fetch: bool, ) -> Result { - if let Some(origin) = - self.inner - .cache_table_to_column_names - .get(&relation_id) - .and_then(|table_columns| { - let column_name = table_columns.columns.get(&attribute_no).cloned()?; + if let Some(origin) = self + .inner + .cache_table_to_column_names + .get(&relation_id) + .and_then(|table_columns| { + let column_name = table_columns.columns.get(&attribute_no).cloned()?; - Some(ColumnOrigin::Table(TableColumn { - table: table_columns.table_name.clone(), - name: column_name, - })) - }) + Some(ColumnOrigin::Table(TableColumn { + table: table_columns.table_name.clone(), + name: column_name, + })) + }) { return Ok(origin); } diff --git a/sqlx-postgres/src/connection/establish.rs b/sqlx-postgres/src/connection/establish.rs index 684bf265..634b71de 100644 --- a/sqlx-postgres/src/connection/establish.rs +++ b/sqlx-postgres/src/connection/establish.rs @@ -149,7 +149,8 @@ impl PgConnection { cache_type_info: HashMap::new(), cache_elem_type_to_array: HashMap::new(), cache_table_to_column_names: HashMap::new(), - log_settings: options.log_settings.clone(),}), + log_settings: options.log_settings.clone(), + }), }) } } diff --git a/sqlx-postgres/src/migrate.rs b/sqlx-postgres/src/migrate.rs index 26464663..90ebd49a 100644 --- a/sqlx-postgres/src/migrate.rs +++ b/sqlx-postgres/src/migrate.rs @@ -111,11 +111,27 @@ impl MigrateDatabase for Postgres { } impl Migrate for PgConnection { - fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { + fn create_schema_if_not_exists<'e>( + &'e mut self, + schema_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { Box::pin(async move { // language=SQL - self.execute( - &*format!(r#" + self.execute(&*format!(r#"CREATE SCHEMA IF NOT EXISTS {schema_name};"#)) + .await?; + + Ok(()) + }) + } + + fn ensure_migrations_table<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { + Box::pin(async move { + // language=SQL + self.execute(&*format!( + r#" CREATE TABLE IF NOT EXISTS {table_name} ( version BIGINT PRIMARY KEY, description TEXT NOT NULL, @@ -124,20 +140,23 @@ CREATE TABLE IF NOT EXISTS {table_name} ( checksum BYTEA NOT NULL, execution_time BIGINT NOT NULL ); - "#), - ) + "# + )) .await?; Ok(()) }) } - fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + fn dirty_version<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL - let row: Option<(i64,)> = query_as( - &*format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"), - ) + let row: Option<(i64,)> = query_as(&*format!( + "SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1" + )) .fetch_optional(self) .await?; @@ -145,13 +164,17 @@ CREATE TABLE IF NOT EXISTS {table_name} ( }) } - fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + fn list_applied_migrations<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQL - let rows: Vec<(i64, Vec)> = - query_as(&*format!("SELECT version, checksum FROM {table_name} ORDER BY version")) - .fetch_all(self) - .await?; + let rows: Vec<(i64, Vec)> = query_as(&*format!( + "SELECT version, checksum FROM {table_name} ORDER BY version" + )) + .fetch_all(self) + .await?; let migrations = rows .into_iter() @@ -230,13 +253,13 @@ CREATE TABLE IF NOT EXISTS {table_name} ( // language=SQL #[allow(clippy::cast_possible_truncation)] - let _ = query( - &*format!(r#" + let _ = query(&*format!( + r#" UPDATE {table_name} SET execution_time = $1 WHERE version = $2 - "#), - ) + "# + )) .bind(elapsed.as_nanos() as i64) .bind(migration.version) .execute(self) @@ -283,12 +306,12 @@ async fn execute_migration( .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?; // language=SQL - let _ = query( - &*format!(r#" + let _ = query(&*format!( + r#" INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) VALUES ( $1, $2, TRUE, $3, -1 ) - "#), - ) + "# + )) .bind(migration.version) .bind(&*migration.description) .bind(&*migration.checksum) diff --git a/sqlx-sqlite/src/migrate.rs b/sqlx-sqlite/src/migrate.rs index 8b5c2474..e475f703 100644 --- a/sqlx-sqlite/src/migrate.rs +++ b/sqlx-sqlite/src/migrate.rs @@ -15,6 +15,7 @@ use std::time::Duration; use std::time::Instant; pub(crate) use sqlx_core::migrate::*; +use sqlx_core::query_scalar::query_scalar; impl MigrateDatabase for Sqlite { fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { @@ -64,10 +65,35 @@ impl MigrateDatabase for Sqlite { } impl Migrate for SqliteConnection { - fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> { + fn create_schema_if_not_exists<'e>( + &'e mut self, + schema_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { + Box::pin(async move { + // Check if the schema already exists; if so, don't error. + let schema_version: Option = + query_scalar(&format!("PRAGMA {schema_name}.schema_version")) + .fetch_optional(&mut *self) + .await?; + + if schema_version.is_some() { + return Ok(()); + } + + Err(MigrateError::CreateSchemasNotSupported( + format!("cannot create new schema {schema_name}; creation of additional schemas in SQLite requires attaching extra database files"), + )) + }) + } + + fn ensure_migrations_table<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result<(), MigrateError>> { Box::pin(async move { // language=SQLite - self.execute(&*format!(r#" + self.execute(&*format!( + r#" CREATE TABLE IF NOT EXISTS {table_name} ( version BIGINT PRIMARY KEY, description TEXT NOT NULL, @@ -76,20 +102,23 @@ CREATE TABLE IF NOT EXISTS {table_name} ( checksum BLOB NOT NULL, execution_time BIGINT NOT NULL ); - "#), - ) - .await?; + "# + )) + .await?; Ok(()) }) } - fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + fn dirty_version<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQLite - let row: Option<(i64,)> = query_as( - &format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"), - ) + let row: Option<(i64,)> = query_as(&format!( + "SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1" + )) .fetch_optional(self) .await?; @@ -97,13 +126,17 @@ CREATE TABLE IF NOT EXISTS {table_name} ( }) } - fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result, MigrateError>> { + fn list_applied_migrations<'e>( + &'e mut self, + table_name: &'e str, + ) -> BoxFuture<'e, Result, MigrateError>> { Box::pin(async move { // language=SQLite - let rows: Vec<(i64, Vec)> = - query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version")) - .fetch_all(self) - .await?; + let rows: Vec<(i64, Vec)> = query_as(&format!( + "SELECT version, checksum FROM {table_name} ORDER BY version" + )) + .fetch_all(self) + .await?; let migrations = rows .into_iter() @@ -145,12 +178,12 @@ CREATE TABLE IF NOT EXISTS {table_name} ( .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?; // language=SQL - let _ = query( - &format!(r#" + let _ = query(&format!( + r#" INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) VALUES ( ?1, ?2, TRUE, ?3, -1 ) - "#), - ) + "# + )) .bind(migration.version) .bind(&*migration.description) .bind(&*migration.checksum) @@ -167,13 +200,13 @@ CREATE TABLE IF NOT EXISTS {table_name} ( // language=SQL #[allow(clippy::cast_possible_truncation)] - let _ = query( - &format!(r#" + let _ = query(&format!( + r#" UPDATE {table_name} SET execution_time = ?1 WHERE version = ?2 - "#), - ) + "# + )) .bind(elapsed.as_nanos() as i64) .bind(migration.version) .execute(self)