sqlite improvements (#1965)

* use direct blocking calls for SQLite in `sqlx_macros`
    * this also ensures the database is closed properly, cleaning up tempfiles
* don't send `PRAGMA journal_mode` unless set
    * this previously defaulted to WAL mode which is a permanent setting
      on databases which doesn't necessarily apply to all use-cases
    * changing into or out of WAL mode acquires an exclusive lock on the database
      that can't be waited on by `sqlite3_busy_timeout()`
    * for consistency, `sqlx-cli` commands that create databases will still
      create SQLite databases in WAL mode; added a flag to disable this.
* in general, don't send `PRAGMA`s unless different than default
    * we were sending a bunch of `PRAGMA`s with their default values just to enforce
      an execution order on them, but we can also do this by inserting empty slots
      for their keys into the `IndexMap`
* add error code to `SqliteError` printout
* document why `u64` is not supported
This commit is contained in:
Austin Bonander 2022-07-12 13:59:37 -07:00 committed by GitHub
parent d9fd21c94e
commit bc3e70545b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 234 additions and 131 deletions

View File

@ -11,6 +11,12 @@ pub async fn create(connect_opts: &ConnectOpts) -> anyhow::Result<()> {
let exists = crate::retry_connect_errors(connect_opts, Any::database_exists).await?;
if !exists {
#[cfg(feature = "sqlite")]
sqlx::sqlite::CREATE_DB_WAL.store(
connect_opts.sqlite_create_db_wal,
std::sync::atomic::Ordering::Release,
);
Any::create_database(&connect_opts.database_url).await?;
}

View File

@ -221,6 +221,18 @@ pub struct ConnectOpts {
/// returning an error.
#[clap(long, default_value = "10")]
pub connect_timeout: u64,
/// Set whether or not to create SQLite databases in Write-Ahead Log (WAL) mode:
/// https://www.sqlite.org/wal.html
///
/// WAL mode is enabled by default for SQLite databases created by `sqlx-cli`.
///
/// However, if your application sets a `journal_mode` on `SqliteConnectOptions` to something
/// other than `Wal`, then it will have to take the database file out of WAL mode on connecting,
/// which requires an exclusive lock and may return a `database is locked` (`SQLITE_BUSY`) error.
#[cfg(feature = "sqlite")]
#[clap(long, action = clap::ArgAction::Set, default_value = "true")]
pub sqlite_create_db_wal: bool,
}
/// Argument for automatic confirmation.

View File

@ -8,7 +8,7 @@ use crate::sqlite::{Sqlite, SqliteColumn};
use either::Either;
use std::convert::identity;
pub(super) fn describe(conn: &mut ConnectionState, query: &str) -> Result<Describe<Sqlite>, Error> {
pub(crate) fn describe(conn: &mut ConnectionState, query: &str) -> Result<Describe<Sqlite>, Error> {
// describing a statement from SQLite can be involved
// each SQLx statement is comprised of multiple SQL statements

View File

@ -53,6 +53,16 @@ fn bind(
Ok(n)
}
impl ExecuteIter<'_> {
pub fn finish(&mut self) -> Result<(), Error> {
for res in self {
let _ = res?;
}
Ok(())
}
}
impl Iterator for ExecuteIter<'_> {
type Item = Result<Either<SqliteQueryResult, SqliteRow>, Error>;

View File

@ -19,9 +19,9 @@ use crate::sqlite::{Sqlite, SqliteConnectOptions};
use crate::transaction::Transaction;
pub(crate) mod collation;
mod describe;
mod establish;
mod execute;
pub(crate) mod describe;
pub(crate) mod establish;
pub(crate) mod execute;
mod executor;
mod explain;
mod handle;

View File

@ -39,7 +39,11 @@ impl SqliteError {
impl Display for SqliteError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.pad(&self.message)
// We include the code as some produce ambiguous messages:
// SQLITE_BUSY: "database is locked"
// SQLITE_LOCKED: "database table is locked"
// Sadly there's no function to get the string label back from an error code.
write!(f, "(code: {}) {}", self.code, self.message)
}
}

View File

@ -7,20 +7,32 @@ use crate::migrate::{Migrate, MigrateDatabase};
use crate::query::query;
use crate::query_as::query_as;
use crate::query_scalar::query_scalar;
use crate::sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection};
use crate::sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection, SqliteJournalMode};
use futures_core::future::BoxFuture;
use sqlx_rt::fs;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
impl MigrateDatabase for Sqlite {
fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let mut opts = SqliteConnectOptions::from_str(url)?.create_if_missing(true);
// Since it doesn't make sense to include this flag in the connection URL,
// we just use an `AtomicBool` to pass it.
if super::CREATE_DB_WAL.load(Ordering::Acquire) {
opts = opts.journal_mode(SqliteJournalMode::Wal);
}
// Opening a connection to sqlite creates the database
let _ = SqliteConnectOptions::from_str(url)?
.create_if_missing(true)
.connect()
.await?
// Ensure WAL mode tempfiles are cleaned up
.close()
.await?;
Ok(())

View File

@ -16,11 +16,15 @@ pub use options::{
pub use query_result::SqliteQueryResult;
pub use row::SqliteRow;
pub use statement::SqliteStatement;
use std::sync::atomic::AtomicBool;
pub use transaction::SqliteTransactionManager;
pub use type_info::SqliteTypeInfo;
pub use value::{SqliteValue, SqliteValueRef};
use crate::describe::Describe;
use crate::error::Error;
use crate::executor::Executor;
use crate::sqlite::connection::establish::EstablishParams;
mod arguments;
mod column;
@ -60,3 +64,24 @@ impl_into_maybe_pool!(Sqlite, SqliteConnection);
// required because some databases have a different handling of NULL
impl_encode_for_option!(Sqlite);
/// UNSTABLE: for use by `sqlx-cli` only.
#[doc(hidden)]
pub static CREATE_DB_WAL: AtomicBool = AtomicBool::new(true);
/// UNSTABLE: for use by `sqlite_macros` only.
#[doc(hidden)]
pub fn describe_blocking(
opts: &SqliteConnectOptions,
query: &str,
) -> Result<Describe<Sqlite>, Error> {
let params = EstablishParams::from_options(opts)?;
let mut conn = params.establish()?;
// Execute any ancillary `PRAGMA`s
connection::execute::iter(&mut conn, &opts.pragma_string(), None, false)?.finish()?;
connection::describe::describe(&mut conn, query)
// SQLite database is closed immediately when `conn` is dropped
}

View File

@ -17,25 +17,8 @@ impl ConnectOptions for SqliteConnectOptions {
Box::pin(async move {
let mut conn = SqliteConnection::establish(self).await?;
// send an initial sql statement comprised of options
let mut init = String::new();
// This is a special case for sqlcipher. When the `key` pragma
// is set, we have to make sure it's executed first in order.
if let Some(pragma_key_password) = self.pragmas.get("key") {
write!(init, "PRAGMA key = {}; ", pragma_key_password).ok();
}
for (key, value) in &self.pragmas {
// Since we've already written the possible `key` pragma
// above, we shall skip it now.
if key == "key" {
continue;
}
write!(init, "PRAGMA {} = {}; ", key, value).ok();
}
conn.execute(&*init).await?;
// Execute PRAGMAs
conn.execute(&*self.pragma_string()).await?;
if !self.collations.is_empty() {
let mut locked = conn.lock_handle().await?;
@ -59,3 +42,18 @@ impl ConnectOptions for SqliteConnectOptions {
self
}
}
impl SqliteConnectOptions {
/// Collect all `PRAMGA` commands into a single string
pub(crate) fn pragma_string(&self) -> String {
let mut string = String::new();
for (key, opt_value) in &self.pragmas {
if let Some(value) = opt_value {
write!(string, "PRAGMA {} = {}; ", key, value).ok();
}
}
string
}
}

View File

@ -63,7 +63,8 @@ pub struct SqliteConnectOptions {
pub(crate) busy_timeout: Duration,
pub(crate) log_settings: LogSettings,
pub(crate) immutable: bool,
pub(crate) pragmas: IndexMap<Cow<'static, str>, Cow<'static, str>>,
pub(crate) pragmas: IndexMap<Cow<'static, str>, Option<Cow<'static, str>>>,
pub(crate) command_channel_size: usize,
pub(crate) row_channel_size: usize,
@ -85,32 +86,44 @@ impl SqliteConnectOptions {
///
/// See the source of this method for the current defaults.
pub fn new() -> Self {
// set default pragmas
let mut pragmas: IndexMap<Cow<'static, str>, Cow<'static, str>> = IndexMap::new();
let mut pragmas: IndexMap<Cow<'static, str>, Option<Cow<'static, str>>> = IndexMap::new();
let locking_mode: SqliteLockingMode = Default::default();
let auto_vacuum: SqliteAutoVacuum = Default::default();
// Standard pragmas
//
// Most of these don't actually need to be sent because they would be set to their
// default values anyway. See the SQLite documentation for default values of these PRAGMAs:
// https://www.sqlite.org/pragma.html
//
// However, by inserting into the map here, we can ensure that they're set in the proper
// order, even if they're overwritten later by their respective setters or
// directly by `pragma()`
// page_size must be set before any other action on the database.
pragmas.insert("page_size".into(), "4096".into());
// SQLCipher special case: if the `key` pragma is set, it must be executed first.
pragmas.insert("key".into(), None);
// Note that locking_mode should be set before journal_mode; see
// https://www.sqlite.org/wal.html#use_of_wal_without_shared_memory .
pragmas.insert("locking_mode".into(), locking_mode.as_str().into());
// Normally, page_size must be set before any other action on the database.
// Defaults to 4096 for new databases.
pragmas.insert("page_size".into(), None);
pragmas.insert(
"journal_mode".into(),
SqliteJournalMode::Wal.as_str().into(),
);
// locking_mode should be set before journal_mode:
// https://www.sqlite.org/wal.html#use_of_wal_without_shared_memory
pragmas.insert("locking_mode".into(), None);
pragmas.insert("foreign_keys".into(), "ON".into());
// Don't set `journal_mode` unless the user requested it.
// WAL mode is a permanent setting for created databases and changing into or out of it
// requires an exclusive lock that can't be waited on with `sqlite3_busy_timeout()`.
// https://github.com/launchbadge/sqlx/pull/1930#issuecomment-1168165414
pragmas.insert("journal_mode".into(), None);
pragmas.insert(
"synchronous".into(),
SqliteSynchronous::Full.as_str().into(),
);
// We choose to enable foreign key enforcement by default, though SQLite normally
// leaves it off for backward compatibility: https://www.sqlite.org/foreignkeys.html#fk_enable
pragmas.insert("foreign_keys".into(), Some("ON".into()));
pragmas.insert("auto_vacuum".into(), auto_vacuum.as_str().into());
// The `synchronous` pragma defaults to FULL
// https://www.sqlite.org/compile.html#default_synchronous.
pragmas.insert("synchronous".into(), None);
pragmas.insert("auto_vacuum".into(), None);
Self {
filename: Cow::Borrowed(Path::new(":memory:")),
@ -139,13 +152,10 @@ impl SqliteConnectOptions {
/// Set the enforcement of [foreign key constraints](https://www.sqlite.org/pragma.html#pragma_foreign_keys).
///
/// By default, this is enabled.
/// SQLx chooses to enable this by default so that foreign keys function as expected,
/// compared to other database flavors.
pub fn foreign_keys(mut self, on: bool) -> Self {
self.pragmas.insert(
"foreign_keys".into(),
(if on { "ON" } else { "OFF" }).into(),
);
self
self.pragma("foreign_keys", if on { "ON" } else { "OFF" })
}
/// Set the [`SQLITE_OPEN_SHAREDCACHE` flag](https://sqlite.org/sharedcache.html).
@ -158,21 +168,34 @@ impl SqliteConnectOptions {
/// Sets the [journal mode](https://www.sqlite.org/pragma.html#pragma_journal_mode) for the database connection.
///
/// The default journal mode is WAL. For most use cases this can be significantly faster but
/// there are [disadvantages](https://www.sqlite.org/wal.html).
/// Journal modes are ephemeral per connection, with the exception of the
/// [Write-Ahead Log (WAL) mode](https://www.sqlite.org/wal.html).
///
/// A database created in WAL mode retains the setting and will apply it to all connections
/// opened against it that don't set a `journal_mode`.
///
/// Opening a connection to a database created in WAL mode with a different `journal_mode` will
/// erase the setting on the database, requiring an exclusive lock to do so.
/// You may get a `database is locked` (corresponding to `SQLITE_BUSY`) error if another
/// connection is accessing the database file at the same time.
///
/// SQLx does not set a journal mode by default, to avoid unintentionally changing a database
/// into or out of WAL mode.
///
/// The default journal mode for non-WAL databases is `DELETE`, or `MEMORY` for in-memory
/// databases.
///
/// For consistency, any commands in `sqlx-cli` which create a SQLite database will create it
/// in WAL mode.
pub fn journal_mode(mut self, mode: SqliteJournalMode) -> Self {
self.pragmas
.insert("journal_mode".into(), mode.as_str().into());
self
self.pragma("journal_mode", mode.as_str())
}
/// Sets the [locking mode](https://www.sqlite.org/pragma.html#pragma_locking_mode) for the database connection.
///
/// The default locking mode is NORMAL.
pub fn locking_mode(mut self, mode: SqliteLockingMode) -> Self {
self.pragmas
.insert("locking_mode".into(), mode.as_str().into());
self
self.pragma("locking_mode", mode.as_str())
}
/// Sets the [access mode](https://www.sqlite.org/c3ref/open.html) to open the database
@ -185,7 +208,7 @@ impl SqliteConnectOptions {
/// Sets the [access mode](https://www.sqlite.org/c3ref/open.html) to create the database file
/// if the file does not exist.
///
/// By default, a new file **will not be** created if one is not found.
/// By default, a new file **will not be created** if one is not found.
pub fn create_if_missing(mut self, create: bool) -> Self {
self.create_if_missing = create;
self
@ -216,27 +239,28 @@ impl SqliteConnectOptions {
/// The default synchronous settings is FULL. However, if durability is not a concern,
/// then NORMAL is normally all one needs in WAL mode.
pub fn synchronous(mut self, synchronous: SqliteSynchronous) -> Self {
self.pragmas
.insert("synchronous".into(), synchronous.as_str().into());
self
self.pragma("synchronous", synchronous.as_str())
}
/// Sets the [auto_vacuum](https://www.sqlite.org/pragma.html#pragma_auto_vacuum) setting for the database connection.
///
/// The default auto_vacuum setting is NONE.
///
/// For existing databases, a change to this value does not take effect unless a
/// [`VACUUM` command](https://www.sqlite.org/lang_vacuum.html) is executed.
pub fn auto_vacuum(mut self, auto_vacuum: SqliteAutoVacuum) -> Self {
self.pragmas
.insert("auto_vacuum".into(), auto_vacuum.as_str().into());
self
self.pragma("auto_vacuum", auto_vacuum.as_str())
}
/// Sets the [page_size](https://www.sqlite.org/pragma.html#pragma_page_size) setting for the database connection.
///
/// The default page_size setting is 4096.
///
/// For existing databases, a change to this value does not take effect unless a
/// [`VACUUM` command](https://www.sqlite.org/lang_vacuum.html) is executed.
/// However, it cannot be changed in WAL mode.
pub fn page_size(mut self, page_size: u32) -> Self {
self.pragmas
.insert("page_size".into(), page_size.to_string().into());
self
self.pragma("page_size", page_size.to_string())
}
/// Sets custom initial pragma for the database connection.
@ -245,7 +269,7 @@ impl SqliteConnectOptions {
K: Into<Cow<'static, str>>,
V: Into<Cow<'static, str>>,
{
self.pragmas.insert(key.into(), value.into());
self.pragmas.insert(key.into(), Some(value.into()));
self
}
@ -294,7 +318,8 @@ impl SqliteConnectOptions {
/// Sets the [threading mode](https://www.sqlite.org/threadsafe.html) for the database connection.
///
/// The default setting is `false` corersponding to using `OPEN_NOMUTEX`, if `true` then `OPEN_FULLMUTEX`.
/// The default setting is `false` corresponding to using `OPEN_NOMUTEX`.
/// If set to `true` then `OPEN_FULLMUTEX`.
///
/// See [open](https://www.sqlite.org/c3ref/open.html) for more details.
///

View File

@ -12,12 +12,25 @@
//! | `u8` | INTEGER |
//! | `u16` | INTEGER |
//! | `u32` | INTEGER |
//! | `u64` | BIGINT, INT8 |
//! | `f32` | REAL |
//! | `f64` | REAL |
//! | `&str`, [`String`] | TEXT |
//! | `&[u8]`, `Vec<u8>` | BLOB |
//!
//! #### Note: Unsigned Integers
//! The unsigned integer types `u8`, `u16` and `u32` are implemented by zero-extending to the
//! next-larger signed type. So `u8` becomes `i16`, `u16` becomes `i32`, and `u32` becomes `i64`
//! while still retaining their semantic values.
//!
//! Similarly, decoding performs a checked truncation to ensure that overflow does not occur.
//!
//! SQLite stores integers in a variable-width encoding and always handles them in memory as 64-bit
//! signed values, so no space is wasted by this implicit widening.
//!
//! However, there is no corresponding larger type for `u64` in SQLite (it would require a `i128`),
//! and so it is not supported. Bit-casting it to `i64` or storing it as `REAL`, `BLOB` or `TEXT`
//! would change the semantics of the value in SQL and so violates the principle of least surprise.
//!
//! ### [`chrono`](https://crates.io/crates/chrono)
//!
//! Requires the `chrono` Cargo feature flag.

View File

@ -24,12 +24,16 @@ impl<DB: Database> QueryData<DB> {
conn: impl Executor<'_, Database = DB>,
query: &str,
) -> crate::Result<Self> {
Ok(QueryData {
Ok(Self::from_describe(query, conn.describe(query).await?))
}
pub fn from_describe(query: &str, describe: Describe<DB>) -> Self {
QueryData {
query: query.into(),
describe: conn.describe(query).await?,
describe,
#[cfg(feature = "offline")]
hash: offline::hash_string(query),
})
}
}
}

View File

@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::str::FromStr;
#[cfg(feature = "offline")]
use std::sync::{Arc, Mutex};
@ -187,68 +188,61 @@ pub fn expand_input(input: QueryMacroInput) -> crate::Result<TokenStream> {
feature = "sqlite"
))]
fn expand_from_db(input: QueryMacroInput, db_url: &str) -> crate::Result<TokenStream> {
use sqlx_core::any::AnyConnection;
use sqlx_core::any::{AnyConnectOptions, AnyConnection};
use std::str::FromStr;
let maybe_expanded: crate::Result<TokenStream> = block_on(async {
let parsed_db_url = Url::parse(db_url)?;
let connect_opts = AnyConnectOptions::from_str(db_url)?;
match parsed_db_url.scheme() {
// SQLite is not used in the connection cache due to issues with newly created
// databases seemingly being locked for several seconds when journaling is off. This
// isn't a huge issue since the intent of the connection cache was to make connections
// to remote databases much faster. Relevant links:
// - https://github.com/launchbadge/sqlx/pull/1782#issuecomment-1089226716
// - https://github.com/launchbadge/sqlx/issues/1929
#[cfg(feature = "sqlite")]
"sqlite" => {
use sqlx_core::connection::ConnectOptions;
use sqlx_core::sqlite::SqliteConnectOptions;
use std::str::FromStr;
// SQLite is not used in the connection cache due to issues with newly created
// databases seemingly being locked for several seconds when journaling is off. This
// isn't a huge issue since the intent of the connection cache was to make connections
// to remote databases much faster. Relevant links:
// - https://github.com/launchbadge/sqlx/pull/1782#issuecomment-1089226716
// - https://github.com/launchbadge/sqlx/issues/1929
#[cfg(feature = "sqlite")]
if let Some(sqlite_opts) = connect_opts.as_sqlite() {
// Since proc-macros don't benefit from async, we can make a describe call directly
// which also ensures that the database is closed afterwards, regardless of errors.
let describe = sqlx_core::sqlite::describe_blocking(sqlite_opts, &input.sql)?;
let data = QueryData::from_describe(&input.sql, describe);
return expand_with_data(input, data, false);
}
let mut conn = SqliteConnectOptions::from_str(db_url)?.connect().await?;
let data = QueryData::from_db(&mut conn, &input.sql).await?;
conn.close().await?;
block_on(async {
static CONNECTION_CACHE: Lazy<AsyncMutex<BTreeMap<String, AnyConnection>>> =
Lazy::new(|| AsyncMutex::new(BTreeMap::new()));
let mut cache = CONNECTION_CACHE.lock().await;
if !cache.contains_key(db_url) {
let conn = AnyConnection::connect_with(&connect_opts).await?;
let _ = cache.insert(db_url.to_owned(), conn);
}
let conn_item = cache.get_mut(db_url).expect("Item was just inserted");
match conn_item.private_get_mut() {
#[cfg(feature = "postgres")]
sqlx_core::any::AnyConnectionKind::Postgres(conn) => {
let data = QueryData::from_db(conn, &input.sql).await?;
expand_with_data(input, data, false)
}
_ => {
static CONNECTION_CACHE: Lazy<AsyncMutex<BTreeMap<String, AnyConnection>>> =
Lazy::new(|| AsyncMutex::new(BTreeMap::new()));
let mut cache = CONNECTION_CACHE.lock().await;
if !cache.contains_key(db_url) {
let conn = AnyConnection::connect(db_url).await?;
let _ = cache.insert(db_url.to_owned(), conn);
}
let conn_item = cache.get_mut(db_url).expect("Item was just inserted");
match conn_item.private_get_mut() {
#[cfg(feature = "postgres")]
sqlx_core::any::AnyConnectionKind::Postgres(conn) => {
let data = QueryData::from_db(conn, &input.sql).await?;
expand_with_data(input, data, false)
}
#[cfg(feature = "mssql")]
sqlx_core::any::AnyConnectionKind::Mssql(conn) => {
let data = QueryData::from_db(conn, &input.sql).await?;
expand_with_data(input, data, false)
}
#[cfg(feature = "mysql")]
sqlx_core::any::AnyConnectionKind::MySql(conn) => {
let data = QueryData::from_db(conn, &input.sql).await?;
expand_with_data(input, data, false)
}
// Variants depend on feature flags
#[allow(unreachable_patterns)]
item => {
return Err(format!("Missing expansion needed for: {:?}", item).into());
}
}
#[cfg(feature = "mssql")]
sqlx_core::any::AnyConnectionKind::Mssql(conn) => {
let data = QueryData::from_db(conn, &input.sql).await?;
expand_with_data(input, data, false)
}
#[cfg(feature = "mysql")]
sqlx_core::any::AnyConnectionKind::MySql(conn) => {
let data = QueryData::from_db(conn, &input.sql).await?;
expand_with_data(input, data, false)
}
// Variants depend on feature flags
#[allow(unreachable_patterns)]
item => {
return Err(format!("Missing expansion needed for: {:?}", item).into());
}
}
});
maybe_expanded.map_err(Into::into)
})
}
#[cfg(feature = "offline")]

View File

@ -234,7 +234,7 @@ async fn it_fails_to_parse() -> anyhow::Result<()> {
let err = res.unwrap_err().to_string();
assert_eq!(
"error returned from database: near \"SEELCT\": syntax error",
"error returned from database: (code: 1) near \"SEELCT\": syntax error",
err
);