sqlite: Fix #616

This commit is contained in:
Patryk Wychowaniec
2020-08-09 11:14:11 +02:00
committed by Ryan Leckey
parent e3a71f095c
commit ca07158949
5 changed files with 55 additions and 147 deletions

View File

@@ -25,12 +25,12 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
let mut statement = statement?;
// we start by finding the first statement that *can* return results
while let Some((statement, ..)) = statement.prepare(&mut conn.handle)? {
num_params += statement.bind_parameter_count();
while let Some((stmt, ..)) = statement.prepare(&mut conn.handle)? {
num_params += stmt.bind_parameter_count();
let mut stepped = false;
let num = statement.column_count();
let num = stmt.column_count();
if num == 0 {
// no columns in this statement; skip
continue;
@@ -44,7 +44,7 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
// to [column_decltype]
// if explain.. fails, ignore the failure and we'll have no fallback
let (fallback, fallback_nullable) = match explain(conn, statement.sql()).await {
let (fallback, fallback_nullable) = match explain(conn, stmt.sql()).await {
Ok(v) => v,
Err(err) => {
log::debug!("describe: explain introspection failed: {}", err);
@@ -54,24 +54,20 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
};
for col in 0..num {
let name = statement.column_name(col).to_owned();
let name = stmt.column_name(col).to_owned();
let type_info = if let Some(ty) = statement.column_decltype(col) {
let type_info = if let Some(ty) = stmt.column_decltype(col) {
ty
} else {
// if that fails, we back up and attempt to step the statement
// once *if* its read-only and then use [column_type] as a
// fallback to [column_decltype]
if !stepped && statement.read_only() {
if !stepped && stmt.read_only() {
stepped = true;
conn.worker.execute(statement);
conn.worker.wake();
let _ = conn.worker.step(statement).await;
let _ = conn.worker.step(*stmt).await;
}
let mut ty = statement.column_type_info(col);
let mut ty = stmt.column_type_info(col);
if ty.0 == DataType::Null {
if let Some(fallback) = fallback.get(col).cloned() {
@@ -82,7 +78,7 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>(
ty
};
nullable.push(statement.column_nullable(col)?.or_else(|| {
nullable.push(stmt.column_nullable(col)?.or_else(|| {
// if we do not *know* if this is nullable, check the EXPLAIN fallback
fallback_nullable.get(col).copied().and_then(identity)
}));

View File

@@ -81,7 +81,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
handle: ref mut conn,
ref mut statements,
ref mut statement,
ref worker,
ref mut worker,
..
} = self;
@@ -91,25 +91,18 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
// keep track of how many arguments we have bound
let mut num_arguments = 0;
while let Some((handle, columns, column_names, last_row_values)) = stmt.prepare(conn)? {
while let Some((stmt, columns, column_names, last_row_values)) = stmt.prepare(conn)? {
// bind values to the statement
num_arguments += bind(handle, &arguments, num_arguments)?;
// tell the worker about the new statement
worker.execute(handle);
// wake up the worker if needed
// the worker parks its thread on async-std when not in use
worker.wake();
num_arguments += bind(stmt, &arguments, num_arguments)?;
loop {
// save the rows from the _current_ position on the statement
// and send them to the still-live row object
SqliteRow::inflate_if_needed(handle, &*columns, last_row_values.take());
SqliteRow::inflate_if_needed(stmt, &*columns, last_row_values.take());
// invoke [sqlite3_step] on the dedicated worker thread
// this will move us forward one row or finish the statement
let s = worker.step(handle).await?;
let s = worker.step(*stmt).await?;
match s {
Either::Left(changes) => {
@@ -129,7 +122,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
Either::Right(()) => {
let (row, weak_values_ref) = SqliteRow::current(
*handle,
*stmt,
columns,
column_names
);

View File

@@ -106,8 +106,5 @@ impl Drop for SqliteConnection {
// we must explicitly drop the statements as the drop-order in a struct is undefined
self.statements.clear();
self.statement.take();
// we then explicitly close the worker
self.worker.close();
}
}

View File

@@ -1,19 +1,10 @@
use crate::error::Error;
use crate::sqlite::statement::StatementHandle;
use crossbeam_channel::{bounded, unbounded, Sender};
use either::Either;
use libsqlite3_sys::sqlite3_stmt;
use libsqlite3_sys::{sqlite3_step, SQLITE_DONE, SQLITE_ROW};
use sqlx_rt::yield_now;
use std::ptr::null_mut;
use std::sync::atomic::{spin_loop_hint, AtomicI32, AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread::{self, park, spawn, JoinHandle};
const STATE_CLOSE: i32 = -1;
const STATE_READY: i32 = 0;
const STATE_INITIAL: i32 = 1;
use std::thread;
// Each SQLite connection has a dedicated thread.
@@ -21,131 +12,56 @@ const STATE_INITIAL: i32 = 1;
// OS resource usage. Low priority because a high concurrent load for SQLite3 is very
// unlikely.
// TODO: Reduce atomic complexity. There must be a simpler way to do this that doesn't
// compromise performance.
pub(crate) struct StatementWorker {
statement: Arc<AtomicPtr<sqlite3_stmt>>,
status: Arc<AtomicI32>,
handle: Option<JoinHandle<()>>,
tx: Sender<StatementWorkerCommand>,
}
enum StatementWorkerCommand {
Step {
statement: StatementHandle,
tx: Sender<Result<Either<u64, ()>, Error>>,
},
}
impl StatementWorker {
pub(crate) fn new() -> Self {
let statement = Arc::new(AtomicPtr::new(null_mut::<sqlite3_stmt>()));
let status = Arc::new(AtomicI32::new(STATE_INITIAL));
let (tx, rx) = unbounded();
let handle = spawn({
let statement = Arc::clone(&statement);
let status = Arc::clone(&status);
thread::spawn(move || {
for cmd in rx {
match cmd {
StatementWorkerCommand::Step { statement, tx } => {
let status = unsafe { sqlite3_step(statement.0.as_ptr()) };
move || {
// wait for the first command
park();
let resp = match status {
SQLITE_ROW => Ok(Either::Right(())),
SQLITE_DONE => Ok(Either::Left(statement.changes())),
_ => Err(statement.last_error().into()),
};
'run: while status.load(Ordering::Acquire) >= 0 {
'statement: loop {
match status.load(Ordering::Acquire) {
STATE_CLOSE => {
// worker has been dropped; get out
break 'run;
}
STATE_READY => {
let statement = statement.load(Ordering::Acquire);
if statement.is_null() {
// we do not have the statement handle yet
thread::yield_now();
continue;
}
let v = unsafe { sqlite3_step(statement) };
status.store(v, Ordering::Release);
if v == SQLITE_DONE {
// when a statement is _done_, we park the thread until
// we need it again
park();
break 'statement;
}
}
_ => {
// waits for the receiving end to be ready to receive the rows
// this should take less than 1 microsecond under most conditions
spin_loop_hint();
}
}
let _ = tx.send(resp);
}
}
}
});
Self {
handle: Some(handle),
statement,
status,
}
Self { tx }
}
pub(crate) fn wake(&self) {
if let Some(handle) = &self.handle {
handle.thread().unpark();
}
}
pub(crate) async fn step(
&mut self,
statement: StatementHandle,
) -> Result<Either<u64, ()>, Error> {
let (tx, rx) = bounded(1);
pub(crate) fn execute(&self, statement: &StatementHandle) {
// readies the worker to execute the statement
// for async-std, this unparks our dedicated thread
self.statement
.store(statement.0.as_ptr(), Ordering::Release);
}
pub(crate) async fn step(&self, statement: &StatementHandle) -> Result<Either<u64, ()>, Error> {
// storing <0> as a terminal in status releases the worker
// to proceed to the next [sqlite3_step] invocation
self.status.store(STATE_READY, Ordering::Release);
// we then use a spin loop to wait for this to finish
// 99% of the time this should be < 1 μs
let status = loop {
let status = self
.status
.compare_and_swap(STATE_READY, STATE_READY, Ordering::AcqRel);
if status != STATE_READY {
break status;
}
self.tx
.send(StatementWorkerCommand::Step { statement, tx })
.map_err(|_| Error::WorkerCrashed)?;
while rx.is_empty() {
yield_now().await;
};
match status {
// a row was found
SQLITE_ROW => Ok(Either::Right(())),
// reached the end of the query results,
// emit the # of changes
SQLITE_DONE => Ok(Either::Left(statement.changes())),
_ => Err(statement.last_error().into()),
}
}
pub(crate) fn close(&mut self) {
self.status.store(STATE_CLOSE, Ordering::Release);
if let Some(handle) = self.handle.take() {
handle.thread().unpark();
handle.join().unwrap();
}
}
}
impl Drop for StatementWorker {
fn drop(&mut self) {
self.close();
rx.recv().map_err(|_| Error::WorkerCrashed)?
}
}