feat: introduce migrate.create-schemas

This commit is contained in:
Austin Bonander 2025-01-22 15:32:50 -08:00
parent 45c0b85b4c
commit 3765f67aba
17 changed files with 383 additions and 185 deletions

View File

@ -1,5 +1,5 @@
use crate::{migrate, Config};
use crate::opt::{ConnectOpts, MigrationSourceOpt};
use crate::{migrate, Config};
use console::style;
use promptly::{prompt, ReadlineError};
use sqlx::any::Any;
@ -54,7 +54,11 @@ pub async fn reset(
setup(config, migration_source, connect_opts).await
}
pub async fn setup(config: &Config, migration_source: &MigrationSourceOpt, connect_opts: &ConnectOpts) -> anyhow::Result<()> {
pub async fn setup(
config: &Config,
migration_source: &MigrationSourceOpt,
connect_opts: &ConnectOpts,
) -> anyhow::Result<()> {
create(connect_opts).await?;
migrate::run(config, migration_source, connect_opts, false, false, None).await
}

View File

@ -1,5 +1,5 @@
use std::io;
use std::path::{PathBuf};
use std::path::PathBuf;
use std::time::Duration;
use anyhow::{Context, Result};
@ -28,7 +28,7 @@ pub async fn run(opt: Opt) -> Result<()> {
match opt.command {
Command::Migrate(migrate) => match migrate.command {
MigrateCommand::Add(opts)=> migrate::add(config, opts).await?,
MigrateCommand::Add(opts) => migrate::add(config, opts).await?,
MigrateCommand::Run {
source,
dry_run,
@ -74,15 +74,17 @@ pub async fn run(opt: Opt) -> Result<()> {
connect_opts.populate_db_url(config)?;
migrate::info(config, &source, &connect_opts).await?
},
MigrateCommand::BuildScript { source, force } => migrate::build_script(config, &source, force)?,
}
MigrateCommand::BuildScript { source, force } => {
migrate::build_script(config, &source, force)?
}
},
Command::Database(database) => match database.command {
DatabaseCommand::Create { mut connect_opts } => {
connect_opts.populate_db_url(config)?;
database::create(&connect_opts).await?
},
}
DatabaseCommand::Drop {
confirmation,
mut connect_opts,
@ -90,7 +92,7 @@ pub async fn run(opt: Opt) -> Result<()> {
} => {
connect_opts.populate_db_url(config)?;
database::drop(&connect_opts, !confirmation.yes, force).await?
},
}
DatabaseCommand::Reset {
confirmation,
source,
@ -99,14 +101,14 @@ pub async fn run(opt: Opt) -> Result<()> {
} => {
connect_opts.populate_db_url(config)?;
database::reset(config, &source, &connect_opts, !confirmation.yes, force).await?
},
}
DatabaseCommand::Setup {
source,
mut connect_opts,
} => {
connect_opts.populate_db_url(config)?;
database::setup(config, &source, &connect_opts).await?
},
}
},
Command::Prepare {
@ -118,7 +120,7 @@ pub async fn run(opt: Opt) -> Result<()> {
} => {
connect_opts.populate_db_url(config)?;
prepare::run(check, all, workspace, connect_opts, args).await?
},
}
#[cfg(feature = "completions")]
Command::Completions { shell } => completions::run(shell),
@ -183,6 +185,6 @@ async fn config_from_current_dir() -> anyhow::Result<&'static Config> {
Config::read_with_or_default(move || Ok(path))
})
.await
.context("unexpected error loading config")
.await
.context("unexpected error loading config")
}

View File

@ -1,7 +1,10 @@
use crate::config::Config;
use crate::opt::{AddMigrationOpts, ConnectOpts, MigrationSourceOpt};
use anyhow::{bail, Context};
use console::style;
use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator, ResolveWith};
use sqlx::migrate::{
AppliedMigration, Migrate, MigrateError, MigrationType, Migrator, ResolveWith,
};
use sqlx::Connection;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
@ -9,14 +12,10 @@ use std::fmt::Write;
use std::fs::{self, File};
use std::path::Path;
use std::time::Duration;
use crate::config::Config;
pub async fn add(
config: &Config,
opts: AddMigrationOpts,
) -> anyhow::Result<()> {
pub async fn add(config: &Config, opts: AddMigrationOpts) -> anyhow::Result<()> {
let source = opts.source.resolve(config);
fs::create_dir_all(source).context("Unable to create migrations directory")?;
let migrator = Migrator::new(Path::new(source)).await?;
@ -124,13 +123,27 @@ fn short_checksum(checksum: &[u8]) -> String {
s
}
pub async fn info(config: &Config, migration_source: &MigrationSourceOpt, connect_opts: &ConnectOpts) -> anyhow::Result<()> {
pub async fn info(
config: &Config,
migration_source: &MigrationSourceOpt,
connect_opts: &ConnectOpts,
) -> anyhow::Result<()> {
let source = migration_source.resolve(config);
let migrator = Migrator::new(ResolveWith(Path::new(source), config.migrate.to_resolve_config())).await?;
let migrator = Migrator::new(ResolveWith(
Path::new(source),
config.migrate.to_resolve_config(),
))
.await?;
let mut conn = crate::connect(connect_opts).await?;
conn.ensure_migrations_table(config.migrate.table_name()).await?;
// FIXME: we shouldn't actually be creating anything here
for schema_name in &config.migrate.create_schemas {
conn.create_schema_if_not_exists(schema_name).await?;
}
conn.ensure_migrations_table(config.migrate.table_name())
.await?;
let applied_migrations: HashMap<_, _> = conn
.list_applied_migrations(config.migrate.table_name())
@ -214,7 +227,7 @@ pub async fn run(
target_version: Option<i64>,
) -> anyhow::Result<()> {
let source = migration_source.resolve(config);
let migrator = Migrator::new(Path::new(source)).await?;
if let Some(target_version) = target_version {
if !migrator.version_exists(target_version) {
@ -224,14 +237,21 @@ pub async fn run(
let mut conn = crate::connect(connect_opts).await?;
conn.ensure_migrations_table(config.migrate.table_name()).await?;
for schema_name in &config.migrate.create_schemas {
conn.create_schema_if_not_exists(schema_name).await?;
}
conn.ensure_migrations_table(config.migrate.table_name())
.await?;
let version = conn.dirty_version(config.migrate.table_name()).await?;
if let Some(version) = version {
bail!(MigrateError::Dirty(version));
}
let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?;
let applied_migrations = conn
.list_applied_migrations(config.migrate.table_name())
.await?;
validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
let latest_version = applied_migrations
@ -319,14 +339,22 @@ pub async fn revert(
let mut conn = crate::connect(connect_opts).await?;
conn.ensure_migrations_table(config.migrate.table_name()).await?;
// FIXME: we should not be creating anything here if it doesn't exist
for schema_name in &config.migrate.create_schemas {
conn.create_schema_if_not_exists(schema_name).await?;
}
conn.ensure_migrations_table(config.migrate.table_name())
.await?;
let version = conn.dirty_version(config.migrate.table_name()).await?;
if let Some(version) = version {
bail!(MigrateError::Dirty(version));
}
let applied_migrations = conn.list_applied_migrations(config.migrate.table_name()).await?;
let applied_migrations = conn
.list_applied_migrations(config.migrate.table_name())
.await?;
validate_applied_migrations(&applied_migrations, &migrator, ignore_missing)?;
let latest_version = applied_migrations
@ -397,9 +425,13 @@ pub async fn revert(
Ok(())
}
pub fn build_script(config: &Config, migration_source: &MigrationSourceOpt, force: bool) -> anyhow::Result<()> {
pub fn build_script(
config: &Config,
migration_source: &MigrationSourceOpt,
force: bool,
) -> anyhow::Result<()> {
let source = migration_source.resolve(config);
anyhow::ensure!(
Path::new("Cargo.toml").exists(),
"must be run in a Cargo project root"

View File

@ -1,13 +1,13 @@
use std::env;
use std::ops::{Deref, Not};
use crate::config::migrate::{DefaultMigrationType, DefaultVersioning};
use crate::config::Config;
use anyhow::Context;
use chrono::Utc;
use clap::{Args, Parser};
#[cfg(feature = "completions")]
use clap_complete::Shell;
use crate::config::Config;
use sqlx::migrate::Migrator;
use crate::config::migrate::{DefaultMigrationType, DefaultVersioning};
use std::env;
use std::ops::{Deref, Not};
#[derive(Parser, Debug)]
#[clap(version, about, author)]
@ -129,7 +129,7 @@ pub enum MigrateCommand {
/// Create a new migration with the given description.
///
/// --------------------------------
///
///
/// Migrations may either be simple, or reversible.
///
/// Reversible migrations can be reverted with `sqlx migrate revert`, simple migrations cannot.
@ -152,7 +152,7 @@ pub enum MigrateCommand {
/// It is recommended to always back up the database before running migrations.
///
/// --------------------------------
///
///
/// For convenience, this command attempts to detect if reversible migrations are in-use.
///
/// If the latest existing migration is reversible, the new migration will also be reversible.
@ -164,7 +164,7 @@ pub enum MigrateCommand {
/// The default type to use can also be set in `sqlx.toml`.
///
/// --------------------------------
///
///
/// A version number will be automatically assigned to the migration.
///
/// Migrations are applied in ascending order by version number.
@ -174,9 +174,9 @@ pub enum MigrateCommand {
/// less than _any_ previously applied migration.
///
/// Migrations should only be created with increasing version number.
///
///
/// --------------------------------
///
///
/// For convenience, this command will attempt to detect if sequential versioning is in use,
/// and if so, continue the sequence.
///
@ -290,7 +290,7 @@ pub struct AddMigrationOpts {
#[derive(Args, Debug)]
pub struct MigrationSourceOpt {
/// Path to folder containing migrations.
///
///
/// Defaults to `migrations/` if not specified, but a different default may be set by `sqlx.toml`.
#[clap(long)]
pub source: Option<String>,
@ -301,7 +301,7 @@ impl MigrationSourceOpt {
if let Some(source) = &self.source {
return source;
}
config.migrate.migrations_dir()
}
}
@ -335,7 +335,9 @@ impl ConnectOpts {
/// Require a database URL to be provided, otherwise
/// return an error.
pub fn expect_db_url(&self) -> anyhow::Result<&str> {
self.database_url.as_deref().context("BUG: database_url not populated")
self.database_url
.as_deref()
.context("BUG: database_url not populated")
}
/// Populate `database_url` from the environment, if not set.
@ -359,7 +361,7 @@ impl ConnectOpts {
}
self.database_url = Some(url)
},
}
Err(env::VarError::NotPresent) => {
anyhow::bail!("`--database-url` or `{var}`{context} must be set")
}
@ -407,22 +409,20 @@ impl Not for IgnoreMissing {
impl AddMigrationOpts {
pub fn reversible(&self, config: &Config, migrator: &Migrator) -> bool {
if self.reversible { return true; }
if self.simple { return false; }
if self.reversible {
return true;
}
if self.simple {
return false;
}
match config.migrate.defaults.migration_type {
DefaultMigrationType::Inferred => {
migrator
.iter()
.last()
.is_some_and(|m| m.migration_type.is_reversible())
}
DefaultMigrationType::Simple => {
false
}
DefaultMigrationType::Reversible => {
true
}
DefaultMigrationType::Inferred => migrator
.iter()
.last()
.is_some_and(|m| m.migration_type.is_reversible()),
DefaultMigrationType::Simple => false,
DefaultMigrationType::Reversible => true,
}
}
@ -434,8 +434,7 @@ impl AddMigrationOpts {
}
if self.sequential || matches!(default_versioning, DefaultVersioning::Sequential) {
return next_sequential(migrator)
.unwrap_or_else(|| fmt_sequential(1));
return next_sequential(migrator).unwrap_or_else(|| fmt_sequential(1));
}
next_sequential(migrator).unwrap_or_else(next_timestamp)
@ -455,18 +454,16 @@ fn next_sequential(migrator: &Migrator) -> Option<String> {
match migrations {
[previous, latest] => {
// If the latest two versions differ by 1, infer sequential.
(latest.version - previous.version == 1)
.then_some(latest.version + 1)
},
(latest.version - previous.version == 1).then_some(latest.version + 1)
}
[latest] => {
// If only one migration exists and its version is 0 or 1, infer sequential
matches!(latest.version, 0 | 1)
.then_some(latest.version + 1)
matches!(latest.version, 0 | 1).then_some(latest.version + 1)
}
_ => unreachable!(),
}
});
next_version.map(fmt_sequential)
}

View File

@ -1,12 +1,12 @@
use assert_cmd::{assert::Assert, Command};
use sqlx::_unstable::config::Config;
use sqlx::{migrate::Migrate, Connection, SqliteConnection};
use std::{
env::temp_dir,
fs::remove_file,
path::{Path, PathBuf},
};
use sqlx::_unstable::config::Config;
pub struct TestDatabase {
file_path: PathBuf,

View File

@ -44,16 +44,44 @@ impl MigrateDatabase for Any {
}
impl Migrate for AnyConnection {
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async { self.get_migrate()?.ensure_migrations_table(table_name).await })
fn create_schema_if_not_exists<'e>(
&'e mut self,
schema_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async {
self.get_migrate()?
.create_schema_if_not_exists(schema_name)
.await
})
}
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
fn ensure_migrations_table<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async {
self.get_migrate()?
.ensure_migrations_table(table_name)
.await
})
}
fn dirty_version<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
Box::pin(async { self.get_migrate()?.dirty_version(table_name).await })
}
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async { self.get_migrate()?.list_applied_migrations(table_name).await })
fn list_applied_migrations<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async {
self.get_migrate()?
.list_applied_migrations(table_name)
.await
})
}
fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {

View File

@ -19,6 +19,20 @@ use std::collections::BTreeSet;
serde(default, rename_all = "kebab-case")
)]
pub struct Config {
/// Specify the names of schemas to create if they don't already exist.
///
/// This is done before checking the existence of the migrations table
/// (`_sqlx_migrations` or overridden `table_name` below) so that it may be placed in
/// one of these schemas.
///
/// ### Example
/// `sqlx.toml`:
/// ```toml
/// [migrate]
/// create-schemas = ["foo"]
/// ```
pub create_schemas: BTreeSet<Box<str>>,
/// Override the name of the table used to track executed migrations.
///
/// May be schema-qualified and/or contain quotes. Defaults to `_sqlx_migrations`.
@ -185,14 +199,14 @@ impl Config {
pub fn migrations_dir(&self) -> &str {
self.migrations_dir.as_deref().unwrap_or("migrations")
}
pub fn table_name(&self) -> &str {
self.table_name.as_deref().unwrap_or("_sqlx_migrations")
}
pub fn to_resolve_config(&self) -> crate::migrate::ResolveConfig {
let mut config = crate::migrate::ResolveConfig::new();
config.ignore_chars(self.ignored_chars.iter().copied());
config
}
}
}

View File

@ -39,4 +39,7 @@ pub enum MigrateError {
"migration {0} is partially applied; fix and remove row from `_sqlx_migrations` table"
)]
Dirty(i64),
#[error("database driver does not support creation of schemas at migrate time: {0}")]
CreateSchemasNotSupported(String),
}

View File

@ -25,16 +25,31 @@ pub trait MigrateDatabase {
// 'e = Executor
pub trait Migrate {
/// Create a database schema with the given name if it does not already exist.
fn create_schema_if_not_exists<'e>(
&'e mut self,
schema_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>>;
// ensure migrations table exists
// will create or migrate it if needed
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>>;
fn ensure_migrations_table<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>>;
// Return the version on which the database is dirty or None otherwise.
// "dirty" means there is a partially applied migration that failed.
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>>;
fn dirty_version<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Option<i64>, MigrateError>>;
// Return the ordered list of applied migrations
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>>;
fn list_applied_migrations<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>>;
// Should acquire a database lock so that only one migration process
// can run at a time. [`Migrate`] will call this function before applying

View File

@ -25,6 +25,9 @@ pub struct Migrator {
pub no_tx: bool,
#[doc(hidden)]
pub table_name: Cow<'static, str>,
#[doc(hidden)]
pub create_schemas: Cow<'static, [Cow<'static, str>]>,
}
impl Migrator {
@ -35,6 +38,7 @@ impl Migrator {
no_tx: false,
locking: true,
table_name: Cow::Borrowed("_sqlx_migrations"),
create_schemas: Cow::Borrowed(&[]),
};
/// Creates a new instance with the given source.
@ -84,6 +88,19 @@ impl Migrator {
self
}
/// Add a schema name to be created if it does not already exist.
///
/// May be used with [`Self::dangerous_set_table_name()`] to place the migrations table
/// in a new schema without requiring it to exist first.
///
/// ### Note: Support Depends on Database
/// SQLite cannot create new schemas without attaching them to a database file,
/// the path of which must be specified separately in an [`ATTACH DATABASE`](https://www.sqlite.org/lang_attach.html) command.
pub fn create_schema(&mut self, schema_name: impl Into<Cow<'static, str>>) -> &Self {
self.create_schemas.to_mut().push(schema_name.into());
self
}
/// Specify whether applied migrations that are missing from the resolved migrations should be ignored.
pub fn set_ignore_missing(&mut self, ignore_missing: bool) -> &Self {
self.ignore_missing = ignore_missing;
@ -160,6 +177,10 @@ impl Migrator {
conn.lock().await?;
}
for schema_name in self.create_schemas.iter() {
conn.create_schema_if_not_exists(schema_name).await?;
}
// creates [_migrations] table only if needed
// eventually this will likely migrate previous versions of the table
conn.ensure_migrations_table(&self.table_name).await?;
@ -182,7 +203,7 @@ impl Migrator {
// Target version reached
break;
}
if migration.migration_type.is_down_migration() {
continue;
}
@ -291,4 +312,4 @@ fn validate_applied_migrations(
}
Ok(())
}
}

View File

@ -5,10 +5,10 @@ use std::path::{Path, PathBuf};
use proc_macro2::{Span, TokenStream};
use quote::{quote, ToTokens, TokenStreamExt};
use syn::LitStr;
use syn::spanned::Spanned;
use sqlx_core::config::Config;
use sqlx_core::migrate::{Migration, MigrationType, ResolveConfig};
use syn::spanned::Spanned;
use syn::LitStr;
pub const DEFAULT_PATH: &str = "./migrations";
@ -85,7 +85,9 @@ impl ToTokens for QuoteMigration {
}
pub fn default_path(config: &Config) -> &str {
config.migrate.migrations_dir
config
.migrate
.migrations_dir
.as_deref()
.unwrap_or(DEFAULT_PATH)
}
@ -93,12 +95,10 @@ pub fn default_path(config: &Config) -> &str {
pub fn expand(path_arg: Option<LitStr>) -> crate::Result<TokenStream> {
let config = Config::from_crate();
let path = match path_arg {
Some(path_arg) => crate::common::resolve_path(path_arg.value(), path_arg.span())?,
None => {
crate::common::resolve_path(default_path(config), Span::call_site())
}?
};
let path = match path_arg {
Some(path_arg) => crate::common::resolve_path(path_arg.value(), path_arg.span())?,
None => { crate::common::resolve_path(default_path(config), Span::call_site()) }?,
};
expand_with_path(config, &path)
}
@ -130,18 +130,21 @@ pub fn expand_with_path(config: &Config, path: &Path) -> crate::Result<TokenStre
proc_macro::tracked_path::path(path);
}
let table_name = config.migrate.table_name
.as_deref()
.map_or_else(
|| quote! {},
|name| quote! { table_name: Some(::std::borrow::Cow::Borrowed(#name)), }
);
let table_name = config.migrate.table_name.as_deref().map_or_else(
|| quote! {},
|name| quote! { table_name: Some(::std::borrow::Cow::Borrowed(#name)), },
);
let create_schemas = config.migrate.create_schemas.iter().map(|schema_name| {
quote! { ::std::borrow::Cow::Borrowed(#schema_name) }
});
Ok(quote! {
::sqlx::migrate::Migrator {
migrations: ::std::borrow::Cow::Borrowed(&[
#(#migrations),*
]),
create_schemas: ::std::borrow::Cow::Borrowed(&[#(#create_schemas),*]),
#table_name
..::sqlx::migrate::Migrator::DEFAULT
}

View File

@ -151,8 +151,7 @@ fn expand_advanced(args: AttributeArgs, input: syn::ItemFn) -> crate::Result<Tok
MigrationsOpt::InferredPath if !inputs.is_empty() => {
let path = crate::migrate::default_path(config);
let resolved_path =
crate::common::resolve_path(path, proc_macro2::Span::call_site())?;
let resolved_path = crate::common::resolve_path(path, proc_macro2::Span::call_site())?;
if resolved_path.is_dir() {
let migrator = crate::migrate::expand_with_path(config, &resolved_path)?;

View File

@ -2,8 +2,6 @@ use std::str::FromStr;
use std::time::Duration;
use std::time::Instant;
use futures_core::future::BoxFuture;
pub(crate) use sqlx_core::migrate::*;
use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;
use crate::executor::Executor;
@ -11,6 +9,8 @@ use crate::query::query;
use crate::query_as::query_as;
use crate::query_scalar::query_scalar;
use crate::{MySql, MySqlConnectOptions, MySqlConnection};
use futures_core::future::BoxFuture;
pub(crate) use sqlx_core::migrate::*;
fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Error> {
let mut options = MySqlConnectOptions::from_str(url)?;
@ -74,11 +74,27 @@ impl MigrateDatabase for MySql {
}
impl Migrate for MySqlConnection {
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
fn create_schema_if_not_exists<'e>(
&'e mut self,
schema_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// language=SQL
self.execute(&*format!(r#"CREATE SCHEMA IF NOT EXISTS {schema_name};"#))
.await?;
Ok(())
})
}
fn ensure_migrations_table<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// language=MySQL
self.execute(
&*format!(r#"
self.execute(&*format!(
r#"
CREATE TABLE IF NOT EXISTS {table_name} (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
@ -87,20 +103,23 @@ CREATE TABLE IF NOT EXISTS {table_name} (
checksum BLOB NOT NULL,
execution_time BIGINT NOT NULL
);
"#),
)
"#
))
.await?;
Ok(())
})
}
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
fn dirty_version<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
Box::pin(async move {
// language=SQL
let row: Option<(i64,)> = query_as(
&format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"),
)
let row: Option<(i64,)> = query_as(&format!(
"SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"
))
.fetch_optional(self)
.await?;
@ -108,13 +127,17 @@ CREATE TABLE IF NOT EXISTS {table_name} (
})
}
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
fn list_applied_migrations<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async move {
// language=SQL
let rows: Vec<(i64, Vec<u8>)> =
query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version"))
.fetch_all(self)
.await?;
let rows: Vec<(i64, Vec<u8>)> = query_as(&format!(
"SELECT version, checksum FROM {table_name} ORDER BY version"
))
.fetch_all(self)
.await?;
let migrations = rows
.into_iter()
@ -185,12 +208,12 @@ CREATE TABLE IF NOT EXISTS {table_name} (
// `success=FALSE` and later modify the flag.
//
// language=MySQL
let _ = query(
&format!(r#"
let _ = query(&format!(
r#"
INSERT INTO {table_name} ( version, description, success, checksum, execution_time )
VALUES ( ?, ?, FALSE, ?, -1 )
"#),
)
"#
))
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
@ -203,13 +226,13 @@ CREATE TABLE IF NOT EXISTS {table_name} (
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
// language=MySQL
let _ = query(
&format!(r#"
let _ = query(&format!(
r#"
UPDATE {table_name}
SET success = TRUE
WHERE version = ?
"#),
)
"#
))
.bind(migration.version)
.execute(&mut *tx)
.await?;
@ -223,13 +246,13 @@ CREATE TABLE IF NOT EXISTS {table_name} (
let elapsed = start.elapsed();
#[allow(clippy::cast_possible_truncation)]
let _ = query(
&format!(r#"
let _ = query(&format!(
r#"
UPDATE {table_name}
SET execution_time = ?
WHERE version = ?
"#),
)
"#
))
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
@ -257,13 +280,13 @@ CREATE TABLE IF NOT EXISTS {table_name} (
// `success=FALSE` and later remove the migration altogether.
//
// language=MySQL
let _ = query(
&format!(r#"
let _ = query(&format!(
r#"
UPDATE {table_name}
SET success = FALSE
WHERE version = ?
"#),
)
"#
))
.bind(migration.version)
.execute(&mut *tx)
.await?;

View File

@ -208,18 +208,18 @@ impl PgConnection {
attribute_no: i16,
should_fetch: bool,
) -> Result<ColumnOrigin, Error> {
if let Some(origin) =
self.inner
.cache_table_to_column_names
.get(&relation_id)
.and_then(|table_columns| {
let column_name = table_columns.columns.get(&attribute_no).cloned()?;
if let Some(origin) = self
.inner
.cache_table_to_column_names
.get(&relation_id)
.and_then(|table_columns| {
let column_name = table_columns.columns.get(&attribute_no).cloned()?;
Some(ColumnOrigin::Table(TableColumn {
table: table_columns.table_name.clone(),
name: column_name,
}))
})
Some(ColumnOrigin::Table(TableColumn {
table: table_columns.table_name.clone(),
name: column_name,
}))
})
{
return Ok(origin);
}

View File

@ -149,7 +149,8 @@ impl PgConnection {
cache_type_info: HashMap::new(),
cache_elem_type_to_array: HashMap::new(),
cache_table_to_column_names: HashMap::new(),
log_settings: options.log_settings.clone(),}),
log_settings: options.log_settings.clone(),
}),
})
}
}

View File

@ -111,11 +111,27 @@ impl MigrateDatabase for Postgres {
}
impl Migrate for PgConnection {
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
fn create_schema_if_not_exists<'e>(
&'e mut self,
schema_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// language=SQL
self.execute(
&*format!(r#"
self.execute(&*format!(r#"CREATE SCHEMA IF NOT EXISTS {schema_name};"#))
.await?;
Ok(())
})
}
fn ensure_migrations_table<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// language=SQL
self.execute(&*format!(
r#"
CREATE TABLE IF NOT EXISTS {table_name} (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
@ -124,20 +140,23 @@ CREATE TABLE IF NOT EXISTS {table_name} (
checksum BYTEA NOT NULL,
execution_time BIGINT NOT NULL
);
"#),
)
"#
))
.await?;
Ok(())
})
}
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
fn dirty_version<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
Box::pin(async move {
// language=SQL
let row: Option<(i64,)> = query_as(
&*format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"),
)
let row: Option<(i64,)> = query_as(&*format!(
"SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"
))
.fetch_optional(self)
.await?;
@ -145,13 +164,17 @@ CREATE TABLE IF NOT EXISTS {table_name} (
})
}
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
fn list_applied_migrations<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async move {
// language=SQL
let rows: Vec<(i64, Vec<u8>)> =
query_as(&*format!("SELECT version, checksum FROM {table_name} ORDER BY version"))
.fetch_all(self)
.await?;
let rows: Vec<(i64, Vec<u8>)> = query_as(&*format!(
"SELECT version, checksum FROM {table_name} ORDER BY version"
))
.fetch_all(self)
.await?;
let migrations = rows
.into_iter()
@ -230,13 +253,13 @@ CREATE TABLE IF NOT EXISTS {table_name} (
// language=SQL
#[allow(clippy::cast_possible_truncation)]
let _ = query(
&*format!(r#"
let _ = query(&*format!(
r#"
UPDATE {table_name}
SET execution_time = $1
WHERE version = $2
"#),
)
"#
))
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
@ -283,12 +306,12 @@ async fn execute_migration(
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
// language=SQL
let _ = query(
&*format!(r#"
let _ = query(&*format!(
r#"
INSERT INTO {table_name} ( version, description, success, checksum, execution_time )
VALUES ( $1, $2, TRUE, $3, -1 )
"#),
)
"#
))
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)

View File

@ -15,6 +15,7 @@ use std::time::Duration;
use std::time::Instant;
pub(crate) use sqlx_core::migrate::*;
use sqlx_core::query_scalar::query_scalar;
impl MigrateDatabase for Sqlite {
fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
@ -64,10 +65,35 @@ impl MigrateDatabase for Sqlite {
}
impl Migrate for SqliteConnection {
fn ensure_migrations_table<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<(), MigrateError>> {
fn create_schema_if_not_exists<'e>(
&'e mut self,
schema_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// Check if the schema already exists; if so, don't error.
let schema_version: Option<i64> =
query_scalar(&format!("PRAGMA {schema_name}.schema_version"))
.fetch_optional(&mut *self)
.await?;
if schema_version.is_some() {
return Ok(());
}
Err(MigrateError::CreateSchemasNotSupported(
format!("cannot create new schema {schema_name}; creation of additional schemas in SQLite requires attaching extra database files"),
))
})
}
fn ensure_migrations_table<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// language=SQLite
self.execute(&*format!(r#"
self.execute(&*format!(
r#"
CREATE TABLE IF NOT EXISTS {table_name} (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
@ -76,20 +102,23 @@ CREATE TABLE IF NOT EXISTS {table_name} (
checksum BLOB NOT NULL,
execution_time BIGINT NOT NULL
);
"#),
)
.await?;
"#
))
.await?;
Ok(())
})
}
fn dirty_version<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
fn dirty_version<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
Box::pin(async move {
// language=SQLite
let row: Option<(i64,)> = query_as(
&format!("SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"),
)
let row: Option<(i64,)> = query_as(&format!(
"SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"
))
.fetch_optional(self)
.await?;
@ -97,13 +126,17 @@ CREATE TABLE IF NOT EXISTS {table_name} (
})
}
fn list_applied_migrations<'e>(&'e mut self, table_name: &'e str) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
fn list_applied_migrations<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async move {
// language=SQLite
let rows: Vec<(i64, Vec<u8>)> =
query_as(&format!("SELECT version, checksum FROM {table_name} ORDER BY version"))
.fetch_all(self)
.await?;
let rows: Vec<(i64, Vec<u8>)> = query_as(&format!(
"SELECT version, checksum FROM {table_name} ORDER BY version"
))
.fetch_all(self)
.await?;
let migrations = rows
.into_iter()
@ -145,12 +178,12 @@ CREATE TABLE IF NOT EXISTS {table_name} (
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
// language=SQL
let _ = query(
&format!(r#"
let _ = query(&format!(
r#"
INSERT INTO {table_name} ( version, description, success, checksum, execution_time )
VALUES ( ?1, ?2, TRUE, ?3, -1 )
"#),
)
"#
))
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
@ -167,13 +200,13 @@ CREATE TABLE IF NOT EXISTS {table_name} (
// language=SQL
#[allow(clippy::cast_possible_truncation)]
let _ = query(
&format!(r#"
let _ = query(&format!(
r#"
UPDATE {table_name}
SET execution_time = ?1
WHERE version = ?2
"#),
)
"#
))
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)