diff --git a/Cargo.lock b/Cargo.lock index e01dce274..42d641e04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,6 +302,12 @@ dependencies = [ "serde", ] +[[package]] +name = "build_const" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39092a32794787acd8525ee150305ff051b0aa6cc2abaf193924f5ab05425f39" + [[package]] name = "bumpalo" version = "3.4.0" @@ -394,12 +400,44 @@ dependencies = [ "ansi_term", "atty", "bitflags", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", ] +[[package]] +name = "clap" +version = "3.0.0-beta.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "860643c53f980f0d38a5e25dfab6c3c93b2cb3aa1fe192643d17a293c6c41936" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "indexmap", + "lazy_static", + "os_str_bytes", + "strsim 0.10.0", + "termcolor", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "clap_derive" +version = "3.0.0-beta.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51c9e75b94452505acd21d929323f5a5c6c4735a852adbd39ef5fb1b014f30" +dependencies = [ + "heck", + "proc-macro-error 0.4.12", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "clicolors-control" version = "1.0.1" @@ -456,6 +494,23 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "console" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c0994e656bba7b922d8dd1245db90672ffb701e684e45be58f20719d69abc5a" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "regex", + "terminal_size", + "termios", + "unicode-width", + "winapi 0.3.9", + "winapi-util", +] + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -500,6 +555,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d375c433320f6c5057ae04a04376eef4d04ce2801448cf8863a78da99107be4" +[[package]] +name = "crc" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" +dependencies = [ + "build_const", +] + [[package]] name = "criterion" version = "0.3.3" @@ -508,7 +572,7 @@ checksum = "70daa7ceec6cf143990669a04c7df13391d55fb27bd4079d252fca774ba244d8" dependencies = [ "atty", "cast", - "clap", + "clap 2.33.1", "criterion-plot", "csv", "itertools", @@ -655,11 +719,11 @@ dependencies = [ [[package]] name = "dialoguer" -version = "0.5.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b5eb0fce3c4f955b8d8d864b131fb8863959138da962026c106ba7a2e3bf7a" +checksum = "f4aa86af7b19b40ef9cbef761ed411a49f0afa06b7b6dcd3dfe2f96a3c546138" dependencies = [ - "console", + "console 0.11.3", "lazy_static", "tempfile", ] @@ -1592,6 +1656,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_str_bytes" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06de47b848347d8c4c94219ad8ecd35eb90231704b067e67e6ae2e36ee023510" + [[package]] name = "parking" version = "1.0.3" @@ -1843,19 +1913,45 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" +[[package]] +name = "proc-macro-error" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" +dependencies = [ + "proc-macro-error-attr 0.4.12", + "proc-macro2", + "quote", + "syn", + "version_check", +] + [[package]] name = "proc-macro-error" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc175e9777c3116627248584e8f8b3e2987405cabe1c0adf7d1dd28f09dc7880" dependencies = [ - "proc-macro-error-attr", + "proc-macro-error-attr 1.0.3", "proc-macro2", "quote", "syn", "version_check", ] +[[package]] +name = "proc-macro-error-attr" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "syn-mid", + "version_check", +] + [[package]] name = "proc-macro-error-attr" version = "1.0.3" @@ -2414,7 +2510,8 @@ dependencies = [ "async-trait", "cargo_metadata", "chrono", - "console", + "clap 3.0.0-beta.1", + "console 0.10.3", "dialoguer", "dotenv", "futures 0.3.5", @@ -2422,7 +2519,6 @@ dependencies = [ "serde", "serde_json", "sqlx", - "structopt", "tokio 0.2.21", "url 2.1.1", ] @@ -2438,6 +2534,7 @@ dependencies = [ "byteorder", "bytes 0.5.5", "chrono", + "crc", "crossbeam-channel", "crossbeam-queue", "crossbeam-utils 0.7.2", @@ -2685,13 +2782,19 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "structopt" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de2f5e239ee807089b62adce73e48c625e0ed80df02c7ab3f068f5db5281065c" dependencies = [ - "clap", + "clap 2.33.1", "lazy_static", "structopt-derive", ] @@ -2703,7 +2806,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "510413f9de616762a4fbeab62509bf15c729603b72d7cd71280fbca431b1c118" dependencies = [ "heck", - "proc-macro-error", + "proc-macro-error 1.0.3", "proc-macro2", "quote", "syn", diff --git a/Cargo.toml b/Cargo.toml index 71f18cee2..f0318ed8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,8 +37,9 @@ features = [ "all" ] rustdoc-args = ["--cfg", "docsrs"] [features] -default = [ "macros", "runtime-async-std" ] +default = [ "macros", "runtime-async-std", "migrate" ] macros = [ "sqlx-macros" ] +migrate = [ "sqlx-core/migrate" ] # [deprecated] TLS is not possible to disable due to it being conditional on multiple features # Hopefully Cargo can handle this in the future diff --git a/sqlx-cli/.gitignore b/sqlx-cli/.gitignore deleted file mode 100644 index 3549fae05..000000000 --- a/sqlx-cli/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -/target -Cargo.lock -.env \ No newline at end of file diff --git a/sqlx-cli/Cargo.toml b/sqlx-cli/Cargo.toml index 230af00f2..1540d7f67 100644 --- a/sqlx-cli/Cargo.toml +++ b/sqlx-cli/Cargo.toml @@ -27,15 +27,15 @@ path = "src/bin/cargo-sqlx.rs" [dependencies] dotenv = "0.15" tokio = { version = "0.2", features = ["macros"] } -sqlx = { version = "0.4.0-pre", path = "..", default-features = false, features = [ "runtime-tokio", "offline" ] } +sqlx = { version = "0.4.0-pre", path = "..", default-features = false, features = [ "runtime-async-std", "migrate", "any", "offline" ] } futures = "0.3" -structopt = "0.3" +clap = "3.0.0-beta.1" chrono = "0.4" anyhow = "1.0" url = { version = "2.1.1", default-features = false } async-trait = "0.1.30" console = "0.10.0" -dialoguer = "0.5.0" +dialoguer = "0.6.2" serde_json = { version = "1.0.53", features = ["preserve_order"] } serde = "1.0.110" glob = "0.3.0" @@ -44,7 +44,7 @@ cargo_metadata = "0.10.0" [features] default = [ "postgres", "sqlite", "mysql" ] -# database +# databases mysql = [ "sqlx/mysql" ] postgres = [ "sqlx/postgres" ] sqlite = [ "sqlx/sqlite" ] diff --git a/sqlx-cli/README.md b/sqlx-cli/README.md index 9c16656e6..9318a3d0b 100644 --- a/sqlx-cli/README.md +++ b/sqlx-cli/README.md @@ -5,6 +5,8 @@ mode with `sqlx::query!()` and friends. ### Install +#### With Rust toolchain + ```bash # supports all databases supported by SQLx $ cargo install sqlx-cli @@ -13,7 +15,7 @@ $ cargo install sqlx-cli $ cargo install sqlx-cli --no-default-features --features postgres ``` -### Commands +### Usage All commands require `DATABASE_URL` to be set, either in the environment or in a `.env` file in the current working directory. @@ -49,10 +51,6 @@ $ sqlx migration run Compares the migration history of the running database against the `migrations/` folder and runs any scripts that are still pending. -##### Note: Down-Migrations -Down-migrations are currently a non-planned feature as their utility seems dubious but we welcome -any contributions (discussions/code) regarding this matter. - #### Enable building in "offline" mode with `query!()` Note: must be run as `cargo sqlx`. diff --git a/sqlx-cli/src/bin/cargo-sqlx.rs b/sqlx-cli/src/bin/cargo-sqlx.rs index 2215c6f7d..5bf7bd1e0 100644 --- a/sqlx-cli/src/bin/cargo-sqlx.rs +++ b/sqlx-cli/src/bin/cargo-sqlx.rs @@ -1,18 +1,21 @@ -use sqlx_cli::Command; -use structopt::{clap, StructOpt}; - +use clap::{crate_version, AppSettings, FromArgMatches, IntoApp}; +use console::style; +use sqlx_cli::Opt; use std::env; #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() { // when invoked as `cargo sqlx [...]` the args we see are `[...]/sqlx-cli sqlx prepare` // so we want to notch out that superfluous "sqlx" let args = env::args_os().skip(2); - let matches = Command::clap() + let matches = Opt::into_app() + .version(crate_version!()) .bin_name("cargo sqlx") - .setting(clap::AppSettings::NoBinaryName) + .setting(AppSettings::NoBinaryName) .get_matches_from(args); - sqlx_cli::run(Command::from_clap(&matches)).await + if let Err(error) = sqlx_cli::run(Opt::from_arg_matches(&matches)).await { + println!("{} {}", style("error:").bold().red(), error); + } } diff --git a/sqlx-cli/src/bin/sqlx.rs b/sqlx-cli/src/bin/sqlx.rs index 2c6997ee4..059353155 100644 --- a/sqlx-cli/src/bin/sqlx.rs +++ b/sqlx-cli/src/bin/sqlx.rs @@ -1,8 +1,13 @@ -use sqlx_cli::Command; -use structopt::StructOpt; +use clap::{crate_version, FromArgMatches, IntoApp}; +use console::style; +use sqlx_cli::Opt; #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() { + let matches = Opt::into_app().version(crate_version!()).get_matches(); + // no special handling here - sqlx_cli::run(Command::from_args()).await + if let Err(error) = sqlx_cli::run(Opt::from_arg_matches(&matches)).await { + println!("{} {}", style("error:").bold().red(), error); + } } diff --git a/sqlx-cli/src/database.rs b/sqlx-cli/src/database.rs new file mode 100644 index 000000000..725584817 --- /dev/null +++ b/sqlx-cli/src/database.rs @@ -0,0 +1,32 @@ +use console::style; +use dialoguer::Confirm; +use sqlx::any::Any; +use sqlx::migrate::MigrateDatabase; + +pub async fn create(uri: &str) -> anyhow::Result<()> { + if !Any::database_exists(uri).await? { + Any::create_database(uri).await?; + } + + Ok(()) +} + +pub async fn drop(uri: &str, confirm: bool) -> anyhow::Result<()> { + if confirm + && !Confirm::new() + .with_prompt(format!( + "\nAre you sure you want to drop the database at {}?", + style(uri).cyan() + )) + .default(false) + .interact()? + { + return Ok(()); + } + + if Any::database_exists(uri).await? { + Any::drop_database(uri).await?; + } + + Ok(()) +} diff --git a/sqlx-cli/src/db.rs b/sqlx-cli/src/db.rs deleted file mode 100644 index 4ff30c3a5..000000000 --- a/sqlx-cli/src/db.rs +++ /dev/null @@ -1,59 +0,0 @@ -use dialoguer::Confirmation; - -use anyhow::bail; - -pub async fn run_create() -> anyhow::Result<()> { - let migrator = crate::migrator::get()?; - - if !migrator.can_create_database() { - bail!( - "Database creation is not implemented for {}", - migrator.database_type() - ); - } - - let db_name = migrator.get_database_name()?; - let db_exists = migrator.check_if_database_exists(&db_name).await?; - - if !db_exists { - println!("Creating database: {}", db_name); - Ok(migrator.create_database(&db_name).await?) - } else { - println!("Database already exists, aborting"); - Ok(()) - } -} - -pub async fn run_drop() -> anyhow::Result<()> { - let migrator = crate::migrator::get()?; - - if !migrator.can_drop_database() { - bail!( - "Database drop is not implemented for {}", - migrator.database_type() - ); - } - - let db_name = migrator.get_database_name()?; - let db_exists = migrator.check_if_database_exists(&db_name).await?; - - if db_exists { - if !Confirmation::new() - .with_text(&format!( - "\nAre you sure you want to drop the database: {}?", - db_name - )) - .default(false) - .interact()? - { - println!("Aborting"); - return Ok(()); - } - - println!("Dropping database: {}", db_name); - Ok(migrator.drop_database(&db_name).await?) - } else { - println!("Database does not exists, aborting"); - Ok(()) - } -} diff --git a/sqlx-cli/src/lib.rs b/sqlx-cli/src/lib.rs index 6d119e3c1..b0ee8174a 100644 --- a/sqlx-cli/src/lib.rs +++ b/sqlx-cli/src/lib.rs @@ -1,95 +1,38 @@ +use crate::opt::{Command, DatabaseCommand, MigrateCommand}; +use anyhow::anyhow; use dotenv::dotenv; +use std::env; -use structopt::StructOpt; - -mod migrator; - -mod db; -mod migration; +mod database; +// mod migration; +// mod migrator; +mod migrate; +mod opt; mod prepare; -#[derive(StructOpt, Debug)] -pub enum Command { - #[structopt(alias = "mig")] - Migrate(MigrationCommand), +pub use crate::opt::Opt; - #[structopt(alias = "db")] - Database(DatabaseCommand), - - /// Enables offline mode for a project utilizing `query!()` and related macros. - /// May only be run as `cargo sqlx prepare`. - /// - /// Saves data for all invocations of `query!()` and friends in the project so that it may be - /// built in offline mode, i.e. so compilation does not require connecting to a running database. - /// Outputs to `sqlx-data.json` in the current directory, overwriting it if it already exists. - /// - /// Offline mode can be activated simply by removing `DATABASE_URL` from the environment or - /// building without a `.env` file. - #[structopt(alias = "prep")] - Prepare { - /// If this flag is passed, instead of overwriting `sqlx-data.json` in the current directory, - /// that file is loaded and compared against the current output of the prepare step; if - /// there is a mismatch, an error is reported and the process exits with a nonzero exit code. - /// - /// Intended for use in CI. - #[structopt(long)] - check: bool, - - /// Any arguments to pass to `cargo rustc`; - /// Cargo args (preceding `--` in `cargo rustc ... -- ...`) only. - #[structopt(name = "Cargo args", last = true)] - cargo_args: Vec, - }, -} - -/// Generate and run migrations -#[derive(StructOpt, Debug)] -pub enum MigrationCommand { - /// Create a new migration with the given name, - /// using the current time as the version - Add { name: String }, - - /// Run all pending migrations - Run, - - /// List all migrations - List, -} - -/// Create or drops database depending on your connection string -#[derive(StructOpt, Debug)] -pub enum DatabaseCommand { - /// Create database in url - Create, - - /// Drop database in url - Drop, -} - -pub async fn run(cmd: Command) -> anyhow::Result<()> { +pub async fn run(opt: Opt) -> anyhow::Result<()> { dotenv().ok(); - match cmd { - Command::Migrate(migrate) => match migrate { - MigrationCommand::Add { name } => migration::add_file(&name)?, - MigrationCommand::Run => migration::run().await?, - MigrationCommand::List => migration::list().await?, + let database_url = env::var("DATABASE_URL") + .map_err(|_| anyhow!("The DATABASE_URL environment variable must be set"))?; + + match opt.command { + Command::Migrate(migrate) => match migrate.command { + MigrateCommand::Add { description } => migrate::add(&description)?, + MigrateCommand::Run => migrate::run(&database_url).await?, + MigrateCommand::Info => migrate::info(&database_url).await?, }, - Command::Database(database) => match database { - DatabaseCommand::Create => db::run_create().await?, - DatabaseCommand::Drop => db::run_drop().await?, + Command::Database(database) => match database.command { + DatabaseCommand::Create => database::create(&database_url).await?, + DatabaseCommand::Drop { yes } => database::drop(&database_url, !yes).await?, }, - Command::Prepare { - check: false, - cargo_args, - } => prepare::run(cargo_args)?, + Command::Prepare { check: false, args } => prepare::run(&database_url, args)?, - Command::Prepare { - check: true, - cargo_args, - } => prepare::check(cargo_args)?, + Command::Prepare { check: true, args } => prepare::check(&database_url, args)?, }; Ok(()) diff --git a/sqlx-cli/src/migrate.rs b/sqlx-cli/src/migrate.rs new file mode 100644 index 000000000..45dead33f --- /dev/null +++ b/sqlx-cli/src/migrate.rs @@ -0,0 +1,89 @@ +use anyhow::{bail, Context}; +use console::style; +use sqlx::migrate::{Migrate, MigrateError, Migrator}; +use sqlx::{AnyConnection, Connection}; +use std::fs::{self, File}; +use std::io::Write; +use std::path::Path; + +const MIGRATION_FOLDER: &'static str = "migrations"; + +pub fn add(description: &str) -> anyhow::Result<()> { + use chrono::prelude::*; + use std::path::PathBuf; + + fs::create_dir_all(MIGRATION_FOLDER).context("Unable to create migrations directory")?; + + let dt = Utc::now(); + let mut file_name = dt.format("%Y%m%d%H%M%S").to_string(); + file_name.push_str("_"); + file_name.push_str(&description.replace(' ', "_")); + file_name.push_str(".sql"); + + let mut path = PathBuf::new(); + path.push(MIGRATION_FOLDER); + path.push(&file_name); + + println!("Creating {}", style(path.display()).cyan()); + + let mut file = File::create(&path).context("Failed to create migration file")?; + + file.write_all(b"-- Add migration script here\n")?; + + Ok(()) +} + +pub async fn info(uri: &str) -> anyhow::Result<()> { + let migrator = Migrator::new(Path::new(MIGRATION_FOLDER)).await?; + let mut conn = AnyConnection::connect(uri).await?; + + conn.ensure_migrations_table().await?; + + let (version, _) = conn.version().await?.unwrap_or((0, false)); + + for migration in migrator.iter() { + println!( + "{}/{} {}", + style(migration.version()).cyan(), + if version >= migration.version() { + style("installed").green() + } else { + style("pending").yellow() + }, + migration.description(), + ); + } + + Ok(()) +} + +pub async fn run(uri: &str) -> anyhow::Result<()> { + let migrator = Migrator::new(Path::new(MIGRATION_FOLDER)).await?; + let mut conn = AnyConnection::connect(uri).await?; + + conn.ensure_migrations_table().await?; + + let (version, dirty) = conn.version().await?.unwrap_or((0, false)); + + if dirty { + bail!(MigrateError::Dirty(version)); + } + + for migration in migrator.iter() { + if migration.version() > version { + let elapsed = conn.apply(migration).await?; + + println!( + "{}/{} {} {}", + style(migration.version()).cyan(), + style("migrate").green(), + migration.description(), + style(format!("({:?})", elapsed)).dim() + ); + } else { + conn.validate(migration).await?; + } + } + + Ok(()) +} diff --git a/sqlx-cli/src/migrator/mod.rs b/sqlx-cli/src/migrator/mod.rs deleted file mode 100644 index 111ed633e..000000000 --- a/sqlx-cli/src/migrator/mod.rs +++ /dev/null @@ -1,75 +0,0 @@ -use anyhow::{bail, Context, Result}; -use async_trait::async_trait; -use std::env; -use url::Url; - -#[cfg(feature = "mysql")] -mod mysql; - -#[cfg(feature = "postgres")] -mod postgres; - -#[cfg(feature = "sqlite")] -mod sqlite; - -#[async_trait] -pub trait MigrationTransaction { - async fn commit(self: Box) -> Result<()>; - async fn rollback(self: Box) -> Result<()>; - async fn check_if_applied(&mut self, migration: &str) -> Result; - async fn execute_migration(&mut self, migration_sql: &str) -> Result<()>; - async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()>; -} - -#[async_trait] -pub trait DatabaseMigrator { - // Misc info - fn database_type(&self) -> String; - fn get_database_name(&self) -> Result; - - // Features - fn can_migrate_database(&self) -> bool; - fn can_create_database(&self) -> bool; - fn can_drop_database(&self) -> bool; - - // Database creation - async fn check_if_database_exists(&self, db_name: &str) -> Result; - async fn create_database(&self, db_name: &str) -> Result<()>; - async fn drop_database(&self, db_name: &str) -> Result<()>; - - // Migration - async fn create_migration_table(&self) -> Result<()>; - async fn get_migrations(&self) -> Result>; - async fn begin_migration(&self) -> Result>; -} - -pub fn get() -> Result> { - let db_url_raw = env::var("DATABASE_URL").context("Failed to find 'DATABASE_URL'")?; - - let db_url = Url::parse(&db_url_raw)?; - - // This code is taken from: https://github.com/launchbadge/sqlx/blob/master/sqlx-macros/src/lib.rs#L63 - match db_url.scheme() { - #[cfg(feature = "sqlite")] - "sqlite" => Ok(Box::new(self::sqlite::Sqlite::new(db_url_raw ))), - #[cfg(not(feature = "sqlite"))] - "sqlite" => bail!("Not implemented. DATABASE_URL {} has the scheme of a SQLite database but the `sqlite` feature of sqlx was not enabled", - db_url), - - #[cfg(feature = "postgres")] - "postgresql" | "postgres" => Ok(Box::new(self::postgres::Postgres::new(db_url_raw))), - #[cfg(not(feature = "postgres"))] - "postgresql" | "postgres" => bail!("DATABASE_URL {} has the scheme of a Postgres database but the `postgres` feature of sqlx was not enabled", - db_url), - - #[cfg(feature = "mysql")] - "mysql" | "mariadb" => Ok(Box::new(self::mysql::MySql::new(db_url_raw))), - #[cfg(not(feature = "mysql"))] - "mysql" | "mariadb" => bail!( - "DATABASE_URL {} has the scheme of a MySQL/MariaDB database but the `mysql` feature of sqlx was not enabled", - db_url - ), - - scheme => bail!("unexpected scheme {:?} in DATABASE_URL {}", scheme, db_url), - } -} diff --git a/sqlx-cli/src/migrator/mysql.rs b/sqlx-cli/src/migrator/mysql.rs deleted file mode 100644 index b39e2c75a..000000000 --- a/sqlx-cli/src/migrator/mysql.rs +++ /dev/null @@ -1,203 +0,0 @@ -use sqlx::mysql::MySqlRow; -use sqlx::pool::PoolConnection; -use sqlx::Connect; -use sqlx::Executor; -use sqlx::MySqlConnection; -use sqlx::MySqlPool; -use sqlx::Row; - -use anyhow::{anyhow, Context, Result}; -use async_trait::async_trait; - -use super::{DatabaseMigrator, MigrationTransaction}; - -pub struct MySql { - pub db_url: String, -} - -impl MySql { - pub fn new(db_url: String) -> Self { - MySql { - db_url: db_url.clone(), - } - } -} - -struct DbUrl<'a> { - base_url: &'a str, - db_name: &'a str, -} - -fn get_base_url<'a>(db_url: &'a str) -> Result { - let split: Vec<&str> = db_url.rsplitn(2, '/').collect(); - - if split.len() != 2 { - return Err(anyhow!("Failed to find database name in connection string")); - } - - let db_name = split[0]; - let base_url = split[1]; - - Ok(DbUrl { base_url, db_name }) -} - -#[async_trait] -impl DatabaseMigrator for MySql { - fn database_type(&self) -> String { - "MySql".to_string() - } - - fn can_migrate_database(&self) -> bool { - true - } - - fn can_create_database(&self) -> bool { - true - } - - fn can_drop_database(&self) -> bool { - true - } - - fn get_database_name(&self) -> Result { - let db_url = get_base_url(&self.db_url)?; - Ok(db_url.db_name.to_string()) - } - - async fn check_if_database_exists(&self, db_name: &str) -> Result { - let db_url = get_base_url(&self.db_url)?; - - let base_url = db_url.base_url; - - let mut conn = MySqlConnection::connect(base_url).await?; - - let result: bool = sqlx::query( - "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)", - ) - .bind(db_name) - .try_map(|row: MySqlRow| row.try_get(0)) - .fetch_one(&mut conn) - .await - .context("Failed to check if database exists")?; - - Ok(result) - } - - async fn create_database(&self, db_name: &str) -> Result<()> { - let db_url = get_base_url(&self.db_url)?; - - let base_url = db_url.base_url; - - let mut conn = MySqlConnection::connect(base_url).await?; - - sqlx::query(&format!("CREATE DATABASE `{}`", db_name)) - .execute(&mut conn) - .await - .with_context(|| format!("Failed to create database: {}", db_name))?; - - Ok(()) - } - - async fn drop_database(&self, db_name: &str) -> Result<()> { - let db_url = get_base_url(&self.db_url)?; - - let base_url = db_url.base_url; - - let mut conn = MySqlConnection::connect(base_url).await?; - - sqlx::query(&format!("DROP DATABASE `{}`", db_name)) - .execute(&mut conn) - .await - .with_context(|| format!("Failed to drop database: {}", db_name))?; - - Ok(()) - } - - async fn create_migration_table(&self) -> Result<()> { - let mut conn = MySqlConnection::connect(&self.db_url).await?; - println!("Create migration table"); - - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS __migrations ( - migration VARCHAR (255) PRIMARY KEY, - created TIMESTAMP NOT NULL DEFAULT current_timestamp - ); - "#, - ) - .execute(&mut conn) - .await - .context("Failed to create migration table")?; - - Ok(()) - } - - async fn get_migrations(&self) -> Result> { - let mut conn = MySqlConnection::connect(&self.db_url).await?; - - let result = sqlx::query( - r#" - select migration from __migrations order by created - "#, - ) - .try_map(|row: MySqlRow| row.try_get(0)) - .fetch_all(&mut conn) - .await - .context("Failed to create migration table")?; - - Ok(result) - } - - async fn begin_migration(&self) -> Result> { - let pool = MySqlPool::new(&self.db_url) - .await - .context("Failed to connect to pool")?; - - let tx = pool.begin().await?; - - Ok(Box::new(MySqlMigration { transaction: tx })) - } -} - -pub struct MySqlMigration { - transaction: sqlx::Transaction<'static, sqlx::MySql, PoolConnection>, -} - -#[async_trait] -impl MigrationTransaction for MySqlMigration { - async fn commit(self: Box) -> Result<()> { - self.transaction.commit().await?; - Ok(()) - } - - async fn rollback(self: Box) -> Result<()> { - self.transaction.rollback().await?; - Ok(()) - } - - async fn check_if_applied(&mut self, migration_name: &str) -> Result { - let result = - sqlx::query("select exists(select migration from __migrations where migration = ?)") - .bind(migration_name.to_string()) - .try_map(|row: MySqlRow| row.try_get(0)) - .fetch_one(&mut self.transaction) - .await - .context("Failed to check migration table")?; - - Ok(result) - } - - async fn execute_migration(&mut self, migration_sql: &str) -> Result<()> { - self.transaction.execute(migration_sql).await?; - Ok(()) - } - - async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()> { - sqlx::query("insert into __migrations (migration) values (?)") - .bind(migration_name.to_string()) - .execute(&mut self.transaction) - .await - .context("Failed to insert migration")?; - Ok(()) - } -} diff --git a/sqlx-cli/src/migrator/postgres.rs b/sqlx-cli/src/migrator/postgres.rs deleted file mode 100644 index 4c952656d..000000000 --- a/sqlx-cli/src/migrator/postgres.rs +++ /dev/null @@ -1,209 +0,0 @@ -use sqlx::pool::PoolConnection; -use sqlx::postgres::PgRow; -use sqlx::Connect; -use sqlx::Executor; -use sqlx::PgConnection; -use sqlx::PgPool; -use sqlx::Row; - -use anyhow::{anyhow, Context, Result}; -use async_trait::async_trait; - -use crate::migrator::{DatabaseMigrator, MigrationTransaction}; - -pub struct Postgres { - pub db_url: String, -} - -impl Postgres { - pub fn new(db_url: String) -> Self { - Postgres { - db_url: db_url.clone(), - } - } -} - -struct DbUrl<'a> { - base_url: &'a str, - db_name: &'a str, -} - -fn get_base_url<'a>(db_url: &'a str) -> Result { - let split: Vec<&str> = db_url.rsplitn(2, '/').collect(); - - if split.len() != 2 { - return Err(anyhow!("Failed to find database name in connection string")); - } - - let db_name = split[0]; - let base_url = split[1]; - - Ok(DbUrl { base_url, db_name }) -} - -#[async_trait] -impl DatabaseMigrator for Postgres { - fn database_type(&self) -> String { - "Postgres".to_string() - } - - fn can_migrate_database(&self) -> bool { - true - } - - fn can_create_database(&self) -> bool { - true - } - - fn can_drop_database(&self) -> bool { - true - } - - fn get_database_name(&self) -> Result { - let db_url = get_base_url(&self.db_url)?; - Ok(db_url.db_name.to_string()) - } - - async fn check_if_database_exists(&self, db_name: &str) -> Result { - let db_url = get_base_url(&self.db_url)?; - - let base_url = db_url.base_url; - - let mut conn = PgConnection::connect(base_url).await?; - - let result: bool = - sqlx::query("select exists(SELECT 1 from pg_database WHERE datname = $1) as exists") - .bind(db_name) - .try_map(|row: PgRow| row.try_get("exists")) - .fetch_one(&mut conn) - .await - .context("Failed to check if database exists")?; - - Ok(result) - } - - async fn create_database(&self, db_name: &str) -> Result<()> { - let db_url = get_base_url(&self.db_url)?; - - let base_url = db_url.base_url; - - let mut conn = PgConnection::connect(base_url).await?; - - // quote database name (quotes in the name are escaped with additional quotes) - sqlx::query(&format!( - "CREATE DATABASE \"{}\"", - db_name.replace('"', "\"\"") - )) - .execute(&mut conn) - .await - .with_context(|| format!("Failed to create database: {}", db_name))?; - - Ok(()) - } - - async fn drop_database(&self, db_name: &str) -> Result<()> { - let db_url = get_base_url(&self.db_url)?; - - let base_url = db_url.base_url; - - let mut conn = PgConnection::connect(base_url).await?; - - sqlx::query(&format!( - "DROP DATABASE \"{}\"", - db_name.replace('"', "\"\"") - )) - .execute(&mut conn) - .await - .with_context(|| format!("Failed to drop database: {}", db_name))?; - - Ok(()) - } - - async fn create_migration_table(&self) -> Result<()> { - let mut conn = PgConnection::connect(&self.db_url).await?; - - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS __migrations ( - migration VARCHAR (255) PRIMARY KEY, - created TIMESTAMP NOT NULL DEFAULT current_timestamp - ); - "#, - ) - .execute(&mut conn) - .await - .context("Failed to create migration table")?; - - Ok(()) - } - - async fn get_migrations(&self) -> Result> { - let mut conn = PgConnection::connect(&self.db_url).await?; - - let result = sqlx::query( - r#" - select migration from __migrations order by created - "#, - ) - .try_map(|row: PgRow| row.try_get(0)) - .fetch_all(&mut conn) - .await - .context("Failed to create migration table")?; - - Ok(result) - } - - async fn begin_migration(&self) -> Result> { - let pool = PgPool::new(&self.db_url) - .await - .context("Failed to connect to pool")?; - - let tx = pool.begin().await?; - - Ok(Box::new(PostgresMigration { transaction: tx })) - } -} - -pub struct PostgresMigration { - transaction: sqlx::Transaction<'static, sqlx::Postgres, PoolConnection>, -} - -#[async_trait] -impl MigrationTransaction for PostgresMigration { - async fn commit(self: Box) -> Result<()> { - self.transaction.commit().await?; - Ok(()) - } - - async fn rollback(self: Box) -> Result<()> { - self.transaction.rollback().await?; - Ok(()) - } - - async fn check_if_applied(&mut self, migration_name: &str) -> Result { - let result = sqlx::query( - "select exists(select migration from __migrations where migration = $1) as exists", - ) - .bind(migration_name.to_string()) - .try_map(|row: PgRow| row.try_get("exists")) - .fetch_one(&mut self.transaction) - .await - .context("Failed to check migration table")?; - - Ok(result) - } - - async fn execute_migration(&mut self, migration_sql: &str) -> Result<()> { - self.transaction.execute(migration_sql).await?; - Ok(()) - } - - async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()> { - sqlx::query("insert into __migrations (migration) values ($1)") - .bind(migration_name.to_string()) - .execute(&mut self.transaction) - .await - .context("Failed to insert migration")?; - Ok(()) - } -} diff --git a/sqlx-cli/src/migrator/sqlite.rs b/sqlx-cli/src/migrator/sqlite.rs deleted file mode 100644 index 6bc6ab92d..000000000 --- a/sqlx-cli/src/migrator/sqlite.rs +++ /dev/null @@ -1,178 +0,0 @@ -// use sqlx::pool::PoolConnection; -use sqlx::sqlite::SqliteRow; -use sqlx::Connect; -use sqlx::Executor; -use sqlx::Row; -use sqlx::SqliteConnection; -// use sqlx::SqlitePool; - -use anyhow::{anyhow, Context, Result}; -use async_trait::async_trait; - -use crate::migrator::{DatabaseMigrator, MigrationTransaction}; - -pub struct Sqlite { - db_url: String, - path: String, -} - -impl Sqlite { - pub fn new(db_url: String) -> Self { - let path = crop_letters(&db_url, "sqlite://".len()); - Sqlite { - db_url: db_url.clone(), - path: path.to_string(), - } - } -} - -fn crop_letters(s: &str, pos: usize) -> &str { - match s.char_indices().skip(pos).next() { - Some((pos, _)) => &s[pos..], - None => "", - } -} - -#[async_trait] -impl DatabaseMigrator for Sqlite { - fn database_type(&self) -> String { - "Sqlite".to_string() - } - - fn can_migrate_database(&self) -> bool { - true - } - - fn can_create_database(&self) -> bool { - true - } - - fn can_drop_database(&self) -> bool { - true - } - - fn get_database_name(&self) -> Result { - let split: Vec<&str> = self.db_url.rsplitn(2, '/').collect(); - - if split.len() != 2 { - return Err(anyhow!("Failed to find database name in connection string")); - } - - let db_name = split[0]; - - Ok(db_name.to_string()) - } - - async fn check_if_database_exists(&self, _db_name: &str) -> Result { - use std::path::Path; - Ok(Path::new(&self.path).exists()) - } - - async fn create_database(&self, _db_name: &str) -> Result<()> { - println!("DB {}", self.path); - - // Opening a connection to sqlite creates the database. - let _ = SqliteConnection::connect(&self.db_url).await?; - - Ok(()) - } - - async fn drop_database(&self, _db_name: &str) -> Result<()> { - std::fs::remove_file(&self.path)?; - Ok(()) - } - - async fn create_migration_table(&self) -> Result<()> { - let mut conn = SqliteConnection::connect(&self.db_url).await?; - - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS __migrations ( - migration TEXT PRIMARY KEY, - created TIMESTAMP NOT NULL DEFAULT current_timestamp - ); - "#, - ) - .execute(&mut conn) - .await - .context("Failed to create migration table")?; - - Ok(()) - } - - async fn get_migrations(&self) -> Result> { - let mut conn = SqliteConnection::connect(&self.db_url).await?; - - let result = sqlx::query( - r#" - select migration from __migrations order by created - "#, - ) - .try_map(|row: SqliteRow| row.try_get(0)) - .fetch_all(&mut conn) - .await - .context("Failed to create migration table")?; - - Ok(result) - } - - async fn begin_migration(&self) -> Result> { - // let pool = SqlitePool::new(&self.db_url) - // .await - // .context("Failed to connect to pool")?; - - // let tx = pool.begin().await?; - - // Ok(Box::new(MigrationTransaction { transaction: tx })) - Ok(Box::new(SqliteMigration { - db_url: self.db_url.clone(), - })) - } -} - -pub struct SqliteMigration { - db_url: String, - // pub transaction: sqlx::Transaction<'static, sqlx::Sqlite, PoolConnection>, -} - -#[async_trait] -impl MigrationTransaction for SqliteMigration { - async fn commit(self: Box) -> Result<()> { - // self.transaction.commit().await?; - Ok(()) - } - - async fn rollback(self: Box) -> Result<()> { - // self.transaction.rollback().await?; - Ok(()) - } - - async fn check_if_applied(&mut self, migration_name: &str) -> Result { - let mut conn = SqliteConnection::connect(&self.db_url).await?; - - let result = - sqlx::query("select exists(select migration from __migrations where migration = $1)") - .bind(migration_name.to_string()) - .try_map(|row: SqliteRow| row.try_get(0)) - .fetch_one(&mut conn) - .await?; - - Ok(result) - } - - async fn execute_migration(&mut self, migration_sql: &str) -> Result<()> { - let mut conn = SqliteConnection::connect(&self.db_url).await?; - conn.execute(migration_sql).await?; - // self.transaction.execute(migration_sql).await?; - Ok(()) - } - - async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()> { - let mut conn = SqliteConnection::connect(&self.db_url).await?; - sqlx::query("insert into __migrations (migration) values ($1)") - .bind(migration_name.to_string()) - .execute(&mut conn) - .await?; - Ok(()) - } -} diff --git a/sqlx-cli/src/opt.rs b/sqlx-cli/src/opt.rs new file mode 100644 index 000000000..2b77d73bd --- /dev/null +++ b/sqlx-cli/src/opt.rs @@ -0,0 +1,77 @@ +use clap::Clap; + +#[derive(Clap, Debug)] +pub struct Opt { + #[clap(subcommand)] + pub command: Command, +} + +#[derive(Clap, Debug)] +pub enum Command { + #[clap(alias = "db")] + Database(DatabaseOpt), + + /// Generate query metadata to support offline compile-time verification. + /// + /// Saves metadata for all invocations of `query!` and related macros to `sqlx-data.json` + /// in the current directory, overwriting if needed. + /// + /// During project compilation, the absence of the `DATABASE_URL` environment variable or + /// the presence of `SQLX_OFFLINE` will constrain the compile-time verification to only + /// read from the cached query metadata. + #[clap(alias = "prep")] + Prepare { + /// Run in 'check' mode. Exits with 0 if the query metadata is up-to-date. Exits with + /// 1 if the query metadata needs updating. + #[clap(long)] + check: bool, + + /// Arguments to be passed to `cargo rustc ...`. + #[clap(last = true)] + args: Vec, + }, + + #[clap(alias = "mig")] + Migrate(MigrateOpt), +} + +/// Group of commands for creating and dropping your database. +#[derive(Clap, Debug)] +pub struct DatabaseOpt { + #[clap(subcommand)] + pub command: DatabaseCommand, +} + +#[derive(Clap, Debug)] +pub enum DatabaseCommand { + /// Creates the database specified in your DATABASE_URL. + Create, + + /// Drops the database specified in your DATABASE_URL. + Drop { + /// Automatic confirmation. Without this option, you will be prompted before dropping + /// your database. + #[clap(short)] + yes: bool, + }, +} + +/// Group of commands for creating and running migrations. +#[derive(Clap, Debug)] +pub struct MigrateOpt { + #[clap(subcommand)] + pub command: MigrateCommand, +} + +#[derive(Clap, Debug)] +pub enum MigrateCommand { + /// Create a new migration with the given description, + /// and the current time as the version. + Add { description: String }, + + /// Run all pending migrations. + Run, + + /// List all available migrations. + Info, +} diff --git a/sqlx-cli/src/prepare.rs b/sqlx-cli/src/prepare.rs index f1e491a74..15fcf8bda 100644 --- a/sqlx-cli/src/prepare.rs +++ b/sqlx-cli/src/prepare.rs @@ -1,18 +1,17 @@ -use anyhow::{anyhow, bail, Context}; -use std::process::Command; -use std::{env, fs}; - +use anyhow::{bail, Context}; use cargo_metadata::MetadataCommand; +use sqlx::any::{AnyConnectOptions, AnyKind}; use std::collections::BTreeMap; use std::fs::File; - +use std::process::Command; +use std::str::FromStr; use std::time::SystemTime; -use url::Url; +use std::{env, fs}; type QueryData = BTreeMap; type JsonObject = serde_json::Map; -pub fn run(cargo_args: Vec) -> anyhow::Result<()> { +pub fn run(url: &str, cargo_args: Vec) -> anyhow::Result<()> { #[derive(serde::Serialize)] struct DataFile { db: &'static str, @@ -20,7 +19,7 @@ pub fn run(cargo_args: Vec) -> anyhow::Result<()> { data: QueryData, } - let db_kind = get_db_kind()?; + let db_kind = get_db_kind(url)?; let data = run_prepare_step(cargo_args)?; serde_json::to_writer_pretty( @@ -37,8 +36,8 @@ pub fn run(cargo_args: Vec) -> anyhow::Result<()> { Ok(()) } -pub fn check(cargo_args: Vec) -> anyhow::Result<()> { - let db_kind = get_db_kind()?; +pub fn check(url: &str, cargo_args: Vec) -> anyhow::Result<()> { + let db_kind = get_db_kind(url)?; let data = run_prepare_step(cargo_args)?; let data_file = fs::read("sqlx-data.json").context( @@ -133,17 +132,21 @@ fn run_prepare_step(cargo_args: Vec) -> anyhow::Result { Ok(data) } -fn get_db_kind() -> anyhow::Result<&'static str> { - let db_url = dotenv::var("DATABASE_URL") - .map_err(|_| anyhow!("`DATABASE_URL` must be set to use the `prepare` subcommand"))?; - - let db_url = Url::parse(&db_url)?; +fn get_db_kind(url: &str) -> anyhow::Result<&'static str> { + let options = AnyConnectOptions::from_str(&url)?; // these should match the values of `DatabaseExt::NAME` in `sqlx-macros` - match db_url.scheme() { - "postgres" | "postgresql" => Ok("PostgreSQL"), - "mysql" | "mariadb" => Ok("MySQL/MariaDB"), - "sqlite" => Ok("SQLite"), - _ => bail!("unexpected scheme in database URL: {}", db_url.scheme()), + match options.kind() { + #[cfg(feature = "postgres")] + AnyKind::Postgres => Ok("PostgreSQL"), + + #[cfg(feature = "mysql")] + AnyKind::MySql => Ok("MySQL"), + + #[cfg(feature = "sqlite")] + AnyKind::Sqlite => Ok("SQLite"), + + #[cfg(feature = "mssql")] + AnyKind::Mssql => Ok("MSSQL"), } } diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 89b0c6b26..c0b098217 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -14,6 +14,7 @@ authors = [ [features] default = [ "runtime-async-std" ] +migrate = [ "sha2", "crc" ] # databases all-databases = [ "postgres", "mysql", "sqlite", "mssql", "any" ] @@ -48,6 +49,7 @@ bitflags = { version = "1.2.1", default-features = false } bytes = "0.5.4" byteorder = { version = "1.3.4", default-features = false, features = [ "std" ] } chrono = { version = "0.4.11", default-features = false, features = [ "clock" ], optional = true } +crc = { version = "1.8.1", optional = true } crossbeam-queue = "0.2.1" crossbeam-channel = "0.4.2" crossbeam-utils = { version = "0.7.2", default-features = false } diff --git a/sqlx-core/src/any/kind.rs b/sqlx-core/src/any/kind.rs new file mode 100644 index 000000000..8d5454ed4 --- /dev/null +++ b/sqlx-core/src/any/kind.rs @@ -0,0 +1,67 @@ +use crate::error::Error; +use std::str::FromStr; + +#[derive(Debug)] +pub enum AnyKind { + #[cfg(feature = "postgres")] + Postgres, + + #[cfg(feature = "mysql")] + MySql, + + #[cfg(feature = "sqlite")] + Sqlite, + + #[cfg(feature = "mssql")] + Mssql, +} + +impl FromStr for AnyKind { + type Err = Error; + + fn from_str(uri: &str) -> Result { + match uri { + #[cfg(feature = "postgres")] + _ if uri.starts_with("postgres:") || uri.starts_with("postgresql:") => { + Ok(AnyKind::Postgres) + } + + #[cfg(not(feature = "postgres"))] + _ if uri.starts_with("postgres:") || uri.starts_with("postgresql:") => { + Err(Error::Configuration("database URL has the scheme of a PostgreSQL database but the `postgres` feature is not enabled".into())) + } + + #[cfg(feature = "mysql")] + _ if uri.starts_with("mysql:") || uri.starts_with("mariadb:") => { + Ok(AnyKind::MySql) + } + + #[cfg(not(feature = "mysql"))] + _ if uri.starts_with("mysql:") || uri.starts_with("mariadb:") => { + Err(Error::Configuration("database URL has the scheme of a MySQL database but the `mysql` feature is not enabled".into())) + } + + #[cfg(feature = "sqlite")] + _ if uri.starts_with("sqlite:") => { + Ok(AnyKind::Sqlite) + } + + #[cfg(not(feature = "sqlite"))] + _ if uri.starts_with("sqlite:") => { + Err(Error::Configuration("database URL has the scheme of a SQLite database but the `sqlite` feature is not enabled".into())) + } + + #[cfg(feature = "mssql")] + _ if uri.starts_with("mssql:") || uri.starts_with("sqlserver:") => { + Ok(AnyKind::Mssql) + } + + #[cfg(not(feature = "mssql"))] + _ if uri.starts_with("mssql:") || uri.starts_with("sqlserver:") => { + Err(Error::Configuration("database URL has the scheme of a MSSQL database but the `mssql` feature is not enabled".into())) + } + + _ => Err(Error::Configuration(format!("unrecognized database url: {:?}", uri).into())) + } + } +} diff --git a/sqlx-core/src/any/migrate.rs b/sqlx-core/src/any/migrate.rs new file mode 100644 index 000000000..5bb575404 --- /dev/null +++ b/sqlx-core/src/any/migrate.rs @@ -0,0 +1,168 @@ +use crate::any::connection::AnyConnectionKind; +use crate::any::kind::AnyKind; +use crate::any::{Any, AnyConnection}; +use crate::error::Error; +use crate::migrate::{Migrate, MigrateDatabase, MigrateError, Migration}; +use futures_core::future::BoxFuture; +use std::str::FromStr; +use std::time::Duration; + +impl MigrateDatabase for Any { + fn create_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + match AnyKind::from_str(uri)? { + #[cfg(feature = "postgres")] + AnyKind::Postgres => crate::postgres::Postgres::create_database(uri).await, + + #[cfg(feature = "sqlite")] + AnyKind::Sqlite => crate::sqlite::Sqlite::create_database(uri).await, + + #[cfg(feature = "mysql")] + AnyKind::MySql => crate::mysql::MySql::create_database(uri).await, + + #[cfg(feature = "mssql")] + AnyKind::Mssql => crate::mssql::Mssql::create_database(uri).await, + } + }) + } + + fn database_exists(uri: &str) -> BoxFuture<'_, Result> { + Box::pin(async move { + match AnyKind::from_str(uri)? { + #[cfg(feature = "postgres")] + AnyKind::Postgres => crate::postgres::Postgres::database_exists(uri).await, + + #[cfg(feature = "sqlite")] + AnyKind::Sqlite => crate::sqlite::Sqlite::database_exists(uri).await, + + #[cfg(feature = "mysql")] + AnyKind::MySql => crate::mysql::MySql::database_exists(uri).await, + + #[cfg(feature = "mssql")] + AnyKind::Mssql => crate::mssql::Mssql::database_exists(uri).await, + } + }) + } + + fn drop_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + match AnyKind::from_str(uri)? { + #[cfg(feature = "postgres")] + AnyKind::Postgres => crate::postgres::Postgres::drop_database(uri).await, + + #[cfg(feature = "sqlite")] + AnyKind::Sqlite => crate::sqlite::Sqlite::drop_database(uri).await, + + #[cfg(feature = "mysql")] + AnyKind::MySql => crate::mysql::MySql::drop_database(uri).await, + + #[cfg(feature = "mssql")] + AnyKind::Mssql => crate::mssql::Mssql::drop_database(uri).await, + } + }) + } +} + +impl Migrate for AnyConnection { + fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + match &mut self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.ensure_migrations_table(), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.ensure_migrations_table(), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.ensure_migrations_table(), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => conn.ensure_migrations_table(), + } + } + + fn version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { + match &mut self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.version(), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.version(), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.version(), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => conn.version(), + } + } + + fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + match &mut self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.lock(), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.lock(), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.lock(), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => conn.lock(), + } + } + + fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + match &mut self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.unlock(), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.unlock(), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.unlock(), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => conn.unlock(), + } + } + + fn validate<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result<(), MigrateError>> { + match &mut self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.validate(migration), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.validate(migration), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.validate(migration), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => conn.validate(migration), + } + } + + fn apply<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + match &mut self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.apply(migration), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.apply(migration), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.apply(migration), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => conn.apply(migration), + } + } +} diff --git a/sqlx-core/src/any/mod.rs b/sqlx-core/src/any/mod.rs index 1502560c9..a02706db5 100644 --- a/sqlx-core/src/any/mod.rs +++ b/sqlx-core/src/any/mod.rs @@ -11,6 +11,7 @@ mod arguments; pub(crate) mod column; mod connection; mod database; +mod kind; mod options; pub(crate) mod row; mod transaction; @@ -18,12 +19,16 @@ pub(crate) mod type_info; pub mod types; pub(crate) mod value; +#[cfg(feature = "migrate")] +mod migrate; + pub use arguments::{AnyArgumentBuffer, AnyArguments}; pub use column::AnyColumn; pub use connection::AnyConnection; pub use database::Any; pub use decode::AnyDecode; pub use encode::AnyEncode; +pub use kind::AnyKind; pub use options::AnyConnectOptions; pub use r#type::AnyType; pub use row::AnyRow; diff --git a/sqlx-core/src/any/options.rs b/sqlx-core/src/any/options.rs index 35a8eb776..0978b5778 100644 --- a/sqlx-core/src/any/options.rs +++ b/sqlx-core/src/any/options.rs @@ -13,6 +13,7 @@ use crate::mysql::MySqlConnectOptions; #[cfg(feature = "sqlite")] use crate::sqlite::SqliteConnectOptions; +use crate::any::kind::AnyKind; #[cfg(feature = "mssql")] use crate::mssql::MssqlConnectOptions; @@ -26,6 +27,24 @@ use crate::mssql::MssqlConnectOptions; #[derive(Debug)] pub struct AnyConnectOptions(pub(crate) AnyConnectOptionsKind); +impl AnyConnectOptions { + pub fn kind(&self) -> AnyKind { + match &self.0 { + #[cfg(feature = "postgres")] + AnyConnectOptionsKind::Postgres(_) => AnyKind::Postgres, + + #[cfg(feature = "mysql")] + AnyConnectOptionsKind::MySql(_) => AnyKind::MySql, + + #[cfg(feature = "sqlite")] + AnyConnectOptionsKind::Sqlite(_) => AnyKind::Sqlite, + + #[cfg(feature = "mssql")] + AnyConnectOptionsKind::Mssql(_) => AnyKind::Mssql, + } + } +} + #[derive(Debug)] pub(crate) enum AnyConnectOptionsKind { #[cfg(feature = "postgres")] @@ -45,49 +64,24 @@ impl FromStr for AnyConnectOptions { type Err = Error; fn from_str(url: &str) -> Result { - match url { + match AnyKind::from_str(url)? { #[cfg(feature = "postgres")] - _ if url.starts_with("postgres:") || url.starts_with("postgresql:") => { + AnyKind::Postgres => { PgConnectOptions::from_str(url).map(AnyConnectOptionsKind::Postgres) } - #[cfg(not(feature = "postgres"))] - _ if url.starts_with("postgres:") || url.starts_with("postgresql:") => { - Err("database URL has the scheme of a PostgreSQL database but the `postgres` feature is not enabled".into()) - } - #[cfg(feature = "mysql")] - _ if url.starts_with("mysql:") || url.starts_with("mariadb:") => { - MySqlConnectOptions::from_str(url).map(AnyConnectOptionsKind::MySql) - } - - #[cfg(not(feature = "mysql"))] - _ if url.starts_with("mysql:") || url.starts_with("mariadb:") => { - Err("database URL has the scheme of a MySQL database but the `mysql` feature is not enabled".into()) - } + AnyKind::MySql => MySqlConnectOptions::from_str(url).map(AnyConnectOptionsKind::MySql), #[cfg(feature = "sqlite")] - _ if url.starts_with("sqlite:") => { + AnyKind::Sqlite => { SqliteConnectOptions::from_str(url).map(AnyConnectOptionsKind::Sqlite) } - #[cfg(not(feature = "sqlite"))] - _ if url.starts_with("sqlite:") => { - Err("database URL has the scheme of a SQLite database but the `sqlite` feature is not enabled".into()) - } - #[cfg(feature = "mssql")] - _ if url.starts_with("mssql:") || url.starts_with("sqlserver:") => { - MssqlConnectOptions::from_str(url).map(AnyConnectOptionsKind::Mssql) - } - - #[cfg(not(feature = "mssql"))] - _ if url.starts_with("mssql:") || url.starts_with("sqlserver:") => { - Err("database URL has the scheme of a MSSQL database but the `mssql` feature is not enabled".into()) - } - - _ => Err(Error::Configuration(format!("unrecognized database url: {:?}", url).into())) - }.map(AnyConnectOptions) + AnyKind::Mssql => MssqlConnectOptions::from_str(url).map(AnyConnectOptionsKind::Mssql), + } + .map(AnyConnectOptions) } } diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index c3685d3a0..d2554f208 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -91,6 +91,10 @@ pub enum Error { /// [`Pool::close`]: crate::pool::Pool::close #[error("attempted to acquire a connection on a closed pool")] PoolClosed, + + #[cfg(feature = "migrate")] + #[error("{0}")] + Migrate(#[source] Box), } impl Error { @@ -221,6 +225,14 @@ where } } +#[cfg(feature = "migrate")] +impl From for Error { + #[inline] + fn from(error: crate::migrate::MigrateError) -> Self { + Error::Migrate(Box::new(error)) + } +} + // Format an error message as a `Protocol` error macro_rules! err_protocol { ($expr:expr) => { diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 4da7a38ec..d5d384e85 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -63,6 +63,9 @@ pub mod statement; pub mod type_info; pub mod value; +#[cfg(feature = "migrate")] +pub mod migrate; + #[cfg(all( any( feature = "postgres", diff --git a/sqlx-core/src/migrate/error.rs b/sqlx-core/src/migrate/error.rs new file mode 100644 index 000000000..e9689e43d --- /dev/null +++ b/sqlx-core/src/migrate/error.rs @@ -0,0 +1,23 @@ +use crate::error::{BoxDynError, Error}; + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum MigrateError { + #[error("while executing migrations: {0}")] + Execute(#[from] Error), + + #[error("while resolving migrations: {0}")] + Source(#[source] BoxDynError), + + #[error("migration {0} was previously applied but is missing in the resolved migrations")] + VersionMissing(i64), + + #[error("migration {0} was previously applied but has been modified")] + VersionMismatch(i64), + + // NOTE: this will only happen with a database that does not have transactional DDL (.e.g, MySQL or Oracle) + #[error( + "migration {0} is partially applied; fix and remove row from `_sqlx_migrations` table" + )] + Dirty(i64), +} diff --git a/sqlx-core/src/migrate/migrate.rs b/sqlx-core/src/migrate/migrate.rs new file mode 100644 index 000000000..c434c20cd --- /dev/null +++ b/sqlx-core/src/migrate/migrate.rs @@ -0,0 +1,53 @@ +use crate::error::Error; +use crate::migrate::{MigrateError, Migration}; +use futures_core::future::BoxFuture; +use std::time::Duration; + +pub trait MigrateDatabase { + // create database in uri + // uses a maintenance database depending on driver + fn create_database(uri: &str) -> BoxFuture<'_, Result<(), Error>>; + + // check if the database in uri exists + // uses a maintenance database depending on driver + fn database_exists(uri: &str) -> BoxFuture<'_, Result>; + + // drop database in uri + // uses a maintenance database depending on driver + fn drop_database(uri: &str) -> BoxFuture<'_, Result<(), Error>>; +} + +// 'e = Executor +pub trait Migrate { + // ensure migrations table exists + // will create or migrate it if needed + fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>>; + + // Return the current version and if the database is "dirty". + // "dirty" means there is a partially applied migration that failed. + fn version(&mut self) -> BoxFuture<'_, Result, MigrateError>>; + + // Should acquire a database lock so that only one migration process + // can run at a time. [`Migrate`] will call this function before applying + // any migrations. + fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>>; + + // Should release the lock. [`Migrate`] will call this function after all + // migrations have been run. + fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>>; + + // validate the migration + // checks that it does exist on the database and that the checksum matches + fn validate<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result<(), MigrateError>>; + + // run SQL from migration in a DDL transaction + // insert new row to [_migrations] table on completion (success or failure) + // returns the time taking to run the migration SQL + fn apply<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result>; +} diff --git a/sqlx-core/src/migrate/migration.rs b/sqlx-core/src/migrate/migration.rs new file mode 100644 index 000000000..52396a116 --- /dev/null +++ b/sqlx-core/src/migrate/migration.rs @@ -0,0 +1,19 @@ +use std::borrow::Cow; + +#[derive(Debug, Clone)] +pub struct Migration { + pub(crate) version: i64, + pub(crate) description: Cow<'static, str>, + pub(crate) sql: Cow<'static, str>, + pub(crate) checksum: Cow<'static, [u8]>, +} + +impl Migration { + pub fn version(&self) -> i64 { + self.version + } + + pub fn description(&self) -> &str { + &*self.description + } +} diff --git a/sqlx-core/src/migrate/migrator.rs b/sqlx-core/src/migrate/migrator.rs new file mode 100644 index 000000000..75b9e10aa --- /dev/null +++ b/sqlx-core/src/migrate/migrator.rs @@ -0,0 +1,78 @@ +use crate::acquire::Acquire; +use crate::migrate::{Migrate, MigrateError, Migration, MigrationSource}; +use std::ops::Deref; +use std::slice; + +#[derive(Debug)] +pub struct Migrator { + migrations: Vec, +} + +impl Migrator { + /// Creates a new instance with the given source. + /// + /// # Examples + /// + /// ```rust,no_run + /// # fn main() { + /// # sqlx_rt::block_on(async move { + /// # use sqlx_core::migrate::Migrator; + /// use std::path::Path; + /// + /// // Read migrations from a local folder: ./migrations + /// let m = Migrator::new(Path::new("./migrations")).await?; + /// # Ok(()) + /// # }).unwrap(); + /// # } + /// ``` + pub async fn new<'s, S>(source: S) -> Result + where + S: MigrationSource<'s>, + { + Ok(Self { + migrations: source.resolve().await.map_err(MigrateError::Source)?, + }) + } + + /// Get an iterator over all known migrations. + pub fn iter(&self) -> slice::Iter<'_, Migration> { + self.migrations.iter() + } + + /// Run any pending migrations against the database; and, validate previously applied migrations + /// against the current migration source to detect accidental changes in previously-applied migrations. + pub async fn run<'a, A>(&self, migrator: A) -> Result<(), MigrateError> + where + A: Acquire<'a>, + ::Target: Migrate, + { + let mut conn = migrator.acquire().await?; + + // lock the database for exclusive access by the migrator + conn.lock().await?; + + // creates [_migrations] table only if needed + // eventually this will likely migrate previous versions of the table + conn.ensure_migrations_table().await?; + + let (version, dirty) = conn.version().await?.unwrap_or((0, false)); + + if dirty { + return Err(MigrateError::Dirty(version)); + } + + for migration in self.iter() { + if migration.version() > version { + conn.apply(migration).await?; + } else { + conn.validate(migration).await?; + } + } + + // unlock the migrator to allow other migrators to run + // but do nothing as we already migrated + conn.unlock().await?; + + Ok(()) + } +} diff --git a/sqlx-core/src/migrate/mod.rs b/sqlx-core/src/migrate/mod.rs new file mode 100644 index 000000000..04231733e --- /dev/null +++ b/sqlx-core/src/migrate/mod.rs @@ -0,0 +1,11 @@ +mod error; +mod migrate; +mod migration; +mod migrator; +mod source; + +pub use error::MigrateError; +pub use migrate::{Migrate, MigrateDatabase}; +pub use migration::Migration; +pub use migrator::Migrator; +pub use source::MigrationSource; diff --git a/sqlx-core/src/migrate/source.rs b/sqlx-core/src/migrate/source.rs new file mode 100644 index 000000000..608513502 --- /dev/null +++ b/sqlx-core/src/migrate/source.rs @@ -0,0 +1,69 @@ +use crate::error::BoxDynError; +use crate::migrate::Migration; +use futures_core::future::BoxFuture; +use futures_util::TryStreamExt; +use sha2::{Digest, Sha384}; +use sqlx_rt::fs; +use std::borrow::Cow; +use std::fmt::Debug; +use std::path::{Path, PathBuf}; + +pub trait MigrationSource<'s>: Debug { + fn resolve(self) -> BoxFuture<'s, Result, BoxDynError>>; +} + +impl<'s> MigrationSource<'s> for &'s Path { + fn resolve(self) -> BoxFuture<'s, Result, BoxDynError>> { + Box::pin(async move { + let mut s = fs::read_dir(self.canonicalize()?).await?; + let mut migrations = Vec::new(); + + while let Some(entry) = s.try_next().await? { + if !entry.metadata().await?.is_file() { + // not a file; ignore + continue; + } + + let file_name = entry.file_name(); + let file_name = file_name.to_string_lossy(); + + let parts = file_name.splitn(2, '_').collect::>(); + + if parts.len() != 2 || !parts[1].ends_with(".sql") { + // not of the format: _.sql; ignore + continue; + } + + let version: i64 = parts[0].parse()?; + + // remove the `.sql` and replace `_` with ` ` + let description = parts[1] + .trim_end_matches(".sql") + .replace('_', " ") + .to_owned(); + + let sql = fs::read_to_string(&entry.path()).await?; + + let checksum = Vec::from(Sha384::digest(sql.as_bytes()).as_slice()); + + migrations.push(Migration { + version, + description: Cow::Owned(description), + sql: Cow::Owned(sql), + checksum: Cow::Owned(checksum), + }) + } + + // ensure that we are sorted by `VERSION ASC` + migrations.sort_by_key(|m| m.version); + + Ok(migrations) + }) + } +} + +impl MigrationSource<'static> for PathBuf { + fn resolve(self) -> BoxFuture<'static, Result, BoxDynError>> { + Box::pin(async move { self.as_path().resolve().await }) + } +} diff --git a/sqlx-core/src/mysql/migrate.rs b/sqlx-core/src/mysql/migrate.rs new file mode 100644 index 000000000..1476ef041 --- /dev/null +++ b/sqlx-core/src/mysql/migrate.rs @@ -0,0 +1,218 @@ +use crate::connection::ConnectOptions; +use crate::error::Error; +use crate::executor::Executor; +use crate::migrate::MigrateError; +use crate::migrate::Migration; +use crate::migrate::{Migrate, MigrateDatabase}; +use crate::mysql::{MySql, MySqlConnectOptions, MySqlConnection}; +use crate::query::query; +use crate::query_as::query_as; +use crate::query_scalar::query_scalar; +use crc::crc32; +use futures_core::future::BoxFuture; +use std::str::FromStr; +use std::time::Duration; +use std::time::Instant; + +fn parse_for_maintenance(uri: &str) -> Result<(MySqlConnectOptions, String), Error> { + let mut options = MySqlConnectOptions::from_str(uri)?; + + let database = if let Some(database) = &options.database { + database.to_owned() + } else { + return Err(Error::Configuration( + "DATABASE_URL does not specify a database".into(), + )); + }; + + // switch us to database for create/drop commands + options.database = None; + + Ok((options, database)) +} + +impl MigrateDatabase for MySql { + fn create_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + let (options, database) = parse_for_maintenance(uri)?; + let mut conn = options.connect().await?; + + let _ = conn + .execute(&*format!("CREATE DATABASE `{}`", database)) + .await?; + + Ok(()) + }) + } + + fn database_exists(uri: &str) -> BoxFuture<'_, Result> { + Box::pin(async move { + let (options, database) = parse_for_maintenance(uri)?; + let mut conn = options.connect().await?; + + let exists: bool = query_scalar( + "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)", + ) + .bind(database) + .fetch_one(&mut conn) + .await?; + + Ok(exists) + }) + } + + fn drop_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + let (options, database) = parse_for_maintenance(uri)?; + let mut conn = options.connect().await?; + + let _ = conn + .execute(&*format!("DROP DATABASE IF EXISTS `{}`", database,)) + .await?; + + Ok(()) + }) + } +} + +impl Migrate for MySqlConnection { + fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { + // language=MySQL + self.execute( + r#" +CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + version BIGINT PRIMARY KEY, + description TEXT NOT NULL, + installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + success BOOLEAN NOT NULL, + checksum BLOB NOT NULL, + execution_time BIGINT NOT NULL +); + "#, + ) + .await?; + + Ok(()) + }) + } + + fn version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { + Box::pin(async move { + // language=SQL + let row = query_as( + "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1", + ) + .fetch_optional(self) + .await?; + + Ok(row) + }) + } + + fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { + let database_name = current_database(self).await?; + let lock_id = generate_lock_id(&database_name); + + // create an application lock over the database + // this function will not return until the lock is acquired + + // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS + // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE + + // language=MySQL + let _ = query("SELECT GET_LOCK(?, -1)") + .bind(lock_id) + .execute(self) + .await?; + + Ok(()) + }) + } + + fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { + let database_name = current_database(self).await?; + let lock_id = generate_lock_id(&database_name); + + // language=MySQL + let _ = query("SELECT RELEASE_LOCK(?)") + .bind(lock_id) + .execute(self) + .await?; + + Ok(()) + }) + } + + fn validate<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result<(), MigrateError>> { + Box::pin(async move { + // language=SQL + let checksum: Option> = + query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?") + .bind(migration.version) + .fetch_optional(self) + .await?; + + if let Some(checksum) = checksum { + return if checksum == &*migration.checksum { + Ok(()) + } else { + Err(MigrateError::VersionMismatch(migration.version)) + }; + } else { + Err(MigrateError::VersionMissing(migration.version)) + } + }) + } + + fn apply<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + Box::pin(async move { + let start = Instant::now(); + + let res = self.execute(&*migration.sql).await; + + let elapsed = start.elapsed(); + + // language=MySQL + let _ = query( + r#" + INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) + VALUES ( ?, ?, ?, ?, ? ) + "#, + ) + .bind(migration.version) + .bind(&*migration.description) + .bind(res.is_ok()) + .bind(&*migration.checksum) + .bind(elapsed.as_nanos() as i64) + .execute(self) + .await?; + + res?; + + Ok(elapsed) + }) + } +} + +async fn current_database(conn: &mut MySqlConnection) -> Result { + // language=MySQL + Ok(query_scalar("SELECT DATABASE()").fetch_one(conn).await?) +} + +// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308 +fn generate_lock_id(database_name: &str) -> String { + // 0x3d32ad9e chosen by fair dice roll + format!( + "{:x}", + 0x3d32ad9e * (crc32::checksum_ieee(database_name.as_bytes()) as i64) + ) +} diff --git a/sqlx-core/src/mysql/mod.rs b/sqlx-core/src/mysql/mod.rs index 583f1ea4b..bb5baec68 100644 --- a/sqlx-core/src/mysql/mod.rs +++ b/sqlx-core/src/mysql/mod.rs @@ -6,6 +6,7 @@ mod connection; mod database; mod error; mod io; +mod migrate; mod options; mod protocol; mod row; diff --git a/sqlx-core/src/pool/maybe.rs b/sqlx-core/src/pool/maybe.rs index 8ae57d709..43c7f3457 100644 --- a/sqlx-core/src/pool/maybe.rs +++ b/sqlx-core/src/pool/maybe.rs @@ -3,6 +3,7 @@ use crate::pool::PoolConnection; use std::ops::{Deref, DerefMut}; pub(crate) enum MaybePoolConnection<'c, DB: Database> { + #[allow(dead_code)] Connection(&'c mut DB::Connection), PoolConnection(PoolConnection), } diff --git a/sqlx-core/src/postgres/migrate.rs b/sqlx-core/src/postgres/migrate.rs new file mode 100644 index 000000000..ef80eb3b3 --- /dev/null +++ b/sqlx-core/src/postgres/migrate.rs @@ -0,0 +1,227 @@ +use crate::connection::{ConnectOptions, Connection}; +use crate::error::Error; +use crate::executor::Executor; +use crate::migrate::MigrateError; +use crate::migrate::Migration; +use crate::migrate::{Migrate, MigrateDatabase}; +use crate::postgres::{PgConnectOptions, PgConnection, Postgres}; +use crate::query::query; +use crate::query_as::query_as; +use crate::query_scalar::query_scalar; +use crc::crc32; +use futures_core::future::BoxFuture; +use std::str::FromStr; +use std::time::Duration; +use std::time::Instant; + +fn parse_for_maintenance(uri: &str) -> Result<(PgConnectOptions, String), Error> { + let mut options = PgConnectOptions::from_str(uri)?; + + // pull out the name of the database to create + let database = options + .database + .as_deref() + .unwrap_or(&options.username) + .to_owned(); + + // switch us to the maintenance database + // use `postgres` _unless_ the current user is postgres, in which case, use `template1` + // this matches the behavior of the `createdb` util + options.database = if options.username == "postgres" { + Some("template1".into()) + } else { + Some("postgres".into()) + }; + + Ok((options, database)) +} + +impl MigrateDatabase for Postgres { + fn create_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + let (options, database) = parse_for_maintenance(uri)?; + let mut conn = options.connect().await?; + + let _ = conn + .execute(&*format!( + "CREATE DATABASE \"{}\"", + database.replace('"', "\"\"") + )) + .await?; + + Ok(()) + }) + } + + fn database_exists(uri: &str) -> BoxFuture<'_, Result> { + Box::pin(async move { + let (options, database) = parse_for_maintenance(uri)?; + let mut conn = options.connect().await?; + + let exists: bool = + query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)") + .bind(database) + .fetch_one(&mut conn) + .await?; + + Ok(exists) + }) + } + + fn drop_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + let (options, database) = parse_for_maintenance(uri)?; + let mut conn = options.connect().await?; + + let _ = conn + .execute(&*format!( + "DROP DATABASE IF EXISTS \"{}\"", + database.replace('"', "\"\"") + )) + .await?; + + Ok(()) + }) + } +} + +impl Migrate for PgConnection { + fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { + // language=SQL + self.execute( + r#" +CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + version BIGINT PRIMARY KEY, + description TEXT NOT NULL, + installed_on TIMESTAMPTZ NOT NULL DEFAULT now(), + success BOOLEAN NOT NULL, + checksum BYTEA NOT NULL, + execution_time BIGINT NOT NULL +); + "#, + ) + .await?; + + Ok(()) + }) + } + + fn version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { + Box::pin(async move { + // language=SQL + let row = query_as( + "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1", + ) + .fetch_optional(self) + .await?; + + Ok(row) + }) + } + + fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { + let database_name = current_database(self).await?; + let lock_id = generate_lock_id(&database_name); + + // create an application lock over the database + // this function will not return until the lock is acquired + + // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS + // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE + + // language=SQL + let _ = query("SELECT pg_advisory_lock($1)") + .bind(lock_id) + .execute(self) + .await?; + + Ok(()) + }) + } + + fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { + let database_name = current_database(self).await?; + let lock_id = generate_lock_id(&database_name); + + // language=SQL + let _ = query("SELECT pg_advisory_unlock($1)") + .bind(lock_id) + .execute(self) + .await?; + + Ok(()) + }) + } + + fn validate<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result<(), MigrateError>> { + Box::pin(async move { + // language=SQL + let checksum: Option> = + query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = $1") + .bind(migration.version) + .fetch_optional(self) + .await?; + + if let Some(checksum) = checksum { + return if checksum == &*migration.checksum { + Ok(()) + } else { + Err(MigrateError::VersionMismatch(migration.version)) + }; + } else { + Err(MigrateError::VersionMissing(migration.version)) + } + }) + } + + fn apply<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + Box::pin(async move { + let mut tx = self.begin().await?; + let start = Instant::now(); + + let _ = tx.execute(&*migration.sql).await?; + + tx.commit().await?; + + let elapsed = start.elapsed(); + + // language=SQL + let _ = query( + r#" + INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) + VALUES ( $1, $2, TRUE, $3, $4 ) + "#, + ) + .bind(migration.version) + .bind(&*migration.description) + .bind(&*migration.checksum) + .bind(elapsed.as_nanos() as i64) + .execute(self) + .await?; + + Ok(elapsed) + }) + } +} + +async fn current_database(conn: &mut PgConnection) -> Result { + // language=SQL + Ok(query_scalar("SELECT current_database()") + .fetch_one(conn) + .await?) +} + +// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308 +fn generate_lock_id(database_name: &str) -> i64 { + // 0x3d32ad9e chosen by fair dice roll + 0x3d32ad9e * (crc32::checksum_ieee(database_name.as_bytes()) as i64) +} diff --git a/sqlx-core/src/postgres/mod.rs b/sqlx-core/src/postgres/mod.rs index 3472bb53d..0c471feb5 100644 --- a/sqlx-core/src/postgres/mod.rs +++ b/sqlx-core/src/postgres/mod.rs @@ -8,6 +8,7 @@ mod error; mod io; mod listener; mod message; +mod migrate; mod options; mod row; mod transaction; diff --git a/sqlx-core/src/sqlite/migrate.rs b/sqlx-core/src/sqlite/migrate.rs new file mode 100644 index 000000000..3572a99ac --- /dev/null +++ b/sqlx-core/src/sqlite/migrate.rs @@ -0,0 +1,153 @@ +use crate::connection::{ConnectOptions, Connection}; +use crate::error::Error; +use crate::executor::Executor; +use crate::migrate::MigrateError; +use crate::migrate::Migration; +use crate::migrate::{Migrate, MigrateDatabase}; +use crate::query::query; +use crate::query_as::query_as; +use crate::query_scalar::query_scalar; +use crate::sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection}; +use futures_core::future::BoxFuture; +use sqlx_rt::fs; +use std::str::FromStr; +use std::time::Duration; +use std::time::Instant; + +impl MigrateDatabase for Sqlite { + fn create_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + // Opening a connection to sqlite creates the database + let _ = SqliteConnectOptions::from_str(uri)? + .create_if_missing(true) + .connect() + .await?; + + Ok(()) + }) + } + + fn database_exists(uri: &str) -> BoxFuture<'_, Result> { + Box::pin(async move { + let options = SqliteConnectOptions::from_str(uri)?; + + if options.in_memory { + Ok(true) + } else { + Ok(options.filename.exists()) + } + }) + } + + fn drop_database(uri: &str) -> BoxFuture<'_, Result<(), Error>> { + Box::pin(async move { + let options = SqliteConnectOptions::from_str(uri)?; + + if !options.in_memory { + fs::remove_file(&*options.filename).await?; + } + + Ok(()) + }) + } +} + +impl Migrate for SqliteConnection { + fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { + // language=SQLite + self.execute( + r#" +CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + version BIGINT PRIMARY KEY, + description TEXT NOT NULL, + installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + success BOOLEAN NOT NULL, + checksum BLOB NOT NULL, + execution_time BIGINT NOT NULL +); + "#, + ) + .await?; + + Ok(()) + }) + } + + fn version(&mut self) -> BoxFuture<'_, Result, MigrateError>> { + Box::pin(async move { + // language=SQLite + let row = query_as( + "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1", + ) + .fetch_optional(self) + .await?; + + Ok(row) + }) + } + + fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { Ok(()) }) + } + + fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { + Box::pin(async move { Ok(()) }) + } + + fn validate<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result<(), MigrateError>> { + Box::pin(async move { + // language=SQL + let checksum: Option> = + query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?1") + .bind(migration.version) + .fetch_optional(self) + .await?; + + if let Some(checksum) = checksum { + return if checksum == &*migration.checksum { + Ok(()) + } else { + Err(MigrateError::VersionMismatch(migration.version)) + }; + } else { + Err(MigrateError::VersionMissing(migration.version)) + } + }) + } + + fn apply<'e: 'm, 'm>( + &'e mut self, + migration: &'m Migration, + ) -> BoxFuture<'m, Result> { + Box::pin(async move { + let mut tx = self.begin().await?; + let start = Instant::now(); + + let _ = tx.execute(&*migration.sql).await?; + + tx.commit().await?; + + let elapsed = start.elapsed(); + + // language=SQL + let _ = query( + r#" + INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) + VALUES ( ?1, ?2, TRUE, ?3, ?4 ) + "#, + ) + .bind(migration.version) + .bind(&*migration.description) + .bind(&*migration.checksum) + .bind(elapsed.as_nanos() as i64) + .execute(self) + .await?; + + Ok(elapsed) + }) + } +} diff --git a/sqlx-core/src/sqlite/mod.rs b/sqlx-core/src/sqlite/mod.rs index 6ebcd0f4e..35bbcb6bb 100644 --- a/sqlx-core/src/sqlite/mod.rs +++ b/sqlx-core/src/sqlite/mod.rs @@ -18,6 +18,9 @@ mod type_info; pub mod types; mod value; +#[cfg(feature = "migrate")] +mod migrate; + pub use arguments::{SqliteArgumentValue, SqliteArguments}; pub use column::SqliteColumn; pub use connection::SqliteConnection;