refactor(sqlite): adapt to the 0.4.x core refactor

* massive (~20x) performance improvement
This commit is contained in:
Ryan Leckey 2020-05-26 02:17:34 -07:00
parent 757a930e21
commit 37a69e0ac3
No known key found for this signature in database
GPG Key ID: BBDFC5595030E7D3
27 changed files with 1690 additions and 1311 deletions

View File

@ -1,58 +1,30 @@
use core::ffi::c_void;
use core::mem;
use std::borrow::Cow;
use std::os::raw::{c_char, c_int};
use libsqlite3_sys::{
sqlite3_bind_blob, sqlite3_bind_double, sqlite3_bind_int, sqlite3_bind_int64,
sqlite3_bind_null, sqlite3_bind_text, SQLITE_OK, SQLITE_TRANSIENT,
};
use atoi::atoi;
use libsqlite3_sys::SQLITE_OK;
use crate::arguments::Arguments;
use crate::encode::{Encode, IsNull};
use crate::sqlite::statement::Statement;
use crate::error::Error;
use crate::sqlite::statement::{SqliteStatement, StatementHandle};
use crate::sqlite::Sqlite;
use crate::sqlite::SqliteError;
use crate::types::Type;
#[derive(Debug, Clone)]
pub enum SqliteArgumentValue {
pub enum SqliteArgumentValue<'q> {
Null,
// TODO: Take by reference to remove the allocation
Text(String),
// TODO: Take by reference to remove the allocation
Blob(Vec<u8>),
Text(Cow<'q, str>),
Blob(Cow<'q, [u8]>),
Double(f64),
Int(i32),
Int64(i64),
}
#[derive(Default)]
pub struct SqliteArguments {
index: usize,
values: Vec<SqliteArgumentValue>,
pub struct SqliteArguments<'q> {
pub(crate) values: Vec<SqliteArgumentValue<'q>>,
}
impl SqliteArguments {
pub(crate) fn next(&mut self) -> Option<SqliteArgumentValue> {
if self.index >= self.values.len() {
return None;
}
let mut value = SqliteArgumentValue::Null;
mem::swap(&mut value, &mut self.values[self.index]);
self.index += 1;
Some(value)
}
}
impl Arguments for SqliteArguments {
impl<'q> Arguments<'q> for SqliteArguments<'q> {
type Database = Sqlite;
fn reserve(&mut self, len: usize, _size_hint: usize) {
@ -61,68 +33,64 @@ impl Arguments for SqliteArguments {
fn add<T>(&mut self, value: T)
where
T: Encode<Self::Database> + Type<Self::Database>,
T: 'q + Encode<'q, Self::Database>,
{
if let IsNull::Yes = value.encode_nullable(&mut self.values) {
if let IsNull::Yes = value.encode(&mut self.values) {
self.values.push(SqliteArgumentValue::Null);
}
}
}
impl SqliteArgumentValue {
pub(super) fn bind(&self, statement: &mut Statement, index: usize) -> crate::Result<()> {
let handle = unsafe {
if let Some(handle) = statement.handle() {
handle
} else {
// drop all requested bindings for a null/empty statement
// note that this _should_ not happen as argument size for a null statement should be zero
return Ok(());
}
};
impl SqliteArguments<'_> {
pub(super) fn bind(&self, statement: &SqliteStatement) -> Result<(), Error> {
let mut arg_i = 0;
for handle in &statement.handles {
let cnt = handle.bind_parameter_count();
for param_i in 0..cnt {
// figure out the index of this bind parameter into our argument tuple
let n: usize = if let Some(name) = handle.bind_parameter_name(param_i) {
if name.starts_with('?') {
// parameter should have the form ?NNN
atoi(name[1..].as_bytes()).expect("parameter of the form ?NNN")
} else {
return Err(err_protocol!("unsupported SQL parameter format: {}", name));
}
} else {
arg_i += 1;
arg_i
};
// TODO: Handle error of trying to bind too many parameters here
let index = index as c_int;
// https://sqlite.org/c3ref/bind_blob.html
let status: c_int = match self {
SqliteArgumentValue::Blob(value) => {
// TODO: Handle bytes that are too large
let bytes = value.as_slice();
let bytes_ptr = bytes.as_ptr() as *const c_void;
let bytes_len = bytes.len() as i32;
unsafe {
sqlite3_bind_blob(handle, index, bytes_ptr, bytes_len, SQLITE_TRANSIENT())
if n > self.values.len() {
return Err(err_protocol!(
"wrong number of parameters, parameter ?{} requested but have only {}",
n,
self.values.len()
));
}
self.values[n - 1].bind(handle, param_i + 1)?;
}
SqliteArgumentValue::Text(value) => {
// TODO: Handle text that is too large
let bytes = value.as_bytes();
let bytes_ptr = bytes.as_ptr() as *const c_char;
let bytes_len = bytes.len() as i32;
unsafe {
sqlite3_bind_text(handle, index, bytes_ptr, bytes_len, SQLITE_TRANSIENT())
}
}
SqliteArgumentValue::Double(value) => unsafe {
sqlite3_bind_double(handle, index, *value)
},
SqliteArgumentValue::Int(value) => unsafe { sqlite3_bind_int(handle, index, *value) },
SqliteArgumentValue::Int64(value) => unsafe {
sqlite3_bind_int64(handle, index, *value)
},
SqliteArgumentValue::Null => unsafe { sqlite3_bind_null(handle, index) },
};
if status != SQLITE_OK {
return Err(SqliteError::from_connection(statement.connection.0.as_ptr()).into());
}
Ok(())
}
}
impl SqliteArgumentValue<'_> {
fn bind(&self, handle: &StatementHandle, i: usize) -> Result<(), Error> {
use SqliteArgumentValue::*;
let status = match self {
Text(v) => handle.bind_text(i, v),
Blob(v) => handle.bind_blob(i, v),
Int(v) => handle.bind_int(i, *v),
Int64(v) => handle.bind_int64(i, *v),
Double(v) => handle.bind_double(i, *v),
Null => handle.bind_null(i),
};
if status != SQLITE_OK {
return Err(handle.last_error().into());
}
Ok(())

View File

@ -1,161 +0,0 @@
use core::ptr::{null, null_mut, NonNull};
use std::collections::HashMap;
use std::convert::TryInto;
use std::ffi::CString;
use futures_core::future::BoxFuture;
use futures_util::future;
use libsqlite3_sys::{
sqlite3, sqlite3_close, sqlite3_extended_result_codes, sqlite3_open_v2, SQLITE_OK,
SQLITE_OPEN_CREATE, SQLITE_OPEN_NOMUTEX, SQLITE_OPEN_READWRITE, SQLITE_OPEN_SHAREDCACHE,
};
use crate::connection::{Connect, Connection};
use crate::executor::Executor;
use crate::sqlite::statement::Statement;
use crate::sqlite::worker::Worker;
use crate::sqlite::SqliteError;
use crate::url::Url;
/// Thin wrapper around [sqlite3] to impl `Send`.
#[derive(Clone, Copy)]
pub(super) struct SqliteConnectionHandle(pub(super) NonNull<sqlite3>);
/// A connection to a [Sqlite](struct.Sqlite.html) database.
pub struct SqliteConnection {
pub(super) handle: SqliteConnectionHandle,
pub(super) worker: Worker,
// Storage of the most recently prepared, non-persistent statement
pub(super) statement: Option<Statement>,
// Storage of persistent statements
pub(super) statements: Vec<Statement>,
pub(super) statement_by_query: HashMap<String, usize>,
}
// A SQLite3 handle is safe to send between threads, provided not more than
// one is accessing it at the same time. This is upheld as long as [SQLITE_CONFIG_MULTITHREAD] is
// enabled and [SQLITE_THREADSAFE] was enabled when sqlite was compiled. We refuse to work
// if these conditions are not upheld.
// <https://www.sqlite.org/c3ref/threadsafe.html>
// <https://www.sqlite.org/c3ref/c_config_covering_index_scan.html#sqliteconfigmultithread>
unsafe impl Send for SqliteConnectionHandle {}
async fn establish(url: Result<Url, url::ParseError>) -> crate::Result<SqliteConnection> {
let mut worker = Worker::new();
// By default, we connect to an in-memory database.
// TODO: Handle the error when there are internal NULs in the database URL
let filename = CString::new(url?.path_decoded().to_string()).unwrap();
let handle = worker
.run(move || -> crate::Result<SqliteConnectionHandle> {
let mut handle = null_mut();
// [SQLITE_OPEN_NOMUTEX] will instruct [sqlite3_open_v2] to return an error if it
// cannot satisfy our wish for a thread-safe, lock-free connection object
let flags = SQLITE_OPEN_READWRITE
| SQLITE_OPEN_CREATE
| SQLITE_OPEN_NOMUTEX
| SQLITE_OPEN_SHAREDCACHE;
// <https://www.sqlite.org/c3ref/open.html>
let status = unsafe { sqlite3_open_v2(filename.as_ptr(), &mut handle, flags, null()) };
if handle.is_null() {
// Failed to allocate memory
panic!("SQLite is unable to allocate memory to hold the sqlite3 object");
}
if status != SQLITE_OK {
// Close the handle if there was an error here
// https://sqlite.org/c3ref/close.html
unsafe {
let _ = sqlite3_close(handle);
}
return Err(SqliteError::from_connection(handle).into());
}
// Enable extended result codes
// https://www.sqlite.org/c3ref/extended_result_codes.html
unsafe {
sqlite3_extended_result_codes(handle, 1);
}
Ok(SqliteConnectionHandle(NonNull::new(handle).unwrap()))
})
.await?;
Ok(SqliteConnection {
worker,
handle,
statement: None,
statements: Vec::with_capacity(10),
statement_by_query: HashMap::with_capacity(10),
})
}
impl SqliteConnection {
#[inline]
pub(super) fn handle(&mut self) -> *mut sqlite3 {
self.handle.0.as_ptr()
}
}
impl Connect for SqliteConnection {
fn connect<T>(url: T) -> BoxFuture<'static, crate::Result<SqliteConnection>>
where
T: TryInto<Url, Error = url::ParseError>,
Self: Sized,
{
let url = url.try_into();
Box::pin(async move {
let mut conn = establish(url).await?;
// https://www.sqlite.org/wal.html
// language=SQLite
conn.execute(
r#"
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
"#,
)
.await?;
Ok(conn)
})
}
}
impl Connection for SqliteConnection {
fn close(self) -> BoxFuture<'static, crate::Result<()>> {
// All necessary behavior is handled on drop
Box::pin(future::ok(()))
}
fn ping(&mut self) -> BoxFuture<crate::Result<()>> {
// For SQLite connections, PING does effectively nothing
Box::pin(future::ok(()))
}
}
impl Drop for SqliteConnection {
fn drop(&mut self) {
// Drop all statements first
self.statements.clear();
drop(self.statement.take());
// Next close the statement
// https://sqlite.org/c3ref/close.html
unsafe {
let _ = sqlite3_close(self.handle());
}
}
}

View File

@ -0,0 +1,81 @@
use std::io;
use std::ptr::{null, null_mut};
use hashbrown::HashMap;
use libsqlite3_sys::{
sqlite3_extended_result_codes, sqlite3_open_v2, SQLITE_OK, SQLITE_OPEN_CREATE,
SQLITE_OPEN_MEMORY, SQLITE_OPEN_NOMUTEX, SQLITE_OPEN_PRIVATECACHE, SQLITE_OPEN_READWRITE,
};
use sqlx_rt::blocking;
use crate::error::Error;
use crate::sqlite::connection::handle::ConnectionHandle;
use crate::sqlite::statement::StatementWorker;
use crate::sqlite::{SqliteConnectOptions, SqliteConnection, SqliteError};
pub(super) async fn establish(options: &SqliteConnectOptions) -> Result<SqliteConnection, Error> {
let mut filename = options
.filename
.to_str()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"filename passed to SQLite must be valid UTF-8",
)
})?
.to_owned();
filename.push('\0');
// By default, we connect to an in-memory database.
// [SQLITE_OPEN_NOMUTEX] will instruct [sqlite3_open_v2] to return an error if it
// cannot satisfy our wish for a thread-safe, lock-free connection object
let mut flags =
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX | SQLITE_OPEN_PRIVATECACHE;
if options.in_memory {
flags |= SQLITE_OPEN_MEMORY;
}
let handle = blocking!({
let mut handle = null_mut();
// <https://www.sqlite.org/c3ref/open.html>
let status = unsafe {
sqlite3_open_v2(
filename.as_bytes().as_ptr() as *const _,
&mut handle,
flags,
null(),
)
};
if handle.is_null() {
// Failed to allocate memory
panic!("SQLite is unable to allocate memory to hold the sqlite3 object");
}
// SAFE: tested for NULL just above
let handle = unsafe { ConnectionHandle::new(handle) };
if status != SQLITE_OK {
return Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr()))));
}
// Enable extended result codes
// https://www.sqlite.org/c3ref/extended_result_codes.html
unsafe {
sqlite3_extended_result_codes(handle.0.as_ptr(), 1);
}
Ok(handle)
})?;
Ok(SqliteConnection {
handle,
worker: StatementWorker::new(),
statements: HashMap::new(),
statement: None,
scratch_row_column_names: Default::default(),
})
}

View File

@ -0,0 +1,203 @@
use std::sync::Arc;
use async_stream::try_stream;
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::TryStreamExt;
use hashbrown::HashMap;
use crate::describe::{Column, Describe};
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::ext::ustr::UStr;
use crate::sqlite::connection::ConnectionHandle;
use crate::sqlite::statement::{SqliteStatement, StatementHandle};
use crate::sqlite::{Sqlite, SqliteArguments, SqliteConnection, SqliteRow};
fn prepare<'a>(
conn: &mut ConnectionHandle,
statements: &'a mut HashMap<String, SqliteStatement>,
statement: &'a mut Option<SqliteStatement>,
query: &str,
persistent: bool,
) -> Result<&'a mut SqliteStatement, Error> {
if !persistent {
*statement = Some(SqliteStatement::prepare(conn, query, false)?);
return Ok(statement.as_mut().unwrap());
}
if !statements.contains_key(query) {
let statement = SqliteStatement::prepare(conn, query, false)?;
statements.insert(query.to_owned(), statement);
}
let statement = statements.get_mut(query).unwrap();
// as this statement has been executed before, we reset before continuing
// this also causes any rows that are from the statement to be inflated
statement.reset();
Ok(statement)
}
fn bind(
statement: &mut SqliteStatement,
arguments: Option<SqliteArguments<'_>>,
) -> Result<(), Error> {
if let Some(arguments) = arguments {
arguments.bind(&*statement)?;
}
Ok(())
}
fn emplace_row_metadata(
statement: &StatementHandle,
column_names: &mut HashMap<UStr, usize>,
) -> Result<(), Error> {
column_names.clear();
let num = statement.column_count();
column_names.reserve(num);
for i in 0..num {
let name: UStr = statement.column_name(i).to_owned().into();
column_names.insert(name, i);
}
Ok(())
}
impl<'c> Executor<'c> for &'c mut SqliteConnection {
type Database = Sqlite;
fn fetch_many<'q: 'c, E>(
self,
mut query: E,
) -> BoxStream<'c, Result<Either<u64, SqliteRow>, Error>>
where
E: Execute<'q, Self::Database>,
{
let s = query.query();
let arguments = query.take_arguments();
Box::pin(try_stream! {
let SqliteConnection {
handle: ref mut conn,
ref mut statements,
ref mut statement,
ref worker,
ref mut scratch_row_column_names,
..
} = self;
// prepare statement object (or checkout from cache)
let mut stmt = prepare(conn, statements, statement, s, arguments.is_some())?;
// bind arguments, if any, to the statement
bind(&mut stmt, arguments)?;
{
// let SqliteStatement { ref handles, ref mut last_row_values, .. } = *stmt;
// let mut handle_i = 0;
while let Some((handle, last_row_values)) = stmt.execute()? {
// tell the worker about the new statement
worker.execute(handle);
// wake up the worker if needed
// the worker parks its thread on async-std when not in use
worker.wake();
emplace_row_metadata(
handle,
Arc::make_mut(scratch_row_column_names),
)?;
loop {
// save the rows from the _current_ position on the statement
// and send them to the still-live row object
SqliteRow::inflate_if_needed(handle, last_row_values.take());
match worker.step(handle).await? {
Either::Left(changes) => {
let v = Either::Left(changes);
yield v;
break;
}
Either::Right(()) => {
let (row, weak_values_ref) = SqliteRow::current(
*handle,
scratch_row_column_names
);
let v = Either::Right(row);
*last_row_values = Some(weak_values_ref);
yield v;
}
}
}
}
}
})
}
fn fetch_optional<'q: 'c, E>(self, query: E) -> BoxFuture<'c, Result<Option<SqliteRow>, Error>>
where
E: Execute<'q, Self::Database>,
{
let mut s = self.fetch_many(query);
Box::pin(async move {
while let Some(v) = s.try_next().await? {
if let Either::Right(r) = v {
return Ok(Some(r));
}
}
Ok(None)
})
}
#[doc(hidden)]
fn describe<'q: 'c, E>(self, query: E) -> BoxFuture<'c, Result<Describe<Sqlite>, Error>>
where
E: Execute<'q, Self::Database>,
{
let query = query.query();
let statement = SqliteStatement::prepare(&mut self.handle, query, false);
Box::pin(async move {
let mut params = Vec::new();
let mut columns = Vec::new();
if let Some(statement) = statement?.handles.get(0) {
// NOTE: we can infer *nothing* about parameters apart from the count
params.resize(statement.bind_parameter_count(), None);
let num_columns = statement.column_count();
columns.reserve(num_columns);
for i in 0..num_columns {
let name = statement.column_name(i).to_owned().into();
let type_info = statement.column_decltype(i);
let not_null = statement.column_not_null(i)?;
columns.push(Column {
name,
type_info,
not_null,
})
}
}
Ok(Describe { params, columns })
})
}
}

View File

@ -0,0 +1,47 @@
use std::ptr::NonNull;
use libsqlite3_sys::{sqlite3, sqlite3_close, SQLITE_OK};
use crate::sqlite::SqliteError;
/// Managed handle to the raw SQLite3 database handle.
/// The database handle will be closed when this is dropped.
#[derive(Debug)]
pub(crate) struct ConnectionHandle(pub(super) NonNull<sqlite3>);
// A SQLite3 handle is safe to send between threads, provided not more than
// one is accessing it at the same time. This is upheld as long as [SQLITE_CONFIG_MULTITHREAD] is
// enabled and [SQLITE_THREADSAFE] was enabled when sqlite was compiled. We refuse to work
// if these conditions are not upheld.
// <https://www.sqlite.org/c3ref/threadsafe.html>
// <https://www.sqlite.org/c3ref/c_config_covering_index_scan.html#sqliteconfigmultithread>
unsafe impl Send for ConnectionHandle {}
impl ConnectionHandle {
#[inline]
pub(super) unsafe fn new(ptr: *mut sqlite3) -> Self {
Self(NonNull::new_unchecked(ptr))
}
#[inline]
pub(crate) fn as_ptr(&self) -> *mut sqlite3 {
self.0.as_ptr()
}
}
impl Drop for ConnectionHandle {
fn drop(&mut self) {
unsafe {
// https://sqlite.org/c3ref/close.html
let status = sqlite3_close(self.0.as_ptr());
if status != SQLITE_OK {
// this should *only* happen due to an internal bug in SQLite where we left
// SQLite handles open
panic!("{}", SqliteError::new(self.0.as_ptr()));
}
}
}
}

View File

@ -0,0 +1,81 @@
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use futures_core::future::BoxFuture;
use futures_util::future;
use hashbrown::HashMap;
use crate::connection::{Connect, Connection};
use crate::error::Error;
use crate::ext::ustr::UStr;
use crate::sqlite::connection::establish::establish;
use crate::sqlite::statement::{SqliteStatement, StatementWorker};
use crate::sqlite::{Sqlite, SqliteConnectOptions};
mod establish;
mod executor;
mod handle;
pub(crate) use handle::ConnectionHandle;
/// A connection to a [Sqlite] database.
pub struct SqliteConnection {
pub(crate) handle: ConnectionHandle,
pub(crate) worker: StatementWorker,
// cache of semi-persistent statements
pub(crate) statements: HashMap<String, SqliteStatement>,
// most recent non-persistent statement
pub(crate) statement: Option<SqliteStatement>,
// working memory for the active row's column information
scratch_row_column_names: Arc<HashMap<UStr, usize>>,
}
impl Debug for SqliteConnection {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("SqliteConnection").finish()
}
}
impl Connection for SqliteConnection {
type Database = Sqlite;
fn close(self) -> BoxFuture<'static, Result<(), Error>> {
// nothing explicit to do; connection will close in drop
Box::pin(future::ok(()))
}
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
// For SQLite connections, PING does effectively nothing
Box::pin(future::ok(()))
}
}
impl Connect for SqliteConnection {
type Options = SqliteConnectOptions;
#[inline]
fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result<Self, Error>> {
Box::pin(async move {
let conn = establish(options).await?;
// TODO: Apply any connection options once we have them defined
Ok(conn)
})
}
}
impl Drop for SqliteConnection {
fn drop(&mut self) {
// before the connection handle is dropped,
// we must explicitly drop the statements as the drop-order in a struct is undefined
self.statements.clear();
self.statement.take();
// we then explicitly close the worker
self.worker.close();
}
}

View File

@ -1,99 +0,0 @@
use futures_core::future::BoxFuture;
use crate::connection::ConnectionSource;
use crate::cursor::Cursor;
use crate::executor::Execute;
use crate::pool::Pool;
use crate::sqlite::statement::Step;
use crate::sqlite::{Sqlite, SqliteArguments, SqliteConnection, SqliteRow};
pub struct SqliteCursor<'c, 'q> {
pub(super) source: ConnectionSource<'c, SqliteConnection>,
query: &'q str,
arguments: Option<SqliteArguments>,
pub(super) statement: Option<Option<usize>>,
}
impl crate::cursor::private::Sealed for SqliteCursor<'_, '_> {}
impl<'c, 'q> Cursor<'c, 'q> for SqliteCursor<'c, 'q> {
type Database = Sqlite;
#[doc(hidden)]
fn from_pool<E>(pool: &Pool<SqliteConnection>, query: E) -> Self
where
Self: Sized,
E: Execute<'q, Sqlite>,
{
let (query, arguments) = query.into_parts();
Self {
source: ConnectionSource::Pool(pool.clone()),
statement: None,
query,
arguments,
}
}
#[doc(hidden)]
fn from_connection<E>(conn: &'c mut SqliteConnection, query: E) -> Self
where
Self: Sized,
E: Execute<'q, Sqlite>,
{
let (query, arguments) = query.into_parts();
Self {
source: ConnectionSource::ConnectionRef(conn),
statement: None,
query,
arguments,
}
}
fn next(&mut self) -> BoxFuture<crate::Result<Option<SqliteRow<'_>>>> {
Box::pin(next(self))
}
}
async fn next<'a, 'c: 'a, 'q: 'a>(
cursor: &'a mut SqliteCursor<'c, 'q>,
) -> crate::Result<Option<SqliteRow<'a>>> {
let conn = cursor.source.resolve().await?;
loop {
if cursor.statement.is_none() {
let key = conn.prepare(&mut cursor.query, cursor.arguments.is_some())?;
if let Some(arguments) = &mut cursor.arguments {
conn.statement_mut(key).bind(arguments)?;
}
cursor.statement = Some(key);
}
let key = cursor.statement.unwrap();
let statement = conn.statement_mut(key);
let step = statement.step().await?;
match step {
Step::Row => {
return Ok(Some(SqliteRow {
values: statement.data_count(),
statement: key,
connection: conn,
}));
}
Step::Done if cursor.query.is_empty() => {
return Ok(None);
}
Step::Done => {
cursor.statement = None;
// continue
}
}
}
}

View File

@ -1,45 +1,33 @@
use crate::cursor::HasCursor;
use crate::database::Database;
use crate::row::HasRow;
use crate::sqlite::error::SqliteError;
use crate::database::{Database, HasArguments, HasValueRef};
use crate::sqlite::{
SqliteArgumentValue, SqliteArguments, SqliteConnection, SqliteCursor, SqliteRow,
SqliteTypeInfo, SqliteValue,
SqliteArgumentValue, SqliteArguments, SqliteConnection, SqliteRow, SqliteTypeInfo, SqliteValue,
SqliteValueRef,
};
use crate::value::HasRawValue;
/// **Sqlite** database driver.
/// Sqlite database driver.
#[derive(Debug)]
pub struct Sqlite;
impl Database for Sqlite {
type Connection = SqliteConnection;
type Arguments = SqliteArguments;
type Row = SqliteRow;
type TypeInfo = SqliteTypeInfo;
type TableId = String;
type RawBuffer = Vec<SqliteArgumentValue>;
type Error = SqliteError;
type Value = SqliteValue;
}
impl<'c> HasRow<'c> for Sqlite {
impl<'r> HasValueRef<'r> for Sqlite {
type Database = Sqlite;
type Row = SqliteRow<'c>;
type ValueRef = SqliteValueRef<'r>;
}
impl<'c, 'q> HasCursor<'c, 'q> for Sqlite {
impl<'q> HasArguments<'q> for Sqlite {
type Database = Sqlite;
type Cursor = SqliteCursor<'c, 'q>;
}
type Arguments = SqliteArguments<'q>;
impl<'c> HasRawValue<'c> for Sqlite {
type Database = Sqlite;
type RawValue = SqliteValue<'c>;
type ArgumentBuffer = Vec<SqliteArgumentValue<'q>>;
}

View File

@ -1,95 +1,53 @@
use crate::error::DatabaseError;
use bitflags::_core::str::from_utf8_unchecked;
use libsqlite3_sys::{sqlite3, sqlite3_errmsg, sqlite3_extended_errcode};
use std::error::Error as StdError;
use std::ffi::CStr;
use std::fmt::{self, Display};
use std::fmt::{self, Display, Formatter};
use std::os::raw::c_int;
use std::str::from_utf8_unchecked;
#[derive(Debug)]
pub struct SqliteError {
code: String,
message: String,
}
use libsqlite3_sys::{sqlite3, sqlite3_errmsg, sqlite3_extended_errcode};
use crate::error::DatabaseError;
// Error Codes And Messages
// https://www.sqlite.org/c3ref/errcode.html
#[derive(Debug)]
pub struct SqliteError {
code: c_int,
message: String,
}
impl SqliteError {
pub(super) fn from_connection(conn: *mut sqlite3) -> Self {
let code: c_int = unsafe { sqlite3_extended_errcode(conn) };
pub(crate) fn new(handle: *mut sqlite3) -> Self {
// returns the extended result code even when extended result codes are disabled
let code: c_int = unsafe { sqlite3_extended_errcode(handle) };
// return English-language text that describes the error
let message = unsafe {
let err = sqlite3_errmsg(conn);
debug_assert!(!err.is_null());
let msg = sqlite3_errmsg(handle);
debug_assert!(!msg.is_null());
from_utf8_unchecked(CStr::from_ptr(err).to_bytes())
from_utf8_unchecked(CStr::from_ptr(msg).to_bytes())
};
Self {
code: code.to_string(),
code,
message: message.to_owned(),
}
}
}
impl Display for SqliteError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad(self.message())
}
}
impl DatabaseError for SqliteError {
fn message(&self) -> &str {
&self.message
}
fn code(&self) -> Option<&str> {
Some(&self.code)
}
fn as_ref_err(&self) -> &(dyn StdError + Send + Sync + 'static) {
self
}
fn as_mut_err(&mut self) -> &mut (dyn StdError + Send + Sync + 'static) {
self
}
fn into_box_err(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static> {
self
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.pad(&self.message)
}
}
impl StdError for SqliteError {}
impl From<SqliteError> for crate::Error {
fn from(err: SqliteError) -> Self {
crate::Error::Database(Box::new(err))
impl DatabaseError for SqliteError {
#[inline]
fn message(&self) -> &str {
&self.message
}
}
#[test]
fn test_error_downcasting() {
let error = SqliteError {
code: "SQLITE_ERR_SOMETHING".into(),
message: "Some hypothetical error message".into(),
};
let error = crate::Error::from(error);
let db_err = match error {
crate::Error::Database(db_err) => db_err,
e => panic!("expected Error::Database, got {:?}", e),
};
assert_eq!(
&db_err.downcast_ref::<SqliteError>().code,
"SQLITE_ERR_SOMETHING"
);
assert_eq!(
db_err.downcast::<SqliteError>().code,
"SQLITE_ERR_SOMETHING"
);
}

View File

@ -1,193 +0,0 @@
use futures_core::future::BoxFuture;
use libsqlite3_sys::sqlite3_changes;
use crate::cursor::Cursor;
use crate::describe::{Column, Describe};
use crate::executor::{Execute, Executor, RefExecutor};
use crate::sqlite::cursor::SqliteCursor;
use crate::sqlite::statement::{Statement, Step};
use crate::sqlite::type_info::SqliteType;
use crate::sqlite::{Sqlite, SqliteConnection, SqliteTypeInfo};
impl SqliteConnection {
pub(super) fn prepare(
&mut self,
query: &mut &str,
persistent: bool,
) -> crate::Result<Option<usize>> {
// TODO: Revisit statement caching and allow cache expiration by using a
// generational index
if !persistent {
// A non-persistent query will be immediately prepared and returned,
// regardless of the current state of the cache
self.statement = Some(Statement::new(self, query, false)?);
return Ok(None);
}
if let Some(key) = self.statement_by_query.get(&**query) {
let statement = &mut self.statements[*key];
// Adjust the passed in query string as if [string3_prepare]
// did the tail parsing
*query = &query[statement.tail..];
// As this statement has very likely been used before, we reset
// it to clear the bindings and its program state
statement.reset();
return Ok(Some(*key));
}
// Prepare a new statement object; ensuring to tell SQLite that this will be stored
// for a "long" time and re-used multiple times
let query_key = query.to_owned();
let statement = Statement::new(self, query, true)?;
let key = self.statements.len();
self.statement_by_query.insert(query_key, key);
self.statements.push(statement);
Ok(Some(key))
}
// This is used for [affected_rows] in the public API.
fn changes(&mut self) -> u64 {
// Returns the number of rows modified, inserted or deleted by the most recently
// completed INSERT, UPDATE or DELETE statement.
// https://www.sqlite.org/c3ref/changes.html
let changes = unsafe { sqlite3_changes(self.handle()) };
changes as u64
}
#[inline]
pub(super) fn statement(&self, key: Option<usize>) -> &Statement {
match key {
Some(key) => &self.statements[key],
None => self.statement.as_ref().unwrap(),
}
}
#[inline]
pub(super) fn statement_mut(&mut self, key: Option<usize>) -> &mut Statement {
match key {
Some(key) => &mut self.statements[key],
None => self.statement.as_mut().unwrap(),
}
}
}
impl Executor for SqliteConnection {
type Database = Sqlite;
fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>(
&'c mut self,
query: E,
) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
log_execution!(query, {
let (mut query, mut arguments) = query.into_parts();
Box::pin(async move {
loop {
let key = self.prepare(&mut query, arguments.is_some())?;
let statement = self.statement_mut(key);
if let Some(arguments) = &mut arguments {
statement.bind(arguments)?;
}
while let Step::Row = statement.step().await? {
// We only care about the rows modified; ignore
}
if query.is_empty() {
break;
}
}
Ok(self.changes())
})
})
}
fn fetch<'q, E>(&mut self, query: E) -> SqliteCursor<'_, 'q>
where
E: Execute<'q, Self::Database>,
{
log_execution!(query, { SqliteCursor::from_connection(self, query) })
}
#[doc(hidden)]
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, Self::Database>,
{
Box::pin(async move {
let (mut query, _) = query.into_parts();
let key = self.prepare(&mut query, false)?;
let statement = self.statement_mut(key);
// First let's attempt to describe what we can about parameter types
// Which happens to just be the count, heh
let num_params = statement.params();
let params = vec![None; num_params].into_boxed_slice();
// Next, collect (return) column types and names
let num_columns = statement.column_count();
let mut columns = Vec::with_capacity(num_columns);
for i in 0..num_columns {
let name = statement.column_name(i).to_owned();
let decl = statement.column_decltype(i);
let r#type = match decl {
None => None,
Some(decl) => match &*decl.to_ascii_lowercase() {
"bool" | "boolean" => Some(SqliteType::Boolean),
"clob" | "text" => Some(SqliteType::Text),
"blob" => Some(SqliteType::Blob),
"real" | "double" | "double precision" | "float" => Some(SqliteType::Float),
decl @ _ if decl.contains("int") => Some(SqliteType::Integer),
decl @ _ if decl.contains("char") => Some(SqliteType::Text),
_ => None,
},
};
columns.push(Column {
name: Some(name.into()),
non_null: statement.column_not_null(i)?,
table_id: None,
type_info: r#type.map(|r#type| SqliteTypeInfo {
r#type,
affinity: None,
}),
})
}
Ok(Describe {
param_types: params,
result_columns: columns.into_boxed_slice(),
})
})
}
}
impl<'e> RefExecutor<'e> for &'e mut SqliteConnection {
type Database = Sqlite;
fn fetch_by_ref<'q, E>(self, query: E) -> SqliteCursor<'e, 'q>
where
E: Execute<'q, Self::Database>,
{
log_execution!(query, { SqliteCursor::from_connection(self, query) })
}
}

View File

@ -1,4 +1,4 @@
//! **SQLite** database and connection types.
//! **SQLite** database driver.
// SQLite is a C library. All interactions require FFI which is unsafe.
// All unsafe blocks should have comments pointing to SQLite docs and ensuring that we maintain
@ -7,30 +7,23 @@
mod arguments;
mod connection;
mod cursor;
mod database;
mod error;
mod executor;
mod options;
mod row;
mod statement;
mod type_info;
pub mod types;
mod value;
mod worker;
pub use arguments::{SqliteArgumentValue, SqliteArguments};
pub use connection::SqliteConnection;
pub use cursor::SqliteCursor;
pub use database::Sqlite;
pub use error::SqliteError;
pub use options::SqliteConnectOptions;
pub use row::SqliteRow;
pub use type_info::SqliteTypeInfo;
pub use value::SqliteValue;
pub use value::{SqliteValue, SqliteValueRef};
/// An alias for [`Pool`][crate::pool::Pool], specialized for **Sqlite**.
#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
/// An alias for [`Pool`][crate::pool::Pool], specialized for SQLite.
pub type SqlitePool = crate::pool::Pool<SqliteConnection>;
make_query_as!(SqliteQueryAs, Sqlite, SqliteRow);
impl_map_row_for_row!(Sqlite, SqliteRow);
impl_from_row_for_tuples!(Sqlite, SqliteRow);

View File

@ -0,0 +1,46 @@
use std::path::PathBuf;
use std::str::FromStr;
use crate::error::BoxDynError;
// TODO: Look at go-sqlite for option ideas
// TODO: journal_mode
/// Options and flags which can be used to configure a SQLite connection.
pub struct SqliteConnectOptions {
pub(crate) filename: PathBuf,
pub(crate) in_memory: bool,
}
impl SqliteConnectOptions {
pub fn new() -> Self {
Self {
filename: PathBuf::from(":memory:"),
in_memory: false,
}
}
}
impl FromStr for SqliteConnectOptions {
type Err = BoxDynError;
fn from_str(mut s: &str) -> Result<Self, Self::Err> {
let mut options = Self {
filename: PathBuf::new(),
in_memory: false,
};
// remove scheme
s = s
.trim_start_matches("sqlite://")
.trim_start_matches("sqlite:");
if s == ":memory:" {
options.in_memory = true;
} else {
options.filename = s.parse()?;
}
Ok(options)
}
}

View File

@ -1,68 +1,137 @@
use crate::row::{ColumnIndex, Row};
use crate::sqlite::statement::Statement;
use crate::sqlite::value::SqliteValue;
use crate::sqlite::{Sqlite, SqliteConnection};
use std::ptr::null_mut;
use std::slice;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::{Arc, Weak};
pub struct SqliteRow<'c> {
pub(super) values: usize,
pub(super) statement: Option<usize>,
pub(super) connection: &'c SqliteConnection,
use hashbrown::HashMap;
use crate::error::Error;
use crate::ext::ustr::UStr;
use crate::row::{ColumnIndex, Row};
use crate::sqlite::statement::StatementHandle;
use crate::sqlite::{Sqlite, SqliteValue, SqliteValueRef};
/// Implementation of [`Row`] for SQLite.
pub struct SqliteRow {
// Raw handle of the SQLite statement
// This is valid to access IFF the atomic [values] is null
// The way this works is that the executor retains a weak reference to
// [values] after the Row is created and yielded downstream.
// IF the user drops the Row before iterating the stream (so
// nearly all of our internal stream iterators), the executor moves on; otherwise,
// it actually inflates this row with a list of owned sqlite3 values.
pub(crate) statement: StatementHandle,
pub(crate) values: Arc<AtomicPtr<SqliteValue>>,
pub(crate) num_values: usize,
pub(crate) column_names: Arc<HashMap<UStr, usize>>,
}
impl crate::row::private_row::Sealed for SqliteRow<'_> {}
impl crate::row::private_row::Sealed for SqliteRow {}
// Accessing values from the statement object is
// safe across threads as long as we don't call [sqlite3_step]
// That should not be possible as long as an immutable borrow is held on the connection
unsafe impl Send for SqliteRow<'_> {}
unsafe impl Sync for SqliteRow<'_> {}
// we block ourselves from doing that by only exposing
// a set interface on [StatementHandle]
impl<'c> SqliteRow<'c> {
#[inline]
fn statement(&self) -> &'c Statement {
self.connection.statement(self.statement)
}
}
unsafe impl Send for SqliteRow {}
unsafe impl Sync for SqliteRow {}
impl<'c> Row<'c> for SqliteRow<'c> {
type Database = Sqlite;
impl SqliteRow {
// creates a new row that is internally referencing the **current** state of the statement
// returns a weak reference to an atomic list where the executor should inflate if its going
// to increment the statement with [step]
pub(crate) fn current(
statement: StatementHandle,
column_names: &Arc<HashMap<UStr, usize>>,
) -> (Self, Weak<AtomicPtr<SqliteValue>>) {
let values = Arc::new(AtomicPtr::new(null_mut()));
let weak_values = Arc::downgrade(&values);
let size = statement.column_count();
#[inline]
fn len(&self) -> usize {
self.values
let row = Self {
statement,
values,
num_values: size,
column_names: Arc::clone(column_names),
};
(row, weak_values)
}
#[doc(hidden)]
fn try_get_raw<I>(&self, index: I) -> crate::Result<SqliteValue<'c>>
where
I: ColumnIndex<'c, Self>,
{
Ok(SqliteValue {
statement: self.statement(),
index: index.index(self)? as i32,
})
}
}
// inflates this Row into memory as a list of owned, protected SQLite value objects
// this is called by the
pub(crate) fn inflate(statement: &StatementHandle, values_ref: &AtomicPtr<SqliteValue>) {
let size = statement.column_count();
let mut values = Vec::with_capacity(size);
impl<'c> ColumnIndex<'c, SqliteRow<'c>> for usize {
fn index(&self, row: &SqliteRow<'c>) -> crate::Result<usize> {
let len = Row::len(row);
if *self >= len {
return Err(crate::Error::ColumnIndexOutOfBounds { len, index: *self });
for i in 0..size {
values.push(statement.column_value(i));
}
Ok(*self)
// decay the array signifier and become just a normal, leaked array
let values_ptr = Box::into_raw(values.into_boxed_slice()) as *mut SqliteValue;
// store in the atomic ptr storage
values_ref.store(values_ptr, Ordering::Release);
}
pub(crate) fn inflate_if_needed(
statement: &StatementHandle,
weak_values_ref: Option<Weak<AtomicPtr<SqliteValue>>>,
) {
if let Some(v) = weak_values_ref.and_then(|v| v.upgrade()) {
SqliteRow::inflate(statement, &v);
}
}
}
impl<'c> ColumnIndex<'c, SqliteRow<'c>> for str {
fn index(&self, row: &SqliteRow<'c>) -> crate::Result<usize> {
row.statement()
.columns
.get(self)
.ok_or_else(|| crate::Error::ColumnNotFound((*self).into()))
.map(|&index| index as usize)
impl Row for SqliteRow {
type Database = Sqlite;
fn len(&self) -> usize {
self.num_values
}
fn try_get_raw<I>(&self, index: I) -> Result<SqliteValueRef<'_>, Error>
where
I: ColumnIndex<Self>,
{
let index = index.index(self)?;
let values_ptr = self.values.load(Ordering::Acquire);
if !values_ptr.is_null() {
// we have raw value data, we should use that
let values: &[SqliteValue] =
unsafe { slice::from_raw_parts(values_ptr, self.num_values) };
Ok(SqliteValueRef::value(&values[index]))
} else {
Ok(SqliteValueRef::statement(&self.statement, index))
}
}
}
impl Drop for SqliteRow {
fn drop(&mut self) {
// if there is a non-null pointer stored here, we need to re-load and drop it
let values_ptr = self.values.load(Ordering::Acquire);
if !values_ptr.is_null() {
let values: &mut [SqliteValue] =
unsafe { slice::from_raw_parts_mut(values_ptr, self.num_values) };
let _ = unsafe { Box::from_raw(values) };
}
}
}
impl ColumnIndex<SqliteRow> for &'_ str {
fn index(&self, row: &SqliteRow) -> Result<usize, Error> {
row.column_names
.get(*self)
.ok_or_else(|| Error::ColumnNotFound((*self).into()))
.map(|v| *v)
}
}

View File

@ -1,305 +0,0 @@
#![allow(unsafe_code)]
use core::ptr::{null, null_mut, NonNull};
use std::collections::HashMap;
use std::ffi::CStr;
use std::os::raw::{c_char, c_int};
use std::ptr;
use libsqlite3_sys::{
sqlite3_bind_parameter_count, sqlite3_clear_bindings, sqlite3_column_count,
sqlite3_column_database_name, sqlite3_column_decltype, sqlite3_column_name,
sqlite3_column_origin_name, sqlite3_column_table_name, sqlite3_data_count, sqlite3_finalize,
sqlite3_prepare_v3, sqlite3_reset, sqlite3_step, sqlite3_stmt, sqlite3_table_column_metadata,
SQLITE_DONE, SQLITE_OK, SQLITE_PREPARE_NO_VTAB, SQLITE_PREPARE_PERSISTENT, SQLITE_ROW,
};
use crate::sqlite::connection::SqliteConnectionHandle;
use crate::sqlite::worker::Worker;
use crate::sqlite::SqliteError;
use crate::sqlite::{SqliteArguments, SqliteConnection};
/// Return values from [SqliteStatement::step].
pub(super) enum Step {
/// The statement has finished executing successfully.
Done,
/// Another row of output is available.
Row,
}
/// Thin wrapper around [sqlite3_stmt] to impl `Send`.
#[derive(Clone, Copy)]
pub(super) struct SqliteStatementHandle(NonNull<sqlite3_stmt>);
/// Represents a _single_ SQL statement that has been compiled into binary
/// form and is ready to be evaluated.
///
/// The statement is finalized ( `sqlite3_finalize` ) on drop.
pub(super) struct Statement {
handle: Option<SqliteStatementHandle>,
pub(super) connection: SqliteConnectionHandle,
pub(super) worker: Worker,
pub(super) tail: usize,
pub(super) columns: HashMap<String, usize>,
}
// SQLite3 statement objects are safe to send between threads, but *not* safe
// for general-purpose concurrent access between threads. See more notes
// on [SqliteConnectionHandle].
unsafe impl Send for SqliteStatementHandle {}
impl Statement {
pub(super) fn new(
conn: &mut SqliteConnection,
query: &mut &str,
persistent: bool,
) -> crate::Result<Self> {
// TODO: Error on queries that are too large
let query_ptr = query.as_bytes().as_ptr() as *const c_char;
let query_len = query.len() as i32;
let mut statement_handle: *mut sqlite3_stmt = null_mut();
let mut flags = SQLITE_PREPARE_NO_VTAB;
let mut tail: *const c_char = null();
if persistent {
// SQLITE_PREPARE_PERSISTENT
// The SQLITE_PREPARE_PERSISTENT flag is a hint to the query
// planner that the prepared statement will be retained for a long time
// and probably reused many times.
flags |= SQLITE_PREPARE_PERSISTENT;
}
// <https://www.sqlite.org/c3ref/prepare.html>
let status = unsafe {
sqlite3_prepare_v3(
conn.handle(),
query_ptr,
query_len,
flags as u32,
&mut statement_handle,
&mut tail,
)
};
if status != SQLITE_OK {
return Err(SqliteError::from_connection(conn.handle()).into());
}
// If pzTail is not NULL then *pzTail is made to point to the first byte
// past the end of the first SQL statement in zSql.
let tail = (tail as usize) - (query_ptr as usize);
*query = &query[tail..].trim();
let mut self_ = Self {
worker: conn.worker.clone(),
connection: conn.handle,
handle: NonNull::new(statement_handle).map(SqliteStatementHandle),
columns: HashMap::new(),
tail,
};
// Prepare a column hash map for use in pulling values from a column by name
let count = self_.column_count();
self_.columns.reserve(count);
for i in 0..count {
let name = self_.column_name(i).to_owned();
self_.columns.insert(name, i);
}
Ok(self_)
}
/// Returns a pointer to the raw C pointer backing this statement.
#[inline]
pub(super) unsafe fn handle(&self) -> Option<*mut sqlite3_stmt> {
self.handle.map(|handle| handle.0.as_ptr())
}
pub(super) fn data_count(&mut self) -> usize {
// https://sqlite.org/c3ref/data_count.html
// The sqlite3_data_count(P) interface returns the number of columns
// in the current row of the result set.
// The value is correct only if there was a recent call to
// sqlite3_step that returned SQLITE_ROW.
unsafe { self.handle().map_or(0, |handle| sqlite3_data_count(handle)) as usize }
}
pub(super) fn column_count(&mut self) -> usize {
// https://sqlite.org/c3ref/column_count.html
unsafe {
self.handle()
.map_or(0, |handle| sqlite3_column_count(handle)) as usize
}
}
pub(super) fn column_name(&mut self, index: usize) -> &str {
unsafe {
self.handle()
.map(|handle| {
// https://sqlite.org/c3ref/column_name.html
let ptr = sqlite3_column_name(handle, index as c_int);
debug_assert!(!ptr.is_null());
CStr::from_ptr(ptr)
})
.map_or(Ok(""), |name| name.to_str())
.unwrap()
}
}
pub(super) fn column_decltype(&mut self, index: usize) -> Option<&str> {
unsafe {
self.handle()
.and_then(|handle| {
let ptr = sqlite3_column_decltype(handle, index as c_int);
if ptr.is_null() {
None
} else {
Some(CStr::from_ptr(ptr))
}
})
.map(|name| name.to_str().unwrap())
}
}
pub(super) fn column_not_null(&mut self, index: usize) -> crate::Result<Option<bool>> {
let handle = unsafe {
if let Some(handle) = self.handle() {
handle
} else {
// we do not know the nullablility of a column that doesn't exist on a statement
// that doesn't exist
return Ok(None);
}
};
unsafe {
// https://sqlite.org/c3ref/column_database_name.html
//
// ### Note
// The returned string is valid until the prepared statement is destroyed using
// sqlite3_finalize() or until the statement is automatically reprepared by the
// first call to sqlite3_step() for a particular run or until the same information
// is requested again in a different encoding.
let db_name = sqlite3_column_database_name(handle, index as c_int);
let table_name = sqlite3_column_table_name(handle, index as c_int);
let origin_name = sqlite3_column_origin_name(handle, index as c_int);
if db_name.is_null() || table_name.is_null() || origin_name.is_null() {
return Ok(None);
}
let mut not_null: c_int = 0;
// https://sqlite.org/c3ref/table_column_metadata.html
let status = sqlite3_table_column_metadata(
self.connection.0.as_ptr(),
db_name,
table_name,
origin_name,
// function docs state to provide NULL for return values you don't care about
ptr::null_mut(),
ptr::null_mut(),
&mut not_null,
ptr::null_mut(),
ptr::null_mut(),
);
if status != SQLITE_OK {
// implementation note: the docs for sqlite3_table_column_metadata() specify
// that an error can be returned if the column came from a view; however,
// experimentally we found that the above functions give us the true origin
// for columns in views that came from real tables and so we should never hit this
// error; for view columns that are expressions we are given NULL for their origins
// so we don't need special handling for that case either.
//
// this is confirmed in the `tests/sqlite-macros.rs` integration test
return Err(SqliteError::from_connection(self.connection.0.as_ptr()).into());
}
Ok(Some(not_null != 0))
}
}
pub(super) fn params(&mut self) -> usize {
// https://www.hwaci.com/sw/sqlite/c3ref/bind_parameter_count.html
unsafe {
self.handle()
.map_or(0, |handle| sqlite3_bind_parameter_count(handle)) as usize
}
}
pub(super) fn bind(&mut self, arguments: &mut SqliteArguments) -> crate::Result<()> {
for index in 0..self.params() {
if let Some(value) = arguments.next() {
value.bind(self, index + 1)?;
} else {
break;
}
}
Ok(())
}
pub(super) fn reset(&mut self) {
let handle = unsafe {
if let Some(handle) = self.handle() {
handle
} else {
// nothing to reset if its null
return;
}
};
// https://sqlite.org/c3ref/reset.html
// https://sqlite.org/c3ref/clear_bindings.html
// the status value of reset is ignored because it merely propagates
// the status of the most recently invoked step function
let _ = unsafe { sqlite3_reset(handle) };
let _ = unsafe { sqlite3_clear_bindings(handle) };
}
pub(super) async fn step(&mut self) -> crate::Result<Step> {
// https://sqlite.org/c3ref/step.html
if let Some(handle) = self.handle {
let status = unsafe {
self.worker
.run(move || sqlite3_step(handle.0.as_ptr()))
.await
};
match status {
SQLITE_DONE => Ok(Step::Done),
SQLITE_ROW => Ok(Step::Row),
_ => {
return Err(SqliteError::from_connection(self.connection.0.as_ptr()).into());
}
}
} else {
// An empty (null) query will always emit `Step::Done`
Ok(Step::Done)
}
}
}
impl Drop for Statement {
fn drop(&mut self) {
// https://sqlite.org/c3ref/finalize.html
unsafe {
if let Some(handle) = self.handle() {
let _ = sqlite3_finalize(handle);
}
}
}
}

View File

@ -0,0 +1,254 @@
use std::ffi::c_void;
use std::ffi::CStr;
use std::os::raw::{c_char, c_int};
use std::ptr::NonNull;
use std::str::{from_utf8, from_utf8_unchecked};
use libsqlite3_sys::{
sqlite3, sqlite3_bind_blob64, sqlite3_bind_double, sqlite3_bind_int, sqlite3_bind_int64,
sqlite3_bind_null, sqlite3_bind_parameter_count, sqlite3_bind_parameter_name,
sqlite3_bind_text64, sqlite3_changes, sqlite3_column_blob, sqlite3_column_bytes,
sqlite3_column_count, sqlite3_column_database_name, sqlite3_column_decltype,
sqlite3_column_double, sqlite3_column_int, sqlite3_column_int64, sqlite3_column_name,
sqlite3_column_origin_name, sqlite3_column_table_name, sqlite3_column_type,
sqlite3_column_value, sqlite3_db_handle, sqlite3_stmt, sqlite3_table_column_metadata,
SQLITE_OK, SQLITE_TRANSIENT, SQLITE_UTF8,
};
use crate::error::{BoxDynError, Error};
use crate::sqlite::type_info::DataType;
use crate::sqlite::{SqliteError, SqliteTypeInfo, SqliteValue};
use std::ptr;
use std::slice::from_raw_parts;
#[derive(Debug, Copy, Clone)]
pub(crate) struct StatementHandle(pub(super) NonNull<sqlite3_stmt>);
// access to SQLite3 statement handles are safe to send and share between threads
// as long as the `sqlite3_step` call is serialized.
unsafe impl Send for StatementHandle {}
unsafe impl Sync for StatementHandle {}
impl StatementHandle {
#[inline]
pub(super) unsafe fn db_handle(&self) -> *mut sqlite3 {
// O(c) access to the connection handle for this statement handle
// https://sqlite.org/c3ref/db_handle.html
sqlite3_db_handle(self.0.as_ptr())
}
#[inline]
pub(crate) fn last_error(&self) -> SqliteError {
SqliteError::new(unsafe { self.db_handle() })
}
#[inline]
pub(crate) fn column_count(&self) -> usize {
// https://sqlite.org/c3ref/column_count.html
unsafe { sqlite3_column_count(self.0.as_ptr()) as usize }
}
#[inline]
pub(crate) fn changes(&self) -> u64 {
// returns the number of changes of the *last* statement; not
// necessarily this statement.
// https://sqlite.org/c3ref/changes.html
unsafe { sqlite3_changes(self.db_handle()) as u64 }
}
#[inline]
pub(crate) fn column_name(&self, index: usize) -> &str {
// https://sqlite.org/c3ref/column_name.html
unsafe {
let name = sqlite3_column_name(self.0.as_ptr(), index as c_int);
debug_assert!(!name.is_null());
from_utf8_unchecked(CStr::from_ptr(name).to_bytes())
}
}
#[inline]
pub(crate) fn column_decltype(&self, index: usize) -> Option<SqliteTypeInfo> {
unsafe {
let decl = sqlite3_column_decltype(self.0.as_ptr(), index as c_int);
if decl.is_null() {
// If the Nth column of the result set is an expression or subquery,
// then a NULL pointer is returned.
return None;
}
let decl = from_utf8_unchecked(CStr::from_ptr(decl).to_bytes());
let ty: DataType = decl.parse().ok()?;
Some(SqliteTypeInfo(ty))
}
}
pub(crate) fn column_not_null(&self, index: usize) -> Result<Option<bool>, Error> {
unsafe {
// https://sqlite.org/c3ref/column_database_name.html
//
// ### Note
// The returned string is valid until the prepared statement is destroyed using
// sqlite3_finalize() or until the statement is automatically reprepared by the
// first call to sqlite3_step() for a particular run or until the same information
// is requested again in a different encoding.
let db_name = sqlite3_column_database_name(self.0.as_ptr(), index as c_int);
let table_name = sqlite3_column_table_name(self.0.as_ptr(), index as c_int);
let origin_name = sqlite3_column_origin_name(self.0.as_ptr(), index as c_int);
if db_name.is_null() || table_name.is_null() || origin_name.is_null() {
return Ok(None);
}
let mut not_null: c_int = 0;
// https://sqlite.org/c3ref/table_column_metadata.html
let status = sqlite3_table_column_metadata(
self.db_handle(),
db_name,
table_name,
origin_name,
// function docs state to provide NULL for return values you don't care about
ptr::null_mut(),
ptr::null_mut(),
&mut not_null,
ptr::null_mut(),
ptr::null_mut(),
);
if status != SQLITE_OK {
// implementation note: the docs for sqlite3_table_column_metadata() specify
// that an error can be returned if the column came from a view; however,
// experimentally we found that the above functions give us the true origin
// for columns in views that came from real tables and so we should never hit this
// error; for view columns that are expressions we are given NULL for their origins
// so we don't need special handling for that case either.
//
// this is confirmed in the `tests/sqlite-macros.rs` integration test
return Err(SqliteError::new(self.db_handle()).into());
}
Ok(Some(not_null != 0))
}
}
// Number Of SQL Parameters
#[inline]
pub(crate) fn bind_parameter_count(&self) -> usize {
// https://www.sqlite.org/c3ref/bind_parameter_count.html
unsafe { sqlite3_bind_parameter_count(self.0.as_ptr()) as usize }
}
// Name Of A Host Parameter
#[inline]
pub(crate) fn bind_parameter_name(&self, index: usize) -> Option<&str> {
unsafe {
// https://www.sqlite.org/c3ref/bind_parameter_name.html
let name = sqlite3_bind_parameter_name(self.0.as_ptr(), index as c_int);
if name.is_null() {
return None;
}
Some(from_utf8_unchecked(CStr::from_ptr(name).to_bytes()))
}
}
// Binding Values To Prepared Statements
// https://www.sqlite.org/c3ref/bind_blob.html
#[inline]
pub(crate) fn bind_blob(&self, index: usize, v: &[u8]) -> c_int {
unsafe {
sqlite3_bind_blob64(
self.0.as_ptr(),
index as c_int,
v.as_ptr() as *const c_void,
v.len() as u64,
SQLITE_TRANSIENT(),
)
}
}
#[inline]
pub(crate) fn bind_text(&self, index: usize, v: &str) -> c_int {
unsafe {
sqlite3_bind_text64(
self.0.as_ptr(),
index as c_int,
v.as_ptr() as *const c_char,
v.len() as u64,
SQLITE_TRANSIENT(),
SQLITE_UTF8 as u8,
)
}
}
#[inline]
pub(crate) fn bind_int(&self, index: usize, v: i32) -> c_int {
unsafe { sqlite3_bind_int(self.0.as_ptr(), index as c_int, v as c_int) }
}
#[inline]
pub(crate) fn bind_int64(&self, index: usize, v: i64) -> c_int {
unsafe { sqlite3_bind_int64(self.0.as_ptr(), index as c_int, v) }
}
#[inline]
pub(crate) fn bind_double(&self, index: usize, v: f64) -> c_int {
unsafe { sqlite3_bind_double(self.0.as_ptr(), index as c_int, v) }
}
#[inline]
pub(crate) fn bind_null(&self, index: usize) -> c_int {
unsafe { sqlite3_bind_null(self.0.as_ptr(), index as c_int) }
}
// result values from the query
// https://www.sqlite.org/c3ref/column_blob.html
#[inline]
pub(crate) fn column_type(&self, index: usize) -> c_int {
unsafe { sqlite3_column_type(self.0.as_ptr(), index as c_int) }
}
#[inline]
pub(crate) fn column_int(&self, index: usize) -> i32 {
unsafe { sqlite3_column_int(self.0.as_ptr(), index as c_int) as i32 }
}
#[inline]
pub(crate) fn column_int64(&self, index: usize) -> i64 {
unsafe { sqlite3_column_int64(self.0.as_ptr(), index as c_int) as i64 }
}
#[inline]
pub(crate) fn column_double(&self, index: usize) -> f64 {
unsafe { sqlite3_column_double(self.0.as_ptr(), index as c_int) }
}
#[inline]
pub(crate) fn column_value(&self, index: usize) -> SqliteValue {
unsafe { SqliteValue::new(sqlite3_column_value(self.0.as_ptr(), index as c_int)) }
}
pub(crate) fn column_blob(&self, index: usize) -> &[u8] {
let index = index as c_int;
let len = unsafe { sqlite3_column_bytes(self.0.as_ptr(), index) } as usize;
if len == 0 {
// empty blobs are NULL so just return an empty slice
return &[];
}
let ptr = unsafe { sqlite3_column_blob(self.0.as_ptr(), index) } as *const u8;
debug_assert!(!ptr.is_null());
unsafe { from_raw_parts(ptr, len) }
}
pub(crate) fn column_text(&self, index: usize) -> Result<&str, BoxDynError> {
Ok(from_utf8(self.column_blob(index))?)
}
}

View File

@ -0,0 +1,187 @@
use std::i32;
use std::os::raw::c_char;
use std::ptr::{null, null_mut, NonNull};
use std::sync::{atomic::AtomicPtr, Weak};
use bytes::{Buf, Bytes};
use libsqlite3_sys::{
sqlite3, sqlite3_clear_bindings, sqlite3_finalize, sqlite3_prepare_v3, sqlite3_reset,
sqlite3_stmt, SQLITE_OK, SQLITE_PREPARE_NO_VTAB, SQLITE_PREPARE_PERSISTENT,
};
use smallvec::SmallVec;
use crate::error::Error;
use crate::sqlite::connection::ConnectionHandle;
use crate::sqlite::{SqliteError, SqliteRow, SqliteValue};
mod handle;
mod worker;
pub(crate) use handle::StatementHandle;
pub(crate) use worker::StatementWorker;
// NOTE: Keep query in statement and slowly chop it up
#[derive(Debug)]
pub(crate) struct SqliteStatement {
persistent: bool,
index: usize,
// tail of the most recently prepared SQL statement within this container
tail: Bytes,
// underlying sqlite handles for each inner statement
// a SQL query string in SQLite is broken up into N statements
// we use a [`SmallVec`] to optimize for the most likely case of a single statement
pub(crate) handles: SmallVec<[StatementHandle; 1]>,
// weak reference to the previous row from this connection
// we use the notice of a successful upgrade of this reference as an indicator that the
// row is still around, in which we then inflate the row such that we can let SQLite
// clobber the memory allocation for the row
pub(crate) last_row_values: SmallVec<[Option<Weak<AtomicPtr<SqliteValue>>>; 1]>,
}
fn prepare(
conn: *mut sqlite3,
query: &mut Bytes,
persistent: bool,
) -> Result<Option<StatementHandle>, Error> {
let mut flags = SQLITE_PREPARE_NO_VTAB;
if persistent {
// SQLITE_PREPARE_PERSISTENT
// The SQLITE_PREPARE_PERSISTENT flag is a hint to the query
// planner that the prepared statement will be retained for a long time
// and probably reused many times.
flags |= SQLITE_PREPARE_PERSISTENT;
}
while !query.is_empty() {
let mut statement_handle: *mut sqlite3_stmt = null_mut();
let mut tail: *const c_char = null();
let query_ptr = query.as_ptr() as *const c_char;
let query_len = query.len() as i32;
// <https://www.sqlite.org/c3ref/prepare.html>
let status = unsafe {
sqlite3_prepare_v3(
conn,
query_ptr,
query_len,
flags as u32,
&mut statement_handle,
&mut tail,
)
};
if status != SQLITE_OK {
return Err(SqliteError::new(conn).into());
}
// tail should point to the first byte past the end of the first SQL
// statement in zSql. these routines only compile the first statement,
// so tail is left pointing to what remains un-compiled.
let n = (tail as i32) - (query_ptr as i32);
query.advance(n as usize);
if let Some(handle) = NonNull::new(statement_handle) {
return Ok(Some(StatementHandle(handle)));
}
}
Ok(None)
}
impl SqliteStatement {
pub(crate) fn prepare(
conn: &mut ConnectionHandle,
mut query: &str,
persistent: bool,
) -> Result<Self, Error> {
query = query.trim();
if query.len() > i32::MAX as usize {
return Err(err_protocol!(
"query string must be smaller than {} bytes",
i32::MAX
));
}
let mut handles: SmallVec<[StatementHandle; 1]> = SmallVec::with_capacity(1);
let mut query = Bytes::from(String::from(query));
if let Some(handle) = prepare(conn.as_ptr(), &mut query, persistent)? {
handles.push(handle);
}
Ok(Self {
persistent,
tail: query,
handles,
index: 0,
last_row_values: SmallVec::from([None; 1]),
})
}
// unsafe: caller must ensure that there is at least one handle
unsafe fn connection(&self) -> *mut sqlite3 {
self.handles[0].db_handle()
}
pub(crate) fn execute(
&mut self,
) -> Result<Option<(&StatementHandle, &mut Option<Weak<AtomicPtr<SqliteValue>>>)>, Error> {
while self.handles.len() == self.index {
if self.tail.is_empty() {
return Ok(None);
}
if let Some(handle) =
unsafe { prepare(self.connection(), &mut self.tail, self.persistent)? }
{
self.handles.push(handle);
self.last_row_values.push(None);
}
}
let index = self.index;
self.index += 1;
Ok(Some((
&self.handles[index],
&mut self.last_row_values[index],
)))
}
pub(crate) fn reset(&mut self) {
self.index = 0;
for (i, handle) in self.handles.iter().enumerate() {
SqliteRow::inflate_if_needed(&handle, self.last_row_values[i].take());
unsafe {
// Reset A Prepared Statement Object
// https://www.sqlite.org/c3ref/reset.html
// https://www.sqlite.org/c3ref/clear_bindings.html
sqlite3_reset(handle.0.as_ptr());
sqlite3_clear_bindings(handle.0.as_ptr());
}
}
}
}
impl Drop for SqliteStatement {
fn drop(&mut self) {
for (i, handle) in self.handles.drain(..).enumerate() {
SqliteRow::inflate_if_needed(&handle, self.last_row_values[i].take());
unsafe {
// https://sqlite.org/c3ref/finalize.html
let _ = sqlite3_finalize(handle.0.as_ptr());
}
}
}
}

View File

@ -0,0 +1,180 @@
use std::ptr::null_mut;
use std::sync::atomic::{spin_loop_hint, AtomicI32, AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread::{self, park, spawn, JoinHandle};
use either::Either;
use libsqlite3_sys::{sqlite3_step, sqlite3_stmt, SQLITE_DONE, SQLITE_ROW};
use sqlx_rt::yield_now;
use crate::error::Error;
use crate::sqlite::statement::StatementHandle;
// For async-std and actix, the worker maintains a dedicated thread for each SQLite connection
// All invocations of [sqlite3_step] are run on this thread
// For tokio, the worker is a thin wrapper around an invocation to [block_in_place]
const STATE_CLOSE: i32 = -1;
const STATE_READY: i32 = 0;
const STATE_INITIAL: i32 = 1;
#[cfg(not(feature = "runtime-tokio"))]
pub(crate) struct StatementWorker {
statement: Arc<AtomicPtr<sqlite3_stmt>>,
status: Arc<AtomicI32>,
handle: Option<JoinHandle<()>>,
}
#[cfg(feature = "runtime-tokio")]
pub(crate) struct StatementWorker;
#[cfg(not(feature = "runtime-tokio"))]
impl StatementWorker {
pub(crate) fn new() -> Self {
let statement = Arc::new(AtomicPtr::new(null_mut::<sqlite3_stmt>()));
let status = Arc::new(AtomicI32::new(STATE_INITIAL));
let handle = spawn({
let statement = Arc::clone(&statement);
let status = Arc::clone(&status);
move || {
// wait for the first command
park();
'run: while status.load(Ordering::Acquire) >= 0 {
'statement: loop {
match status.load(Ordering::Acquire) {
STATE_CLOSE => {
// worker has been dropped; get out
break 'run;
}
STATE_READY => {
let statement = statement.load(Ordering::Acquire);
if statement.is_null() {
// we do not have the statement handle yet
thread::yield_now();
continue;
}
let v = unsafe { sqlite3_step(statement) };
status.store(v, Ordering::Release);
if v == SQLITE_DONE {
// when a statement is _done_, we park the thread until
// we need it again
park();
break 'statement;
}
}
_ => {
// waits for the receiving end to be ready to receive the rows
// this should take less than 1 microsecond under most conditions
spin_loop_hint();
}
}
}
}
}
});
Self {
handle: Some(handle),
statement,
status,
}
}
pub(crate) fn wake(&self) {
if let Some(handle) = &self.handle {
handle.thread().unpark();
}
}
pub(crate) fn execute(&self, statement: &StatementHandle) {
// readies the worker to execute the statement
// for async-std, this unparks our dedicated thread
self.statement
.store(statement.0.as_ptr(), Ordering::Release);
}
pub(crate) async fn step(&self, statement: &StatementHandle) -> Result<Either<u64, ()>, Error> {
// storing <0> as a terminal in status releases the worker
// to proceed to the next [sqlite3_step] invocation
self.status.store(STATE_READY, Ordering::Release);
// we then use a spin loop to wait for this to finish
// 99% of the time this should be < 1 μs
let status = loop {
let status = self
.status
.compare_and_swap(STATE_READY, STATE_READY, Ordering::AcqRel);
if status != STATE_READY {
break status;
}
yield_now().await;
};
match status {
// a row was found
SQLITE_ROW => Ok(Either::Right(())),
// reached the end of the query results,
// emit the # of changes
SQLITE_DONE => Ok(Either::Left(statement.changes())),
_ => Err(statement.last_error().into()),
}
}
pub(crate) fn close(&mut self) {
self.status.store(STATE_CLOSE, Ordering::Release);
if let Some(handle) = self.handle.take() {
handle.thread().unpark();
handle.join().unwrap();
}
}
}
#[cfg(feature = "runtime-tokio")]
impl StatementWorker {
pub(crate) fn new() -> Self {
StatementWorker
}
pub(crate) fn execute(&self, _statement: &StatementHandle) {}
pub(crate) fn wake(&self) {}
pub(crate) async fn step(&self, statement: &StatementHandle) -> Result<Either<u64, ()>, Error> {
let statement = *statement;
let status = sqlx_rt::blocking!({ unsafe { sqlite3_step(statement.0.as_ptr()) } });
match status {
// a row was found
SQLITE_ROW => Ok(Either::Right(())),
// reached the end of the query results,
// emit the # of changes
SQLITE_DONE => Ok(Either::Left(statement.changes())),
_ => Err(statement.last_error().into()),
}
}
pub(crate) fn close(&mut self) {}
}
impl Drop for StatementWorker {
fn drop(&mut self) {
self.close();
}
}

View File

@ -1,69 +1,127 @@
use std::fmt::{self, Display};
use std::fmt::{self, Display, Formatter};
use std::os::raw::c_int;
use std::str::FromStr;
use crate::types::TypeInfo;
use libsqlite3_sys::{SQLITE_BLOB, SQLITE_FLOAT, SQLITE_INTEGER, SQLITE_NULL, SQLITE_TEXT};
// https://www.sqlite.org/c3ref/c_blob.html
#[derive(Debug, PartialEq, Clone, Copy)]
use crate::error::BoxDynError;
use crate::type_info::TypeInfo;
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "offline", derive(serde::Serialize, serde::Deserialize))]
pub(crate) enum SqliteType {
Integer = 1,
Float = 2,
Text = 3,
Blob = 4,
// Non-standard extensions
Boolean,
}
// https://www.sqlite.org/datatype3.html#type_affinity
#[derive(Debug, PartialEq, Clone, Copy)]
#[cfg_attr(feature = "offline", derive(serde::Serialize, serde::Deserialize))]
pub(crate) enum SqliteTypeAffinity {
pub(crate) enum DataType {
Int,
Float,
Text,
Numeric,
Integer,
Real,
Blob,
// TODO: Support NUMERIC
#[allow(dead_code)]
Numeric,
// non-standard extensions
Bool,
Int64,
}
#[derive(Debug, Clone)]
/// Type information for a SQLite type.
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "offline", derive(serde::Serialize, serde::Deserialize))]
pub struct SqliteTypeInfo {
pub(crate) r#type: SqliteType,
pub(crate) affinity: Option<SqliteTypeAffinity>,
}
pub struct SqliteTypeInfo(pub(crate) DataType);
impl SqliteTypeInfo {
pub(crate) fn new(r#type: SqliteType, affinity: SqliteTypeAffinity) -> Self {
Self {
r#type,
affinity: Some(affinity),
}
impl Display for SqliteTypeInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Display for SqliteTypeInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self.r#type {
SqliteType::Text => "TEXT",
SqliteType::Boolean => "BOOLEAN",
SqliteType::Integer => "INTEGER",
SqliteType::Float => "DOUBLE",
SqliteType::Blob => "BLOB",
impl TypeInfo for SqliteTypeInfo {}
impl Display for DataType {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str(match self {
DataType::Text => "TEXT",
DataType::Float => "FLOAT",
DataType::Blob => "BLOB",
DataType::Int => "INTEGER",
DataType::Numeric => "NUMERIC",
// non-standard extensions
DataType::Bool => "BOOLEAN",
DataType::Int64 => "BIGINT",
})
}
}
impl PartialEq<SqliteTypeInfo> for SqliteTypeInfo {
fn eq(&self, other: &SqliteTypeInfo) -> bool {
self.r#type == other.r#type || self.affinity == other.affinity
impl DataType {
pub(crate) fn from_code(code: c_int) -> Option<Self> {
match code {
SQLITE_INTEGER => Some(DataType::Int),
SQLITE_FLOAT => Some(DataType::Float),
SQLITE_BLOB => Some(DataType::Blob),
SQLITE_NULL => None,
SQLITE_TEXT => Some(DataType::Text),
_ => None,
}
}
}
impl TypeInfo for SqliteTypeInfo {
#[inline]
fn compatible(&self, _other: &Self) -> bool {
// All types are compatible with all other types in SQLite
true
// note: this implementation is particularly important as this is how the macros determine
// what Rust type maps to what *declared* SQL type
// <https://www.sqlite.org/datatype3.html#affname>
impl FromStr for DataType {
type Err = BoxDynError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.to_ascii_lowercase();
Ok(match &*s {
"int8" => DataType::Int64,
"boolean" | "bool" => DataType::Bool,
_ if s.contains("int") && s.contains("big") && s.find("int") > s.find("big") => {
DataType::Int64
}
_ if s.contains("int") => DataType::Int,
_ if s.contains("char") || s.contains("clob") || s.contains("text") => DataType::Text,
_ if s.contains("blob") => DataType::Blob,
_ if s.contains("real") || s.contains("floa") || s.contains("doub") => DataType::Float,
_ => {
return Err(format!("unknown type: `{}`", s).into());
}
})
}
}
#[test]
fn test_data_type_from_str() -> Result<(), BoxDynError> {
assert_eq!(DataType::Int, "INT".parse()?);
assert_eq!(DataType::Int, "INTEGER".parse()?);
assert_eq!(DataType::Int, "INTBIG".parse()?);
assert_eq!(DataType::Int, "MEDIUMINT".parse()?);
assert_eq!(DataType::Int64, "BIGINT".parse()?);
assert_eq!(DataType::Int64, "UNSIGNED BIG INT".parse()?);
assert_eq!(DataType::Int64, "INT8".parse()?);
assert_eq!(DataType::Text, "CHARACTER(20)".parse()?);
assert_eq!(DataType::Text, "NCHAR(55)".parse()?);
assert_eq!(DataType::Text, "TEXT".parse()?);
assert_eq!(DataType::Text, "CLOB".parse()?);
assert_eq!(DataType::Blob, "BLOB".parse()?);
assert_eq!(DataType::Float, "REAL".parse()?);
assert_eq!(DataType::Float, "FLOAT".parse()?);
assert_eq!(DataType::Float, "DOUBLE PRECISION".parse()?);
assert_eq!(DataType::Bool, "BOOLEAN".parse()?);
assert_eq!(DataType::Bool, "BOOL".parse()?);
Ok(())
}

View File

@ -1,23 +1,30 @@
use crate::decode::Decode;
use crate::encode::Encode;
use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity};
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue};
use crate::encode::{Encode, IsNull};
use crate::error::BoxDynError;
use crate::sqlite::type_info::DataType;
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef};
use crate::types::Type;
impl Type<Sqlite> for bool {
fn type_info() -> SqliteTypeInfo {
SqliteTypeInfo::new(SqliteType::Boolean, SqliteTypeAffinity::Numeric)
SqliteTypeInfo(DataType::Bool)
}
}
impl Encode<Sqlite> for bool {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
values.push(SqliteArgumentValue::Int((*self).into()));
impl<'q> Encode<'q, Sqlite> for bool {
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Int((*self).into()));
IsNull::No
}
}
impl<'a> Decode<'a, Sqlite> for bool {
fn decode(value: SqliteValue<'a>) -> crate::Result<bool> {
impl<'r> Decode<'r, Sqlite> for bool {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
fn decode(value: SqliteValueRef<'r>) -> Result<bool, BoxDynError> {
Ok(value.int() != 0)
}
}

View File

@ -1,42 +1,62 @@
use std::borrow::Cow;
use crate::decode::Decode;
use crate::encode::Encode;
use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity};
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue};
use crate::encode::{Encode, IsNull};
use crate::error::BoxDynError;
use crate::sqlite::type_info::DataType;
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef};
use crate::types::Type;
impl Type<Sqlite> for [u8] {
fn type_info() -> SqliteTypeInfo {
SqliteTypeInfo::new(SqliteType::Blob, SqliteTypeAffinity::Blob)
SqliteTypeInfo(DataType::Blob)
}
}
impl<'q> Encode<'q, Sqlite> for &'q [u8] {
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Blob(Cow::Borrowed(self)));
IsNull::No
}
}
impl<'r> Decode<'r, Sqlite> for &'r [u8] {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
fn decode(value: SqliteValueRef<'r>) -> Result<Self, BoxDynError> {
Ok(value.blob())
}
}
impl Type<Sqlite> for Vec<u8> {
fn type_info() -> SqliteTypeInfo {
<[u8] as Type<Sqlite>>::type_info()
<&[u8] as Type<Sqlite>>::type_info()
}
}
impl Encode<Sqlite> for [u8] {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
// TODO: look into a way to remove this allocation
values.push(SqliteArgumentValue::Blob(self.to_owned()));
impl<'q> Encode<'q, Sqlite> for Vec<u8> {
fn encode(self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Blob(Cow::Owned(self)));
IsNull::No
}
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Blob(Cow::Owned(self.clone())));
IsNull::No
}
}
impl Encode<Sqlite> for Vec<u8> {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
<[u8] as Encode<Sqlite>>::encode(self, values)
impl<'r> Decode<'r, Sqlite> for Vec<u8> {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
}
impl<'de> Decode<'de, Sqlite> for &'de [u8] {
fn decode(value: SqliteValue<'de>) -> crate::Result<&'de [u8]> {
Ok(value.blob())
}
}
impl<'de> Decode<'de, Sqlite> for Vec<u8> {
fn decode(value: SqliteValue<'de>) -> crate::Result<Vec<u8>> {
<&[u8] as Decode<Sqlite>>::decode(value).map(ToOwned::to_owned)
fn decode(value: SqliteValueRef<'r>) -> Result<Self, BoxDynError> {
Ok(value.blob().to_owned())
}
}

View File

@ -1,41 +1,54 @@
use crate::decode::Decode;
use crate::encode::Encode;
use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity};
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue};
use crate::encode::{Encode, IsNull};
use crate::error::BoxDynError;
use crate::sqlite::type_info::DataType;
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef};
use crate::types::Type;
impl Type<Sqlite> for f32 {
fn type_info() -> SqliteTypeInfo {
SqliteTypeInfo::new(SqliteType::Float, SqliteTypeAffinity::Real)
SqliteTypeInfo(DataType::Float)
}
}
impl Encode<Sqlite> for f32 {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
values.push(SqliteArgumentValue::Double((*self).into()));
impl<'q> Encode<'q, Sqlite> for f32 {
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Double((*self).into()));
IsNull::No
}
}
impl<'a> Decode<'a, Sqlite> for f32 {
fn decode(value: SqliteValue<'a>) -> crate::Result<f32> {
impl<'r> Decode<'r, Sqlite> for f32 {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
fn decode(value: SqliteValueRef<'r>) -> Result<f32, BoxDynError> {
Ok(value.double() as f32)
}
}
impl Type<Sqlite> for f64 {
fn type_info() -> SqliteTypeInfo {
SqliteTypeInfo::new(SqliteType::Float, SqliteTypeAffinity::Real)
SqliteTypeInfo(DataType::Float)
}
}
impl Encode<Sqlite> for f64 {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
values.push(SqliteArgumentValue::Double((*self).into()));
impl<'q> Encode<'q, Sqlite> for f64 {
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Double(*self));
IsNull::No
}
}
impl<'a> Decode<'a, Sqlite> for f64 {
fn decode(value: SqliteValue<'a>) -> crate::Result<f64> {
impl<'r> Decode<'r, Sqlite> for f64 {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
fn decode(value: SqliteValueRef<'r>) -> Result<f64, BoxDynError> {
Ok(value.double())
}
}

View File

@ -1,41 +1,54 @@
use crate::decode::Decode;
use crate::encode::Encode;
use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity};
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue};
use crate::encode::{Encode, IsNull};
use crate::error::BoxDynError;
use crate::sqlite::type_info::DataType;
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef};
use crate::types::Type;
impl Type<Sqlite> for i32 {
fn type_info() -> SqliteTypeInfo {
SqliteTypeInfo::new(SqliteType::Integer, SqliteTypeAffinity::Integer)
SqliteTypeInfo(DataType::Int)
}
}
impl Encode<Sqlite> for i32 {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
values.push(SqliteArgumentValue::Int((*self).into()));
impl<'q> Encode<'q, Sqlite> for i32 {
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Int(*self));
IsNull::No
}
}
impl<'a> Decode<'a, Sqlite> for i32 {
fn decode(value: SqliteValue<'a>) -> crate::Result<i32> {
impl<'r> Decode<'r, Sqlite> for i32 {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
fn decode(value: SqliteValueRef<'r>) -> Result<Self, BoxDynError> {
Ok(value.int())
}
}
impl Type<Sqlite> for i64 {
fn type_info() -> SqliteTypeInfo {
SqliteTypeInfo::new(SqliteType::Integer, SqliteTypeAffinity::Integer)
SqliteTypeInfo(DataType::Int64)
}
}
impl Encode<Sqlite> for i64 {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
values.push(SqliteArgumentValue::Int64((*self).into()));
impl<'q> Encode<'q, Sqlite> for i64 {
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Int64(*self));
IsNull::No
}
}
impl<'a> Decode<'a, Sqlite> for i64 {
fn decode(value: SqliteValue<'a>) -> crate::Result<i64> {
impl<'r> Decode<'r, Sqlite> for i64 {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
fn decode(value: SqliteValueRef<'r>) -> Result<Self, BoxDynError> {
Ok(value.int64())
}
}

View File

@ -7,7 +7,7 @@
//! | `bool` | BOOLEAN |
//! | `i16` | INTEGER |
//! | `i32` | INTEGER |
//! | `i64` | INTEGER |
//! | `i64` | BIGINT, INT8 |
//! | `f32` | REAL |
//! | `f64` | REAL |
//! | `&str`, `String` | TEXT |
@ -19,25 +19,12 @@
//! a potentially `NULL` value from SQLite.
//!
use crate::decode::Decode;
use crate::sqlite::value::SqliteValue;
use crate::sqlite::Sqlite;
// NOTE: all types are compatible with all other types in SQLite
// so we explicitly opt-out of runtime type assertions by returning [true] for
// all implementations of [Decode::accepts]
mod bool;
mod bytes;
mod float;
mod int;
mod str;
impl<'de, T> Decode<'de, Sqlite> for Option<T>
where
T: Decode<'de, Sqlite>,
{
fn decode(value: SqliteValue<'de>) -> crate::Result<Self> {
if value.is_null() {
Ok(None)
} else {
<T as Decode<Sqlite>>::decode(value).map(Some)
}
}
}

View File

@ -1,45 +1,62 @@
use std::borrow::Cow;
use crate::decode::Decode;
use crate::encode::Encode;
use crate::error::UnexpectedNullError;
use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity};
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue};
use crate::encode::{Encode, IsNull};
use crate::error::BoxDynError;
use crate::sqlite::type_info::DataType;
use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef};
use crate::types::Type;
impl Type<Sqlite> for str {
fn type_info() -> SqliteTypeInfo {
SqliteTypeInfo::new(SqliteType::Text, SqliteTypeAffinity::Text)
SqliteTypeInfo(DataType::Text)
}
}
impl<'q> Encode<'q, Sqlite> for &'q str {
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Text(Cow::Borrowed(*self)));
IsNull::No
}
}
impl<'r> Decode<'r, Sqlite> for &'r str {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
fn decode(value: SqliteValueRef<'r>) -> Result<Self, BoxDynError> {
value.text()
}
}
impl Type<Sqlite> for String {
fn type_info() -> SqliteTypeInfo {
<str as Type<Sqlite>>::type_info()
<&str as Type<Sqlite>>::type_info()
}
}
impl Encode<Sqlite> for str {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
// TODO: look into a way to remove this allocation
values.push(SqliteArgumentValue::Text(self.to_owned()));
impl<'q> Encode<'q, Sqlite> for String {
fn encode(self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Text(Cow::Owned(self)));
IsNull::No
}
fn encode_by_ref(&self, args: &mut Vec<SqliteArgumentValue<'q>>) -> IsNull {
args.push(SqliteArgumentValue::Text(Cow::Owned(self.clone())));
IsNull::No
}
}
impl Encode<Sqlite> for String {
fn encode(&self, values: &mut Vec<SqliteArgumentValue>) {
<str as Encode<Sqlite>>::encode(self, values)
impl<'r> Decode<'r, Sqlite> for String {
fn accepts(_ty: &SqliteTypeInfo) -> bool {
true
}
}
impl<'de> Decode<'de, Sqlite> for &'de str {
fn decode(value: SqliteValue<'de>) -> crate::Result<&'de str> {
value
.text()
.ok_or_else(|| crate::Error::decode(UnexpectedNullError))
}
}
impl<'de> Decode<'de, Sqlite> for String {
fn decode(value: SqliteValue<'de>) -> crate::Result<String> {
<&str as Decode<Sqlite>>::decode(value).map(ToOwned::to_owned)
fn decode(value: SqliteValueRef<'r>) -> Result<Self, BoxDynError> {
value.text().map(ToOwned::to_owned)
}
}

View File

@ -1,135 +1,169 @@
use core::slice;
use std::ffi::CStr;
use std::str::from_utf8_unchecked;
use std::borrow::Cow;
use std::ptr::NonNull;
use std::slice::from_raw_parts;
use std::str::from_utf8;
use std::sync::Arc;
use libsqlite3_sys::{
sqlite3_column_blob, sqlite3_column_bytes, sqlite3_column_double, sqlite3_column_int,
sqlite3_column_int64, sqlite3_column_text, sqlite3_column_type, SQLITE_BLOB, SQLITE_FLOAT,
SQLITE_INTEGER, SQLITE_NULL, SQLITE_TEXT,
sqlite3_value, sqlite3_value_blob, sqlite3_value_bytes, sqlite3_value_double,
sqlite3_value_dup, sqlite3_value_int, sqlite3_value_int64, sqlite3_value_type, SQLITE_NULL,
};
use crate::sqlite::statement::Statement;
use crate::sqlite::type_info::SqliteType;
use crate::error::BoxDynError;
use crate::sqlite::statement::StatementHandle;
use crate::sqlite::type_info::DataType;
use crate::sqlite::{Sqlite, SqliteTypeInfo};
use crate::value::RawValue;
use crate::value::{Value, ValueRef};
pub struct SqliteValue<'c> {
pub(super) index: i32,
pub(super) statement: &'c Statement,
enum SqliteValueData<'r> {
Statement {
statement: &'r StatementHandle,
index: usize,
},
Value(&'r SqliteValue),
}
// https://www.sqlite.org/c3ref/column_blob.html
// https://www.sqlite.org/capi3ref.html#sqlite3_column_blob
pub struct SqliteValueRef<'r>(SqliteValueData<'r>);
// These routines return information about a single column of the current result row of a query.
impl<'c> SqliteValue<'c> {
/// Returns true if the value should be intrepreted as NULL.
pub(super) fn is_null(&self) -> bool {
self.r#type().is_none()
impl<'r> SqliteValueRef<'r> {
pub(crate) fn value(value: &'r SqliteValue) -> Self {
Self(SqliteValueData::Value(value))
}
fn r#type(&self) -> Option<SqliteType> {
let type_code = unsafe {
if let Some(handle) = self.statement.handle() {
sqlite3_column_type(handle, self.index)
} else {
// unreachable: null statements do not have any values to type
return None;
}
};
// SQLITE_INTEGER, SQLITE_FLOAT, SQLITE_TEXT, SQLITE_BLOB, or SQLITE_NULL
match type_code {
SQLITE_INTEGER => Some(SqliteType::Integer),
SQLITE_FLOAT => Some(SqliteType::Float),
SQLITE_TEXT => Some(SqliteType::Text),
SQLITE_BLOB => Some(SqliteType::Blob),
SQLITE_NULL => None,
_ => unreachable!("received unexpected column type: {}", type_code),
}
pub(crate) fn statement(statement: &'r StatementHandle, index: usize) -> Self {
Self(SqliteValueData::Statement { statement, index })
}
/// Returns the 32-bit INTEGER result.
pub(super) fn int(&self) -> i32 {
unsafe {
self.statement
.handle()
.map_or(0, |handle| sqlite3_column_int(handle, self.index))
match self.0 {
SqliteValueData::Statement { statement, index } => statement.column_int(index),
SqliteValueData::Value(v) => v.int(),
}
}
/// Returns the 64-bit INTEGER result.
pub(super) fn int64(&self) -> i64 {
unsafe {
self.statement
.handle()
.map_or(0, |handle| sqlite3_column_int64(handle, self.index))
match self.0 {
SqliteValueData::Statement { statement, index } => statement.column_int64(index),
SqliteValueData::Value(v) => v.int64(),
}
}
/// Returns the 64-bit, REAL result.
pub(super) fn double(&self) -> f64 {
unsafe {
self.statement
.handle()
.map_or(0.0, |handle| sqlite3_column_double(handle, self.index))
match self.0 {
SqliteValueData::Statement { statement, index } => statement.column_double(index),
SqliteValueData::Value(v) => v.double(),
}
}
/// Returns the UTF-8 TEXT result.
pub(super) fn text(&self) -> Option<&'c str> {
unsafe {
self.statement.handle().and_then(|handle| {
let ptr = sqlite3_column_text(handle, self.index);
if ptr.is_null() {
None
} else {
Some(from_utf8_unchecked(CStr::from_ptr(ptr as _).to_bytes()))
}
})
pub(super) fn blob(&self) -> &'r [u8] {
match self.0 {
SqliteValueData::Statement { statement, index } => statement.column_blob(index),
SqliteValueData::Value(v) => v.blob(),
}
}
fn bytes(&self) -> usize {
// Returns the size of the result in bytes.
unsafe {
self.statement
.handle()
.map_or(0, |handle| sqlite3_column_bytes(handle, self.index)) as usize
pub(super) fn text(&self) -> Result<&'r str, BoxDynError> {
match self.0 {
SqliteValueData::Statement { statement, index } => statement.column_text(index),
SqliteValueData::Value(v) => v.text(),
}
}
}
impl<'r> ValueRef<'r> for SqliteValueRef<'r> {
type Database = Sqlite;
fn to_owned(&self) -> SqliteValue {
match self.0 {
SqliteValueData::Statement { statement, index } => statement.column_value(index),
SqliteValueData::Value(v) => v.clone(),
}
}
/// Returns the BLOB result.
pub(super) fn blob(&self) -> &'c [u8] {
let ptr = unsafe {
if let Some(handle) = self.statement.handle() {
sqlite3_column_blob(handle, self.index)
} else {
// Null statements do not exist
return &[];
fn type_info(&self) -> Option<Cow<'_, SqliteTypeInfo>> {
match self.0 {
SqliteValueData::Statement { statement, index } => {
DataType::from_code(statement.column_type(index))
.map(SqliteTypeInfo)
.map(Cow::Owned)
}
};
if ptr.is_null() {
// Empty BLOBs are received as null pointers
SqliteValueData::Value(v) => v.type_info(),
}
}
fn is_null(&self) -> bool {
match self.0 {
SqliteValueData::Statement { statement, index } => {
statement.column_type(index) == SQLITE_NULL
}
SqliteValueData::Value(v) => v.is_null(),
}
}
}
#[derive(Clone)]
pub struct SqliteValue(pub(crate) Arc<NonNull<sqlite3_value>>);
// SAFE: only protected value objects are stored in SqliteValue
unsafe impl Send for SqliteValue {}
unsafe impl Sync for SqliteValue {}
impl SqliteValue {
pub(crate) unsafe fn new(value: *mut sqlite3_value) -> Self {
debug_assert!(!value.is_null());
Self(Arc::new(NonNull::new_unchecked(sqlite3_value_dup(value))))
}
fn r#type(&self) -> Option<DataType> {
DataType::from_code(unsafe { sqlite3_value_type(self.0.as_ptr()) })
}
fn int(&self) -> i32 {
unsafe { sqlite3_value_int(self.0.as_ptr()) }
}
fn int64(&self) -> i64 {
unsafe { sqlite3_value_int64(self.0.as_ptr()) }
}
fn double(&self) -> f64 {
unsafe { sqlite3_value_double(self.0.as_ptr()) }
}
fn blob(&self) -> &[u8] {
let len = unsafe { sqlite3_value_bytes(self.0.as_ptr()) } as usize;
if len == 0 {
// empty blobs are NULL so just return an empty slice
return &[];
}
unsafe { slice::from_raw_parts(ptr as *const u8, self.bytes()) }
let ptr = unsafe { sqlite3_value_blob(self.0.as_ptr()) } as *const u8;
debug_assert!(!ptr.is_null());
unsafe { from_raw_parts(ptr, len) }
}
fn text(&self) -> Result<&str, BoxDynError> {
Ok(from_utf8(self.blob())?)
}
}
impl<'c> RawValue<'c> for SqliteValue<'c> {
impl Value for SqliteValue {
type Database = Sqlite;
fn type_info(&self) -> Option<SqliteTypeInfo> {
Some(SqliteTypeInfo {
r#type: self.r#type()?,
affinity: None,
})
fn as_ref(&self) -> SqliteValueRef<'_> {
SqliteValueRef::value(self)
}
fn type_info(&self) -> Option<Cow<'_, SqliteTypeInfo>> {
self.r#type().map(SqliteTypeInfo).map(Cow::Owned)
}
fn is_null(&self) -> bool {
unsafe { sqlite3_value_type(self.0.as_ptr()) == SQLITE_NULL }
}
}

View File

@ -1,67 +0,0 @@
use crossbeam_queue::ArrayQueue;
use futures_channel::oneshot;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{park, spawn, JoinHandle};
// After tinkering with this, I believe the safest solution is to spin up a discrete thread per
// SQLite connection and perform all I/O operations for SQLite on _that_ thread. To this effect
// we have a worker struct that is a thin message passing API to run messages on the worker thread.
#[derive(Clone)]
pub(crate) struct Worker {
running: Arc<AtomicBool>,
queue: Arc<ArrayQueue<Box<dyn FnOnce() + Send>>>,
handle: Arc<JoinHandle<()>>,
}
impl Worker {
pub(crate) fn new() -> Self {
let queue: Arc<ArrayQueue<Box<dyn FnOnce() + Send>>> = Arc::new(ArrayQueue::new(1));
let running = Arc::new(AtomicBool::new(true));
Self {
handle: Arc::new(spawn({
let queue = queue.clone();
let running = running.clone();
move || {
while running.load(Ordering::SeqCst) {
if let Ok(message) = queue.pop() {
(message)();
}
park();
}
}
})),
queue,
running,
}
}
pub(crate) async fn run<F, R>(&mut self, f: F) -> R
where
F: Send + 'static,
R: Send + 'static,
F: FnOnce() -> R,
{
let (sender, receiver) = oneshot::channel::<R>();
let _ = self.queue.push(Box::new(move || {
let _ = sender.send(f());
}));
self.handle.thread().unpark();
receiver.await.unwrap()
}
}
impl Drop for Worker {
fn drop(&mut self) {
if Arc::strong_count(&self.handle) == 1 {
self.running.store(false, Ordering::SeqCst);
}
}
}

View File

@ -42,7 +42,7 @@ where
for<'a> Json<&'a Self>: Encode<'q, DB>,
DB: Database,
{
fn encode_by_ref(&self, buf: &mut <DB as HasArguments<'q>>::Arguments) -> IsNull {
fn encode_by_ref(&self, buf: &mut <DB as HasArguments<'q>>::ArgumentBuffer) -> IsNull {
<Json<&Self> as Encode<'q, DB>>::encode(Json(self), buf)
}
}