diff --git a/sqlx-core/src/sqlite/arguments.rs b/sqlx-core/src/sqlite/arguments.rs index b2b6bbba..e773d244 100644 --- a/sqlx-core/src/sqlite/arguments.rs +++ b/sqlx-core/src/sqlite/arguments.rs @@ -1,58 +1,30 @@ -use core::ffi::c_void; -use core::mem; +use std::borrow::Cow; -use std::os::raw::{c_char, c_int}; - -use libsqlite3_sys::{ - sqlite3_bind_blob, sqlite3_bind_double, sqlite3_bind_int, sqlite3_bind_int64, - sqlite3_bind_null, sqlite3_bind_text, SQLITE_OK, SQLITE_TRANSIENT, -}; +use atoi::atoi; +use libsqlite3_sys::SQLITE_OK; use crate::arguments::Arguments; use crate::encode::{Encode, IsNull}; -use crate::sqlite::statement::Statement; +use crate::error::Error; +use crate::sqlite::statement::{SqliteStatement, StatementHandle}; use crate::sqlite::Sqlite; -use crate::sqlite::SqliteError; -use crate::types::Type; #[derive(Debug, Clone)] -pub enum SqliteArgumentValue { +pub enum SqliteArgumentValue<'q> { Null, - - // TODO: Take by reference to remove the allocation - Text(String), - - // TODO: Take by reference to remove the allocation - Blob(Vec), - + Text(Cow<'q, str>), + Blob(Cow<'q, [u8]>), Double(f64), - Int(i32), - Int64(i64), } #[derive(Default)] -pub struct SqliteArguments { - index: usize, - values: Vec, +pub struct SqliteArguments<'q> { + pub(crate) values: Vec>, } -impl SqliteArguments { - pub(crate) fn next(&mut self) -> Option { - if self.index >= self.values.len() { - return None; - } - - let mut value = SqliteArgumentValue::Null; - mem::swap(&mut value, &mut self.values[self.index]); - - self.index += 1; - Some(value) - } -} - -impl Arguments for SqliteArguments { +impl<'q> Arguments<'q> for SqliteArguments<'q> { type Database = Sqlite; fn reserve(&mut self, len: usize, _size_hint: usize) { @@ -61,68 +33,64 @@ impl Arguments for SqliteArguments { fn add(&mut self, value: T) where - T: Encode + Type, + T: 'q + Encode<'q, Self::Database>, { - if let IsNull::Yes = value.encode_nullable(&mut self.values) { + if let IsNull::Yes = value.encode(&mut self.values) { self.values.push(SqliteArgumentValue::Null); } } } -impl SqliteArgumentValue { - pub(super) fn bind(&self, statement: &mut Statement, index: usize) -> crate::Result<()> { - let handle = unsafe { - if let Some(handle) = statement.handle() { - handle - } else { - // drop all requested bindings for a null/empty statement - // note that this _should_ not happen as argument size for a null statement should be zero - return Ok(()); - } - }; +impl SqliteArguments<'_> { + pub(super) fn bind(&self, statement: &SqliteStatement) -> Result<(), Error> { + let mut arg_i = 0; + for handle in &statement.handles { + let cnt = handle.bind_parameter_count(); + for param_i in 0..cnt { + // figure out the index of this bind parameter into our argument tuple + let n: usize = if let Some(name) = handle.bind_parameter_name(param_i) { + if name.starts_with('?') { + // parameter should have the form ?NNN + atoi(name[1..].as_bytes()).expect("parameter of the form ?NNN") + } else { + return Err(err_protocol!("unsupported SQL parameter format: {}", name)); + } + } else { + arg_i += 1; + arg_i + }; - // TODO: Handle error of trying to bind too many parameters here - let index = index as c_int; - - // https://sqlite.org/c3ref/bind_blob.html - let status: c_int = match self { - SqliteArgumentValue::Blob(value) => { - // TODO: Handle bytes that are too large - let bytes = value.as_slice(); - let bytes_ptr = bytes.as_ptr() as *const c_void; - let bytes_len = bytes.len() as i32; - - unsafe { - sqlite3_bind_blob(handle, index, bytes_ptr, bytes_len, SQLITE_TRANSIENT()) + if n > self.values.len() { + return Err(err_protocol!( + "wrong number of parameters, parameter ?{} requested but have only {}", + n, + self.values.len() + )); } + + self.values[n - 1].bind(handle, param_i + 1)?; } - - SqliteArgumentValue::Text(value) => { - // TODO: Handle text that is too large - let bytes = value.as_bytes(); - let bytes_ptr = bytes.as_ptr() as *const c_char; - let bytes_len = bytes.len() as i32; - - unsafe { - sqlite3_bind_text(handle, index, bytes_ptr, bytes_len, SQLITE_TRANSIENT()) - } - } - - SqliteArgumentValue::Double(value) => unsafe { - sqlite3_bind_double(handle, index, *value) - }, - - SqliteArgumentValue::Int(value) => unsafe { sqlite3_bind_int(handle, index, *value) }, - - SqliteArgumentValue::Int64(value) => unsafe { - sqlite3_bind_int64(handle, index, *value) - }, - - SqliteArgumentValue::Null => unsafe { sqlite3_bind_null(handle, index) }, - }; - - if status != SQLITE_OK { - return Err(SqliteError::from_connection(statement.connection.0.as_ptr()).into()); + } + + Ok(()) + } +} + +impl SqliteArgumentValue<'_> { + fn bind(&self, handle: &StatementHandle, i: usize) -> Result<(), Error> { + use SqliteArgumentValue::*; + + let status = match self { + Text(v) => handle.bind_text(i, v), + Blob(v) => handle.bind_blob(i, v), + Int(v) => handle.bind_int(i, *v), + Int64(v) => handle.bind_int64(i, *v), + Double(v) => handle.bind_double(i, *v), + Null => handle.bind_null(i), + }; + + if status != SQLITE_OK { + return Err(handle.last_error().into()); } Ok(()) diff --git a/sqlx-core/src/sqlite/connection.rs b/sqlx-core/src/sqlite/connection.rs deleted file mode 100644 index 9e654046..00000000 --- a/sqlx-core/src/sqlite/connection.rs +++ /dev/null @@ -1,161 +0,0 @@ -use core::ptr::{null, null_mut, NonNull}; - -use std::collections::HashMap; -use std::convert::TryInto; -use std::ffi::CString; - -use futures_core::future::BoxFuture; -use futures_util::future; -use libsqlite3_sys::{ - sqlite3, sqlite3_close, sqlite3_extended_result_codes, sqlite3_open_v2, SQLITE_OK, - SQLITE_OPEN_CREATE, SQLITE_OPEN_NOMUTEX, SQLITE_OPEN_READWRITE, SQLITE_OPEN_SHAREDCACHE, -}; - -use crate::connection::{Connect, Connection}; -use crate::executor::Executor; -use crate::sqlite::statement::Statement; -use crate::sqlite::worker::Worker; - -use crate::sqlite::SqliteError; -use crate::url::Url; - -/// Thin wrapper around [sqlite3] to impl `Send`. -#[derive(Clone, Copy)] -pub(super) struct SqliteConnectionHandle(pub(super) NonNull); - -/// A connection to a [Sqlite](struct.Sqlite.html) database. -pub struct SqliteConnection { - pub(super) handle: SqliteConnectionHandle, - pub(super) worker: Worker, - // Storage of the most recently prepared, non-persistent statement - pub(super) statement: Option, - // Storage of persistent statements - pub(super) statements: Vec, - pub(super) statement_by_query: HashMap, -} - -// A SQLite3 handle is safe to send between threads, provided not more than -// one is accessing it at the same time. This is upheld as long as [SQLITE_CONFIG_MULTITHREAD] is -// enabled and [SQLITE_THREADSAFE] was enabled when sqlite was compiled. We refuse to work -// if these conditions are not upheld. - -// - -// - -unsafe impl Send for SqliteConnectionHandle {} - -async fn establish(url: Result) -> crate::Result { - let mut worker = Worker::new(); - - // By default, we connect to an in-memory database. - // TODO: Handle the error when there are internal NULs in the database URL - let filename = CString::new(url?.path_decoded().to_string()).unwrap(); - - let handle = worker - .run(move || -> crate::Result { - let mut handle = null_mut(); - - // [SQLITE_OPEN_NOMUTEX] will instruct [sqlite3_open_v2] to return an error if it - // cannot satisfy our wish for a thread-safe, lock-free connection object - let flags = SQLITE_OPEN_READWRITE - | SQLITE_OPEN_CREATE - | SQLITE_OPEN_NOMUTEX - | SQLITE_OPEN_SHAREDCACHE; - - // - let status = unsafe { sqlite3_open_v2(filename.as_ptr(), &mut handle, flags, null()) }; - - if handle.is_null() { - // Failed to allocate memory - panic!("SQLite is unable to allocate memory to hold the sqlite3 object"); - } - - if status != SQLITE_OK { - // Close the handle if there was an error here - // https://sqlite.org/c3ref/close.html - unsafe { - let _ = sqlite3_close(handle); - } - - return Err(SqliteError::from_connection(handle).into()); - } - - // Enable extended result codes - // https://www.sqlite.org/c3ref/extended_result_codes.html - unsafe { - sqlite3_extended_result_codes(handle, 1); - } - - Ok(SqliteConnectionHandle(NonNull::new(handle).unwrap())) - }) - .await?; - - Ok(SqliteConnection { - worker, - handle, - statement: None, - statements: Vec::with_capacity(10), - statement_by_query: HashMap::with_capacity(10), - }) -} - -impl SqliteConnection { - #[inline] - pub(super) fn handle(&mut self) -> *mut sqlite3 { - self.handle.0.as_ptr() - } -} - -impl Connect for SqliteConnection { - fn connect(url: T) -> BoxFuture<'static, crate::Result> - where - T: TryInto, - Self: Sized, - { - let url = url.try_into(); - - Box::pin(async move { - let mut conn = establish(url).await?; - - // https://www.sqlite.org/wal.html - - // language=SQLite - conn.execute( - r#" -PRAGMA journal_mode = WAL; -PRAGMA synchronous = NORMAL; - "#, - ) - .await?; - - Ok(conn) - }) - } -} - -impl Connection for SqliteConnection { - fn close(self) -> BoxFuture<'static, crate::Result<()>> { - // All necessary behavior is handled on drop - Box::pin(future::ok(())) - } - - fn ping(&mut self) -> BoxFuture> { - // For SQLite connections, PING does effectively nothing - Box::pin(future::ok(())) - } -} - -impl Drop for SqliteConnection { - fn drop(&mut self) { - // Drop all statements first - self.statements.clear(); - drop(self.statement.take()); - - // Next close the statement - // https://sqlite.org/c3ref/close.html - unsafe { - let _ = sqlite3_close(self.handle()); - } - } -} diff --git a/sqlx-core/src/sqlite/connection/establish.rs b/sqlx-core/src/sqlite/connection/establish.rs new file mode 100644 index 00000000..457dfc4c --- /dev/null +++ b/sqlx-core/src/sqlite/connection/establish.rs @@ -0,0 +1,81 @@ +use std::io; +use std::ptr::{null, null_mut}; + +use hashbrown::HashMap; +use libsqlite3_sys::{ + sqlite3_extended_result_codes, sqlite3_open_v2, SQLITE_OK, SQLITE_OPEN_CREATE, + SQLITE_OPEN_MEMORY, SQLITE_OPEN_NOMUTEX, SQLITE_OPEN_PRIVATECACHE, SQLITE_OPEN_READWRITE, +}; +use sqlx_rt::blocking; + +use crate::error::Error; +use crate::sqlite::connection::handle::ConnectionHandle; +use crate::sqlite::statement::StatementWorker; +use crate::sqlite::{SqliteConnectOptions, SqliteConnection, SqliteError}; + +pub(super) async fn establish(options: &SqliteConnectOptions) -> Result { + let mut filename = options + .filename + .to_str() + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "filename passed to SQLite must be valid UTF-8", + ) + })? + .to_owned(); + + filename.push('\0'); + + // By default, we connect to an in-memory database. + // [SQLITE_OPEN_NOMUTEX] will instruct [sqlite3_open_v2] to return an error if it + // cannot satisfy our wish for a thread-safe, lock-free connection object + let mut flags = + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX | SQLITE_OPEN_PRIVATECACHE; + + if options.in_memory { + flags |= SQLITE_OPEN_MEMORY; + } + + let handle = blocking!({ + let mut handle = null_mut(); + + // + let status = unsafe { + sqlite3_open_v2( + filename.as_bytes().as_ptr() as *const _, + &mut handle, + flags, + null(), + ) + }; + + if handle.is_null() { + // Failed to allocate memory + panic!("SQLite is unable to allocate memory to hold the sqlite3 object"); + } + + // SAFE: tested for NULL just above + let handle = unsafe { ConnectionHandle::new(handle) }; + + if status != SQLITE_OK { + return Err(Error::Database(Box::new(SqliteError::new(handle.as_ptr())))); + } + + // Enable extended result codes + // https://www.sqlite.org/c3ref/extended_result_codes.html + unsafe { + sqlite3_extended_result_codes(handle.0.as_ptr(), 1); + } + + Ok(handle) + })?; + + Ok(SqliteConnection { + handle, + worker: StatementWorker::new(), + statements: HashMap::new(), + statement: None, + scratch_row_column_names: Default::default(), + }) +} diff --git a/sqlx-core/src/sqlite/connection/executor.rs b/sqlx-core/src/sqlite/connection/executor.rs new file mode 100644 index 00000000..1b19aceb --- /dev/null +++ b/sqlx-core/src/sqlite/connection/executor.rs @@ -0,0 +1,203 @@ +use std::sync::Arc; + +use async_stream::try_stream; +use either::Either; +use futures_core::future::BoxFuture; +use futures_core::stream::BoxStream; +use futures_util::TryStreamExt; +use hashbrown::HashMap; + +use crate::describe::{Column, Describe}; +use crate::error::Error; +use crate::executor::{Execute, Executor}; +use crate::ext::ustr::UStr; +use crate::sqlite::connection::ConnectionHandle; +use crate::sqlite::statement::{SqliteStatement, StatementHandle}; +use crate::sqlite::{Sqlite, SqliteArguments, SqliteConnection, SqliteRow}; + +fn prepare<'a>( + conn: &mut ConnectionHandle, + statements: &'a mut HashMap, + statement: &'a mut Option, + query: &str, + persistent: bool, +) -> Result<&'a mut SqliteStatement, Error> { + if !persistent { + *statement = Some(SqliteStatement::prepare(conn, query, false)?); + return Ok(statement.as_mut().unwrap()); + } + + if !statements.contains_key(query) { + let statement = SqliteStatement::prepare(conn, query, false)?; + statements.insert(query.to_owned(), statement); + } + + let statement = statements.get_mut(query).unwrap(); + + // as this statement has been executed before, we reset before continuing + // this also causes any rows that are from the statement to be inflated + statement.reset(); + + Ok(statement) +} + +fn bind( + statement: &mut SqliteStatement, + arguments: Option>, +) -> Result<(), Error> { + if let Some(arguments) = arguments { + arguments.bind(&*statement)?; + } + + Ok(()) +} + +fn emplace_row_metadata( + statement: &StatementHandle, + column_names: &mut HashMap, +) -> Result<(), Error> { + column_names.clear(); + + let num = statement.column_count(); + + column_names.reserve(num); + + for i in 0..num { + let name: UStr = statement.column_name(i).to_owned().into(); + + column_names.insert(name, i); + } + + Ok(()) +} + +impl<'c> Executor<'c> for &'c mut SqliteConnection { + type Database = Sqlite; + + fn fetch_many<'q: 'c, E>( + self, + mut query: E, + ) -> BoxStream<'c, Result, Error>> + where + E: Execute<'q, Self::Database>, + { + let s = query.query(); + let arguments = query.take_arguments(); + + Box::pin(try_stream! { + let SqliteConnection { + handle: ref mut conn, + ref mut statements, + ref mut statement, + ref worker, + ref mut scratch_row_column_names, + .. + } = self; + + // prepare statement object (or checkout from cache) + let mut stmt = prepare(conn, statements, statement, s, arguments.is_some())?; + + // bind arguments, if any, to the statement + bind(&mut stmt, arguments)?; + + { + // let SqliteStatement { ref handles, ref mut last_row_values, .. } = *stmt; + // let mut handle_i = 0; + + while let Some((handle, last_row_values)) = stmt.execute()? { + // tell the worker about the new statement + worker.execute(handle); + + // wake up the worker if needed + // the worker parks its thread on async-std when not in use + worker.wake(); + + emplace_row_metadata( + handle, + Arc::make_mut(scratch_row_column_names), + )?; + + loop { + // save the rows from the _current_ position on the statement + // and send them to the still-live row object + SqliteRow::inflate_if_needed(handle, last_row_values.take()); + + match worker.step(handle).await? { + Either::Left(changes) => { + let v = Either::Left(changes); + yield v; + + break; + } + + Either::Right(()) => { + let (row, weak_values_ref) = SqliteRow::current( + *handle, + scratch_row_column_names + ); + + let v = Either::Right(row); + *last_row_values = Some(weak_values_ref); + + yield v; + } + } + } + } + } + }) + } + + fn fetch_optional<'q: 'c, E>(self, query: E) -> BoxFuture<'c, Result, Error>> + where + E: Execute<'q, Self::Database>, + { + let mut s = self.fetch_many(query); + + Box::pin(async move { + while let Some(v) = s.try_next().await? { + if let Either::Right(r) = v { + return Ok(Some(r)); + } + } + + Ok(None) + }) + } + + #[doc(hidden)] + fn describe<'q: 'c, E>(self, query: E) -> BoxFuture<'c, Result, Error>> + where + E: Execute<'q, Self::Database>, + { + let query = query.query(); + let statement = SqliteStatement::prepare(&mut self.handle, query, false); + + Box::pin(async move { + let mut params = Vec::new(); + let mut columns = Vec::new(); + + if let Some(statement) = statement?.handles.get(0) { + // NOTE: we can infer *nothing* about parameters apart from the count + params.resize(statement.bind_parameter_count(), None); + + let num_columns = statement.column_count(); + columns.reserve(num_columns); + + for i in 0..num_columns { + let name = statement.column_name(i).to_owned().into(); + let type_info = statement.column_decltype(i); + let not_null = statement.column_not_null(i)?; + + columns.push(Column { + name, + type_info, + not_null, + }) + } + } + + Ok(Describe { params, columns }) + }) + } +} diff --git a/sqlx-core/src/sqlite/connection/handle.rs b/sqlx-core/src/sqlite/connection/handle.rs new file mode 100644 index 00000000..6aa8f376 --- /dev/null +++ b/sqlx-core/src/sqlite/connection/handle.rs @@ -0,0 +1,47 @@ +use std::ptr::NonNull; + +use libsqlite3_sys::{sqlite3, sqlite3_close, SQLITE_OK}; + +use crate::sqlite::SqliteError; + +/// Managed handle to the raw SQLite3 database handle. +/// The database handle will be closed when this is dropped. +#[derive(Debug)] +pub(crate) struct ConnectionHandle(pub(super) NonNull); + +// A SQLite3 handle is safe to send between threads, provided not more than +// one is accessing it at the same time. This is upheld as long as [SQLITE_CONFIG_MULTITHREAD] is +// enabled and [SQLITE_THREADSAFE] was enabled when sqlite was compiled. We refuse to work +// if these conditions are not upheld. + +// + +// + +unsafe impl Send for ConnectionHandle {} + +impl ConnectionHandle { + #[inline] + pub(super) unsafe fn new(ptr: *mut sqlite3) -> Self { + Self(NonNull::new_unchecked(ptr)) + } + + #[inline] + pub(crate) fn as_ptr(&self) -> *mut sqlite3 { + self.0.as_ptr() + } +} + +impl Drop for ConnectionHandle { + fn drop(&mut self) { + unsafe { + // https://sqlite.org/c3ref/close.html + let status = sqlite3_close(self.0.as_ptr()); + if status != SQLITE_OK { + // this should *only* happen due to an internal bug in SQLite where we left + // SQLite handles open + panic!("{}", SqliteError::new(self.0.as_ptr())); + } + } + } +} diff --git a/sqlx-core/src/sqlite/connection/mod.rs b/sqlx-core/src/sqlite/connection/mod.rs new file mode 100644 index 00000000..49df218d --- /dev/null +++ b/sqlx-core/src/sqlite/connection/mod.rs @@ -0,0 +1,81 @@ +use std::fmt::{self, Debug, Formatter}; +use std::sync::Arc; + +use futures_core::future::BoxFuture; +use futures_util::future; +use hashbrown::HashMap; + +use crate::connection::{Connect, Connection}; +use crate::error::Error; +use crate::ext::ustr::UStr; +use crate::sqlite::connection::establish::establish; +use crate::sqlite::statement::{SqliteStatement, StatementWorker}; +use crate::sqlite::{Sqlite, SqliteConnectOptions}; + +mod establish; +mod executor; +mod handle; + +pub(crate) use handle::ConnectionHandle; + +/// A connection to a [Sqlite] database. +pub struct SqliteConnection { + pub(crate) handle: ConnectionHandle, + pub(crate) worker: StatementWorker, + + // cache of semi-persistent statements + pub(crate) statements: HashMap, + + // most recent non-persistent statement + pub(crate) statement: Option, + + // working memory for the active row's column information + scratch_row_column_names: Arc>, +} + +impl Debug for SqliteConnection { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("SqliteConnection").finish() + } +} + +impl Connection for SqliteConnection { + type Database = Sqlite; + + fn close(self) -> BoxFuture<'static, Result<(), Error>> { + // nothing explicit to do; connection will close in drop + Box::pin(future::ok(())) + } + + fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { + // For SQLite connections, PING does effectively nothing + Box::pin(future::ok(())) + } +} + +impl Connect for SqliteConnection { + type Options = SqliteConnectOptions; + + #[inline] + fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result> { + Box::pin(async move { + let conn = establish(options).await?; + + // TODO: Apply any connection options once we have them defined + + Ok(conn) + }) + } +} + +impl Drop for SqliteConnection { + fn drop(&mut self) { + // before the connection handle is dropped, + // we must explicitly drop the statements as the drop-order in a struct is undefined + self.statements.clear(); + self.statement.take(); + + // we then explicitly close the worker + self.worker.close(); + } +} diff --git a/sqlx-core/src/sqlite/cursor.rs b/sqlx-core/src/sqlite/cursor.rs deleted file mode 100644 index ac1f33b3..00000000 --- a/sqlx-core/src/sqlite/cursor.rs +++ /dev/null @@ -1,99 +0,0 @@ -use futures_core::future::BoxFuture; - -use crate::connection::ConnectionSource; -use crate::cursor::Cursor; -use crate::executor::Execute; -use crate::pool::Pool; -use crate::sqlite::statement::Step; -use crate::sqlite::{Sqlite, SqliteArguments, SqliteConnection, SqliteRow}; - -pub struct SqliteCursor<'c, 'q> { - pub(super) source: ConnectionSource<'c, SqliteConnection>, - query: &'q str, - arguments: Option, - pub(super) statement: Option>, -} - -impl crate::cursor::private::Sealed for SqliteCursor<'_, '_> {} - -impl<'c, 'q> Cursor<'c, 'q> for SqliteCursor<'c, 'q> { - type Database = Sqlite; - - #[doc(hidden)] - fn from_pool(pool: &Pool, query: E) -> Self - where - Self: Sized, - E: Execute<'q, Sqlite>, - { - let (query, arguments) = query.into_parts(); - - Self { - source: ConnectionSource::Pool(pool.clone()), - statement: None, - query, - arguments, - } - } - - #[doc(hidden)] - fn from_connection(conn: &'c mut SqliteConnection, query: E) -> Self - where - Self: Sized, - E: Execute<'q, Sqlite>, - { - let (query, arguments) = query.into_parts(); - - Self { - source: ConnectionSource::ConnectionRef(conn), - statement: None, - query, - arguments, - } - } - - fn next(&mut self) -> BoxFuture>>> { - Box::pin(next(self)) - } -} - -async fn next<'a, 'c: 'a, 'q: 'a>( - cursor: &'a mut SqliteCursor<'c, 'q>, -) -> crate::Result>> { - let conn = cursor.source.resolve().await?; - - loop { - if cursor.statement.is_none() { - let key = conn.prepare(&mut cursor.query, cursor.arguments.is_some())?; - - if let Some(arguments) = &mut cursor.arguments { - conn.statement_mut(key).bind(arguments)?; - } - - cursor.statement = Some(key); - } - - let key = cursor.statement.unwrap(); - let statement = conn.statement_mut(key); - - let step = statement.step().await?; - - match step { - Step::Row => { - return Ok(Some(SqliteRow { - values: statement.data_count(), - statement: key, - connection: conn, - })); - } - - Step::Done if cursor.query.is_empty() => { - return Ok(None); - } - - Step::Done => { - cursor.statement = None; - // continue - } - } - } -} diff --git a/sqlx-core/src/sqlite/database.rs b/sqlx-core/src/sqlite/database.rs index a9eb6e5e..7dda8707 100644 --- a/sqlx-core/src/sqlite/database.rs +++ b/sqlx-core/src/sqlite/database.rs @@ -1,45 +1,33 @@ -use crate::cursor::HasCursor; -use crate::database::Database; -use crate::row::HasRow; -use crate::sqlite::error::SqliteError; +use crate::database::{Database, HasArguments, HasValueRef}; use crate::sqlite::{ - SqliteArgumentValue, SqliteArguments, SqliteConnection, SqliteCursor, SqliteRow, - SqliteTypeInfo, SqliteValue, + SqliteArgumentValue, SqliteArguments, SqliteConnection, SqliteRow, SqliteTypeInfo, SqliteValue, + SqliteValueRef, }; -use crate::value::HasRawValue; -/// **Sqlite** database driver. +/// Sqlite database driver. #[derive(Debug)] pub struct Sqlite; impl Database for Sqlite { type Connection = SqliteConnection; - type Arguments = SqliteArguments; + type Row = SqliteRow; type TypeInfo = SqliteTypeInfo; - type TableId = String; - - type RawBuffer = Vec; - - type Error = SqliteError; + type Value = SqliteValue; } -impl<'c> HasRow<'c> for Sqlite { +impl<'r> HasValueRef<'r> for Sqlite { type Database = Sqlite; - type Row = SqliteRow<'c>; + type ValueRef = SqliteValueRef<'r>; } -impl<'c, 'q> HasCursor<'c, 'q> for Sqlite { +impl<'q> HasArguments<'q> for Sqlite { type Database = Sqlite; - type Cursor = SqliteCursor<'c, 'q>; -} + type Arguments = SqliteArguments<'q>; -impl<'c> HasRawValue<'c> for Sqlite { - type Database = Sqlite; - - type RawValue = SqliteValue<'c>; + type ArgumentBuffer = Vec>; } diff --git a/sqlx-core/src/sqlite/error.rs b/sqlx-core/src/sqlite/error.rs index eca7fec1..f5b9b7e9 100644 --- a/sqlx-core/src/sqlite/error.rs +++ b/sqlx-core/src/sqlite/error.rs @@ -1,95 +1,53 @@ -use crate::error::DatabaseError; - -use bitflags::_core::str::from_utf8_unchecked; -use libsqlite3_sys::{sqlite3, sqlite3_errmsg, sqlite3_extended_errcode}; use std::error::Error as StdError; use std::ffi::CStr; -use std::fmt::{self, Display}; +use std::fmt::{self, Display, Formatter}; use std::os::raw::c_int; +use std::str::from_utf8_unchecked; -#[derive(Debug)] -pub struct SqliteError { - code: String, - message: String, -} +use libsqlite3_sys::{sqlite3, sqlite3_errmsg, sqlite3_extended_errcode}; + +use crate::error::DatabaseError; // Error Codes And Messages // https://www.sqlite.org/c3ref/errcode.html +#[derive(Debug)] +pub struct SqliteError { + code: c_int, + message: String, +} + impl SqliteError { - pub(super) fn from_connection(conn: *mut sqlite3) -> Self { - let code: c_int = unsafe { sqlite3_extended_errcode(conn) }; + pub(crate) fn new(handle: *mut sqlite3) -> Self { + // returns the extended result code even when extended result codes are disabled + let code: c_int = unsafe { sqlite3_extended_errcode(handle) }; + // return English-language text that describes the error let message = unsafe { - let err = sqlite3_errmsg(conn); - debug_assert!(!err.is_null()); + let msg = sqlite3_errmsg(handle); + debug_assert!(!msg.is_null()); - from_utf8_unchecked(CStr::from_ptr(err).to_bytes()) + from_utf8_unchecked(CStr::from_ptr(msg).to_bytes()) }; Self { - code: code.to_string(), + code, message: message.to_owned(), } } } impl Display for SqliteError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad(self.message()) - } -} - -impl DatabaseError for SqliteError { - fn message(&self) -> &str { - &self.message - } - - fn code(&self) -> Option<&str> { - Some(&self.code) - } - - fn as_ref_err(&self) -> &(dyn StdError + Send + Sync + 'static) { - self - } - - fn as_mut_err(&mut self) -> &mut (dyn StdError + Send + Sync + 'static) { - self - } - - fn into_box_err(self: Box) -> Box { - self + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.pad(&self.message) } } impl StdError for SqliteError {} -impl From for crate::Error { - fn from(err: SqliteError) -> Self { - crate::Error::Database(Box::new(err)) +impl DatabaseError for SqliteError { + #[inline] + fn message(&self) -> &str { + &self.message } } - -#[test] -fn test_error_downcasting() { - let error = SqliteError { - code: "SQLITE_ERR_SOMETHING".into(), - message: "Some hypothetical error message".into(), - }; - - let error = crate::Error::from(error); - - let db_err = match error { - crate::Error::Database(db_err) => db_err, - e => panic!("expected Error::Database, got {:?}", e), - }; - - assert_eq!( - &db_err.downcast_ref::().code, - "SQLITE_ERR_SOMETHING" - ); - assert_eq!( - db_err.downcast::().code, - "SQLITE_ERR_SOMETHING" - ); -} diff --git a/sqlx-core/src/sqlite/executor.rs b/sqlx-core/src/sqlite/executor.rs deleted file mode 100644 index 3ba7b5f4..00000000 --- a/sqlx-core/src/sqlite/executor.rs +++ /dev/null @@ -1,193 +0,0 @@ -use futures_core::future::BoxFuture; - -use libsqlite3_sys::sqlite3_changes; - -use crate::cursor::Cursor; -use crate::describe::{Column, Describe}; -use crate::executor::{Execute, Executor, RefExecutor}; -use crate::sqlite::cursor::SqliteCursor; -use crate::sqlite::statement::{Statement, Step}; -use crate::sqlite::type_info::SqliteType; -use crate::sqlite::{Sqlite, SqliteConnection, SqliteTypeInfo}; - -impl SqliteConnection { - pub(super) fn prepare( - &mut self, - query: &mut &str, - persistent: bool, - ) -> crate::Result> { - // TODO: Revisit statement caching and allow cache expiration by using a - // generational index - - if !persistent { - // A non-persistent query will be immediately prepared and returned, - // regardless of the current state of the cache - self.statement = Some(Statement::new(self, query, false)?); - return Ok(None); - } - - if let Some(key) = self.statement_by_query.get(&**query) { - let statement = &mut self.statements[*key]; - - // Adjust the passed in query string as if [string3_prepare] - // did the tail parsing - *query = &query[statement.tail..]; - - // As this statement has very likely been used before, we reset - // it to clear the bindings and its program state - statement.reset(); - - return Ok(Some(*key)); - } - - // Prepare a new statement object; ensuring to tell SQLite that this will be stored - // for a "long" time and re-used multiple times - - let query_key = query.to_owned(); - let statement = Statement::new(self, query, true)?; - - let key = self.statements.len(); - - self.statement_by_query.insert(query_key, key); - self.statements.push(statement); - - Ok(Some(key)) - } - - // This is used for [affected_rows] in the public API. - fn changes(&mut self) -> u64 { - // Returns the number of rows modified, inserted or deleted by the most recently - // completed INSERT, UPDATE or DELETE statement. - - // https://www.sqlite.org/c3ref/changes.html - let changes = unsafe { sqlite3_changes(self.handle()) }; - changes as u64 - } - - #[inline] - pub(super) fn statement(&self, key: Option) -> &Statement { - match key { - Some(key) => &self.statements[key], - None => self.statement.as_ref().unwrap(), - } - } - - #[inline] - pub(super) fn statement_mut(&mut self, key: Option) -> &mut Statement { - match key { - Some(key) => &mut self.statements[key], - None => self.statement.as_mut().unwrap(), - } - } -} - -impl Executor for SqliteConnection { - type Database = Sqlite; - - fn execute<'e, 'q: 'e, 'c: 'e, E: 'e>( - &'c mut self, - query: E, - ) -> BoxFuture<'e, crate::Result> - where - E: Execute<'q, Self::Database>, - { - log_execution!(query, { - let (mut query, mut arguments) = query.into_parts(); - - Box::pin(async move { - loop { - let key = self.prepare(&mut query, arguments.is_some())?; - let statement = self.statement_mut(key); - - if let Some(arguments) = &mut arguments { - statement.bind(arguments)?; - } - - while let Step::Row = statement.step().await? { - // We only care about the rows modified; ignore - } - - if query.is_empty() { - break; - } - } - - Ok(self.changes()) - }) - }) - } - - fn fetch<'q, E>(&mut self, query: E) -> SqliteCursor<'_, 'q> - where - E: Execute<'q, Self::Database>, - { - log_execution!(query, { SqliteCursor::from_connection(self, query) }) - } - - #[doc(hidden)] - fn describe<'e, 'q, E: 'e>( - &'e mut self, - query: E, - ) -> BoxFuture<'e, crate::Result>> - where - E: Execute<'q, Self::Database>, - { - Box::pin(async move { - let (mut query, _) = query.into_parts(); - let key = self.prepare(&mut query, false)?; - let statement = self.statement_mut(key); - - // First let's attempt to describe what we can about parameter types - // Which happens to just be the count, heh - let num_params = statement.params(); - let params = vec![None; num_params].into_boxed_slice(); - - // Next, collect (return) column types and names - let num_columns = statement.column_count(); - let mut columns = Vec::with_capacity(num_columns); - for i in 0..num_columns { - let name = statement.column_name(i).to_owned(); - let decl = statement.column_decltype(i); - - let r#type = match decl { - None => None, - Some(decl) => match &*decl.to_ascii_lowercase() { - "bool" | "boolean" => Some(SqliteType::Boolean), - "clob" | "text" => Some(SqliteType::Text), - "blob" => Some(SqliteType::Blob), - "real" | "double" | "double precision" | "float" => Some(SqliteType::Float), - decl @ _ if decl.contains("int") => Some(SqliteType::Integer), - decl @ _ if decl.contains("char") => Some(SqliteType::Text), - _ => None, - }, - }; - - columns.push(Column { - name: Some(name.into()), - non_null: statement.column_not_null(i)?, - table_id: None, - type_info: r#type.map(|r#type| SqliteTypeInfo { - r#type, - affinity: None, - }), - }) - } - - Ok(Describe { - param_types: params, - result_columns: columns.into_boxed_slice(), - }) - }) - } -} - -impl<'e> RefExecutor<'e> for &'e mut SqliteConnection { - type Database = Sqlite; - - fn fetch_by_ref<'q, E>(self, query: E) -> SqliteCursor<'e, 'q> - where - E: Execute<'q, Self::Database>, - { - log_execution!(query, { SqliteCursor::from_connection(self, query) }) - } -} diff --git a/sqlx-core/src/sqlite/mod.rs b/sqlx-core/src/sqlite/mod.rs index 8b6793f0..d91e3750 100644 --- a/sqlx-core/src/sqlite/mod.rs +++ b/sqlx-core/src/sqlite/mod.rs @@ -1,4 +1,4 @@ -//! **SQLite** database and connection types. +//! **SQLite** database driver. // SQLite is a C library. All interactions require FFI which is unsafe. // All unsafe blocks should have comments pointing to SQLite docs and ensuring that we maintain @@ -7,30 +7,23 @@ mod arguments; mod connection; -mod cursor; mod database; mod error; -mod executor; +mod options; mod row; mod statement; mod type_info; pub mod types; mod value; -mod worker; pub use arguments::{SqliteArgumentValue, SqliteArguments}; pub use connection::SqliteConnection; -pub use cursor::SqliteCursor; pub use database::Sqlite; pub use error::SqliteError; +pub use options::SqliteConnectOptions; pub use row::SqliteRow; pub use type_info::SqliteTypeInfo; -pub use value::SqliteValue; +pub use value::{SqliteValue, SqliteValueRef}; -/// An alias for [`Pool`][crate::pool::Pool], specialized for **Sqlite**. -#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))] +/// An alias for [`Pool`][crate::pool::Pool], specialized for SQLite. pub type SqlitePool = crate::pool::Pool; - -make_query_as!(SqliteQueryAs, Sqlite, SqliteRow); -impl_map_row_for_row!(Sqlite, SqliteRow); -impl_from_row_for_tuples!(Sqlite, SqliteRow); diff --git a/sqlx-core/src/sqlite/options.rs b/sqlx-core/src/sqlite/options.rs new file mode 100644 index 00000000..c0623c23 --- /dev/null +++ b/sqlx-core/src/sqlite/options.rs @@ -0,0 +1,46 @@ +use std::path::PathBuf; +use std::str::FromStr; + +use crate::error::BoxDynError; + +// TODO: Look at go-sqlite for option ideas +// TODO: journal_mode + +/// Options and flags which can be used to configure a SQLite connection. +pub struct SqliteConnectOptions { + pub(crate) filename: PathBuf, + pub(crate) in_memory: bool, +} + +impl SqliteConnectOptions { + pub fn new() -> Self { + Self { + filename: PathBuf::from(":memory:"), + in_memory: false, + } + } +} + +impl FromStr for SqliteConnectOptions { + type Err = BoxDynError; + + fn from_str(mut s: &str) -> Result { + let mut options = Self { + filename: PathBuf::new(), + in_memory: false, + }; + + // remove scheme + s = s + .trim_start_matches("sqlite://") + .trim_start_matches("sqlite:"); + + if s == ":memory:" { + options.in_memory = true; + } else { + options.filename = s.parse()?; + } + + Ok(options) + } +} diff --git a/sqlx-core/src/sqlite/row.rs b/sqlx-core/src/sqlite/row.rs index a761dcf6..5954d912 100644 --- a/sqlx-core/src/sqlite/row.rs +++ b/sqlx-core/src/sqlite/row.rs @@ -1,68 +1,137 @@ -use crate::row::{ColumnIndex, Row}; -use crate::sqlite::statement::Statement; -use crate::sqlite::value::SqliteValue; -use crate::sqlite::{Sqlite, SqliteConnection}; +use std::ptr::null_mut; +use std::slice; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::{Arc, Weak}; -pub struct SqliteRow<'c> { - pub(super) values: usize, - pub(super) statement: Option, - pub(super) connection: &'c SqliteConnection, +use hashbrown::HashMap; + +use crate::error::Error; +use crate::ext::ustr::UStr; +use crate::row::{ColumnIndex, Row}; +use crate::sqlite::statement::StatementHandle; +use crate::sqlite::{Sqlite, SqliteValue, SqliteValueRef}; + +/// Implementation of [`Row`] for SQLite. +pub struct SqliteRow { + // Raw handle of the SQLite statement + // This is valid to access IFF the atomic [values] is null + // The way this works is that the executor retains a weak reference to + // [values] after the Row is created and yielded downstream. + // IF the user drops the Row before iterating the stream (so + // nearly all of our internal stream iterators), the executor moves on; otherwise, + // it actually inflates this row with a list of owned sqlite3 values. + pub(crate) statement: StatementHandle, + + pub(crate) values: Arc>, + pub(crate) num_values: usize, + + pub(crate) column_names: Arc>, } -impl crate::row::private_row::Sealed for SqliteRow<'_> {} +impl crate::row::private_row::Sealed for SqliteRow {} // Accessing values from the statement object is // safe across threads as long as we don't call [sqlite3_step] -// That should not be possible as long as an immutable borrow is held on the connection -unsafe impl Send for SqliteRow<'_> {} -unsafe impl Sync for SqliteRow<'_> {} +// we block ourselves from doing that by only exposing +// a set interface on [StatementHandle] -impl<'c> SqliteRow<'c> { - #[inline] - fn statement(&self) -> &'c Statement { - self.connection.statement(self.statement) - } -} +unsafe impl Send for SqliteRow {} +unsafe impl Sync for SqliteRow {} -impl<'c> Row<'c> for SqliteRow<'c> { - type Database = Sqlite; +impl SqliteRow { + // creates a new row that is internally referencing the **current** state of the statement + // returns a weak reference to an atomic list where the executor should inflate if its going + // to increment the statement with [step] + pub(crate) fn current( + statement: StatementHandle, + column_names: &Arc>, + ) -> (Self, Weak>) { + let values = Arc::new(AtomicPtr::new(null_mut())); + let weak_values = Arc::downgrade(&values); + let size = statement.column_count(); - #[inline] - fn len(&self) -> usize { - self.values + let row = Self { + statement, + values, + num_values: size, + column_names: Arc::clone(column_names), + }; + + (row, weak_values) } - #[doc(hidden)] - fn try_get_raw(&self, index: I) -> crate::Result> - where - I: ColumnIndex<'c, Self>, - { - Ok(SqliteValue { - statement: self.statement(), - index: index.index(self)? as i32, - }) - } -} + // inflates this Row into memory as a list of owned, protected SQLite value objects + // this is called by the + pub(crate) fn inflate(statement: &StatementHandle, values_ref: &AtomicPtr) { + let size = statement.column_count(); + let mut values = Vec::with_capacity(size); -impl<'c> ColumnIndex<'c, SqliteRow<'c>> for usize { - fn index(&self, row: &SqliteRow<'c>) -> crate::Result { - let len = Row::len(row); - - if *self >= len { - return Err(crate::Error::ColumnIndexOutOfBounds { len, index: *self }); + for i in 0..size { + values.push(statement.column_value(i)); } - Ok(*self) + // decay the array signifier and become just a normal, leaked array + let values_ptr = Box::into_raw(values.into_boxed_slice()) as *mut SqliteValue; + + // store in the atomic ptr storage + values_ref.store(values_ptr, Ordering::Release); + } + + pub(crate) fn inflate_if_needed( + statement: &StatementHandle, + weak_values_ref: Option>>, + ) { + if let Some(v) = weak_values_ref.and_then(|v| v.upgrade()) { + SqliteRow::inflate(statement, &v); + } } } -impl<'c> ColumnIndex<'c, SqliteRow<'c>> for str { - fn index(&self, row: &SqliteRow<'c>) -> crate::Result { - row.statement() - .columns - .get(self) - .ok_or_else(|| crate::Error::ColumnNotFound((*self).into())) - .map(|&index| index as usize) +impl Row for SqliteRow { + type Database = Sqlite; + + fn len(&self) -> usize { + self.num_values + } + + fn try_get_raw(&self, index: I) -> Result, Error> + where + I: ColumnIndex, + { + let index = index.index(self)?; + + let values_ptr = self.values.load(Ordering::Acquire); + if !values_ptr.is_null() { + // we have raw value data, we should use that + let values: &[SqliteValue] = + unsafe { slice::from_raw_parts(values_ptr, self.num_values) }; + + Ok(SqliteValueRef::value(&values[index])) + } else { + Ok(SqliteValueRef::statement(&self.statement, index)) + } + } +} + +impl Drop for SqliteRow { + fn drop(&mut self) { + // if there is a non-null pointer stored here, we need to re-load and drop it + let values_ptr = self.values.load(Ordering::Acquire); + if !values_ptr.is_null() { + let values: &mut [SqliteValue] = + unsafe { slice::from_raw_parts_mut(values_ptr, self.num_values) }; + + let _ = unsafe { Box::from_raw(values) }; + } + } +} + +impl ColumnIndex for &'_ str { + fn index(&self, row: &SqliteRow) -> Result { + row.column_names + .get(*self) + .ok_or_else(|| Error::ColumnNotFound((*self).into())) + .map(|v| *v) } } diff --git a/sqlx-core/src/sqlite/statement.rs b/sqlx-core/src/sqlite/statement.rs deleted file mode 100644 index 37b1b69a..00000000 --- a/sqlx-core/src/sqlite/statement.rs +++ /dev/null @@ -1,305 +0,0 @@ -#![allow(unsafe_code)] - -use core::ptr::{null, null_mut, NonNull}; -use std::collections::HashMap; -use std::ffi::CStr; -use std::os::raw::{c_char, c_int}; -use std::ptr; - -use libsqlite3_sys::{ - sqlite3_bind_parameter_count, sqlite3_clear_bindings, sqlite3_column_count, - sqlite3_column_database_name, sqlite3_column_decltype, sqlite3_column_name, - sqlite3_column_origin_name, sqlite3_column_table_name, sqlite3_data_count, sqlite3_finalize, - sqlite3_prepare_v3, sqlite3_reset, sqlite3_step, sqlite3_stmt, sqlite3_table_column_metadata, - SQLITE_DONE, SQLITE_OK, SQLITE_PREPARE_NO_VTAB, SQLITE_PREPARE_PERSISTENT, SQLITE_ROW, -}; - -use crate::sqlite::connection::SqliteConnectionHandle; -use crate::sqlite::worker::Worker; -use crate::sqlite::SqliteError; -use crate::sqlite::{SqliteArguments, SqliteConnection}; - -/// Return values from [SqliteStatement::step]. -pub(super) enum Step { - /// The statement has finished executing successfully. - Done, - - /// Another row of output is available. - Row, -} - -/// Thin wrapper around [sqlite3_stmt] to impl `Send`. -#[derive(Clone, Copy)] -pub(super) struct SqliteStatementHandle(NonNull); - -/// Represents a _single_ SQL statement that has been compiled into binary -/// form and is ready to be evaluated. -/// -/// The statement is finalized ( `sqlite3_finalize` ) on drop. -pub(super) struct Statement { - handle: Option, - pub(super) connection: SqliteConnectionHandle, - pub(super) worker: Worker, - pub(super) tail: usize, - pub(super) columns: HashMap, -} - -// SQLite3 statement objects are safe to send between threads, but *not* safe -// for general-purpose concurrent access between threads. See more notes -// on [SqliteConnectionHandle]. -unsafe impl Send for SqliteStatementHandle {} - -impl Statement { - pub(super) fn new( - conn: &mut SqliteConnection, - query: &mut &str, - persistent: bool, - ) -> crate::Result { - // TODO: Error on queries that are too large - let query_ptr = query.as_bytes().as_ptr() as *const c_char; - let query_len = query.len() as i32; - let mut statement_handle: *mut sqlite3_stmt = null_mut(); - let mut flags = SQLITE_PREPARE_NO_VTAB; - let mut tail: *const c_char = null(); - - if persistent { - // SQLITE_PREPARE_PERSISTENT - // The SQLITE_PREPARE_PERSISTENT flag is a hint to the query - // planner that the prepared statement will be retained for a long time - // and probably reused many times. - flags |= SQLITE_PREPARE_PERSISTENT; - } - - // - let status = unsafe { - sqlite3_prepare_v3( - conn.handle(), - query_ptr, - query_len, - flags as u32, - &mut statement_handle, - &mut tail, - ) - }; - - if status != SQLITE_OK { - return Err(SqliteError::from_connection(conn.handle()).into()); - } - - // If pzTail is not NULL then *pzTail is made to point to the first byte - // past the end of the first SQL statement in zSql. - let tail = (tail as usize) - (query_ptr as usize); - *query = &query[tail..].trim(); - - let mut self_ = Self { - worker: conn.worker.clone(), - connection: conn.handle, - handle: NonNull::new(statement_handle).map(SqliteStatementHandle), - columns: HashMap::new(), - tail, - }; - - // Prepare a column hash map for use in pulling values from a column by name - let count = self_.column_count(); - self_.columns.reserve(count); - - for i in 0..count { - let name = self_.column_name(i).to_owned(); - self_.columns.insert(name, i); - } - - Ok(self_) - } - - /// Returns a pointer to the raw C pointer backing this statement. - #[inline] - pub(super) unsafe fn handle(&self) -> Option<*mut sqlite3_stmt> { - self.handle.map(|handle| handle.0.as_ptr()) - } - - pub(super) fn data_count(&mut self) -> usize { - // https://sqlite.org/c3ref/data_count.html - - // The sqlite3_data_count(P) interface returns the number of columns - // in the current row of the result set. - - // The value is correct only if there was a recent call to - // sqlite3_step that returned SQLITE_ROW. - - unsafe { self.handle().map_or(0, |handle| sqlite3_data_count(handle)) as usize } - } - - pub(super) fn column_count(&mut self) -> usize { - // https://sqlite.org/c3ref/column_count.html - unsafe { - self.handle() - .map_or(0, |handle| sqlite3_column_count(handle)) as usize - } - } - - pub(super) fn column_name(&mut self, index: usize) -> &str { - unsafe { - self.handle() - .map(|handle| { - // https://sqlite.org/c3ref/column_name.html - let ptr = sqlite3_column_name(handle, index as c_int); - debug_assert!(!ptr.is_null()); - - CStr::from_ptr(ptr) - }) - .map_or(Ok(""), |name| name.to_str()) - .unwrap() - } - } - - pub(super) fn column_decltype(&mut self, index: usize) -> Option<&str> { - unsafe { - self.handle() - .and_then(|handle| { - let ptr = sqlite3_column_decltype(handle, index as c_int); - - if ptr.is_null() { - None - } else { - Some(CStr::from_ptr(ptr)) - } - }) - .map(|name| name.to_str().unwrap()) - } - } - - pub(super) fn column_not_null(&mut self, index: usize) -> crate::Result> { - let handle = unsafe { - if let Some(handle) = self.handle() { - handle - } else { - // we do not know the nullablility of a column that doesn't exist on a statement - // that doesn't exist - return Ok(None); - } - }; - - unsafe { - // https://sqlite.org/c3ref/column_database_name.html - // - // ### Note - // The returned string is valid until the prepared statement is destroyed using - // sqlite3_finalize() or until the statement is automatically reprepared by the - // first call to sqlite3_step() for a particular run or until the same information - // is requested again in a different encoding. - let db_name = sqlite3_column_database_name(handle, index as c_int); - let table_name = sqlite3_column_table_name(handle, index as c_int); - let origin_name = sqlite3_column_origin_name(handle, index as c_int); - - if db_name.is_null() || table_name.is_null() || origin_name.is_null() { - return Ok(None); - } - - let mut not_null: c_int = 0; - - // https://sqlite.org/c3ref/table_column_metadata.html - let status = sqlite3_table_column_metadata( - self.connection.0.as_ptr(), - db_name, - table_name, - origin_name, - // function docs state to provide NULL for return values you don't care about - ptr::null_mut(), - ptr::null_mut(), - &mut not_null, - ptr::null_mut(), - ptr::null_mut(), - ); - - if status != SQLITE_OK { - // implementation note: the docs for sqlite3_table_column_metadata() specify - // that an error can be returned if the column came from a view; however, - // experimentally we found that the above functions give us the true origin - // for columns in views that came from real tables and so we should never hit this - // error; for view columns that are expressions we are given NULL for their origins - // so we don't need special handling for that case either. - // - // this is confirmed in the `tests/sqlite-macros.rs` integration test - return Err(SqliteError::from_connection(self.connection.0.as_ptr()).into()); - } - - Ok(Some(not_null != 0)) - } - } - - pub(super) fn params(&mut self) -> usize { - // https://www.hwaci.com/sw/sqlite/c3ref/bind_parameter_count.html - unsafe { - self.handle() - .map_or(0, |handle| sqlite3_bind_parameter_count(handle)) as usize - } - } - - pub(super) fn bind(&mut self, arguments: &mut SqliteArguments) -> crate::Result<()> { - for index in 0..self.params() { - if let Some(value) = arguments.next() { - value.bind(self, index + 1)?; - } else { - break; - } - } - - Ok(()) - } - - pub(super) fn reset(&mut self) { - let handle = unsafe { - if let Some(handle) = self.handle() { - handle - } else { - // nothing to reset if its null - return; - } - }; - - // https://sqlite.org/c3ref/reset.html - // https://sqlite.org/c3ref/clear_bindings.html - - // the status value of reset is ignored because it merely propagates - // the status of the most recently invoked step function - - let _ = unsafe { sqlite3_reset(handle) }; - let _ = unsafe { sqlite3_clear_bindings(handle) }; - } - - pub(super) async fn step(&mut self) -> crate::Result { - // https://sqlite.org/c3ref/step.html - - if let Some(handle) = self.handle { - let status = unsafe { - self.worker - .run(move || sqlite3_step(handle.0.as_ptr())) - .await - }; - - match status { - SQLITE_DONE => Ok(Step::Done), - - SQLITE_ROW => Ok(Step::Row), - - _ => { - return Err(SqliteError::from_connection(self.connection.0.as_ptr()).into()); - } - } - } else { - // An empty (null) query will always emit `Step::Done` - Ok(Step::Done) - } - } -} - -impl Drop for Statement { - fn drop(&mut self) { - // https://sqlite.org/c3ref/finalize.html - unsafe { - if let Some(handle) = self.handle() { - let _ = sqlite3_finalize(handle); - } - } - } -} diff --git a/sqlx-core/src/sqlite/statement/handle.rs b/sqlx-core/src/sqlite/statement/handle.rs new file mode 100644 index 00000000..99edc352 --- /dev/null +++ b/sqlx-core/src/sqlite/statement/handle.rs @@ -0,0 +1,254 @@ +use std::ffi::c_void; +use std::ffi::CStr; +use std::os::raw::{c_char, c_int}; +use std::ptr::NonNull; +use std::str::{from_utf8, from_utf8_unchecked}; + +use libsqlite3_sys::{ + sqlite3, sqlite3_bind_blob64, sqlite3_bind_double, sqlite3_bind_int, sqlite3_bind_int64, + sqlite3_bind_null, sqlite3_bind_parameter_count, sqlite3_bind_parameter_name, + sqlite3_bind_text64, sqlite3_changes, sqlite3_column_blob, sqlite3_column_bytes, + sqlite3_column_count, sqlite3_column_database_name, sqlite3_column_decltype, + sqlite3_column_double, sqlite3_column_int, sqlite3_column_int64, sqlite3_column_name, + sqlite3_column_origin_name, sqlite3_column_table_name, sqlite3_column_type, + sqlite3_column_value, sqlite3_db_handle, sqlite3_stmt, sqlite3_table_column_metadata, + SQLITE_OK, SQLITE_TRANSIENT, SQLITE_UTF8, +}; + +use crate::error::{BoxDynError, Error}; +use crate::sqlite::type_info::DataType; +use crate::sqlite::{SqliteError, SqliteTypeInfo, SqliteValue}; +use std::ptr; +use std::slice::from_raw_parts; + +#[derive(Debug, Copy, Clone)] +pub(crate) struct StatementHandle(pub(super) NonNull); + +// access to SQLite3 statement handles are safe to send and share between threads +// as long as the `sqlite3_step` call is serialized. + +unsafe impl Send for StatementHandle {} +unsafe impl Sync for StatementHandle {} + +impl StatementHandle { + #[inline] + pub(super) unsafe fn db_handle(&self) -> *mut sqlite3 { + // O(c) access to the connection handle for this statement handle + // https://sqlite.org/c3ref/db_handle.html + sqlite3_db_handle(self.0.as_ptr()) + } + + #[inline] + pub(crate) fn last_error(&self) -> SqliteError { + SqliteError::new(unsafe { self.db_handle() }) + } + + #[inline] + pub(crate) fn column_count(&self) -> usize { + // https://sqlite.org/c3ref/column_count.html + unsafe { sqlite3_column_count(self.0.as_ptr()) as usize } + } + + #[inline] + pub(crate) fn changes(&self) -> u64 { + // returns the number of changes of the *last* statement; not + // necessarily this statement. + // https://sqlite.org/c3ref/changes.html + unsafe { sqlite3_changes(self.db_handle()) as u64 } + } + + #[inline] + pub(crate) fn column_name(&self, index: usize) -> &str { + // https://sqlite.org/c3ref/column_name.html + unsafe { + let name = sqlite3_column_name(self.0.as_ptr(), index as c_int); + debug_assert!(!name.is_null()); + + from_utf8_unchecked(CStr::from_ptr(name).to_bytes()) + } + } + + #[inline] + pub(crate) fn column_decltype(&self, index: usize) -> Option { + unsafe { + let decl = sqlite3_column_decltype(self.0.as_ptr(), index as c_int); + if decl.is_null() { + // If the Nth column of the result set is an expression or subquery, + // then a NULL pointer is returned. + return None; + } + + let decl = from_utf8_unchecked(CStr::from_ptr(decl).to_bytes()); + let ty: DataType = decl.parse().ok()?; + + Some(SqliteTypeInfo(ty)) + } + } + + pub(crate) fn column_not_null(&self, index: usize) -> Result, Error> { + unsafe { + // https://sqlite.org/c3ref/column_database_name.html + // + // ### Note + // The returned string is valid until the prepared statement is destroyed using + // sqlite3_finalize() or until the statement is automatically reprepared by the + // first call to sqlite3_step() for a particular run or until the same information + // is requested again in a different encoding. + let db_name = sqlite3_column_database_name(self.0.as_ptr(), index as c_int); + let table_name = sqlite3_column_table_name(self.0.as_ptr(), index as c_int); + let origin_name = sqlite3_column_origin_name(self.0.as_ptr(), index as c_int); + + if db_name.is_null() || table_name.is_null() || origin_name.is_null() { + return Ok(None); + } + + let mut not_null: c_int = 0; + + // https://sqlite.org/c3ref/table_column_metadata.html + let status = sqlite3_table_column_metadata( + self.db_handle(), + db_name, + table_name, + origin_name, + // function docs state to provide NULL for return values you don't care about + ptr::null_mut(), + ptr::null_mut(), + &mut not_null, + ptr::null_mut(), + ptr::null_mut(), + ); + + if status != SQLITE_OK { + // implementation note: the docs for sqlite3_table_column_metadata() specify + // that an error can be returned if the column came from a view; however, + // experimentally we found that the above functions give us the true origin + // for columns in views that came from real tables and so we should never hit this + // error; for view columns that are expressions we are given NULL for their origins + // so we don't need special handling for that case either. + // + // this is confirmed in the `tests/sqlite-macros.rs` integration test + return Err(SqliteError::new(self.db_handle()).into()); + } + + Ok(Some(not_null != 0)) + } + } + + // Number Of SQL Parameters + #[inline] + pub(crate) fn bind_parameter_count(&self) -> usize { + // https://www.sqlite.org/c3ref/bind_parameter_count.html + unsafe { sqlite3_bind_parameter_count(self.0.as_ptr()) as usize } + } + + // Name Of A Host Parameter + #[inline] + pub(crate) fn bind_parameter_name(&self, index: usize) -> Option<&str> { + unsafe { + // https://www.sqlite.org/c3ref/bind_parameter_name.html + let name = sqlite3_bind_parameter_name(self.0.as_ptr(), index as c_int); + if name.is_null() { + return None; + } + + Some(from_utf8_unchecked(CStr::from_ptr(name).to_bytes())) + } + } + + // Binding Values To Prepared Statements + // https://www.sqlite.org/c3ref/bind_blob.html + + #[inline] + pub(crate) fn bind_blob(&self, index: usize, v: &[u8]) -> c_int { + unsafe { + sqlite3_bind_blob64( + self.0.as_ptr(), + index as c_int, + v.as_ptr() as *const c_void, + v.len() as u64, + SQLITE_TRANSIENT(), + ) + } + } + + #[inline] + pub(crate) fn bind_text(&self, index: usize, v: &str) -> c_int { + unsafe { + sqlite3_bind_text64( + self.0.as_ptr(), + index as c_int, + v.as_ptr() as *const c_char, + v.len() as u64, + SQLITE_TRANSIENT(), + SQLITE_UTF8 as u8, + ) + } + } + + #[inline] + pub(crate) fn bind_int(&self, index: usize, v: i32) -> c_int { + unsafe { sqlite3_bind_int(self.0.as_ptr(), index as c_int, v as c_int) } + } + + #[inline] + pub(crate) fn bind_int64(&self, index: usize, v: i64) -> c_int { + unsafe { sqlite3_bind_int64(self.0.as_ptr(), index as c_int, v) } + } + + #[inline] + pub(crate) fn bind_double(&self, index: usize, v: f64) -> c_int { + unsafe { sqlite3_bind_double(self.0.as_ptr(), index as c_int, v) } + } + + #[inline] + pub(crate) fn bind_null(&self, index: usize) -> c_int { + unsafe { sqlite3_bind_null(self.0.as_ptr(), index as c_int) } + } + + // result values from the query + // https://www.sqlite.org/c3ref/column_blob.html + + #[inline] + pub(crate) fn column_type(&self, index: usize) -> c_int { + unsafe { sqlite3_column_type(self.0.as_ptr(), index as c_int) } + } + + #[inline] + pub(crate) fn column_int(&self, index: usize) -> i32 { + unsafe { sqlite3_column_int(self.0.as_ptr(), index as c_int) as i32 } + } + + #[inline] + pub(crate) fn column_int64(&self, index: usize) -> i64 { + unsafe { sqlite3_column_int64(self.0.as_ptr(), index as c_int) as i64 } + } + + #[inline] + pub(crate) fn column_double(&self, index: usize) -> f64 { + unsafe { sqlite3_column_double(self.0.as_ptr(), index as c_int) } + } + + #[inline] + pub(crate) fn column_value(&self, index: usize) -> SqliteValue { + unsafe { SqliteValue::new(sqlite3_column_value(self.0.as_ptr(), index as c_int)) } + } + + pub(crate) fn column_blob(&self, index: usize) -> &[u8] { + let index = index as c_int; + let len = unsafe { sqlite3_column_bytes(self.0.as_ptr(), index) } as usize; + + if len == 0 { + // empty blobs are NULL so just return an empty slice + return &[]; + } + + let ptr = unsafe { sqlite3_column_blob(self.0.as_ptr(), index) } as *const u8; + debug_assert!(!ptr.is_null()); + + unsafe { from_raw_parts(ptr, len) } + } + + pub(crate) fn column_text(&self, index: usize) -> Result<&str, BoxDynError> { + Ok(from_utf8(self.column_blob(index))?) + } +} diff --git a/sqlx-core/src/sqlite/statement/mod.rs b/sqlx-core/src/sqlite/statement/mod.rs new file mode 100644 index 00000000..cee36b2e --- /dev/null +++ b/sqlx-core/src/sqlite/statement/mod.rs @@ -0,0 +1,187 @@ +use std::i32; +use std::os::raw::c_char; +use std::ptr::{null, null_mut, NonNull}; +use std::sync::{atomic::AtomicPtr, Weak}; + +use bytes::{Buf, Bytes}; +use libsqlite3_sys::{ + sqlite3, sqlite3_clear_bindings, sqlite3_finalize, sqlite3_prepare_v3, sqlite3_reset, + sqlite3_stmt, SQLITE_OK, SQLITE_PREPARE_NO_VTAB, SQLITE_PREPARE_PERSISTENT, +}; +use smallvec::SmallVec; + +use crate::error::Error; +use crate::sqlite::connection::ConnectionHandle; +use crate::sqlite::{SqliteError, SqliteRow, SqliteValue}; + +mod handle; +mod worker; + +pub(crate) use handle::StatementHandle; +pub(crate) use worker::StatementWorker; + +// NOTE: Keep query in statement and slowly chop it up + +#[derive(Debug)] +pub(crate) struct SqliteStatement { + persistent: bool, + index: usize, + + // tail of the most recently prepared SQL statement within this container + tail: Bytes, + + // underlying sqlite handles for each inner statement + // a SQL query string in SQLite is broken up into N statements + // we use a [`SmallVec`] to optimize for the most likely case of a single statement + pub(crate) handles: SmallVec<[StatementHandle; 1]>, + + // weak reference to the previous row from this connection + // we use the notice of a successful upgrade of this reference as an indicator that the + // row is still around, in which we then inflate the row such that we can let SQLite + // clobber the memory allocation for the row + pub(crate) last_row_values: SmallVec<[Option>>; 1]>, +} + +fn prepare( + conn: *mut sqlite3, + query: &mut Bytes, + persistent: bool, +) -> Result, Error> { + let mut flags = SQLITE_PREPARE_NO_VTAB; + + if persistent { + // SQLITE_PREPARE_PERSISTENT + // The SQLITE_PREPARE_PERSISTENT flag is a hint to the query + // planner that the prepared statement will be retained for a long time + // and probably reused many times. + flags |= SQLITE_PREPARE_PERSISTENT; + } + + while !query.is_empty() { + let mut statement_handle: *mut sqlite3_stmt = null_mut(); + let mut tail: *const c_char = null(); + + let query_ptr = query.as_ptr() as *const c_char; + let query_len = query.len() as i32; + + // + let status = unsafe { + sqlite3_prepare_v3( + conn, + query_ptr, + query_len, + flags as u32, + &mut statement_handle, + &mut tail, + ) + }; + + if status != SQLITE_OK { + return Err(SqliteError::new(conn).into()); + } + + // tail should point to the first byte past the end of the first SQL + // statement in zSql. these routines only compile the first statement, + // so tail is left pointing to what remains un-compiled. + + let n = (tail as i32) - (query_ptr as i32); + query.advance(n as usize); + + if let Some(handle) = NonNull::new(statement_handle) { + return Ok(Some(StatementHandle(handle))); + } + } + + Ok(None) +} + +impl SqliteStatement { + pub(crate) fn prepare( + conn: &mut ConnectionHandle, + mut query: &str, + persistent: bool, + ) -> Result { + query = query.trim(); + + if query.len() > i32::MAX as usize { + return Err(err_protocol!( + "query string must be smaller than {} bytes", + i32::MAX + )); + } + + let mut handles: SmallVec<[StatementHandle; 1]> = SmallVec::with_capacity(1); + let mut query = Bytes::from(String::from(query)); + + if let Some(handle) = prepare(conn.as_ptr(), &mut query, persistent)? { + handles.push(handle); + } + + Ok(Self { + persistent, + tail: query, + handles, + index: 0, + last_row_values: SmallVec::from([None; 1]), + }) + } + + // unsafe: caller must ensure that there is at least one handle + unsafe fn connection(&self) -> *mut sqlite3 { + self.handles[0].db_handle() + } + + pub(crate) fn execute( + &mut self, + ) -> Result>>)>, Error> { + while self.handles.len() == self.index { + if self.tail.is_empty() { + return Ok(None); + } + + if let Some(handle) = + unsafe { prepare(self.connection(), &mut self.tail, self.persistent)? } + { + self.handles.push(handle); + self.last_row_values.push(None); + } + } + + let index = self.index; + self.index += 1; + + Ok(Some(( + &self.handles[index], + &mut self.last_row_values[index], + ))) + } + + pub(crate) fn reset(&mut self) { + self.index = 0; + + for (i, handle) in self.handles.iter().enumerate() { + SqliteRow::inflate_if_needed(&handle, self.last_row_values[i].take()); + + unsafe { + // Reset A Prepared Statement Object + // https://www.sqlite.org/c3ref/reset.html + // https://www.sqlite.org/c3ref/clear_bindings.html + sqlite3_reset(handle.0.as_ptr()); + sqlite3_clear_bindings(handle.0.as_ptr()); + } + } + } +} + +impl Drop for SqliteStatement { + fn drop(&mut self) { + for (i, handle) in self.handles.drain(..).enumerate() { + SqliteRow::inflate_if_needed(&handle, self.last_row_values[i].take()); + + unsafe { + // https://sqlite.org/c3ref/finalize.html + let _ = sqlite3_finalize(handle.0.as_ptr()); + } + } + } +} diff --git a/sqlx-core/src/sqlite/statement/worker.rs b/sqlx-core/src/sqlite/statement/worker.rs new file mode 100644 index 00000000..f1eac278 --- /dev/null +++ b/sqlx-core/src/sqlite/statement/worker.rs @@ -0,0 +1,180 @@ +use std::ptr::null_mut; +use std::sync::atomic::{spin_loop_hint, AtomicI32, AtomicPtr, Ordering}; +use std::sync::Arc; +use std::thread::{self, park, spawn, JoinHandle}; + +use either::Either; +use libsqlite3_sys::{sqlite3_step, sqlite3_stmt, SQLITE_DONE, SQLITE_ROW}; +use sqlx_rt::yield_now; + +use crate::error::Error; +use crate::sqlite::statement::StatementHandle; + +// For async-std and actix, the worker maintains a dedicated thread for each SQLite connection +// All invocations of [sqlite3_step] are run on this thread + +// For tokio, the worker is a thin wrapper around an invocation to [block_in_place] + +const STATE_CLOSE: i32 = -1; +const STATE_READY: i32 = 0; +const STATE_INITIAL: i32 = 1; + +#[cfg(not(feature = "runtime-tokio"))] +pub(crate) struct StatementWorker { + statement: Arc>, + status: Arc, + handle: Option>, +} + +#[cfg(feature = "runtime-tokio")] +pub(crate) struct StatementWorker; + +#[cfg(not(feature = "runtime-tokio"))] +impl StatementWorker { + pub(crate) fn new() -> Self { + let statement = Arc::new(AtomicPtr::new(null_mut::())); + let status = Arc::new(AtomicI32::new(STATE_INITIAL)); + + let handle = spawn({ + let statement = Arc::clone(&statement); + let status = Arc::clone(&status); + + move || { + // wait for the first command + park(); + + 'run: while status.load(Ordering::Acquire) >= 0 { + 'statement: loop { + match status.load(Ordering::Acquire) { + STATE_CLOSE => { + // worker has been dropped; get out + break 'run; + } + + STATE_READY => { + let statement = statement.load(Ordering::Acquire); + if statement.is_null() { + // we do not have the statement handle yet + thread::yield_now(); + continue; + } + + let v = unsafe { sqlite3_step(statement) }; + + status.store(v, Ordering::Release); + + if v == SQLITE_DONE { + // when a statement is _done_, we park the thread until + // we need it again + park(); + break 'statement; + } + } + + _ => { + // waits for the receiving end to be ready to receive the rows + // this should take less than 1 microsecond under most conditions + spin_loop_hint(); + } + } + } + } + } + }); + + Self { + handle: Some(handle), + statement, + status, + } + } + + pub(crate) fn wake(&self) { + if let Some(handle) = &self.handle { + handle.thread().unpark(); + } + } + + pub(crate) fn execute(&self, statement: &StatementHandle) { + // readies the worker to execute the statement + // for async-std, this unparks our dedicated thread + + self.statement + .store(statement.0.as_ptr(), Ordering::Release); + } + + pub(crate) async fn step(&self, statement: &StatementHandle) -> Result, Error> { + // storing <0> as a terminal in status releases the worker + // to proceed to the next [sqlite3_step] invocation + self.status.store(STATE_READY, Ordering::Release); + + // we then use a spin loop to wait for this to finish + // 99% of the time this should be < 1 μs + let status = loop { + let status = self + .status + .compare_and_swap(STATE_READY, STATE_READY, Ordering::AcqRel); + + if status != STATE_READY { + break status; + } + + yield_now().await; + }; + + match status { + // a row was found + SQLITE_ROW => Ok(Either::Right(())), + + // reached the end of the query results, + // emit the # of changes + SQLITE_DONE => Ok(Either::Left(statement.changes())), + + _ => Err(statement.last_error().into()), + } + } + + pub(crate) fn close(&mut self) { + self.status.store(STATE_CLOSE, Ordering::Release); + + if let Some(handle) = self.handle.take() { + handle.thread().unpark(); + handle.join().unwrap(); + } + } +} + +#[cfg(feature = "runtime-tokio")] +impl StatementWorker { + pub(crate) fn new() -> Self { + StatementWorker + } + + pub(crate) fn execute(&self, _statement: &StatementHandle) {} + + pub(crate) fn wake(&self) {} + + pub(crate) async fn step(&self, statement: &StatementHandle) -> Result, Error> { + let statement = *statement; + let status = sqlx_rt::blocking!({ unsafe { sqlite3_step(statement.0.as_ptr()) } }); + + match status { + // a row was found + SQLITE_ROW => Ok(Either::Right(())), + + // reached the end of the query results, + // emit the # of changes + SQLITE_DONE => Ok(Either::Left(statement.changes())), + + _ => Err(statement.last_error().into()), + } + } + + pub(crate) fn close(&mut self) {} +} + +impl Drop for StatementWorker { + fn drop(&mut self) { + self.close(); + } +} diff --git a/sqlx-core/src/sqlite/type_info.rs b/sqlx-core/src/sqlite/type_info.rs index 43088ab7..26d8ab9c 100644 --- a/sqlx-core/src/sqlite/type_info.rs +++ b/sqlx-core/src/sqlite/type_info.rs @@ -1,69 +1,127 @@ -use std::fmt::{self, Display}; +use std::fmt::{self, Display, Formatter}; +use std::os::raw::c_int; +use std::str::FromStr; -use crate::types::TypeInfo; +use libsqlite3_sys::{SQLITE_BLOB, SQLITE_FLOAT, SQLITE_INTEGER, SQLITE_NULL, SQLITE_TEXT}; -// https://www.sqlite.org/c3ref/c_blob.html -#[derive(Debug, PartialEq, Clone, Copy)] +use crate::error::BoxDynError; +use crate::type_info::TypeInfo; + +#[derive(Debug, Clone, Eq, PartialEq)] #[cfg_attr(feature = "offline", derive(serde::Serialize, serde::Deserialize))] -pub(crate) enum SqliteType { - Integer = 1, - Float = 2, - Text = 3, - Blob = 4, - - // Non-standard extensions - Boolean, -} - -// https://www.sqlite.org/datatype3.html#type_affinity -#[derive(Debug, PartialEq, Clone, Copy)] -#[cfg_attr(feature = "offline", derive(serde::Serialize, serde::Deserialize))] -pub(crate) enum SqliteTypeAffinity { +pub(crate) enum DataType { + Int, + Float, Text, - Numeric, - Integer, - Real, Blob, + + // TODO: Support NUMERIC + #[allow(dead_code)] + Numeric, + + // non-standard extensions + Bool, + Int64, } -#[derive(Debug, Clone)] +/// Type information for a SQLite type. +#[derive(Debug, Clone, Eq, PartialEq)] #[cfg_attr(feature = "offline", derive(serde::Serialize, serde::Deserialize))] -pub struct SqliteTypeInfo { - pub(crate) r#type: SqliteType, - pub(crate) affinity: Option, -} +pub struct SqliteTypeInfo(pub(crate) DataType); -impl SqliteTypeInfo { - pub(crate) fn new(r#type: SqliteType, affinity: SqliteTypeAffinity) -> Self { - Self { - r#type, - affinity: Some(affinity), - } +impl Display for SqliteTypeInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.0.fmt(f) } } -impl Display for SqliteTypeInfo { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(match self.r#type { - SqliteType::Text => "TEXT", - SqliteType::Boolean => "BOOLEAN", - SqliteType::Integer => "INTEGER", - SqliteType::Float => "DOUBLE", - SqliteType::Blob => "BLOB", +impl TypeInfo for SqliteTypeInfo {} + +impl Display for DataType { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str(match self { + DataType::Text => "TEXT", + DataType::Float => "FLOAT", + DataType::Blob => "BLOB", + DataType::Int => "INTEGER", + DataType::Numeric => "NUMERIC", + + // non-standard extensions + DataType::Bool => "BOOLEAN", + DataType::Int64 => "BIGINT", }) } } -impl PartialEq for SqliteTypeInfo { - fn eq(&self, other: &SqliteTypeInfo) -> bool { - self.r#type == other.r#type || self.affinity == other.affinity +impl DataType { + pub(crate) fn from_code(code: c_int) -> Option { + match code { + SQLITE_INTEGER => Some(DataType::Int), + SQLITE_FLOAT => Some(DataType::Float), + SQLITE_BLOB => Some(DataType::Blob), + SQLITE_NULL => None, + SQLITE_TEXT => Some(DataType::Text), + + _ => None, + } } } -impl TypeInfo for SqliteTypeInfo { - #[inline] - fn compatible(&self, _other: &Self) -> bool { - // All types are compatible with all other types in SQLite - true +// note: this implementation is particularly important as this is how the macros determine +// what Rust type maps to what *declared* SQL type +// +impl FromStr for DataType { + type Err = BoxDynError; + + fn from_str(s: &str) -> Result { + let s = s.to_ascii_lowercase(); + Ok(match &*s { + "int8" => DataType::Int64, + "boolean" | "bool" => DataType::Bool, + + _ if s.contains("int") && s.contains("big") && s.find("int") > s.find("big") => { + DataType::Int64 + } + + _ if s.contains("int") => DataType::Int, + + _ if s.contains("char") || s.contains("clob") || s.contains("text") => DataType::Text, + + _ if s.contains("blob") => DataType::Blob, + + _ if s.contains("real") || s.contains("floa") || s.contains("doub") => DataType::Float, + + _ => { + return Err(format!("unknown type: `{}`", s).into()); + } + }) } } + +#[test] +fn test_data_type_from_str() -> Result<(), BoxDynError> { + assert_eq!(DataType::Int, "INT".parse()?); + assert_eq!(DataType::Int, "INTEGER".parse()?); + assert_eq!(DataType::Int, "INTBIG".parse()?); + assert_eq!(DataType::Int, "MEDIUMINT".parse()?); + + assert_eq!(DataType::Int64, "BIGINT".parse()?); + assert_eq!(DataType::Int64, "UNSIGNED BIG INT".parse()?); + assert_eq!(DataType::Int64, "INT8".parse()?); + + assert_eq!(DataType::Text, "CHARACTER(20)".parse()?); + assert_eq!(DataType::Text, "NCHAR(55)".parse()?); + assert_eq!(DataType::Text, "TEXT".parse()?); + assert_eq!(DataType::Text, "CLOB".parse()?); + + assert_eq!(DataType::Blob, "BLOB".parse()?); + + assert_eq!(DataType::Float, "REAL".parse()?); + assert_eq!(DataType::Float, "FLOAT".parse()?); + assert_eq!(DataType::Float, "DOUBLE PRECISION".parse()?); + + assert_eq!(DataType::Bool, "BOOLEAN".parse()?); + assert_eq!(DataType::Bool, "BOOL".parse()?); + + Ok(()) +} diff --git a/sqlx-core/src/sqlite/types/bool.rs b/sqlx-core/src/sqlite/types/bool.rs index a03170c0..9a64162c 100644 --- a/sqlx-core/src/sqlite/types/bool.rs +++ b/sqlx-core/src/sqlite/types/bool.rs @@ -1,23 +1,30 @@ use crate::decode::Decode; -use crate::encode::Encode; -use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity}; -use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue}; +use crate::encode::{Encode, IsNull}; +use crate::error::BoxDynError; +use crate::sqlite::type_info::DataType; +use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef}; use crate::types::Type; impl Type for bool { fn type_info() -> SqliteTypeInfo { - SqliteTypeInfo::new(SqliteType::Boolean, SqliteTypeAffinity::Numeric) + SqliteTypeInfo(DataType::Bool) } } -impl Encode for bool { - fn encode(&self, values: &mut Vec) { - values.push(SqliteArgumentValue::Int((*self).into())); +impl<'q> Encode<'q, Sqlite> for bool { + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Int((*self).into())); + + IsNull::No } } -impl<'a> Decode<'a, Sqlite> for bool { - fn decode(value: SqliteValue<'a>) -> crate::Result { +impl<'r> Decode<'r, Sqlite> for bool { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true + } + + fn decode(value: SqliteValueRef<'r>) -> Result { Ok(value.int() != 0) } } diff --git a/sqlx-core/src/sqlite/types/bytes.rs b/sqlx-core/src/sqlite/types/bytes.rs index 04a432b3..a79fbba1 100644 --- a/sqlx-core/src/sqlite/types/bytes.rs +++ b/sqlx-core/src/sqlite/types/bytes.rs @@ -1,42 +1,62 @@ +use std::borrow::Cow; + use crate::decode::Decode; -use crate::encode::Encode; -use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity}; -use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue}; +use crate::encode::{Encode, IsNull}; +use crate::error::BoxDynError; +use crate::sqlite::type_info::DataType; +use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef}; use crate::types::Type; impl Type for [u8] { fn type_info() -> SqliteTypeInfo { - SqliteTypeInfo::new(SqliteType::Blob, SqliteTypeAffinity::Blob) + SqliteTypeInfo(DataType::Blob) + } +} + +impl<'q> Encode<'q, Sqlite> for &'q [u8] { + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Blob(Cow::Borrowed(self))); + + IsNull::No + } +} + +impl<'r> Decode<'r, Sqlite> for &'r [u8] { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true + } + + fn decode(value: SqliteValueRef<'r>) -> Result { + Ok(value.blob()) } } impl Type for Vec { fn type_info() -> SqliteTypeInfo { - <[u8] as Type>::type_info() + <&[u8] as Type>::type_info() } } -impl Encode for [u8] { - fn encode(&self, values: &mut Vec) { - // TODO: look into a way to remove this allocation - values.push(SqliteArgumentValue::Blob(self.to_owned())); +impl<'q> Encode<'q, Sqlite> for Vec { + fn encode(self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Blob(Cow::Owned(self))); + + IsNull::No + } + + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Blob(Cow::Owned(self.clone()))); + + IsNull::No } } -impl Encode for Vec { - fn encode(&self, values: &mut Vec) { - <[u8] as Encode>::encode(self, values) +impl<'r> Decode<'r, Sqlite> for Vec { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true } -} -impl<'de> Decode<'de, Sqlite> for &'de [u8] { - fn decode(value: SqliteValue<'de>) -> crate::Result<&'de [u8]> { - Ok(value.blob()) - } -} - -impl<'de> Decode<'de, Sqlite> for Vec { - fn decode(value: SqliteValue<'de>) -> crate::Result> { - <&[u8] as Decode>::decode(value).map(ToOwned::to_owned) + fn decode(value: SqliteValueRef<'r>) -> Result { + Ok(value.blob().to_owned()) } } diff --git a/sqlx-core/src/sqlite/types/float.rs b/sqlx-core/src/sqlite/types/float.rs index 7b65cc31..c25bf0ce 100644 --- a/sqlx-core/src/sqlite/types/float.rs +++ b/sqlx-core/src/sqlite/types/float.rs @@ -1,41 +1,54 @@ use crate::decode::Decode; -use crate::encode::Encode; -use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity}; -use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue}; +use crate::encode::{Encode, IsNull}; +use crate::error::BoxDynError; +use crate::sqlite::type_info::DataType; +use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef}; use crate::types::Type; impl Type for f32 { fn type_info() -> SqliteTypeInfo { - SqliteTypeInfo::new(SqliteType::Float, SqliteTypeAffinity::Real) + SqliteTypeInfo(DataType::Float) } } -impl Encode for f32 { - fn encode(&self, values: &mut Vec) { - values.push(SqliteArgumentValue::Double((*self).into())); +impl<'q> Encode<'q, Sqlite> for f32 { + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Double((*self).into())); + + IsNull::No } } -impl<'a> Decode<'a, Sqlite> for f32 { - fn decode(value: SqliteValue<'a>) -> crate::Result { +impl<'r> Decode<'r, Sqlite> for f32 { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true + } + + fn decode(value: SqliteValueRef<'r>) -> Result { Ok(value.double() as f32) } } impl Type for f64 { fn type_info() -> SqliteTypeInfo { - SqliteTypeInfo::new(SqliteType::Float, SqliteTypeAffinity::Real) + SqliteTypeInfo(DataType::Float) } } -impl Encode for f64 { - fn encode(&self, values: &mut Vec) { - values.push(SqliteArgumentValue::Double((*self).into())); +impl<'q> Encode<'q, Sqlite> for f64 { + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Double(*self)); + + IsNull::No } } -impl<'a> Decode<'a, Sqlite> for f64 { - fn decode(value: SqliteValue<'a>) -> crate::Result { +impl<'r> Decode<'r, Sqlite> for f64 { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true + } + + fn decode(value: SqliteValueRef<'r>) -> Result { Ok(value.double()) } } diff --git a/sqlx-core/src/sqlite/types/int.rs b/sqlx-core/src/sqlite/types/int.rs index 6ff8c395..c4307637 100644 --- a/sqlx-core/src/sqlite/types/int.rs +++ b/sqlx-core/src/sqlite/types/int.rs @@ -1,41 +1,54 @@ use crate::decode::Decode; -use crate::encode::Encode; -use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity}; -use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue}; +use crate::encode::{Encode, IsNull}; +use crate::error::BoxDynError; +use crate::sqlite::type_info::DataType; +use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef}; use crate::types::Type; impl Type for i32 { fn type_info() -> SqliteTypeInfo { - SqliteTypeInfo::new(SqliteType::Integer, SqliteTypeAffinity::Integer) + SqliteTypeInfo(DataType::Int) } } -impl Encode for i32 { - fn encode(&self, values: &mut Vec) { - values.push(SqliteArgumentValue::Int((*self).into())); +impl<'q> Encode<'q, Sqlite> for i32 { + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Int(*self)); + + IsNull::No } } -impl<'a> Decode<'a, Sqlite> for i32 { - fn decode(value: SqliteValue<'a>) -> crate::Result { +impl<'r> Decode<'r, Sqlite> for i32 { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true + } + + fn decode(value: SqliteValueRef<'r>) -> Result { Ok(value.int()) } } impl Type for i64 { fn type_info() -> SqliteTypeInfo { - SqliteTypeInfo::new(SqliteType::Integer, SqliteTypeAffinity::Integer) + SqliteTypeInfo(DataType::Int64) } } -impl Encode for i64 { - fn encode(&self, values: &mut Vec) { - values.push(SqliteArgumentValue::Int64((*self).into())); +impl<'q> Encode<'q, Sqlite> for i64 { + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Int64(*self)); + + IsNull::No } } -impl<'a> Decode<'a, Sqlite> for i64 { - fn decode(value: SqliteValue<'a>) -> crate::Result { +impl<'r> Decode<'r, Sqlite> for i64 { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true + } + + fn decode(value: SqliteValueRef<'r>) -> Result { Ok(value.int64()) } } diff --git a/sqlx-core/src/sqlite/types/mod.rs b/sqlx-core/src/sqlite/types/mod.rs index dbc7f013..a7b98cd9 100644 --- a/sqlx-core/src/sqlite/types/mod.rs +++ b/sqlx-core/src/sqlite/types/mod.rs @@ -7,7 +7,7 @@ //! | `bool` | BOOLEAN | //! | `i16` | INTEGER | //! | `i32` | INTEGER | -//! | `i64` | INTEGER | +//! | `i64` | BIGINT, INT8 | //! | `f32` | REAL | //! | `f64` | REAL | //! | `&str`, `String` | TEXT | @@ -19,25 +19,12 @@ //! a potentially `NULL` value from SQLite. //! -use crate::decode::Decode; -use crate::sqlite::value::SqliteValue; -use crate::sqlite::Sqlite; +// NOTE: all types are compatible with all other types in SQLite +// so we explicitly opt-out of runtime type assertions by returning [true] for +// all implementations of [Decode::accepts] mod bool; mod bytes; mod float; mod int; mod str; - -impl<'de, T> Decode<'de, Sqlite> for Option -where - T: Decode<'de, Sqlite>, -{ - fn decode(value: SqliteValue<'de>) -> crate::Result { - if value.is_null() { - Ok(None) - } else { - >::decode(value).map(Some) - } - } -} diff --git a/sqlx-core/src/sqlite/types/str.rs b/sqlx-core/src/sqlite/types/str.rs index a30bebc8..e0db2327 100644 --- a/sqlx-core/src/sqlite/types/str.rs +++ b/sqlx-core/src/sqlite/types/str.rs @@ -1,45 +1,62 @@ +use std::borrow::Cow; + use crate::decode::Decode; -use crate::encode::Encode; -use crate::error::UnexpectedNullError; -use crate::sqlite::type_info::{SqliteType, SqliteTypeAffinity}; -use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValue}; +use crate::encode::{Encode, IsNull}; +use crate::error::BoxDynError; +use crate::sqlite::type_info::DataType; +use crate::sqlite::{Sqlite, SqliteArgumentValue, SqliteTypeInfo, SqliteValueRef}; use crate::types::Type; impl Type for str { fn type_info() -> SqliteTypeInfo { - SqliteTypeInfo::new(SqliteType::Text, SqliteTypeAffinity::Text) + SqliteTypeInfo(DataType::Text) + } +} + +impl<'q> Encode<'q, Sqlite> for &'q str { + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Text(Cow::Borrowed(*self))); + + IsNull::No + } +} + +impl<'r> Decode<'r, Sqlite> for &'r str { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true + } + + fn decode(value: SqliteValueRef<'r>) -> Result { + value.text() } } impl Type for String { fn type_info() -> SqliteTypeInfo { - >::type_info() + <&str as Type>::type_info() } } -impl Encode for str { - fn encode(&self, values: &mut Vec) { - // TODO: look into a way to remove this allocation - values.push(SqliteArgumentValue::Text(self.to_owned())); +impl<'q> Encode<'q, Sqlite> for String { + fn encode(self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Text(Cow::Owned(self))); + + IsNull::No + } + + fn encode_by_ref(&self, args: &mut Vec>) -> IsNull { + args.push(SqliteArgumentValue::Text(Cow::Owned(self.clone()))); + + IsNull::No } } -impl Encode for String { - fn encode(&self, values: &mut Vec) { - >::encode(self, values) +impl<'r> Decode<'r, Sqlite> for String { + fn accepts(_ty: &SqliteTypeInfo) -> bool { + true } -} -impl<'de> Decode<'de, Sqlite> for &'de str { - fn decode(value: SqliteValue<'de>) -> crate::Result<&'de str> { - value - .text() - .ok_or_else(|| crate::Error::decode(UnexpectedNullError)) - } -} - -impl<'de> Decode<'de, Sqlite> for String { - fn decode(value: SqliteValue<'de>) -> crate::Result { - <&str as Decode>::decode(value).map(ToOwned::to_owned) + fn decode(value: SqliteValueRef<'r>) -> Result { + value.text().map(ToOwned::to_owned) } } diff --git a/sqlx-core/src/sqlite/value.rs b/sqlx-core/src/sqlite/value.rs index 961127f1..d009132c 100644 --- a/sqlx-core/src/sqlite/value.rs +++ b/sqlx-core/src/sqlite/value.rs @@ -1,135 +1,169 @@ -use core::slice; - -use std::ffi::CStr; -use std::str::from_utf8_unchecked; +use std::borrow::Cow; +use std::ptr::NonNull; +use std::slice::from_raw_parts; +use std::str::from_utf8; +use std::sync::Arc; use libsqlite3_sys::{ - sqlite3_column_blob, sqlite3_column_bytes, sqlite3_column_double, sqlite3_column_int, - sqlite3_column_int64, sqlite3_column_text, sqlite3_column_type, SQLITE_BLOB, SQLITE_FLOAT, - SQLITE_INTEGER, SQLITE_NULL, SQLITE_TEXT, + sqlite3_value, sqlite3_value_blob, sqlite3_value_bytes, sqlite3_value_double, + sqlite3_value_dup, sqlite3_value_int, sqlite3_value_int64, sqlite3_value_type, SQLITE_NULL, }; -use crate::sqlite::statement::Statement; -use crate::sqlite::type_info::SqliteType; +use crate::error::BoxDynError; +use crate::sqlite::statement::StatementHandle; +use crate::sqlite::type_info::DataType; use crate::sqlite::{Sqlite, SqliteTypeInfo}; -use crate::value::RawValue; +use crate::value::{Value, ValueRef}; -pub struct SqliteValue<'c> { - pub(super) index: i32, - pub(super) statement: &'c Statement, +enum SqliteValueData<'r> { + Statement { + statement: &'r StatementHandle, + index: usize, + }, + + Value(&'r SqliteValue), } -// https://www.sqlite.org/c3ref/column_blob.html -// https://www.sqlite.org/capi3ref.html#sqlite3_column_blob +pub struct SqliteValueRef<'r>(SqliteValueData<'r>); -// These routines return information about a single column of the current result row of a query. - -impl<'c> SqliteValue<'c> { - /// Returns true if the value should be intrepreted as NULL. - pub(super) fn is_null(&self) -> bool { - self.r#type().is_none() +impl<'r> SqliteValueRef<'r> { + pub(crate) fn value(value: &'r SqliteValue) -> Self { + Self(SqliteValueData::Value(value)) } - fn r#type(&self) -> Option { - let type_code = unsafe { - if let Some(handle) = self.statement.handle() { - sqlite3_column_type(handle, self.index) - } else { - // unreachable: null statements do not have any values to type - return None; - } - }; - - // SQLITE_INTEGER, SQLITE_FLOAT, SQLITE_TEXT, SQLITE_BLOB, or SQLITE_NULL - match type_code { - SQLITE_INTEGER => Some(SqliteType::Integer), - SQLITE_FLOAT => Some(SqliteType::Float), - SQLITE_TEXT => Some(SqliteType::Text), - SQLITE_BLOB => Some(SqliteType::Blob), - SQLITE_NULL => None, - - _ => unreachable!("received unexpected column type: {}", type_code), - } + pub(crate) fn statement(statement: &'r StatementHandle, index: usize) -> Self { + Self(SqliteValueData::Statement { statement, index }) } - /// Returns the 32-bit INTEGER result. pub(super) fn int(&self) -> i32 { - unsafe { - self.statement - .handle() - .map_or(0, |handle| sqlite3_column_int(handle, self.index)) + match self.0 { + SqliteValueData::Statement { statement, index } => statement.column_int(index), + SqliteValueData::Value(v) => v.int(), } } - /// Returns the 64-bit INTEGER result. pub(super) fn int64(&self) -> i64 { - unsafe { - self.statement - .handle() - .map_or(0, |handle| sqlite3_column_int64(handle, self.index)) + match self.0 { + SqliteValueData::Statement { statement, index } => statement.column_int64(index), + SqliteValueData::Value(v) => v.int64(), } } - /// Returns the 64-bit, REAL result. pub(super) fn double(&self) -> f64 { - unsafe { - self.statement - .handle() - .map_or(0.0, |handle| sqlite3_column_double(handle, self.index)) + match self.0 { + SqliteValueData::Statement { statement, index } => statement.column_double(index), + SqliteValueData::Value(v) => v.double(), } } - /// Returns the UTF-8 TEXT result. - pub(super) fn text(&self) -> Option<&'c str> { - unsafe { - self.statement.handle().and_then(|handle| { - let ptr = sqlite3_column_text(handle, self.index); - - if ptr.is_null() { - None - } else { - Some(from_utf8_unchecked(CStr::from_ptr(ptr as _).to_bytes())) - } - }) + pub(super) fn blob(&self) -> &'r [u8] { + match self.0 { + SqliteValueData::Statement { statement, index } => statement.column_blob(index), + SqliteValueData::Value(v) => v.blob(), } } - fn bytes(&self) -> usize { - // Returns the size of the result in bytes. - unsafe { - self.statement - .handle() - .map_or(0, |handle| sqlite3_column_bytes(handle, self.index)) as usize + pub(super) fn text(&self) -> Result<&'r str, BoxDynError> { + match self.0 { + SqliteValueData::Statement { statement, index } => statement.column_text(index), + SqliteValueData::Value(v) => v.text(), + } + } +} + +impl<'r> ValueRef<'r> for SqliteValueRef<'r> { + type Database = Sqlite; + + fn to_owned(&self) -> SqliteValue { + match self.0 { + SqliteValueData::Statement { statement, index } => statement.column_value(index), + SqliteValueData::Value(v) => v.clone(), } } - /// Returns the BLOB result. - pub(super) fn blob(&self) -> &'c [u8] { - let ptr = unsafe { - if let Some(handle) = self.statement.handle() { - sqlite3_column_blob(handle, self.index) - } else { - // Null statements do not exist - return &[]; + fn type_info(&self) -> Option> { + match self.0 { + SqliteValueData::Statement { statement, index } => { + DataType::from_code(statement.column_type(index)) + .map(SqliteTypeInfo) + .map(Cow::Owned) } - }; - if ptr.is_null() { - // Empty BLOBs are received as null pointers + SqliteValueData::Value(v) => v.type_info(), + } + } + + fn is_null(&self) -> bool { + match self.0 { + SqliteValueData::Statement { statement, index } => { + statement.column_type(index) == SQLITE_NULL + } + + SqliteValueData::Value(v) => v.is_null(), + } + } +} + +#[derive(Clone)] +pub struct SqliteValue(pub(crate) Arc>); + +// SAFE: only protected value objects are stored in SqliteValue +unsafe impl Send for SqliteValue {} +unsafe impl Sync for SqliteValue {} + +impl SqliteValue { + pub(crate) unsafe fn new(value: *mut sqlite3_value) -> Self { + debug_assert!(!value.is_null()); + Self(Arc::new(NonNull::new_unchecked(sqlite3_value_dup(value)))) + } + + fn r#type(&self) -> Option { + DataType::from_code(unsafe { sqlite3_value_type(self.0.as_ptr()) }) + } + + fn int(&self) -> i32 { + unsafe { sqlite3_value_int(self.0.as_ptr()) } + } + + fn int64(&self) -> i64 { + unsafe { sqlite3_value_int64(self.0.as_ptr()) } + } + + fn double(&self) -> f64 { + unsafe { sqlite3_value_double(self.0.as_ptr()) } + } + + fn blob(&self) -> &[u8] { + let len = unsafe { sqlite3_value_bytes(self.0.as_ptr()) } as usize; + + if len == 0 { + // empty blobs are NULL so just return an empty slice return &[]; } - unsafe { slice::from_raw_parts(ptr as *const u8, self.bytes()) } + let ptr = unsafe { sqlite3_value_blob(self.0.as_ptr()) } as *const u8; + debug_assert!(!ptr.is_null()); + + unsafe { from_raw_parts(ptr, len) } + } + + fn text(&self) -> Result<&str, BoxDynError> { + Ok(from_utf8(self.blob())?) } } -impl<'c> RawValue<'c> for SqliteValue<'c> { +impl Value for SqliteValue { type Database = Sqlite; - fn type_info(&self) -> Option { - Some(SqliteTypeInfo { - r#type: self.r#type()?, - affinity: None, - }) + fn as_ref(&self) -> SqliteValueRef<'_> { + SqliteValueRef::value(self) + } + + fn type_info(&self) -> Option> { + self.r#type().map(SqliteTypeInfo).map(Cow::Owned) + } + + fn is_null(&self) -> bool { + unsafe { sqlite3_value_type(self.0.as_ptr()) == SQLITE_NULL } } } diff --git a/sqlx-core/src/sqlite/worker.rs b/sqlx-core/src/sqlite/worker.rs deleted file mode 100644 index 20a12a48..00000000 --- a/sqlx-core/src/sqlite/worker.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crossbeam_queue::ArrayQueue; -use futures_channel::oneshot; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::thread::{park, spawn, JoinHandle}; - -// After tinkering with this, I believe the safest solution is to spin up a discrete thread per -// SQLite connection and perform all I/O operations for SQLite on _that_ thread. To this effect -// we have a worker struct that is a thin message passing API to run messages on the worker thread. - -#[derive(Clone)] -pub(crate) struct Worker { - running: Arc, - queue: Arc>>, - handle: Arc>, -} - -impl Worker { - pub(crate) fn new() -> Self { - let queue: Arc>> = Arc::new(ArrayQueue::new(1)); - let running = Arc::new(AtomicBool::new(true)); - - Self { - handle: Arc::new(spawn({ - let queue = queue.clone(); - let running = running.clone(); - - move || { - while running.load(Ordering::SeqCst) { - if let Ok(message) = queue.pop() { - (message)(); - } - - park(); - } - } - })), - queue, - running, - } - } - - pub(crate) async fn run(&mut self, f: F) -> R - where - F: Send + 'static, - R: Send + 'static, - F: FnOnce() -> R, - { - let (sender, receiver) = oneshot::channel::(); - - let _ = self.queue.push(Box::new(move || { - let _ = sender.send(f()); - })); - - self.handle.thread().unpark(); - - receiver.await.unwrap() - } -} - -impl Drop for Worker { - fn drop(&mut self) { - if Arc::strong_count(&self.handle) == 1 { - self.running.store(false, Ordering::SeqCst); - } - } -} diff --git a/sqlx-core/src/types/json.rs b/sqlx-core/src/types/json.rs index 567bd802..01d4634f 100644 --- a/sqlx-core/src/types/json.rs +++ b/sqlx-core/src/types/json.rs @@ -42,7 +42,7 @@ where for<'a> Json<&'a Self>: Encode<'q, DB>, DB: Database, { - fn encode_by_ref(&self, buf: &mut >::Arguments) -> IsNull { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { as Encode<'q, DB>>::encode(Json(self), buf) } }