rename cargo-sqlx -> sqlx-cli

edit README
This commit is contained in:
Austin Bonander
2020-05-15 20:07:57 -07:00
committed by Ryan Leckey
parent a44e29c84c
commit 7dae3dbf57
17 changed files with 106 additions and 51 deletions

3
sqlx-cli/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/target
Cargo.lock
.env

49
sqlx-cli/Cargo.toml Normal file
View File

@@ -0,0 +1,49 @@
[package]
name = "sqlx-cli"
version = "0.0.1"
description = "Command-line utility for SQLx, the Rust SQL toolkit."
authors = [
"Jesper Axelsson <jesperaxe@gmail.com>",
"Austin Bonander <austin.bonander@gmail.com>" # austin@launchbadge.com
]
edition = "2018"
readme = "README.md"
homepage = "https://github.com/launchbadge/sqlx"
repository = "https://github.com/launchbadge/sqlx"
keywords = ["database", "postgres", "database-management", "migration"]
categories = ["database", "command-line-utilities"]
default-run = "sqlx"
[[bin]]
name = "sqlx"
path = "src/bin/sqlx.rs"
# enables invocation as `cargo sqlx`; required for `prepare` subcommand
[[bin]]
name = "cargo-sqlx"
path = "src/bin/cargo-sqlx.rs"
[dependencies]
dotenv = "0.15"
tokio = { version = "0.2", features = ["macros"] }
sqlx = { version = "0.3", path = "..", default-features = false, features = [ "runtime-tokio", "offline" ] }
futures = "0.3"
structopt = "0.3"
chrono = "0.4"
anyhow = "1.0"
url = { version = "2.1.1", default-features = false }
async-trait = "0.1.30"
console = "0.10.0"
dialoguer = "0.5.0"
serde_json = { version = "1.0.53", features = ["preserve_order"] }
serde = "1.0.110"
glob = "0.3.0"
cargo_metadata = "0.10.0"
[features]
default = [ "postgres", "sqlite", "mysql" ]
# database
mysql = [ "sqlx/mysql" ]
postgres = [ "sqlx/postgres" ]
sqlite = [ "sqlx/sqlite" ]

63
sqlx-cli/README.md Normal file
View File

@@ -0,0 +1,63 @@
# SQLx CLI
SQLx's associated command-line utility for managing databases, migrations, and enabling "offline"
mode with `sqlx::query!()` and friends.
### Installation
```bash
$ cargo install sqlx-cli
```
### Commands
All commands require `DATABASE_URL` to be set, either in the environment or in a `.env` file
in the current working directory.
`database` and `migrate` subcommands support only Postgres; MySQL and SQLite are TODO.
```dotenv
# Postgres
DATABASE_URL=postgres://postgres@localhost/my_database
```
#### Create/drop the database at `DATABASE_URL`
```bash
sqlx database create
sqlx database drop
```
#### Create and run migrations
```bash
$ sqlx migrate add <name>
```
Creates a new file in `migrations/<timestamp>-<name>.sql`. Add your database schema changes to
this new file.
---
```bash
$ sqlx migration run
```
Compares the migration history of the running database against the `migrations/` folder and runs
any scripts that are still pending.
##### Note: Down-Migrations
Down-migrations are currently a non-planned feature as their utility seems dubious but we welcome
any contributions (discussions/code) regarding this matter.
#### Enable building in "offline" mode with `query!()`
Note: must be run as `cargo sqlx`.
```bash
cargo sqlx prepare
```
Saves query data to `sqlx-data.json` in the current directory; check this file into version control
and an active database connection will no longer be needed to build your project.
----
```bash
cargo sqlx prepare --check
```
Exits with a nonzero exit status if the data in `sqlx-data.json` is out of date with the current
database schema and queries in the project. Intended for use in Continuous Integration.

View File

@@ -0,0 +1,18 @@
use sqlx_cli::Command;
use structopt::{clap, StructOpt};
use std::env;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// when invoked as `cargo sqlx [...]` the args we see are `[...]/sqlx-cli sqlx prepare`
// so we want to notch out that superfluous "sqlx"
let args = env::args_os().skip(2);
let matches = Command::clap()
.bin_name("cargo sqlx")
.setting(clap::AppSettings::NoBinaryName)
.get_matches_from(args);
sqlx_cli::run(Command::from_clap(&matches)).await
}

8
sqlx-cli/src/bin/sqlx.rs Normal file
View File

@@ -0,0 +1,8 @@
use sqlx_cli::Command;
use structopt::StructOpt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// no special handling here
sqlx_cli::run(Command::from_args()).await
}

57
sqlx-cli/src/db.rs Normal file
View File

@@ -0,0 +1,57 @@
use crate::migrator::DatabaseMigrator;
use dialoguer::Confirmation;
use anyhow::bail;
pub async fn run_create() -> anyhow::Result<()> {
let migrator = crate::migrator::get()?;
if !migrator.can_create_database() {
bail!(
"Database creation is not implemented for {}",
migrator.database_type()
);
}
let db_name = migrator.get_database_name()?;
let db_exists = migrator.check_if_database_exists(&db_name).await?;
if !db_exists {
println!("Creating database: {}", db_name);
Ok(migrator.create_database(&db_name).await?)
} else {
println!("Database already exists, aborting");
Ok(())
}
}
pub async fn run_drop() -> anyhow::Result<()> {
let migrator = crate::migrator::get()?;
if !migrator.can_drop_database() {
bail!(
"Database drop is not implemented for {}",
migrator.database_type()
);
}
let db_name = migrator.get_database_name()?;
let db_exists = migrator.check_if_database_exists(&db_name).await?;
if db_exists {
if !Confirmation::new()
.with_text("\nAre you sure you want to drop the database: {}?")
.default(false)
.interact()?
{
println!("Aborting");
return Ok(());
}
println!("Dropping database: {}", db_name);
Ok(migrator.drop_database(&db_name).await?)
} else {
println!("Database does not exists, aborting");
Ok(())
}
}

85
sqlx-cli/src/lib.rs Normal file
View File

@@ -0,0 +1,85 @@
use dotenv::dotenv;
use structopt::StructOpt;
mod migrator;
mod db;
mod migration;
mod prepare;
/// Sqlx commandline tool
#[derive(StructOpt, Debug)]
#[structopt(name = "Sqlx")]
pub enum Command {
#[structopt(alias = "mig")]
Migrate(MigrationCommand),
#[structopt(alias = "db")]
Database(DatabaseCommand),
/// Enables offline mode for a project utilizing `query!()` and related macros.
/// May only be run as `cargo sqlx prepare`.
///
/// Saves data for all invocations of `query!()` and friends in the project so that it may be
/// built in offline mode, i.e. so compilation does not require connecting to a running database.
/// Outputs to `sqlx-data.json` in the current directory.
///
/// Offline mode can be activated simply by removing `DATABASE_URL` from the environment or
/// building without a `.env` file.
#[structopt(alias = "prep")]
Prepare {
/// If this flag is passed, instead of overwriting `sqlx-data.json` in the current directory,
/// that file is loaded and compared against the current output of the prepare step; if
/// there is a mismatch, an error is reported and the process exits with a nonzero exit code.
///
/// Intended for use in CI.
#[structopt(long)]
check: bool,
},
}
/// Adds and runs migrations. Alias: mig
#[derive(StructOpt, Debug)]
#[structopt(name = "Sqlx migrator")]
pub enum MigrationCommand {
/// Add new migration with name <timestamp>_<migration_name>.sql
Add { name: String },
/// Run all migrations
Run,
/// List all migrations
List,
}
/// Create or drops database depending on your connection string. Alias: db
#[derive(StructOpt, Debug)]
#[structopt(name = "Sqlx migrator")]
pub enum DatabaseCommand {
/// Create database in url
Create,
/// Drop database in url
Drop,
}
pub async fn run(cmd: Command) -> anyhow::Result<()> {
dotenv().ok();
match cmd {
Command::Migrate(migrate) => match migrate {
MigrationCommand::Add { name } => migration::add_file(&name)?,
MigrationCommand::Run => migration::run().await?,
MigrationCommand::List => migration::list().await?,
},
Command::Database(database) => match database {
DatabaseCommand::Create => db::run_create().await?,
DatabaseCommand::Drop => db::run_drop().await?,
},
Command::Prepare { check: false } => prepare::run()?,
Command::Prepare { check: true } => prepare::check()?,
};
Ok(())
}

187
sqlx-cli/src/migration.rs Normal file
View File

@@ -0,0 +1,187 @@
use anyhow::{bail, Context};
use console::style;
use std::fs::{self, File};
use std::io::{Read, Write};
const MIGRATION_FOLDER: &'static str = "migrations";
pub struct Migration {
pub name: String,
pub sql: String,
}
pub fn add_file(name: &str) -> anyhow::Result<()> {
use chrono::prelude::*;
use std::path::PathBuf;
fs::create_dir_all(MIGRATION_FOLDER).context("Unable to create migrations directory")?;
let dt = Utc::now();
let mut file_name = dt.format("%Y-%m-%d_%H-%M-%S").to_string();
file_name.push_str("_");
file_name.push_str(name);
file_name.push_str(".sql");
let mut path = PathBuf::new();
path.push(MIGRATION_FOLDER);
path.push(&file_name);
let mut file = File::create(path).context("Failed to create file")?;
file.write_all(b"-- Add migration script here")
.context("Could not write to file")?;
println!("Created migration: '{}'", file_name);
Ok(())
}
pub async fn run() -> anyhow::Result<()> {
let migrator = crate::migrator::get()?;
if !migrator.can_migrate_database() {
bail!(
"Database migrations not supported for {}",
migrator.database_type()
);
}
migrator.create_migration_table().await?;
let migrations = load_migrations()?;
for mig in migrations.iter() {
let mut tx = migrator.begin_migration().await?;
if tx.check_if_applied(&mig.name).await? {
println!("Already applied migration: '{}'", mig.name);
continue;
}
println!("Applying migration: '{}'", mig.name);
tx.execute_migration(&mig.sql)
.await
.with_context(|| format!("Failed to run migration {:?}", &mig.name))?;
tx.save_applied_migration(&mig.name)
.await
.context("Failed to insert migration")?;
tx.commit().await.context("Failed")?;
}
Ok(())
}
pub async fn list() -> anyhow::Result<()> {
let migrator = crate::migrator::get()?;
if !migrator.can_migrate_database() {
bail!(
"Database migrations not supported for {}",
migrator.database_type()
);
}
let file_migrations = load_migrations()?;
if migrator
.check_if_database_exists(&migrator.get_database_name()?)
.await?
{
let applied_migrations = migrator.get_migrations().await.unwrap_or_else(|_| {
println!("Could not retrive data from migration table");
Vec::new()
});
let mut width = 0;
for mig in file_migrations.iter() {
width = std::cmp::max(width, mig.name.len());
}
for mig in file_migrations.iter() {
let status = if applied_migrations
.iter()
.find(|&m| mig.name == *m)
.is_some()
{
style("Applied").green()
} else {
style("Not Applied").yellow()
};
println!("{:width$}\t{}", mig.name, status, width = width);
}
let orphans = check_for_orphans(file_migrations, applied_migrations);
if let Some(orphans) = orphans {
println!("\nFound migrations applied in the database that does not have a corresponding migration file:");
for name in orphans {
println!("{:width$}\t{}", name, style("Orphan").red(), width = width);
}
}
} else {
println!("No database found, listing migrations");
for mig in file_migrations {
println!("{}", mig.name);
}
}
Ok(())
}
fn load_migrations() -> anyhow::Result<Vec<Migration>> {
let entries = fs::read_dir(&MIGRATION_FOLDER).context("Could not find 'migrations' dir")?;
let mut migrations = Vec::new();
for e in entries {
if let Ok(e) = e {
if let Ok(meta) = e.metadata() {
if !meta.is_file() {
continue;
}
if let Some(ext) = e.path().extension() {
if ext != "sql" {
println!("Wrong ext: {:?}", ext);
continue;
}
} else {
continue;
}
let mut file = File::open(e.path())
.with_context(|| format!("Failed to open: '{:?}'", e.file_name()))?;
let mut contents = String::new();
file.read_to_string(&mut contents)
.with_context(|| format!("Failed to read: '{:?}'", e.file_name()))?;
migrations.push(Migration {
name: e.file_name().to_str().unwrap().to_string(),
sql: contents,
});
}
}
}
migrations.sort_by(|a, b| a.name.partial_cmp(&b.name).unwrap());
Ok(migrations)
}
fn check_for_orphans(
file_migrations: Vec<Migration>,
applied_migrations: Vec<String>,
) -> Option<Vec<String>> {
let orphans: Vec<String> = applied_migrations
.iter()
.filter(|m| !file_migrations.iter().any(|fm| fm.name == **m))
.cloned()
.collect();
if orphans.len() > 0 {
Some(orphans)
} else {
None
}
}

View File

@@ -0,0 +1,75 @@
use anyhow::{bail, Context, Result};
use async_trait::async_trait;
use std::env;
use url::Url;
#[cfg(feature = "mysql")]
mod mysql;
#[cfg(feature = "postgres")]
mod postgres;
#[cfg(feature = "sqlite")]
mod sqlite;
#[async_trait]
pub trait MigrationTransaction {
async fn commit(self: Box<Self>) -> Result<()>;
async fn rollback(self: Box<Self>) -> Result<()>;
async fn check_if_applied(&mut self, migration: &str) -> Result<bool>;
async fn execute_migration(&mut self, migration_sql: &str) -> Result<()>;
async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()>;
}
#[async_trait]
pub trait DatabaseMigrator {
// Misc info
fn database_type(&self) -> String;
fn get_database_name(&self) -> Result<String>;
// Features
fn can_migrate_database(&self) -> bool;
fn can_create_database(&self) -> bool;
fn can_drop_database(&self) -> bool;
// Database creation
async fn check_if_database_exists(&self, db_name: &str) -> Result<bool>;
async fn create_database(&self, db_name: &str) -> Result<()>;
async fn drop_database(&self, db_name: &str) -> Result<()>;
// Migration
async fn create_migration_table(&self) -> Result<()>;
async fn get_migrations(&self) -> Result<Vec<String>>;
async fn begin_migration(&self) -> Result<Box<dyn MigrationTransaction>>;
}
pub fn get() -> Result<Box<dyn DatabaseMigrator>> {
let db_url_raw = env::var("DATABASE_URL").context("Failed to find 'DATABASE_URL'")?;
let db_url = Url::parse(&db_url_raw)?;
// This code is taken from: https://github.com/launchbadge/sqlx/blob/master/sqlx-macros/src/lib.rs#L63
match db_url.scheme() {
#[cfg(feature = "sqlite")]
"sqlite" => Ok(Box::new(self::sqlite::Sqlite::new(db_url_raw ))),
#[cfg(not(feature = "sqlite"))]
"sqlite" => bail!("Not implemented. DATABASE_URL {} has the scheme of a SQLite database but the `sqlite` feature of sqlx was not enabled",
db_url),
#[cfg(feature = "postgres")]
"postgresql" | "postgres" => Ok(Box::new(self::postgres::Postgres::new(db_url_raw))),
#[cfg(not(feature = "postgres"))]
"postgresql" | "postgres" => bail!("DATABASE_URL {} has the scheme of a Postgres database but the `postgres` feature of sqlx was not enabled",
db_url),
#[cfg(feature = "mysql")]
"mysql" | "mariadb" => Ok(Box::new(self::mysql::MySql::new(db_url_raw))),
#[cfg(not(feature = "mysql"))]
"mysql" | "mariadb" => bail!(
"DATABASE_URL {} has the scheme of a MySQL/MariaDB database but the `mysql` feature of sqlx was not enabled",
db_url
),
scheme => bail!("unexpected scheme {:?} in DATABASE_URL {}", scheme, db_url),
}
}

View File

@@ -0,0 +1,203 @@
use sqlx::mysql::MySqlRow;
use sqlx::pool::PoolConnection;
use sqlx::Connect;
use sqlx::Executor;
use sqlx::MySqlConnection;
use sqlx::MySqlPool;
use sqlx::Row;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use super::{DatabaseMigrator, MigrationTransaction};
pub struct MySql {
pub db_url: String,
}
impl MySql {
pub fn new(db_url: String) -> Self {
MySql {
db_url: db_url.clone(),
}
}
}
struct DbUrl<'a> {
base_url: &'a str,
db_name: &'a str,
}
fn get_base_url<'a>(db_url: &'a str) -> Result<DbUrl> {
let split: Vec<&str> = db_url.rsplitn(2, '/').collect();
if split.len() != 2 {
return Err(anyhow!("Failed to find database name in connection string"));
}
let db_name = split[0];
let base_url = split[1];
Ok(DbUrl { base_url, db_name })
}
#[async_trait]
impl DatabaseMigrator for MySql {
fn database_type(&self) -> String {
"MySql".to_string()
}
fn can_migrate_database(&self) -> bool {
true
}
fn can_create_database(&self) -> bool {
true
}
fn can_drop_database(&self) -> bool {
true
}
fn get_database_name(&self) -> Result<String> {
let db_url = get_base_url(&self.db_url)?;
Ok(db_url.db_name.to_string())
}
async fn check_if_database_exists(&self, db_name: &str) -> Result<bool> {
let db_url = get_base_url(&self.db_url)?;
let base_url = db_url.base_url;
let mut conn = MySqlConnection::connect(base_url).await?;
let result: bool = sqlx::query(
"select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)",
)
.bind(db_name)
.try_map(|row: MySqlRow| row.try_get(0))
.fetch_one(&mut conn)
.await
.context("Failed to check if database exists")?;
Ok(result)
}
async fn create_database(&self, db_name: &str) -> Result<()> {
let db_url = get_base_url(&self.db_url)?;
let base_url = db_url.base_url;
let mut conn = MySqlConnection::connect(base_url).await?;
sqlx::query(&format!("CREATE DATABASE `{}`", db_name))
.execute(&mut conn)
.await
.with_context(|| format!("Failed to create database: {}", db_name))?;
Ok(())
}
async fn drop_database(&self, db_name: &str) -> Result<()> {
let db_url = get_base_url(&self.db_url)?;
let base_url = db_url.base_url;
let mut conn = MySqlConnection::connect(base_url).await?;
sqlx::query(&format!("DROP DATABASE `{}`", db_name))
.execute(&mut conn)
.await
.with_context(|| format!("Failed to drop database: {}", db_name))?;
Ok(())
}
async fn create_migration_table(&self) -> Result<()> {
let mut conn = MySqlConnection::connect(&self.db_url).await?;
println!("Create migration table");
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS __migrations (
migration VARCHAR (255) PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT current_timestamp
);
"#,
)
.execute(&mut conn)
.await
.context("Failed to create migration table")?;
Ok(())
}
async fn get_migrations(&self) -> Result<Vec<String>> {
let mut conn = MySqlConnection::connect(&self.db_url).await?;
let result = sqlx::query(
r#"
select migration from __migrations order by created
"#,
)
.try_map(|row: MySqlRow| row.try_get(0))
.fetch_all(&mut conn)
.await
.context("Failed to create migration table")?;
Ok(result)
}
async fn begin_migration(&self) -> Result<Box<dyn MigrationTransaction>> {
let pool = MySqlPool::new(&self.db_url)
.await
.context("Failed to connect to pool")?;
let tx = pool.begin().await?;
Ok(Box::new(MySqlMigration { transaction: tx }))
}
}
pub struct MySqlMigration {
transaction: sqlx::Transaction<PoolConnection<MySqlConnection>>,
}
#[async_trait]
impl MigrationTransaction for MySqlMigration {
async fn commit(self: Box<Self>) -> Result<()> {
self.transaction.commit().await?;
Ok(())
}
async fn rollback(self: Box<Self>) -> Result<()> {
self.transaction.rollback().await?;
Ok(())
}
async fn check_if_applied(&mut self, migration_name: &str) -> Result<bool> {
let result =
sqlx::query("select exists(select migration from __migrations where migration = ?)")
.bind(migration_name.to_string())
.try_map(|row: MySqlRow| row.try_get(0))
.fetch_one(&mut self.transaction)
.await
.context("Failed to check migration table")?;
Ok(result)
}
async fn execute_migration(&mut self, migration_sql: &str) -> Result<()> {
self.transaction.execute(migration_sql).await?;
Ok(())
}
async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()> {
sqlx::query("insert into __migrations (migration) values (?)")
.bind(migration_name.to_string())
.execute(&mut self.transaction)
.await
.context("Failed to insert migration")?;
Ok(())
}
}

View File

@@ -0,0 +1,209 @@
use sqlx::pool::PoolConnection;
use sqlx::postgres::PgRow;
use sqlx::Connect;
use sqlx::Executor;
use sqlx::PgConnection;
use sqlx::PgPool;
use sqlx::Row;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use crate::migrator::{DatabaseMigrator, MigrationTransaction};
pub struct Postgres {
pub db_url: String,
}
impl Postgres {
pub fn new(db_url: String) -> Self {
Postgres {
db_url: db_url.clone(),
}
}
}
struct DbUrl<'a> {
base_url: &'a str,
db_name: &'a str,
}
fn get_base_url<'a>(db_url: &'a str) -> Result<DbUrl> {
let split: Vec<&str> = db_url.rsplitn(2, '/').collect();
if split.len() != 2 {
return Err(anyhow!("Failed to find database name in connection string"));
}
let db_name = split[0];
let base_url = split[1];
Ok(DbUrl { base_url, db_name })
}
#[async_trait]
impl DatabaseMigrator for Postgres {
fn database_type(&self) -> String {
"Postgres".to_string()
}
fn can_migrate_database(&self) -> bool {
true
}
fn can_create_database(&self) -> bool {
true
}
fn can_drop_database(&self) -> bool {
true
}
fn get_database_name(&self) -> Result<String> {
let db_url = get_base_url(&self.db_url)?;
Ok(db_url.db_name.to_string())
}
async fn check_if_database_exists(&self, db_name: &str) -> Result<bool> {
let db_url = get_base_url(&self.db_url)?;
let base_url = db_url.base_url;
let mut conn = PgConnection::connect(base_url).await?;
let result: bool =
sqlx::query("select exists(SELECT 1 from pg_database WHERE datname = $1) as exists")
.bind(db_name)
.try_map(|row: PgRow| row.try_get("exists"))
.fetch_one(&mut conn)
.await
.context("Failed to check if database exists")?;
Ok(result)
}
async fn create_database(&self, db_name: &str) -> Result<()> {
let db_url = get_base_url(&self.db_url)?;
let base_url = db_url.base_url;
let mut conn = PgConnection::connect(base_url).await?;
// quote database name (quotes in the name are escaped with additional quotes)
sqlx::query(&format!(
"CREATE DATABASE \"{}\"",
db_name.replace('"', "\"\"")
))
.execute(&mut conn)
.await
.with_context(|| format!("Failed to create database: {}", db_name))?;
Ok(())
}
async fn drop_database(&self, db_name: &str) -> Result<()> {
let db_url = get_base_url(&self.db_url)?;
let base_url = db_url.base_url;
let mut conn = PgConnection::connect(base_url).await?;
sqlx::query(&format!(
"DROP DATABASE \"{}\"",
db_name.replace('"', "\"\"")
))
.execute(&mut conn)
.await
.with_context(|| format!("Failed to drop database: {}", db_name))?;
Ok(())
}
async fn create_migration_table(&self) -> Result<()> {
let mut conn = PgConnection::connect(&self.db_url).await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS __migrations (
migration VARCHAR (255) PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT current_timestamp
);
"#,
)
.execute(&mut conn)
.await
.context("Failed to create migration table")?;
Ok(())
}
async fn get_migrations(&self) -> Result<Vec<String>> {
let mut conn = PgConnection::connect(&self.db_url).await?;
let result = sqlx::query(
r#"
select migration from __migrations order by created
"#,
)
.try_map(|row: PgRow| row.try_get(0))
.fetch_all(&mut conn)
.await
.context("Failed to create migration table")?;
Ok(result)
}
async fn begin_migration(&self) -> Result<Box<dyn MigrationTransaction>> {
let pool = PgPool::new(&self.db_url)
.await
.context("Failed to connect to pool")?;
let tx = pool.begin().await?;
Ok(Box::new(PostgresMigration { transaction: tx }))
}
}
pub struct PostgresMigration {
transaction: sqlx::Transaction<PoolConnection<PgConnection>>,
}
#[async_trait]
impl MigrationTransaction for PostgresMigration {
async fn commit(self: Box<Self>) -> Result<()> {
self.transaction.commit().await?;
Ok(())
}
async fn rollback(self: Box<Self>) -> Result<()> {
self.transaction.rollback().await?;
Ok(())
}
async fn check_if_applied(&mut self, migration_name: &str) -> Result<bool> {
let result = sqlx::query(
"select exists(select migration from __migrations where migration = $1) as exists",
)
.bind(migration_name.to_string())
.try_map(|row: PgRow| row.try_get("exists"))
.fetch_one(&mut self.transaction)
.await
.context("Failed to check migration table")?;
Ok(result)
}
async fn execute_migration(&mut self, migration_sql: &str) -> Result<()> {
self.transaction.execute(migration_sql).await?;
Ok(())
}
async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()> {
sqlx::query("insert into __migrations (migration) values ($1)")
.bind(migration_name.to_string())
.execute(&mut self.transaction)
.await
.context("Failed to insert migration")?;
Ok(())
}
}

View File

@@ -0,0 +1,178 @@
// use sqlx::pool::PoolConnection;
use sqlx::sqlite::SqliteRow;
use sqlx::Connect;
use sqlx::Executor;
use sqlx::Row;
use sqlx::SqliteConnection;
// use sqlx::SqlitePool;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use crate::migrator::{DatabaseMigrator, MigrationTransaction};
pub struct Sqlite {
db_url: String,
path: String,
}
impl Sqlite {
pub fn new(db_url: String) -> Self {
let path = crop_letters(&db_url, "sqlite://".len());
Sqlite {
db_url: db_url.clone(),
path: path.to_string(),
}
}
}
fn crop_letters(s: &str, pos: usize) -> &str {
match s.char_indices().skip(pos).next() {
Some((pos, _)) => &s[pos..],
None => "",
}
}
#[async_trait]
impl DatabaseMigrator for Sqlite {
fn database_type(&self) -> String {
"Sqlite".to_string()
}
fn can_migrate_database(&self) -> bool {
true
}
fn can_create_database(&self) -> bool {
true
}
fn can_drop_database(&self) -> bool {
true
}
fn get_database_name(&self) -> Result<String> {
let split: Vec<&str> = self.db_url.rsplitn(2, '/').collect();
if split.len() != 2 {
return Err(anyhow!("Failed to find database name in connection string"));
}
let db_name = split[0];
Ok(db_name.to_string())
}
async fn check_if_database_exists(&self, _db_name: &str) -> Result<bool> {
use std::path::Path;
Ok(Path::new(&self.path).exists())
}
async fn create_database(&self, _db_name: &str) -> Result<()> {
println!("DB {}", self.path);
// Opening a connection to sqlite creates the database.
let _ = SqliteConnection::connect(&self.db_url).await?;
Ok(())
}
async fn drop_database(&self, _db_name: &str) -> Result<()> {
std::fs::remove_file(&self.path)?;
Ok(())
}
async fn create_migration_table(&self) -> Result<()> {
let mut conn = SqliteConnection::connect(&self.db_url).await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS __migrations (
migration TEXT PRIMARY KEY,
created TIMESTAMP NOT NULL DEFAULT current_timestamp
);
"#,
)
.execute(&mut conn)
.await
.context("Failed to create migration table")?;
Ok(())
}
async fn get_migrations(&self) -> Result<Vec<String>> {
let mut conn = SqliteConnection::connect(&self.db_url).await?;
let result = sqlx::query(
r#"
select migration from __migrations order by created
"#,
)
.try_map(|row: SqliteRow| row.try_get(0))
.fetch_all(&mut conn)
.await
.context("Failed to create migration table")?;
Ok(result)
}
async fn begin_migration(&self) -> Result<Box<dyn MigrationTransaction>> {
// let pool = SqlitePool::new(&self.db_url)
// .await
// .context("Failed to connect to pool")?;
// let tx = pool.begin().await?;
// Ok(Box::new(MigrationTransaction { transaction: tx }))
Ok(Box::new(SqliteMigration {
db_url: self.db_url.clone(),
}))
}
}
pub struct SqliteMigration {
db_url: String,
// pub transaction: sqlx::Transaction<PoolConnection<SqliteConnection>>,
}
#[async_trait]
impl MigrationTransaction for SqliteMigration {
async fn commit(self: Box<Self>) -> Result<()> {
// self.transaction.commit().await?;
Ok(())
}
async fn rollback(self: Box<Self>) -> Result<()> {
// self.transaction.rollback().await?;
Ok(())
}
async fn check_if_applied(&mut self, migration_name: &str) -> Result<bool> {
let mut conn = SqliteConnection::connect(&self.db_url).await?;
let result =
sqlx::query("select exists(select migration from __migrations where migration = $1)")
.bind(migration_name.to_string())
.try_map(|row: SqliteRow| row.try_get(0))
.fetch_one(&mut conn)
.await?;
Ok(result)
}
async fn execute_migration(&mut self, migration_sql: &str) -> Result<()> {
let mut conn = SqliteConnection::connect(&self.db_url).await?;
conn.execute(migration_sql).await?;
// self.transaction.execute(migration_sql).await?;
Ok(())
}
async fn save_applied_migration(&mut self, migration_name: &str) -> Result<()> {
let mut conn = SqliteConnection::connect(&self.db_url).await?;
sqlx::query("insert into __migrations (migration) values ($1)")
.bind(migration_name.to_string())
.execute(&mut conn)
.await?;
Ok(())
}
}

141
sqlx-cli/src/prepare.rs Normal file
View File

@@ -0,0 +1,141 @@
use anyhow::{anyhow, bail, Context};
use std::process::Command;
use std::{env, fs};
use cargo_metadata::MetadataCommand;
use std::collections::BTreeMap;
use std::fs::File;
use std::time::SystemTime;
use url::Url;
type QueryData = BTreeMap<String, serde_json::Value>;
type JsonObject = serde_json::Map<String, serde_json::Value>;
pub fn run() -> anyhow::Result<()> {
#[derive(serde::Serialize)]
struct DataFile {
db: &'static str,
#[serde(flatten)]
data: QueryData,
}
let db_kind = get_db_kind()?;
let data = run_prepare_step()?;
serde_json::to_writer_pretty(
File::create("sqlx-data.json").context("failed to create/open `sqlx-data.json`")?,
&DataFile { db: db_kind, data },
)
.context("failed to write to `sqlx-data.json`")?;
println!(
"query data written to `sqlx-data.json` in the current directory; \
please check this into version control"
);
Ok(())
}
pub fn check() -> anyhow::Result<()> {
let db_kind = get_db_kind()?;
let data = run_prepare_step()?;
let data_file = fs::read("sqlx-data.json").context(
"failed to open `sqlx-data.json`; you may need to run `cargo sqlx prepare` first",
)?;
let mut saved_data: QueryData = serde_json::from_slice(&data_file)?;
let expected_db = saved_data
.remove("db")
.context("expected key `db` in data file")?;
let expected_db = expected_db
.as_str()
.context("expected key `db` to be a string")?;
if db_kind != expected_db {
bail!(
"saved prepare data is for {}, not {} (inferred from `DATABASE_URL`)",
expected_db,
db_kind
)
}
if data != saved_data {
bail!("`cargo sqlx prepare` needs to be rerun")
}
Ok(())
}
fn run_prepare_step() -> anyhow::Result<QueryData> {
// path to the Cargo executable
let cargo = env::var("CARGO")
.context("`prepare` subcommand may only be invoked as `cargo sqlx prepare``")?;
let check_status = Command::new(&cargo)
.arg("check")
// set an always-changing env var that the macros depend on via `env!()`
.env(
"__SQLX_RECOMPILE_TRIGGER",
SystemTime::UNIX_EPOCH.elapsed()?.as_millis().to_string(),
)
.status()?;
if !check_status.success() {
bail!("`cargo check` failed with status: {}", check_status);
}
let metadata = MetadataCommand::new()
.cargo_path(cargo)
.exec()
.context("failed to execute `cargo metadata`")?;
let pattern = metadata.target_directory.join("sqlx/query-*.json");
let mut data = BTreeMap::new();
for path in glob::glob(
pattern
.to_str()
.context("CARGO_TARGET_DIR not valid UTF-8")?,
)? {
let path = path?;
let contents = fs::read(&*path)?;
let mut query_data: JsonObject = serde_json::from_slice(&contents)?;
// we lift the `hash` key to the outer map
let hash = query_data
.remove("hash")
.context("expected key `hash` in query data")?;
if let serde_json::Value::String(hash) = hash {
data.insert(hash, serde_json::Value::Object(query_data));
} else {
bail!(
"expected key `hash` in query data to be string, was {:?} instead; file: {}",
hash,
path.display()
)
}
}
Ok(data)
}
fn get_db_kind() -> anyhow::Result<&'static str> {
let db_url = dotenv::var("DATABASE_URL")
.map_err(|_| anyhow!("`DATABASE_URL` must be set to use the `prepare` subcommand"))?;
let db_url = Url::parse(&db_url)?;
// these should match the values of `DatabaseExt::NAME` in `sqlx-macros`
match db_url.scheme() {
"postgres" | "postgresql" => Ok("PostgreSQL"),
"mysql" | "mariadb" => Ok("MySQL/MariaDB"),
"sqlite" => Ok("SQLite"),
_ => bail!("unexpected scheme in database URL: {}", db_url.scheme()),
}
}