mirror of
https://github.com/launchbadge/sqlx.git
synced 2026-03-23 10:38:57 +00:00
Merge remote-tracking branch 'origin/main' into sqlx-toml
# Conflicts: # .github/workflows/examples.yml # sqlx-postgres/src/connection/mod.rs
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
use std::borrow::Cow;
|
||||
|
||||
use crate::{
|
||||
Either, Sqlite, SqliteArgumentValue, SqliteArguments, SqliteColumn, SqliteConnectOptions,
|
||||
SqliteConnection, SqliteQueryResult, SqliteRow, SqliteTransactionManager, SqliteTypeInfo,
|
||||
@@ -38,8 +40,11 @@ impl AnyConnectionBackend for SqliteConnection {
|
||||
Connection::ping(self)
|
||||
}
|
||||
|
||||
fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
|
||||
SqliteTransactionManager::begin(self)
|
||||
fn begin(
|
||||
&mut self,
|
||||
statement: Option<Cow<'static, str>>,
|
||||
) -> BoxFuture<'_, sqlx_core::Result<()>> {
|
||||
SqliteTransactionManager::begin(self, statement)
|
||||
}
|
||||
|
||||
fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
|
||||
@@ -54,6 +59,10 @@ impl AnyConnectionBackend for SqliteConnection {
|
||||
SqliteTransactionManager::start_rollback(self)
|
||||
}
|
||||
|
||||
fn get_transaction_depth(&self) -> usize {
|
||||
SqliteTransactionManager::get_transaction_depth(self)
|
||||
}
|
||||
|
||||
fn shrink_buffers(&mut self) {
|
||||
// NO-OP.
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ use libsqlite3_sys::{sqlite3_create_collation_v2, SQLITE_OK, SQLITE_UTF8};
|
||||
|
||||
use crate::connection::handle::ConnectionHandle;
|
||||
use crate::error::Error;
|
||||
use crate::SqliteError;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Collation {
|
||||
@@ -67,7 +66,7 @@ impl Collation {
|
||||
} else {
|
||||
// The xDestroy callback is not called if the sqlite3_create_collation_v2() function fails.
|
||||
drop(unsafe { Arc::from_raw(raw_f) });
|
||||
Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr()))))
|
||||
Err(handle.expect_error().into())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -112,7 +111,7 @@ where
|
||||
} else {
|
||||
// The xDestroy callback is not called if the sqlite3_create_collation_v2() function fails.
|
||||
drop(unsafe { Box::from_raw(boxed_f) });
|
||||
Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr()))))
|
||||
Err(handle.expect_error().into())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -204,10 +204,10 @@ impl EstablishParams {
|
||||
|
||||
// SAFE: tested for NULL just above
|
||||
// This allows any returns below to close this handle with RAII
|
||||
let handle = unsafe { ConnectionHandle::new(handle) };
|
||||
let mut handle = unsafe { ConnectionHandle::new(handle) };
|
||||
|
||||
if status != SQLITE_OK {
|
||||
return Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr()))));
|
||||
return Err(Error::Database(Box::new(handle.expect_error())));
|
||||
}
|
||||
|
||||
// Enable extended result codes
|
||||
@@ -226,33 +226,29 @@ impl EstablishParams {
|
||||
for ext in self.extensions.iter() {
|
||||
// `sqlite3_load_extension` is unusual as it returns its errors via an out-pointer
|
||||
// rather than by calling `sqlite3_errmsg`
|
||||
let mut error = null_mut();
|
||||
let mut error_msg = null_mut();
|
||||
status = unsafe {
|
||||
sqlite3_load_extension(
|
||||
handle.as_ptr(),
|
||||
ext.0.as_ptr(),
|
||||
ext.1.as_ref().map_or(null(), |e| e.as_ptr()),
|
||||
addr_of_mut!(error),
|
||||
addr_of_mut!(error_msg),
|
||||
)
|
||||
};
|
||||
|
||||
if status != SQLITE_OK {
|
||||
let mut e = handle.expect_error();
|
||||
|
||||
// SAFETY: We become responsible for any memory allocation at `&error`, so test
|
||||
// for null and take an RAII version for returns
|
||||
let err_msg = if !error.is_null() {
|
||||
unsafe {
|
||||
let e = CStr::from_ptr(error).into();
|
||||
sqlite3_free(error as *mut c_void);
|
||||
e
|
||||
}
|
||||
} else {
|
||||
CString::new("Unknown error when loading extension")
|
||||
.expect("text should be representable as a CString")
|
||||
};
|
||||
return Err(Error::Database(Box::new(SqliteError::extension(
|
||||
handle.as_ptr(),
|
||||
&err_msg,
|
||||
))));
|
||||
if !error_msg.is_null() {
|
||||
e = e.with_message(unsafe {
|
||||
let msg = CStr::from_ptr(error_msg).to_string_lossy().into();
|
||||
sqlite3_free(error_msg as *mut c_void);
|
||||
msg
|
||||
});
|
||||
}
|
||||
return Err(Error::Database(Box::new(e)));
|
||||
}
|
||||
} // Preempt any hypothetical security issues arising from leaving ENABLE_LOAD_EXTENSION
|
||||
// on by disabling the flag again once we've loaded all the requested modules.
|
||||
@@ -271,7 +267,7 @@ impl EstablishParams {
|
||||
// configure a `regexp` function for sqlite, it does not come with one by default
|
||||
let status = crate::regexp::register(handle.as_ptr());
|
||||
if status != SQLITE_OK {
|
||||
return Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr()))));
|
||||
return Err(Error::Database(Box::new(handle.expect_error())));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,13 +282,12 @@ impl EstablishParams {
|
||||
status = unsafe { sqlite3_busy_timeout(handle.as_ptr(), ms) };
|
||||
|
||||
if status != SQLITE_OK {
|
||||
return Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr()))));
|
||||
return Err(Error::Database(Box::new(handle.expect_error())));
|
||||
}
|
||||
|
||||
Ok(ConnectionState {
|
||||
handle,
|
||||
statements: Statements::new(self.statement_cache_capacity),
|
||||
transaction_depth: 0,
|
||||
log_settings: self.log_settings.clone(),
|
||||
progress_handler_callback: None,
|
||||
update_hook_callback: None,
|
||||
|
||||
@@ -46,6 +46,17 @@ impl ConnectionHandle {
|
||||
unsafe { sqlite3_last_insert_rowid(self.as_ptr()) }
|
||||
}
|
||||
|
||||
pub(crate) fn last_error(&mut self) -> Option<SqliteError> {
|
||||
// SAFETY: we have exclusive access to the database handle
|
||||
unsafe { SqliteError::try_new(self.as_ptr()) }
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub(crate) fn expect_error(&mut self) -> SqliteError {
|
||||
self.last_error()
|
||||
.expect("expected error code to be set in current context")
|
||||
}
|
||||
|
||||
pub(crate) fn exec(&mut self, query: impl Into<String>) -> Result<(), Error> {
|
||||
let query = query.into();
|
||||
let query = CString::new(query).map_err(|_| err_protocol!("query contains nul bytes"))?;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
use std::ffi::CStr;
|
||||
use std::fmt::Write;
|
||||
@@ -11,8 +12,8 @@ use futures_core::future::BoxFuture;
|
||||
use futures_intrusive::sync::MutexGuard;
|
||||
use futures_util::future;
|
||||
use libsqlite3_sys::{
|
||||
sqlite3, sqlite3_commit_hook, sqlite3_progress_handler, sqlite3_rollback_hook,
|
||||
sqlite3_update_hook, SQLITE_DELETE, SQLITE_INSERT, SQLITE_UPDATE,
|
||||
sqlite3, sqlite3_commit_hook, sqlite3_get_autocommit, sqlite3_progress_handler,
|
||||
sqlite3_rollback_hook, sqlite3_update_hook, SQLITE_DELETE, SQLITE_INSERT, SQLITE_UPDATE,
|
||||
};
|
||||
#[cfg(feature = "preupdate-hook")]
|
||||
pub use preupdate_hook::*;
|
||||
@@ -40,6 +41,7 @@ mod handle;
|
||||
pub(crate) mod intmap;
|
||||
#[cfg(feature = "preupdate-hook")]
|
||||
mod preupdate_hook;
|
||||
pub(crate) mod serialize;
|
||||
|
||||
mod worker;
|
||||
|
||||
@@ -105,9 +107,6 @@ unsafe impl Send for RollbackHookHandler {}
|
||||
pub(crate) struct ConnectionState {
|
||||
pub(crate) handle: ConnectionHandle,
|
||||
|
||||
// transaction status
|
||||
pub(crate) transaction_depth: usize,
|
||||
|
||||
pub(crate) statements: Statements,
|
||||
|
||||
log_settings: LogSettings,
|
||||
@@ -252,14 +251,21 @@ impl Connection for SqliteConnection {
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Transaction::begin(self)
|
||||
Transaction::begin(self, None)
|
||||
}
|
||||
|
||||
fn begin_with(
|
||||
&mut self,
|
||||
statement: impl Into<Cow<'static, str>>,
|
||||
) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Transaction::begin(self, Some(statement.into()))
|
||||
}
|
||||
|
||||
fn cached_statements_size(&self) -> usize {
|
||||
self.worker
|
||||
.shared
|
||||
.cached_statements_size
|
||||
.load(std::sync::atomic::Ordering::Acquire)
|
||||
self.worker.shared.get_cached_statements_size()
|
||||
}
|
||||
|
||||
fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
|
||||
@@ -544,7 +550,12 @@ impl LockedSqliteHandle<'_> {
|
||||
}
|
||||
|
||||
pub fn last_error(&mut self) -> Option<SqliteError> {
|
||||
SqliteError::try_new(self.guard.handle.as_ptr())
|
||||
self.guard.handle.last_error()
|
||||
}
|
||||
|
||||
pub(crate) fn in_transaction(&mut self) -> bool {
|
||||
let ret = unsafe { sqlite3_get_autocommit(self.as_raw_handle().as_ptr()) };
|
||||
ret == 0
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
297
sqlx-sqlite/src/connection/serialize.rs
Normal file
297
sqlx-sqlite/src/connection/serialize.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
use super::ConnectionState;
|
||||
use crate::{error::Error, SqliteConnection, SqliteError};
|
||||
use libsqlite3_sys::{
|
||||
sqlite3_deserialize, sqlite3_free, sqlite3_malloc64, sqlite3_serialize,
|
||||
SQLITE_DESERIALIZE_FREEONCLOSE, SQLITE_DESERIALIZE_READONLY, SQLITE_DESERIALIZE_RESIZEABLE,
|
||||
SQLITE_NOMEM, SQLITE_OK,
|
||||
};
|
||||
use std::ffi::c_char;
|
||||
use std::fmt::Debug;
|
||||
use std::{
|
||||
ops::{Deref, DerefMut},
|
||||
ptr,
|
||||
ptr::NonNull,
|
||||
};
|
||||
|
||||
impl SqliteConnection {
|
||||
/// Serialize the given SQLite database schema using [`sqlite3_serialize()`].
|
||||
///
|
||||
/// The returned buffer is a SQLite managed allocation containing the equivalent data
|
||||
/// as writing the database to disk. It is freed on-drop.
|
||||
///
|
||||
/// To serialize the primary, unqualified schema (`main`), pass `None` for the schema name.
|
||||
///
|
||||
/// # Errors
|
||||
/// * [`Error::InvalidArgument`] if the schema name contains a zero/NUL byte (`\0`).
|
||||
/// * [`Error::Database`] if the schema does not exist or another error occurs.
|
||||
///
|
||||
/// [`sqlite3_serialize()`]: https://sqlite.org/c3ref/serialize.html
|
||||
pub async fn serialize(&mut self, schema: Option<&str>) -> Result<SqliteOwnedBuf, Error> {
|
||||
let schema = schema.map(SchemaName::try_from).transpose()?;
|
||||
|
||||
self.worker.serialize(schema).await
|
||||
}
|
||||
|
||||
/// Deserialize a SQLite database from a buffer into the specified schema using [`sqlite3_deserialize()`].
|
||||
///
|
||||
/// The given schema will be disconnected and re-connected as an in-memory database
|
||||
/// backed by `data`, which should be the serialized form of a database previously returned
|
||||
/// by a call to [`Self::serialize()`], documented as being equivalent to
|
||||
/// the contents of the database file on disk.
|
||||
///
|
||||
/// An error will be returned if a schema with the given name is not already attached.
|
||||
/// You can use `ATTACH ':memory' as "<schema name>"` to create an empty schema first.
|
||||
///
|
||||
/// Pass `None` to deserialize to the primary, unqualified schema (`main`).
|
||||
///
|
||||
/// The SQLite connection will take ownership of `data` and will free it when the connection
|
||||
/// is closed or the schema is detached ([`SQLITE_DESERIALIZE_FREEONCLOSE`][deserialize-flags]).
|
||||
///
|
||||
/// If `read_only` is `true`, the schema is opened as read-only ([`SQLITE_DESERIALIZE_READONLY`][deserialize-flags]).
|
||||
/// If `false`, the schema is marked as resizable ([`SQLITE_DESERIALIZE_RESIZABLE`][deserialize-flags]).
|
||||
///
|
||||
/// If the database is in WAL mode, an error is returned.
|
||||
/// See [`sqlite3_deserialize()`] for details.
|
||||
///
|
||||
/// # Errors
|
||||
/// * [`Error::InvalidArgument`] if the schema name contains a zero/NUL byte (`\0`).
|
||||
/// * [`Error::Database`] if an error occurs during deserialization.
|
||||
///
|
||||
/// [`sqlite3_deserialize()`]: https://sqlite.org/c3ref/deserialize.html
|
||||
/// [deserialize-flags]: https://sqlite.org/c3ref/c_deserialize_freeonclose.html
|
||||
pub async fn deserialize(
|
||||
&mut self,
|
||||
schema: Option<&str>,
|
||||
data: SqliteOwnedBuf,
|
||||
read_only: bool,
|
||||
) -> Result<(), Error> {
|
||||
let schema = schema.map(SchemaName::try_from).transpose()?;
|
||||
|
||||
self.worker.deserialize(schema, data, read_only).await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn serialize(
|
||||
conn: &mut ConnectionState,
|
||||
schema: Option<SchemaName>,
|
||||
) -> Result<SqliteOwnedBuf, Error> {
|
||||
let mut size = 0;
|
||||
|
||||
let buf = unsafe {
|
||||
let ptr = sqlite3_serialize(
|
||||
conn.handle.as_ptr(),
|
||||
schema.as_ref().map_or(ptr::null(), SchemaName::as_ptr),
|
||||
&mut size,
|
||||
0,
|
||||
);
|
||||
|
||||
// looking at the source, `sqlite3_serialize` actually sets `size = -1` on error:
|
||||
// https://github.com/sqlite/sqlite/blob/da5f81387843f92652128087a8f8ecef0b79461d/src/memdb.c#L776
|
||||
usize::try_from(size)
|
||||
.ok()
|
||||
.and_then(|size| SqliteOwnedBuf::from_raw(ptr, size))
|
||||
};
|
||||
|
||||
if let Some(buf) = buf {
|
||||
return Ok(buf);
|
||||
}
|
||||
|
||||
if let Some(error) = conn.handle.last_error() {
|
||||
return Err(error.into());
|
||||
}
|
||||
|
||||
if size > 0 {
|
||||
// If `size` is positive but `sqlite3_serialize` still returned NULL,
|
||||
// the most likely culprit is an out-of-memory condition.
|
||||
return Err(SqliteError::from_code(SQLITE_NOMEM).into());
|
||||
}
|
||||
|
||||
// Otherwise, the schema was probably not found.
|
||||
// We return the equivalent error as when you try to execute `PRAGMA <schema>.page_count`
|
||||
// against a non-existent schema.
|
||||
Err(SqliteError::generic(format!(
|
||||
"database {} does not exist",
|
||||
schema.as_ref().map_or("main", SchemaName::as_str)
|
||||
))
|
||||
.into())
|
||||
}
|
||||
|
||||
pub(crate) fn deserialize(
|
||||
conn: &mut ConnectionState,
|
||||
schema: Option<SchemaName>,
|
||||
data: SqliteOwnedBuf,
|
||||
read_only: bool,
|
||||
) -> Result<(), Error> {
|
||||
// SQLITE_DESERIALIZE_FREEONCLOSE causes SQLite to take ownership of the buffer
|
||||
let mut flags = SQLITE_DESERIALIZE_FREEONCLOSE;
|
||||
if read_only {
|
||||
flags |= SQLITE_DESERIALIZE_READONLY;
|
||||
} else {
|
||||
flags |= SQLITE_DESERIALIZE_RESIZEABLE;
|
||||
}
|
||||
|
||||
let (buf, size) = data.into_raw();
|
||||
|
||||
let rc = unsafe {
|
||||
sqlite3_deserialize(
|
||||
conn.handle.as_ptr(),
|
||||
schema.as_ref().map_or(ptr::null(), SchemaName::as_ptr),
|
||||
buf,
|
||||
i64::try_from(size).unwrap(),
|
||||
i64::try_from(size).unwrap(),
|
||||
flags,
|
||||
)
|
||||
};
|
||||
|
||||
match rc {
|
||||
SQLITE_OK => Ok(()),
|
||||
SQLITE_NOMEM => Err(SqliteError::from_code(SQLITE_NOMEM).into()),
|
||||
// SQLite unfortunately doesn't set any specific message for deserialization errors.
|
||||
_ => Err(SqliteError::generic("an error occurred during deserialization").into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Memory buffer owned and allocated by SQLite. Freed on drop.
|
||||
///
|
||||
/// Intended primarily for use with [`SqliteConnection::serialize()`] and [`SqliteConnection::deserialize()`].
|
||||
///
|
||||
/// Can be created from `&[u8]` using the `TryFrom` impl. The slice must not be empty.
|
||||
#[derive(Debug)]
|
||||
pub struct SqliteOwnedBuf {
|
||||
ptr: NonNull<u8>,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
unsafe impl Send for SqliteOwnedBuf {}
|
||||
unsafe impl Sync for SqliteOwnedBuf {}
|
||||
|
||||
impl Drop for SqliteOwnedBuf {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
sqlite3_free(self.ptr.as_ptr().cast());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SqliteOwnedBuf {
|
||||
/// Uses `sqlite3_malloc` to allocate a buffer and returns a pointer to it.
|
||||
///
|
||||
/// # Safety
|
||||
/// The allocated buffer is uninitialized.
|
||||
unsafe fn with_capacity(size: usize) -> Option<SqliteOwnedBuf> {
|
||||
let ptr = sqlite3_malloc64(u64::try_from(size).unwrap()).cast::<u8>();
|
||||
Self::from_raw(ptr, size)
|
||||
}
|
||||
|
||||
/// Creates a new mem buffer from a pointer that has been created with sqlite_malloc
|
||||
///
|
||||
/// # Safety:
|
||||
/// * The pointer must point to a valid allocation created by `sqlite3_malloc()`, or `NULL`.
|
||||
unsafe fn from_raw(ptr: *mut u8, size: usize) -> Option<Self> {
|
||||
Some(Self {
|
||||
ptr: NonNull::new(ptr)?,
|
||||
size,
|
||||
})
|
||||
}
|
||||
|
||||
fn into_raw(self) -> (*mut u8, usize) {
|
||||
let raw = (self.ptr.as_ptr(), self.size);
|
||||
// this is used in sqlite_deserialize and
|
||||
// underlying buffer must not be freed
|
||||
std::mem::forget(self);
|
||||
raw
|
||||
}
|
||||
}
|
||||
|
||||
/// # Errors
|
||||
/// Returns [`Error::InvalidArgument`] if the slice is empty.
|
||||
impl TryFrom<&[u8]> for SqliteOwnedBuf {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
|
||||
unsafe {
|
||||
// SAFETY: `buf` is not initialized until `ptr::copy_nonoverlapping` completes.
|
||||
let mut buf = Self::with_capacity(bytes.len()).ok_or_else(|| {
|
||||
Error::InvalidArgument("SQLite owned buffer cannot be empty".to_string())
|
||||
})?;
|
||||
ptr::copy_nonoverlapping(bytes.as_ptr(), buf.ptr.as_mut(), buf.size);
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SqliteOwnedBuf {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.size) }
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SqliteOwnedBuf {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
unsafe { std::slice::from_raw_parts_mut(self.ptr.as_mut(), self.size) }
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for SqliteOwnedBuf {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.deref()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8]> for SqliteOwnedBuf {
|
||||
fn as_mut(&mut self) -> &mut [u8] {
|
||||
self.deref_mut()
|
||||
}
|
||||
}
|
||||
|
||||
/// Checked schema name to pass to SQLite.
|
||||
///
|
||||
/// # Safety:
|
||||
/// * Valid UTF-8 (not guaranteed by `CString`)
|
||||
/// * No internal zero bytes (`\0`) (not guaranteed by `String`)
|
||||
/// * Terminated with a zero byte (`\0`) (not guaranteed by `String`)
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SchemaName(Box<str>);
|
||||
|
||||
impl SchemaName {
|
||||
/// Get the schema name as a string without the zero byte terminator.
|
||||
pub fn as_str(&self) -> &str {
|
||||
&self.0[..self.0.len() - 1]
|
||||
}
|
||||
|
||||
/// Get a pointer to the string data, suitable for passing as C's `*const char`.
|
||||
///
|
||||
/// # Safety
|
||||
/// The string data is guaranteed to be terminated with a zero byte.
|
||||
pub fn as_ptr(&self) -> *const c_char {
|
||||
self.0.as_ptr() as *const c_char
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TryFrom<&'a str> for SchemaName {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(name: &'a str) -> Result<Self, Self::Error> {
|
||||
// SAFETY: we must ensure that the string does not contain an internal NULL byte
|
||||
if let Some(pos) = name.as_bytes().iter().position(|&b| b == 0) {
|
||||
return Err(Error::InvalidArgument(format!(
|
||||
"schema name {name:?} contains a zero byte at index {pos}"
|
||||
)));
|
||||
}
|
||||
|
||||
let capacity = name.len().checked_add(1).unwrap();
|
||||
|
||||
let mut s = String::new();
|
||||
// `String::with_capacity()` does not guarantee that it will not overallocate,
|
||||
// which might mean an unnecessary reallocation to make `capacity == len`
|
||||
// in the conversion to `Box<str>`.
|
||||
s.reserve_exact(capacity);
|
||||
|
||||
s.push_str(name);
|
||||
s.push('\0');
|
||||
|
||||
Ok(SchemaName(s.into()))
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,8 @@ use crate::connection::execute;
|
||||
use crate::connection::ConnectionState;
|
||||
use crate::{Sqlite, SqliteArguments, SqliteQueryResult, SqliteRow, SqliteStatement};
|
||||
|
||||
use super::serialize::{deserialize, serialize, SchemaName, SqliteOwnedBuf};
|
||||
|
||||
// Each SQLite connection has a dedicated thread.
|
||||
|
||||
// TODO: Tweak this so that we can use a thread pool per pool of SQLite3 connections to reduce
|
||||
@@ -34,10 +36,21 @@ pub(crate) struct ConnectionWorker {
|
||||
}
|
||||
|
||||
pub(crate) struct WorkerSharedState {
|
||||
pub(crate) cached_statements_size: AtomicUsize,
|
||||
transaction_depth: AtomicUsize,
|
||||
cached_statements_size: AtomicUsize,
|
||||
pub(crate) conn: Mutex<ConnectionState>,
|
||||
}
|
||||
|
||||
impl WorkerSharedState {
|
||||
pub(crate) fn get_transaction_depth(&self) -> usize {
|
||||
self.transaction_depth.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub(crate) fn get_cached_statements_size(&self) -> usize {
|
||||
self.cached_statements_size.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
enum Command {
|
||||
Prepare {
|
||||
query: Box<str>,
|
||||
@@ -54,8 +67,19 @@ enum Command {
|
||||
tx: flume::Sender<Result<Either<SqliteQueryResult, SqliteRow>, Error>>,
|
||||
limit: Option<usize>,
|
||||
},
|
||||
Serialize {
|
||||
schema: Option<SchemaName>,
|
||||
tx: oneshot::Sender<Result<SqliteOwnedBuf, Error>>,
|
||||
},
|
||||
Deserialize {
|
||||
schema: Option<SchemaName>,
|
||||
data: SqliteOwnedBuf,
|
||||
read_only: bool,
|
||||
tx: oneshot::Sender<Result<(), Error>>,
|
||||
},
|
||||
Begin {
|
||||
tx: rendezvous_oneshot::Sender<Result<(), Error>>,
|
||||
statement: Option<Cow<'static, str>>,
|
||||
},
|
||||
Commit {
|
||||
tx: rendezvous_oneshot::Sender<Result<(), Error>>,
|
||||
@@ -93,6 +117,7 @@ impl ConnectionWorker {
|
||||
};
|
||||
|
||||
let shared = Arc::new(WorkerSharedState {
|
||||
transaction_depth: AtomicUsize::new(0),
|
||||
cached_statements_size: AtomicUsize::new(0),
|
||||
// note: must be fair because in `Command::UnlockDb` we unlock the mutex
|
||||
// and then immediately try to relock it; an unfair mutex would immediately
|
||||
@@ -182,13 +207,27 @@ impl ConnectionWorker {
|
||||
|
||||
update_cached_statements_size(&conn, &shared.cached_statements_size);
|
||||
}
|
||||
Command::Begin { tx } => {
|
||||
let depth = conn.transaction_depth;
|
||||
Command::Begin { tx, statement } => {
|
||||
let depth = shared.transaction_depth.load(Ordering::Acquire);
|
||||
|
||||
let statement = match statement {
|
||||
// custom `BEGIN` statements are not allowed if
|
||||
// we're already in a transaction (we need to
|
||||
// issue a `SAVEPOINT` instead)
|
||||
Some(_) if depth > 0 => {
|
||||
if tx.blocking_send(Err(Error::InvalidSavePointStatement)).is_err() {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
},
|
||||
Some(statement) => statement,
|
||||
None => begin_ansi_transaction_sql(depth),
|
||||
};
|
||||
let res =
|
||||
conn.handle
|
||||
.exec(begin_ansi_transaction_sql(depth))
|
||||
.exec(statement)
|
||||
.map(|_| {
|
||||
conn.transaction_depth += 1;
|
||||
shared.transaction_depth.fetch_add(1, Ordering::Release);
|
||||
});
|
||||
let res_ok = res.is_ok();
|
||||
|
||||
@@ -201,7 +240,7 @@ impl ConnectionWorker {
|
||||
.handle
|
||||
.exec(rollback_ansi_transaction_sql(depth + 1))
|
||||
.map(|_| {
|
||||
conn.transaction_depth -= 1;
|
||||
shared.transaction_depth.fetch_sub(1, Ordering::Release);
|
||||
})
|
||||
{
|
||||
// The rollback failed. To prevent leaving the connection
|
||||
@@ -213,13 +252,13 @@ impl ConnectionWorker {
|
||||
}
|
||||
}
|
||||
Command::Commit { tx } => {
|
||||
let depth = conn.transaction_depth;
|
||||
let depth = shared.transaction_depth.load(Ordering::Acquire);
|
||||
|
||||
let res = if depth > 0 {
|
||||
conn.handle
|
||||
.exec(commit_ansi_transaction_sql(depth))
|
||||
.map(|_| {
|
||||
conn.transaction_depth -= 1;
|
||||
shared.transaction_depth.fetch_sub(1, Ordering::Release);
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
@@ -239,13 +278,13 @@ impl ConnectionWorker {
|
||||
continue;
|
||||
}
|
||||
|
||||
let depth = conn.transaction_depth;
|
||||
let depth = shared.transaction_depth.load(Ordering::Acquire);
|
||||
|
||||
let res = if depth > 0 {
|
||||
conn.handle
|
||||
.exec(rollback_ansi_transaction_sql(depth))
|
||||
.map(|_| {
|
||||
conn.transaction_depth -= 1;
|
||||
shared.transaction_depth.fetch_sub(1, Ordering::Release);
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
@@ -263,6 +302,12 @@ impl ConnectionWorker {
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::Serialize { schema, tx } => {
|
||||
tx.send(serialize(&mut conn, schema)).ok();
|
||||
}
|
||||
Command::Deserialize { schema, data, read_only, tx } => {
|
||||
tx.send(deserialize(&mut conn, schema, data, read_only)).ok();
|
||||
}
|
||||
Command::ClearCache { tx } => {
|
||||
conn.statements.clear();
|
||||
update_cached_statements_size(&conn, &shared.cached_statements_size);
|
||||
@@ -333,8 +378,11 @@ impl ConnectionWorker {
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
pub(crate) async fn begin(&mut self) -> Result<(), Error> {
|
||||
self.oneshot_cmd_with_ack(|tx| Command::Begin { tx })
|
||||
pub(crate) async fn begin(
|
||||
&mut self,
|
||||
statement: Option<Cow<'static, str>>,
|
||||
) -> Result<(), Error> {
|
||||
self.oneshot_cmd_with_ack(|tx| Command::Begin { tx, statement })
|
||||
.await?
|
||||
}
|
||||
|
||||
@@ -358,6 +406,29 @@ impl ConnectionWorker {
|
||||
self.oneshot_cmd(|tx| Command::Ping { tx }).await
|
||||
}
|
||||
|
||||
pub(crate) async fn deserialize(
|
||||
&mut self,
|
||||
schema: Option<SchemaName>,
|
||||
data: SqliteOwnedBuf,
|
||||
read_only: bool,
|
||||
) -> Result<(), Error> {
|
||||
self.oneshot_cmd(|tx| Command::Deserialize {
|
||||
schema,
|
||||
data,
|
||||
read_only,
|
||||
tx,
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub(crate) async fn serialize(
|
||||
&mut self,
|
||||
schema: Option<SchemaName>,
|
||||
) -> Result<SqliteOwnedBuf, Error> {
|
||||
self.oneshot_cmd(|tx| Command::Serialize { schema, tx })
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn oneshot_cmd<F, T>(&mut self, command: F) -> Result<T, Error>
|
||||
where
|
||||
F: FnOnce(oneshot::Sender<T>) -> Command,
|
||||
|
||||
@@ -2,12 +2,12 @@ use std::error::Error as StdError;
|
||||
use std::ffi::CStr;
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
use std::os::raw::c_int;
|
||||
use std::{borrow::Cow, str::from_utf8_unchecked};
|
||||
use std::{borrow::Cow, str};
|
||||
|
||||
use libsqlite3_sys::{
|
||||
sqlite3, sqlite3_errmsg, sqlite3_extended_errcode, SQLITE_CONSTRAINT_CHECK,
|
||||
sqlite3, sqlite3_errmsg, sqlite3_errstr, sqlite3_extended_errcode, SQLITE_CONSTRAINT_CHECK,
|
||||
SQLITE_CONSTRAINT_FOREIGNKEY, SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY,
|
||||
SQLITE_CONSTRAINT_UNIQUE,
|
||||
SQLITE_CONSTRAINT_UNIQUE, SQLITE_ERROR,
|
||||
};
|
||||
|
||||
pub(crate) use sqlx_core::error::*;
|
||||
@@ -18,15 +18,15 @@ pub(crate) use sqlx_core::error::*;
|
||||
#[derive(Debug)]
|
||||
pub struct SqliteError {
|
||||
code: c_int,
|
||||
message: String,
|
||||
message: Cow<'static, str>,
|
||||
}
|
||||
|
||||
impl SqliteError {
|
||||
pub(crate) fn new(handle: *mut sqlite3) -> Self {
|
||||
pub(crate) unsafe fn new(handle: *mut sqlite3) -> Self {
|
||||
Self::try_new(handle).expect("There should be an error")
|
||||
}
|
||||
|
||||
pub(crate) fn try_new(handle: *mut sqlite3) -> Option<Self> {
|
||||
pub(crate) unsafe fn try_new(handle: *mut sqlite3) -> Option<Self> {
|
||||
// returns the extended result code even when extended result codes are disabled
|
||||
let code: c_int = unsafe { sqlite3_extended_errcode(handle) };
|
||||
|
||||
@@ -39,20 +39,44 @@ impl SqliteError {
|
||||
let msg = sqlite3_errmsg(handle);
|
||||
debug_assert!(!msg.is_null());
|
||||
|
||||
from_utf8_unchecked(CStr::from_ptr(msg).to_bytes())
|
||||
str::from_utf8_unchecked(CStr::from_ptr(msg).to_bytes()).to_owned()
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
code,
|
||||
message: message.to_owned(),
|
||||
message: message.into(),
|
||||
})
|
||||
}
|
||||
|
||||
/// For errors during extension load, the error message is supplied via a separate pointer
|
||||
pub(crate) fn extension(handle: *mut sqlite3, error_msg: &CStr) -> Self {
|
||||
let mut err = Self::new(handle);
|
||||
err.message = unsafe { from_utf8_unchecked(error_msg.to_bytes()).to_owned() };
|
||||
err
|
||||
pub(crate) fn with_message(mut self, error_msg: String) -> Self {
|
||||
self.message = error_msg.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn from_code(code: c_int) -> Self {
|
||||
let message = unsafe {
|
||||
let errstr = sqlite3_errstr(code);
|
||||
|
||||
if !errstr.is_null() {
|
||||
// SAFETY: `errstr` is guaranteed to be UTF-8
|
||||
// The lifetime of the string is "internally managed";
|
||||
// the implementation just selects from an array of static strings.
|
||||
// We copy to an owned buffer in case `libsqlite3` is dynamically loaded somehow.
|
||||
Cow::Owned(str::from_utf8_unchecked(CStr::from_ptr(errstr).to_bytes()).into())
|
||||
} else {
|
||||
Cow::Borrowed("<error message unavailable>")
|
||||
}
|
||||
};
|
||||
|
||||
SqliteError { code, message }
|
||||
}
|
||||
|
||||
pub(crate) fn generic(message: impl Into<Cow<'static, str>>) -> Self {
|
||||
Self {
|
||||
code: SQLITE_ERROR,
|
||||
message: message.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ use std::sync::atomic::AtomicBool;
|
||||
|
||||
pub use arguments::{SqliteArgumentValue, SqliteArguments};
|
||||
pub use column::SqliteColumn;
|
||||
pub use connection::serialize::SqliteOwnedBuf;
|
||||
#[cfg(feature = "preupdate-hook")]
|
||||
pub use connection::PreupdateHookResult;
|
||||
pub use connection::{LockedSqliteHandle, SqliteConnection, SqliteOperation, UpdateHookResult};
|
||||
|
||||
@@ -84,8 +84,8 @@ impl StatementHandle {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn last_error(&self) -> SqliteError {
|
||||
SqliteError::new(unsafe { self.db_handle() })
|
||||
pub(crate) fn last_error(&mut self) -> SqliteError {
|
||||
unsafe { SqliteError::new(self.db_handle()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
@@ -185,7 +185,7 @@ fn prepare(
|
||||
};
|
||||
|
||||
if status != SQLITE_OK {
|
||||
return Err(SqliteError::new(conn).into());
|
||||
return Err(unsafe { SqliteError::new(conn).into() });
|
||||
}
|
||||
|
||||
// tail should point to the first byte past the end of the first SQL
|
||||
|
||||
@@ -1,17 +1,33 @@
|
||||
use futures_core::future::BoxFuture;
|
||||
use std::borrow::Cow;
|
||||
|
||||
use crate::{Sqlite, SqliteConnection};
|
||||
use sqlx_core::error::Error;
|
||||
use sqlx_core::transaction::TransactionManager;
|
||||
|
||||
use crate::{Sqlite, SqliteConnection};
|
||||
|
||||
/// Implementation of [`TransactionManager`] for SQLite.
|
||||
pub struct SqliteTransactionManager;
|
||||
|
||||
impl TransactionManager for SqliteTransactionManager {
|
||||
type Database = Sqlite;
|
||||
|
||||
fn begin(conn: &mut SqliteConnection) -> BoxFuture<'_, Result<(), Error>> {
|
||||
Box::pin(conn.worker.begin())
|
||||
fn begin<'conn>(
|
||||
conn: &'conn mut SqliteConnection,
|
||||
statement: Option<Cow<'static, str>>,
|
||||
) -> BoxFuture<'conn, Result<(), Error>> {
|
||||
Box::pin(async {
|
||||
let is_custom_statement = statement.is_some();
|
||||
conn.worker.begin(statement).await?;
|
||||
if is_custom_statement {
|
||||
// Check that custom statement actually put the connection into a transaction.
|
||||
let mut handle = conn.lock_handle().await?;
|
||||
if !handle.in_transaction() {
|
||||
return Err(Error::BeginFailed);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn commit(conn: &mut SqliteConnection) -> BoxFuture<'_, Result<(), Error>> {
|
||||
@@ -25,4 +41,8 @@ impl TransactionManager for SqliteTransactionManager {
|
||||
fn start_rollback(conn: &mut SqliteConnection) {
|
||||
conn.worker.start_rollback().ok();
|
||||
}
|
||||
|
||||
fn get_transaction_depth(conn: &SqliteConnection) -> usize {
|
||||
conn.worker.shared.get_transaction_depth()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user