mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-10-27 03:25:08 +00:00
211 lines
6.5 KiB
Rust
211 lines
6.5 KiB
Rust
use crate::connection::{ConnectOptions, Connection};
|
|
use crate::error::Error;
|
|
use crate::executor::Executor;
|
|
use crate::fs;
|
|
use crate::migrate::MigrateError;
|
|
use crate::migrate::{AppliedMigration, Migration};
|
|
use crate::migrate::{Migrate, MigrateDatabase};
|
|
use crate::query::query;
|
|
use crate::query_as::query_as;
|
|
use crate::{Sqlite, SqliteConnectOptions, SqliteConnection, SqliteJournalMode};
|
|
use futures_core::future::BoxFuture;
|
|
use std::str::FromStr;
|
|
use std::sync::atomic::Ordering;
|
|
use std::time::Duration;
|
|
use std::time::Instant;
|
|
|
|
pub(crate) use sqlx_core::migrate::*;
|
|
|
|
impl MigrateDatabase for Sqlite {
|
|
fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
|
|
Box::pin(async move {
|
|
let mut opts = SqliteConnectOptions::from_str(url)?.create_if_missing(true);
|
|
|
|
// Since it doesn't make sense to include this flag in the connection URL,
|
|
// we just use an `AtomicBool` to pass it.
|
|
if super::CREATE_DB_WAL.load(Ordering::Acquire) {
|
|
opts = opts.journal_mode(SqliteJournalMode::Wal);
|
|
}
|
|
|
|
// Opening a connection to sqlite creates the database
|
|
let _ = opts
|
|
.connect()
|
|
.await?
|
|
// Ensure WAL mode tempfiles are cleaned up
|
|
.close()
|
|
.await?;
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
|
|
Box::pin(async move {
|
|
let options = SqliteConnectOptions::from_str(url)?;
|
|
|
|
if options.in_memory {
|
|
Ok(true)
|
|
} else {
|
|
Ok(options.filename.exists())
|
|
}
|
|
})
|
|
}
|
|
|
|
fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
|
|
Box::pin(async move {
|
|
let options = SqliteConnectOptions::from_str(url)?;
|
|
|
|
if !options.in_memory {
|
|
fs::remove_file(&*options.filename).await?;
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
impl Migrate for SqliteConnection {
|
|
fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
|
Box::pin(async move {
|
|
// language=SQLite
|
|
self.execute(
|
|
r#"
|
|
CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
|
version BIGINT PRIMARY KEY,
|
|
description TEXT NOT NULL,
|
|
installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
success BOOLEAN NOT NULL,
|
|
checksum BLOB NOT NULL,
|
|
execution_time BIGINT NOT NULL
|
|
);
|
|
"#,
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
|
|
Box::pin(async move {
|
|
// language=SQLite
|
|
let row: Option<(i64,)> = query_as(
|
|
"SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
|
|
)
|
|
.fetch_optional(self)
|
|
.await?;
|
|
|
|
Ok(row.map(|r| r.0))
|
|
})
|
|
}
|
|
|
|
fn list_applied_migrations(
|
|
&mut self,
|
|
) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
|
|
Box::pin(async move {
|
|
// language=SQLite
|
|
let rows: Vec<(i64, Vec<u8>)> =
|
|
query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
|
|
.fetch_all(self)
|
|
.await?;
|
|
|
|
let migrations = rows
|
|
.into_iter()
|
|
.map(|(version, checksum)| AppliedMigration {
|
|
version,
|
|
checksum: checksum.into(),
|
|
})
|
|
.collect();
|
|
|
|
Ok(migrations)
|
|
})
|
|
}
|
|
|
|
fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
|
Box::pin(async move { Ok(()) })
|
|
}
|
|
|
|
fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
|
|
Box::pin(async move { Ok(()) })
|
|
}
|
|
|
|
fn apply<'e: 'm, 'm>(
|
|
&'e mut self,
|
|
migration: &'m Migration,
|
|
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
|
Box::pin(async move {
|
|
let mut tx = self.begin().await?;
|
|
let start = Instant::now();
|
|
|
|
// Use a single transaction for the actual migration script and the essential bookeeping so we never
|
|
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
|
|
// The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
|
|
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
|
|
// and update it once the actual transaction completed.
|
|
let _ = tx.execute(&*migration.sql).await?;
|
|
|
|
// language=SQL
|
|
let _ = query(
|
|
r#"
|
|
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
|
|
VALUES ( ?1, ?2, TRUE, ?3, -1 )
|
|
"#,
|
|
)
|
|
.bind(migration.version)
|
|
.bind(&*migration.description)
|
|
.bind(&*migration.checksum)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
tx.commit().await?;
|
|
|
|
// Update `elapsed_time`.
|
|
// NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
|
|
// this small risk since this value is not super important.
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
// language=SQL
|
|
let _ = query(
|
|
r#"
|
|
UPDATE _sqlx_migrations
|
|
SET execution_time = ?1
|
|
WHERE version = ?2
|
|
"#,
|
|
)
|
|
.bind(elapsed.as_nanos() as i64)
|
|
.bind(migration.version)
|
|
.execute(self)
|
|
.await?;
|
|
|
|
Ok(elapsed)
|
|
})
|
|
}
|
|
|
|
fn revert<'e: 'm, 'm>(
|
|
&'e mut self,
|
|
migration: &'m Migration,
|
|
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
|
Box::pin(async move {
|
|
// Use a single transaction for the actual migration script and the essential bookeeping so we never
|
|
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
|
|
let mut tx = self.begin().await?;
|
|
let start = Instant::now();
|
|
|
|
let _ = tx.execute(&*migration.sql).await?;
|
|
|
|
// language=SQL
|
|
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
|
|
.bind(migration.version)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
|
|
tx.commit().await?;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
Ok(elapsed)
|
|
})
|
|
}
|
|
}
|