feat(cli): add --connect-timeout (#1889)

This commit is contained in:
Austin Bonander 2022-06-08 15:48:04 -07:00 committed by GitHub
parent d52f301a94
commit 1f91724927
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 158 additions and 79 deletions

15
Cargo.lock generated
View File

@ -248,6 +248,20 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backoff"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
dependencies = [
"futures-core",
"getrandom",
"instant",
"pin-project-lite",
"rand",
"tokio",
]
[[package]]
name = "base64"
version = "0.13.0"
@ -2316,6 +2330,7 @@ version = "0.5.12"
dependencies = [
"anyhow",
"async-trait",
"backoff",
"chrono",
"clap 3.1.0",
"console",

View File

@ -47,6 +47,8 @@ openssl = { version = "0.10.38", optional = true }
# workaround for https://github.com/rust-lang/rust/issues/29497
remove_dir_all = "0.7.0"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
[features]
default = ["postgres", "sqlite", "mysql", "native-tls"]
rustls = ["sqlx/runtime-tokio-rustls"]

View File

@ -1,43 +1,58 @@
use crate::migrate;
use crate::opt::ConnectOpts;
use console::style;
use promptly::{prompt, ReadlineError};
use sqlx::any::Any;
use sqlx::migrate::MigrateDatabase;
pub async fn create(uri: &str) -> anyhow::Result<()> {
if !Any::database_exists(uri).await? {
Any::create_database(uri).await?;
pub async fn create(connect_opts: &ConnectOpts) -> anyhow::Result<()> {
// NOTE: only retry the idempotent action.
// We're assuming that if this succeeds, then any following operations should also succeed.
let exists = crate::retry_connect_errors(connect_opts, Any::database_exists).await?;
if !exists {
Any::create_database(&connect_opts.database_url).await?;
}
Ok(())
}
pub async fn drop(uri: &str, confirm: bool) -> anyhow::Result<()> {
if confirm && !ask_to_continue(uri) {
pub async fn drop(connect_opts: &ConnectOpts, confirm: bool) -> anyhow::Result<()> {
if confirm && !ask_to_continue(connect_opts) {
return Ok(());
}
if Any::database_exists(uri).await? {
Any::drop_database(uri).await?;
// NOTE: only retry the idempotent action.
// We're assuming that if this succeeds, then any following operations should also succeed.
let exists = crate::retry_connect_errors(connect_opts, Any::database_exists).await?;
if exists {
Any::drop_database(&connect_opts.database_url).await?;
}
Ok(())
}
pub async fn reset(migration_source: &str, uri: &str, confirm: bool) -> anyhow::Result<()> {
drop(uri, confirm).await?;
setup(migration_source, uri).await
pub async fn reset(
migration_source: &str,
connect_opts: &ConnectOpts,
confirm: bool,
) -> anyhow::Result<()> {
drop(connect_opts, confirm).await?;
setup(migration_source, connect_opts).await
}
pub async fn setup(migration_source: &str, uri: &str) -> anyhow::Result<()> {
create(uri).await?;
migrate::run(migration_source, uri, false, false).await
pub async fn setup(migration_source: &str, connect_opts: &ConnectOpts) -> anyhow::Result<()> {
create(connect_opts).await?;
migrate::run(migration_source, connect_opts, false, false).await
}
fn ask_to_continue(uri: &str) -> bool {
fn ask_to_continue(connect_opts: &ConnectOpts) -> bool {
loop {
let r: Result<String, ReadlineError> =
prompt(format!("Drop database at {}? (y/n)", style(uri).cyan()));
let r: Result<String, ReadlineError> = prompt(format!(
"Drop database at {}? (y/n)",
style(&connect_opts.database_url).cyan()
));
match r {
Ok(response) => {
if response == "n" || response == "N" {

View File

@ -1,6 +1,10 @@
use anyhow::Result;
use futures::{Future, TryFutureExt};
use sqlx::{AnyConnection, Connection};
use std::io;
use std::time::Duration;
use crate::opt::{Command, DatabaseCommand, MigrateCommand};
use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand};
mod database;
// mod migration;
@ -23,11 +27,11 @@ pub async fn run(opt: Opt) -> Result<()> {
source,
dry_run,
ignore_missing,
database_url,
connect_opts,
} => {
migrate::run(
source.resolve(&migrate.source),
&database_url,
&connect_opts,
dry_run,
*ignore_missing,
)
@ -37,11 +41,11 @@ pub async fn run(opt: Opt) -> Result<()> {
source,
dry_run,
ignore_missing,
database_url,
connect_opts,
} => {
migrate::revert(
source.resolve(&migrate.source),
&database_url,
&connect_opts,
dry_run,
*ignore_missing,
)
@ -49,44 +53,86 @@ pub async fn run(opt: Opt) -> Result<()> {
}
MigrateCommand::Info {
source,
database_url,
} => migrate::info(source.resolve(&migrate.source), &database_url).await?,
connect_opts,
} => migrate::info(source.resolve(&migrate.source), &connect_opts).await?,
MigrateCommand::BuildScript { source, force } => {
migrate::build_script(source.resolve(&migrate.source), force)?
}
},
Command::Database(database) => match database.command {
DatabaseCommand::Create { database_url } => database::create(&database_url).await?,
DatabaseCommand::Create { connect_opts } => database::create(&connect_opts).await?,
DatabaseCommand::Drop {
confirmation,
database_url,
} => database::drop(&database_url, !confirmation).await?,
connect_opts,
} => database::drop(&connect_opts, !confirmation.yes).await?,
DatabaseCommand::Reset {
confirmation,
source,
database_url,
} => database::reset(&source, &database_url, !confirmation).await?,
connect_opts,
} => database::reset(&source, &connect_opts, !confirmation.yes).await?,
DatabaseCommand::Setup {
source,
database_url,
} => database::setup(&source, &database_url).await?,
connect_opts,
} => database::setup(&source, &connect_opts).await?,
},
Command::Prepare {
check: false,
merged,
args,
database_url,
} => prepare::run(&database_url, merged, args)?,
connect_opts,
} => prepare::run(&connect_opts, merged, args).await?,
Command::Prepare {
check: true,
merged,
args,
database_url,
} => prepare::check(&database_url, merged, args)?,
connect_opts,
} => prepare::check(&connect_opts, merged, args).await?,
};
Ok(())
}
/// Attempt to connect to the database server, retrying up to `ops.connect_timeout`.
async fn connect(opts: &ConnectOpts) -> sqlx::Result<AnyConnection> {
retry_connect_errors(opts, AnyConnection::connect).await
}
/// Attempt an operation that may return errors like `ConnectionRefused`,
/// retrying up until `ops.connect_timeout`.
///
/// The closure is passed `&ops.database_url` for easy composition.
async fn retry_connect_errors<'a, F, Fut, T>(
opts: &'a ConnectOpts,
mut connect: F,
) -> sqlx::Result<T>
where
F: FnMut(&'a str) -> Fut,
Fut: Future<Output = sqlx::Result<T>> + 'a,
{
backoff::future::retry(
backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout)))
.build(),
|| {
connect(&opts.database_url).map_err(|e| -> backoff::Error<sqlx::Error> {
match e {
sqlx::Error::Io(ref ioe) => match ioe.kind() {
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionAborted => {
return backoff::Error::transient(e);
}
_ => (),
},
_ => (),
}
backoff::Error::permanent(e)
})
},
)
.await
}

View File

@ -1,8 +1,8 @@
use crate::opt::ConnectOpts;
use anyhow::{bail, Context};
use chrono::Utc;
use console::style;
use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator};
use sqlx::{AnyConnection, Connection};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
@ -116,9 +116,9 @@ fn short_checksum(checksum: &[u8]) -> String {
s
}
pub async fn info(migration_source: &str, uri: &str) -> anyhow::Result<()> {
pub async fn info(migration_source: &str, connect_opts: &ConnectOpts) -> anyhow::Result<()> {
let migrator = Migrator::new(Path::new(migration_source)).await?;
let mut conn = AnyConnection::connect(uri).await?;
let mut conn = crate::connect(&connect_opts).await?;
conn.ensure_migrations_table().await?;
@ -190,12 +190,12 @@ fn validate_applied_migrations(
pub async fn run(
migration_source: &str,
uri: &str,
connect_opts: &ConnectOpts,
dry_run: bool,
ignore_missing: bool,
) -> anyhow::Result<()> {
let migrator = Migrator::new(Path::new(migration_source)).await?;
let mut conn = AnyConnection::connect(uri).await?;
let mut conn = crate::connect(connect_opts).await?;
conn.ensure_migrations_table().await?;
@ -249,12 +249,12 @@ pub async fn run(
pub async fn revert(
migration_source: &str,
uri: &str,
connect_opts: &ConnectOpts,
dry_run: bool,
ignore_missing: bool,
) -> anyhow::Result<()> {
let migrator = Migrator::new(Path::new(migration_source)).await?;
let mut conn = AnyConnection::connect(uri).await?;
let mut conn = crate::connect(&connect_opts).await?;
conn.ensure_migrations_table().await?;

View File

@ -38,7 +38,7 @@ pub enum Command {
args: Vec<String>,
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
#[clap(alias = "mig")]
@ -57,7 +57,7 @@ pub enum DatabaseCommand {
/// Creates the database specified in your DATABASE_URL.
Create {
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
/// Drops the database specified in your DATABASE_URL.
@ -66,7 +66,7 @@ pub enum DatabaseCommand {
confirmation: Confirmation,
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
/// Drops the database specified in your DATABASE_URL, re-creates it, and runs any pending migrations.
@ -78,7 +78,7 @@ pub enum DatabaseCommand {
source: Source,
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
/// Creates the database specified in your DATABASE_URL and runs any pending migrations.
@ -87,7 +87,7 @@ pub enum DatabaseCommand {
source: Source,
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
}
@ -132,7 +132,7 @@ pub enum MigrateCommand {
ignore_missing: IgnoreMissing,
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
/// Revert the latest migration with a down file.
@ -148,7 +148,7 @@ pub enum MigrateCommand {
ignore_missing: IgnoreMissing,
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
/// List all available migrations.
@ -157,7 +157,7 @@ pub enum MigrateCommand {
source: SourceOverride,
#[clap(flatten)]
database_url: DatabaseUrl,
connect_opts: ConnectOpts,
},
/// Generate a `build.rs` to trigger recompilation when a new migration is added.
@ -212,43 +212,24 @@ impl SourceOverride {
/// Argument for the database URL.
#[derive(Args, Debug)]
pub struct DatabaseUrl {
pub struct ConnectOpts {
/// Location of the DB, by default will be read from the DATABASE_URL env var
#[clap(long, short = 'D', env)]
database_url: String,
pub database_url: String,
/// The maximum time, in seconds, to try connecting to the database server before
/// returning an error.
#[clap(long, default_value = "10")]
pub connect_timeout: u64,
}
impl Deref for DatabaseUrl {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.database_url
}
}
/// Argument for automatic confirmantion.
/// Argument for automatic confirmation.
#[derive(Args, Copy, Clone, Debug)]
pub struct Confirmation {
/// Automatic confirmation. Without this option, you will be prompted before dropping
/// your database.
#[clap(short)]
yes: bool,
}
impl Deref for Confirmation {
type Target = bool;
fn deref(&self) -> &Self::Target {
&self.yes
}
}
impl Not for Confirmation {
type Output = bool;
fn not(self) -> Self::Output {
!self.yes
}
pub yes: bool,
}
/// Argument for ignoring applied migrations that were not resolved.

View File

@ -1,8 +1,10 @@
use crate::opt::ConnectOpts;
use anyhow::{bail, Context};
use console::style;
use remove_dir_all::remove_dir_all;
use serde::Deserialize;
use sqlx::any::{AnyConnectOptions, AnyKind};
use sqlx::Connection;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, BufWriter};
@ -22,7 +24,16 @@ struct DataFile {
data: QueryData,
}
pub fn run(url: &str, merge: bool, cargo_args: Vec<String>) -> anyhow::Result<()> {
pub async fn run(
connect_opts: &ConnectOpts,
merge: bool,
cargo_args: Vec<String>,
) -> anyhow::Result<()> {
// Ensure the database server is available.
crate::connect(connect_opts).await?.close().await?;
let url = &connect_opts.database_url;
let db_kind = get_db_kind(url)?;
let data = run_prepare_step(url, merge, cargo_args)?;
@ -52,7 +63,16 @@ pub fn run(url: &str, merge: bool, cargo_args: Vec<String>) -> anyhow::Result<()
Ok(())
}
pub fn check(url: &str, merge: bool, cargo_args: Vec<String>) -> anyhow::Result<()> {
pub async fn check(
connect_opts: &ConnectOpts,
merge: bool,
cargo_args: Vec<String>,
) -> anyhow::Result<()> {
// Ensure the database server is available.
crate::connect(connect_opts).await?.close().await?;
let url = &connect_opts.database_url;
let db_kind = get_db_kind(url)?;
let data = run_prepare_step(url, merge, cargo_args)?;