sqlx/sqlx-sqlite/src/migrate.rs
2025-01-22 15:32:50 -08:00

246 lines
7.7 KiB
Rust

use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;
use crate::executor::Executor;
use crate::fs;
use crate::migrate::MigrateError;
use crate::migrate::{AppliedMigration, Migration};
use crate::migrate::{Migrate, MigrateDatabase};
use crate::query::query;
use crate::query_as::query_as;
use crate::{Sqlite, SqliteConnectOptions, SqliteConnection, SqliteJournalMode};
use futures_core::future::BoxFuture;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
pub(crate) use sqlx_core::migrate::*;
use sqlx_core::query_scalar::query_scalar;
impl MigrateDatabase for Sqlite {
fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let mut opts = SqliteConnectOptions::from_str(url)?.create_if_missing(true);
// Since it doesn't make sense to include this flag in the connection URL,
// we just use an `AtomicBool` to pass it.
if super::CREATE_DB_WAL.load(Ordering::Acquire) {
opts = opts.journal_mode(SqliteJournalMode::Wal);
}
// Opening a connection to sqlite creates the database
opts.connect()
.await?
// Ensure WAL mode tempfiles are cleaned up
.close()
.await?;
Ok(())
})
}
fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
Box::pin(async move {
let options = SqliteConnectOptions::from_str(url)?;
if options.in_memory {
Ok(true)
} else {
Ok(options.filename.exists())
}
})
}
fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let options = SqliteConnectOptions::from_str(url)?;
if !options.in_memory {
fs::remove_file(&*options.filename).await?;
}
Ok(())
})
}
}
impl Migrate for SqliteConnection {
fn create_schema_if_not_exists<'e>(
&'e mut self,
schema_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// Check if the schema already exists; if so, don't error.
let schema_version: Option<i64> =
query_scalar(&format!("PRAGMA {schema_name}.schema_version"))
.fetch_optional(&mut *self)
.await?;
if schema_version.is_some() {
return Ok(());
}
Err(MigrateError::CreateSchemasNotSupported(
format!("cannot create new schema {schema_name}; creation of additional schemas in SQLite requires attaching extra database files"),
))
})
}
fn ensure_migrations_table<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
// language=SQLite
self.execute(&*format!(
r#"
CREATE TABLE IF NOT EXISTS {table_name} (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
success BOOLEAN NOT NULL,
checksum BLOB NOT NULL,
execution_time BIGINT NOT NULL
);
"#
))
.await?;
Ok(())
})
}
fn dirty_version<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
Box::pin(async move {
// language=SQLite
let row: Option<(i64,)> = query_as(&format!(
"SELECT version FROM {table_name} WHERE success = false ORDER BY version LIMIT 1"
))
.fetch_optional(self)
.await?;
Ok(row.map(|r| r.0))
})
}
fn list_applied_migrations<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async move {
// language=SQLite
let rows: Vec<(i64, Vec<u8>)> = query_as(&format!(
"SELECT version, checksum FROM {table_name} ORDER BY version"
))
.fetch_all(self)
.await?;
let migrations = rows
.into_iter()
.map(|(version, checksum)| AppliedMigration {
version,
checksum: checksum.into(),
})
.collect();
Ok(migrations)
})
}
fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
Box::pin(async move { Ok(()) })
}
fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
Box::pin(async move { Ok(()) })
}
fn apply<'e>(
&'e mut self,
table_name: &'e str,
migration: &'e Migration,
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
Box::pin(async move {
let mut tx = self.begin().await?;
let start = Instant::now();
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
// The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
// and update it once the actual transaction completed.
let _ = tx
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
// language=SQL
let _ = query(&format!(
r#"
INSERT INTO {table_name} ( version, description, success, checksum, execution_time )
VALUES ( ?1, ?2, TRUE, ?3, -1 )
"#
))
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(&mut *tx)
.await?;
tx.commit().await?;
// Update `elapsed_time`.
// NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
// this small risk since this value is not super important.
let elapsed = start.elapsed();
// language=SQL
#[allow(clippy::cast_possible_truncation)]
let _ = query(&format!(
r#"
UPDATE {table_name}
SET execution_time = ?1
WHERE version = ?2
"#
))
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
.await?;
Ok(elapsed)
})
}
fn revert<'e>(
&'e mut self,
table_name: &'e str,
migration: &'e Migration,
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
Box::pin(async move {
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
let mut tx = self.begin().await?;
let start = Instant::now();
let _ = tx.execute(&*migration.sql).await?;
// language=SQLite
let _ = query(&format!(r#"DELETE FROM {table_name} WHERE version = ?1"#))
.bind(migration.version)
.execute(&mut *tx)
.await?;
tx.commit().await?;
let elapsed = start.elapsed();
Ok(elapsed)
})
}
}