Caching describe

This commit is contained in:
Julius de Bruijn 2020-07-07 16:56:13 +02:00 committed by Ryan Leckey
parent eba6f3973d
commit 590f97df4a
19 changed files with 323 additions and 50 deletions

View File

@ -119,6 +119,41 @@ impl<'c> Executor<'c> for &'c mut AnyConnection {
})
})
}
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
let query = query.query();
Box::pin(async move {
Ok(match &mut self.0 {
#[cfg(feature = "postgres")]
AnyConnectionKind::Postgres(conn) => {
conn.describe_full(query).await.map(map_describe)?
}
#[cfg(feature = "mysql")]
AnyConnectionKind::MySql(conn) => {
conn.describe_full(query).await.map(map_describe)?
}
#[cfg(feature = "sqlite")]
AnyConnectionKind::Sqlite(conn) => {
conn.describe_full(query).await.map(map_describe)?
}
#[cfg(feature = "mssql")]
AnyConnectionKind::Mssql(conn) => {
conn.describe_full(query).await.map(map_describe)?
}
})
})
}
}
fn map_describe<DB: Database>(info: StatementInfo<DB>) -> StatementInfo<Any>

View File

@ -129,13 +129,23 @@ pub trait Executor<'c>: Send + Debug + Sized {
'c: 'e,
E: Execute<'q, Self::Database>;
/// Prepare the SQL query and return type information about its parameters
/// and results.
fn describe<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>;
/// Prepare the SQL query and return type information about its parameters
/// and results.
///
/// This is used by compile-time verification in the query macros to
/// power their type inference.
#[doc(hidden)]
fn describe<'e, 'q: 'e, E: 'q>(
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
@ -161,6 +171,9 @@ pub trait Execute<'q, DB: Database>: Send + Sized {
/// prepare the query. Returning `Some(Default::default())` is an empty arguments object that
/// will be prepared (and cached) before execution.
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments>;
/// Returns true if query has any parameters.
fn persistent(&self) -> bool;
}
// NOTE: `Execute` is explicitly not implemented for String and &String to make it slightly more
@ -175,6 +188,11 @@ impl<'q, DB: Database> Execute<'q, DB> for &'q str {
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
None
}
#[inline]
fn persistent(&self) -> bool {
false
}
}
impl<'q, DB: Database> Execute<'q, DB> for (&'q str, Option<<DB as HasArguments<'q>>::Arguments>) {
@ -187,4 +205,9 @@ impl<'q, DB: Database> Execute<'q, DB> for (&'q str, Option<<DB as HasArguments<
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.1.take()
}
#[inline]
fn persistent(&self) -> bool {
self.1.is_some()
}
}

View File

@ -151,4 +151,15 @@ impl<'c> Executor<'c> for &'c mut MssqlConnection {
{
describe(self, query.query()).boxed()
}
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
describe(self, query.query()).boxed()
}
}

View File

@ -259,6 +259,17 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<MySql>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
self.describe_full(query)
}
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,

View File

@ -60,6 +60,19 @@ where
Box::pin(async move { pool.acquire().await?.describe(query).await })
}
#[doc(hidden)]
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
where
E: Execute<'q, Self::Database>,
{
let pool = self.clone();
Box::pin(async move { pool.acquire().await?.describe_full(query).await })
}
}
// NOTE: required due to lack of lazy normalization
@ -114,6 +127,22 @@ macro_rules! impl_executor_for_pool_connection {
{
(**self).describe(query)
}
#[doc(hidden)]
#[inline]
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> futures_core::future::BoxFuture<
'e,
Result<crate::statement::StatementInfo<$DB>, crate::error::Error>,
>
where
'c: 'e,
E: crate::executor::Execute<'q, $DB>,
{
(**self).describe_full(query)
}
}
};
}

View File

@ -12,14 +12,20 @@ use crate::postgres::message::{
RowDescription,
};
use crate::postgres::type_info::PgType;
use crate::postgres::{PgArguments, PgConnection, PgDone, PgRow, PgValueFormat, Postgres};
use crate::postgres::{
statement::PgStatement, PgArguments, PgConnection, PgDone, PgRow, PgValueFormat, Postgres,
};
use crate::statement::StatementInfo;
use message::Flush;
async fn prepare(
conn: &mut PgConnection,
query: &str,
arguments: &PgArguments,
) -> Result<u32, Error> {
) -> Result<PgStatement, Error> {
// before we continue, wait until we are "ready" to accept more queries
conn.wait_until_ready().await?;
let id = conn.next_statement_id;
conn.next_statement_id = conn.next_statement_id.wrapping_add(1);
@ -64,7 +70,28 @@ async fn prepare(
conn.recv_ready_for_query().await?;
Ok(id)
// get the statement columns and parameters
conn.stream.write(message::Describe::Statement(id));
conn.write_sync();
conn.stream.flush().await?;
let parameters = recv_desc_params(conn).await?;
let rows = recv_desc_rows(conn).await?;
conn.recv_ready_for_query().await?;
let parameters = conn.handle_parameter_description(parameters).await?;
conn.handle_row_description(rows, true).await?;
let columns = (&*conn.scratch_row_columns).clone();
Ok(PgStatement {
id,
parameters,
columns,
})
}
async fn recv_desc_params(conn: &mut PgConnection) -> Result<ParameterDescription, Error> {
@ -128,16 +155,22 @@ impl PgConnection {
self.pending_ready_for_query_count += 1;
}
async fn prepare(&mut self, query: &str, arguments: &PgArguments) -> Result<u32, Error> {
if let Some(statement) = self.cache_statement.get_mut(query) {
return Ok(*statement);
async fn prepare(
&mut self,
query: &str,
arguments: &PgArguments,
) -> Result<&mut PgStatement, Error> {
let contains = self.cache_statement.contains_key(query);
if contains {
return Ok(self.cache_statement.get_mut(query).unwrap());
}
let statement = prepare(self, query, arguments).await?;
if let Some(statement) = self.cache_statement.insert(query, statement) {
self.stream.write(Close::Statement(statement));
self.write_sync();
self.stream.write(Close::Statement(statement.id));
self.stream.write(Flush);
self.stream.flush().await?;
@ -145,7 +178,7 @@ impl PgConnection {
self.recv_ready_for_query().await?;
}
Ok(statement)
Ok(self.cache_statement.get_mut(query).unwrap())
}
async fn run(
@ -160,7 +193,7 @@ impl PgConnection {
let format = if let Some(mut arguments) = arguments {
// prepare the statement if this our first time executing it
// always return the statement ID here
let statement = self.prepare(query, &arguments).await?;
let statement = self.prepare(query, &arguments).await?.id;
// patch holes created during encoding
arguments.buffer.patch_type_holes(self).await?;
@ -343,23 +376,33 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
let s = query.query();
Box::pin(async move {
let id = prepare(self, s, &Default::default()).await?;
let statement = self.prepare(s, &Default::default()).await?;
let columns = statement.columns.clone();
let params = statement.parameters.clone();
let nullable = Vec::new();
self.stream.write(message::Describe::Statement(id));
self.write_sync();
Ok(StatementInfo {
columns,
nullable,
parameters: Some(Either::Left(params)),
})
})
}
self.stream.flush().await?;
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
let s = query.query();
let params = recv_desc_params(self).await?;
let rows = recv_desc_rows(self).await?;
self.recv_ready_for_query().await?;
let params = self.handle_parameter_description(params).await?;
self.handle_row_description(rows, true).await?;
let columns = (&*self.scratch_row_columns).clone();
Box::pin(async move {
let statement = self.prepare(s, &Default::default()).await?;
let columns = statement.columns.clone();
let params = statement.parameters.clone();
let nullable = self.get_nullable_for_columns(&columns).await?;
Ok(StatementInfo {

View File

@ -5,6 +5,7 @@ use futures_core::future::BoxFuture;
use futures_util::{FutureExt, TryFutureExt};
use hashbrown::HashMap;
use super::statement::PgStatement;
use crate::common::StatementCache;
use crate::connection::Connection;
use crate::error::Error;
@ -47,7 +48,7 @@ pub struct PgConnection {
next_statement_id: u32,
// cache statement by query string to the id and columns
cache_statement: StatementCache<u32>,
cache_statement: StatementCache<PgStatement>,
// cache user-defined types by id <-> info
cache_type_info: HashMap<u32, PgTypeInfo>,
@ -153,7 +154,7 @@ impl Connection for PgConnection {
self.wait_until_ready().await?;
while let Some(statement) = self.cache_statement.remove_lru() {
self.stream.write(Close::Statement(statement));
self.stream.write(Close::Statement(statement.id));
cleared += 1;
}

View File

@ -283,7 +283,6 @@ impl<'c> Executor<'c> for &'c mut PgListener {
self.connection().fetch_optional(query)
}
#[doc(hidden)]
fn describe<'e, 'q: 'e, E: 'q>(
self,
query: E,
@ -294,6 +293,18 @@ impl<'c> Executor<'c> for &'c mut PgListener {
{
self.connection().describe(query)
}
#[doc(hidden)]
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Self::Database>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
self.connection().describe_full(query)
}
}
impl PgNotification {

View File

@ -11,6 +11,7 @@ mod listener;
mod message;
mod options;
mod row;
mod statement;
mod transaction;
mod type_info;
pub mod types;

View File

@ -0,0 +1,7 @@
use super::{PgColumn, PgTypeInfo};
pub struct PgStatement {
pub(crate) id: u32,
pub(crate) columns: Vec<PgColumn>,
pub(crate) parameters: Vec<PgTypeInfo>,
}

View File

@ -47,6 +47,11 @@ where
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.arguments.take().map(IntoArguments::into_arguments)
}
#[inline]
fn persistent(&self) -> bool {
self.arguments.is_some()
}
}
impl<'q, DB: Database> Query<'q, DB, <DB as HasArguments<'q>>::Arguments> {
@ -201,6 +206,11 @@ where
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.inner.take_arguments()
}
#[inline]
fn persistent(&self) -> bool {
self.inner.arguments.is_some()
}
}
impl<'q, DB, F, O, A> Map<'q, DB, F, A>

View File

@ -35,6 +35,11 @@ where
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.inner.take_arguments()
}
#[inline]
fn persistent(&self) -> bool {
self.inner.arguments.is_some()
}
}
impl<'q, DB: Database, O> QueryAs<'q, DB, O, <DB as HasArguments<'q>>::Arguments> {

View File

@ -31,6 +31,11 @@ where
fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
self.inner.take_arguments()
}
#[inline]
fn persistent(&self) -> bool {
self.inner.persistent()
}
}
impl<'q, DB: Database, O> QueryScalar<'q, DB, O, <DB as HasArguments<'q>>::Arguments> {

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::{FutureExt, TryStreamExt};
use futures_util::TryStreamExt;
use hashbrown::HashMap;
use libsqlite3_sys::sqlite3_last_insert_rowid;
@ -15,7 +15,8 @@ use crate::sqlite::connection::describe::describe;
use crate::sqlite::connection::ConnectionHandle;
use crate::sqlite::statement::{SqliteStatement, StatementHandle};
use crate::sqlite::{
Sqlite, SqliteArguments, SqliteColumn, SqliteConnection, SqliteDone, SqliteRow,
type_info::DataType, Sqlite, SqliteArguments, SqliteColumn, SqliteConnection, SqliteDone,
SqliteRow, SqliteTypeInfo,
};
use crate::statement::StatementInfo;
@ -211,7 +212,6 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
})
}
#[doc(hidden)]
fn describe<'e, 'q: 'e, E: 'q>(
self,
query: E,
@ -220,6 +220,67 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
'c: 'e,
E: Execute<'q, Self::Database>,
{
describe(self, query.query()).boxed()
Box::pin(async move {
let persistent = query.persistent();
let query = query.query();
let SqliteConnection {
handle: ref mut conn,
ref mut statements,
ref mut statement,
..
} = self;
// prepare statement object (or checkout from cache)
let statement = prepare(conn, statements, statement, query, persistent)?;
let mut params = 0;
let mut columns = Vec::new();
if let Some(statement) = statement.handles.get(0) {
// NOTE: we can infer *nothing* about parameters apart from the count
params = statement.bind_parameter_count();
let num_columns = statement.column_count();
columns.reserve(num_columns);
for i in 0..num_columns {
let name = statement.column_name(i).to_owned().into();
let type_info = statement
.column_decltype(i)
.unwrap_or(SqliteTypeInfo(DataType::Null));
let ordinal = i;
columns.push(SqliteColumn {
name,
type_info,
ordinal,
})
}
}
let parameters = Some(Either::Right(params));
let nullable = Vec::new();
Ok(StatementInfo {
parameters,
columns,
nullable,
})
})
}
#[doc(hidden)]
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<StatementInfo<Sqlite>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
describe(self, query.query())
}
}

View File

@ -145,6 +145,21 @@ macro_rules! impl_executor_for_transaction {
{
(&mut **self).describe(query)
}
#[doc(hidden)]
fn describe_full<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> futures_core::future::BoxFuture<
'e,
Result<crate::statement::StatementInfo<Self::Database>, crate::error::Error>,
>
where
't: 'e,
E: crate::executor::Execute<'q, Self::Database>,
{
(&mut **self).describe(query)
}
}
};
}

View File

@ -26,7 +26,7 @@ impl<DB: Database> QueryData<DB> {
) -> crate::Result<Self> {
Ok(QueryData {
query: query.into(),
describe: conn.describe(query).await?,
describe: conn.describe_full(query).await?,
#[cfg(feature = "offline")]
hash: offline::hash_string(query),
})

View File

@ -6,7 +6,7 @@ use sqlx_test::new;
async fn it_describes_simple() -> anyhow::Result<()> {
let mut conn = new::<MySql>().await?;
let d = conn.describe("SELECT * FROM tweet").await?;
let d = conn.describe_full("SELECT * FROM tweet").await?;
assert_eq!(d.column(0).name(), "id");
assert_eq!(d.column(1).name(), "created_at");
@ -31,7 +31,7 @@ async fn uses_alias_name() -> anyhow::Result<()> {
let mut conn = new::<MySql>().await?;
let d = conn
.describe("SELECT text AS tweet_text FROM tweet")
.describe_full("SELECT text AS tweet_text FROM tweet")
.await?;
assert_eq!(d.column(0).name(), "tweet_text");

View File

@ -5,7 +5,7 @@ use sqlx_test::new;
async fn it_describes_simple() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;
let d = conn.describe("SELECT * FROM tweet").await?;
let d = conn.describe_full("SELECT * FROM tweet").await?;
assert_eq!(d.column(0).name(), "id");
assert_eq!(d.column(1).name(), "created_at");
@ -29,7 +29,7 @@ async fn it_describes_simple() -> anyhow::Result<()> {
async fn it_describes_expression() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;
let d = conn.describe("SELECT 1::int8 + 10").await?;
let d = conn.describe_full("SELECT 1::int8 + 10").await?;
// ?column? will cause the macro to emit an error ad ask the user to explicitly name the type
assert_eq!(d.column(0).name(), "?column?");
@ -46,7 +46,7 @@ async fn it_describes_expression() -> anyhow::Result<()> {
async fn it_describes_enum() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;
let d = conn.describe("SELECT 'open'::status as _1").await?;
let d = conn.describe_full("SELECT 'open'::status as _1").await?;
assert_eq!(d.column(0).name(), "_1");
@ -66,7 +66,7 @@ async fn it_describes_enum() -> anyhow::Result<()> {
async fn it_describes_record() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;
let d = conn.describe("SELECT (true, 10::int2)").await?;
let d = conn.describe_full("SELECT (true, 10::int2)").await?;
let ty = d.column(0).type_info();
assert_eq!(ty.name(), "RECORD");
@ -79,7 +79,7 @@ async fn it_describes_composite() -> anyhow::Result<()> {
let mut conn = new::<Postgres>().await?;
let d = conn
.describe("SELECT ROW('name',10,500)::inventory_item")
.describe_full("SELECT ROW('name',10,500)::inventory_item")
.await?;
let ty = d.column(0).type_info();

View File

@ -10,7 +10,7 @@ use std::env;
async fn it_describes_simple() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let info = conn.describe("SELECT * FROM tweet").await?;
let info = conn.describe_full("SELECT * FROM tweet").await?;
let columns = info.columns();
assert_eq!(columns[0].name(), "id");
@ -41,13 +41,13 @@ async fn it_describes_variables() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
// without any context, we resolve to NULL
let info = conn.describe("SELECT ?1").await?;
let info = conn.describe_full("SELECT ?1").await?;
assert_eq!(info.column(0).type_info().name(), "NULL");
assert_eq!(info.nullable(0), None); // unknown
// context can be provided by using CAST(_ as _)
let info = conn.describe("SELECT CAST(?1 AS REAL)").await?;
let info = conn.describe_full("SELECT CAST(?1 AS REAL)").await?;
assert_eq!(info.column(0).type_info().name(), "REAL");
assert_eq!(info.nullable(0), None); // unknown
@ -60,7 +60,7 @@ async fn it_describes_expression() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let d = conn
.describe("SELECT 1 + 10, 5.12 * 2, 'Hello', x'deadbeef'")
.describe_full("SELECT 1 + 10, 5.12 * 2, 'Hello', x'deadbeef'")
.await?;
let columns = d.columns();
@ -92,7 +92,7 @@ async fn it_describes_expression_from_empty_table() -> anyhow::Result<()> {
.await?;
let d = conn
.describe("SELECT COUNT(*), a + 1, name, 5.12, 'Hello' FROM _temp_empty")
.describe_full("SELECT COUNT(*), a + 1, name, 5.12, 'Hello' FROM _temp_empty")
.await?;
assert_eq!(d.column(0).type_info().name(), "INTEGER");
@ -121,7 +121,7 @@ async fn it_describes_expression_from_empty_table_with_star() -> anyhow::Result<
.await?;
let d = conn
.describe("SELECT *, 5, 'Hello' FROM _temp_empty")
.describe_full("SELECT *, 5, 'Hello' FROM _temp_empty")
.await?;
assert_eq!(d.column(0).type_info().name(), "TEXT");
@ -137,13 +137,15 @@ async fn it_describes_insert() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let d = conn
.describe("INSERT INTO tweet (id, text) VALUES (2, 'Hello')")
.describe_full("INSERT INTO tweet (id, text) VALUES (2, 'Hello')")
.await?;
assert_eq!(d.columns().len(), 0);
let d = conn
.describe("INSERT INTO tweet (id, text) VALUES (2, 'Hello'); SELECT last_insert_rowid();")
.describe_full(
"INSERT INTO tweet (id, text) VALUES (2, 'Hello'); SELECT last_insert_rowid();",
)
.await?;
assert_eq!(d.columns().len(), 1);
@ -163,7 +165,7 @@ async fn it_describes_insert_with_read_only() -> anyhow::Result<()> {
let mut conn = options.connect().await?;
let d = conn
.describe("INSERT INTO tweet (id, text) VALUES (2, 'Hello')")
.describe_full("INSERT INTO tweet (id, text) VALUES (2, 'Hello')")
.await?;
assert_eq!(d.columns().len(), 0);
@ -175,7 +177,10 @@ async fn it_describes_insert_with_read_only() -> anyhow::Result<()> {
async fn it_describes_bad_statement() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let err = conn.describe("SELECT 1 FROM not_found").await.unwrap_err();
let err = conn
.describe_full("SELECT 1 FROM not_found")
.await
.unwrap_err();
let err = err
.as_database_error()
.unwrap()