refactor(postgres): review connection executor and tweak raw_query

This commit is contained in:
Ryan Leckey
2021-04-09 16:51:20 -07:00
parent fd95c68898
commit 0267fe0482
6 changed files with 48 additions and 42 deletions

View File

@@ -1,5 +1,5 @@
#[cfg(feature = "async")]
use futures_util::future::BoxFuture;
use futures_util::future::{BoxFuture, FutureExt};
use sqlx_core::{Execute, Executor, Result, Runtime};
use crate::protocol::backend::ReadyForQuery;
@@ -37,7 +37,7 @@ impl<Rt: Runtime> Executor<Rt> for PgConnection<Rt> {
'q: 'x,
'v: 'x,
{
Box::pin(self.execute_async(query))
self.execute_async(query).boxed()
}
#[cfg(feature = "async")]
@@ -50,7 +50,7 @@ impl<Rt: Runtime> Executor<Rt> for PgConnection<Rt> {
'q: 'x,
'v: 'x,
{
Box::pin(self.fetch_all_async(query))
self.fetch_all_async(query).boxed()
}
#[cfg(feature = "async")]
@@ -66,7 +66,7 @@ impl<Rt: Runtime> Executor<Rt> for PgConnection<Rt> {
'q: 'x,
'v: 'x,
{
Box::pin(self.fetch_optional_async(query))
self.fetch_optional_async(query).boxed()
}
}

View File

@@ -16,16 +16,15 @@ impl<Rt: Runtime> PgConnection<Rt> {
BackendMessageType::BindComplete => {}
BackendMessageType::DataRow => {
rows.push(PgRow::new(message.deserialize()?, &columns));
rows.push(PgRow::new(message.deserialize()?, columns));
}
BackendMessageType::RowDescription => {
*columns = Some(message.deserialize::<RowDescription>()?.columns.into());
}
BackendMessageType::CommandComplete => {
// one statement has finished
}
// one statement has finished
BackendMessageType::CommandComplete => {}
BackendMessageType::ReadyForQuery => {
self.handle_ready_for_query(message.deserialize()?);

View File

@@ -28,9 +28,8 @@ impl<Rt: Runtime> PgConnection<Rt> {
*columns = Some(message.deserialize::<RowDescription>()?.columns.into());
}
BackendMessageType::CommandComplete => {
// one statement has finished
}
// one statement has finished
BackendMessageType::CommandComplete => {}
BackendMessageType::ReadyForQuery => {
self.handle_ready_for_query(message.deserialize()?);

View File

@@ -41,9 +41,8 @@ impl<Rt: Runtime> PgConnection<Rt> {
statement: &mut RawStatement,
) -> Result<bool> {
match message.ty {
BackendMessageType::ParseComplete => {
// next message should be <ReadyForQuery>
}
// next message should be <ReadyForQuery>
BackendMessageType::ParseComplete => {}
BackendMessageType::ReadyForQuery => {
self.handle_ready_for_query(message.deserialize()?);

View File

@@ -1,38 +1,47 @@
use sqlx_core::{Execute, Result, Runtime};
use crate::protocol::frontend::{self, Bind, PortalRef, Query, StatementRef, Sync};
use crate::{PgConnection, Postgres};
use crate::raw_statement::RawStatement;
use crate::{PgArguments, PgConnection, Postgres};
impl<Rt: Runtime> PgConnection<Rt> {
fn write_raw_query_statement(
&mut self,
statement: &RawStatement,
arguments: &PgArguments<'_>,
) -> Result<()> {
// bind values to the prepared statement
self.stream.write_message(&Bind {
portal: PortalRef::Unnamed,
statement: StatementRef::Named(statement.id),
arguments,
parameters: &statement.parameters,
})?;
// describe the bound prepared statement (portal)
self.stream.write_message(&frontend::Describe {
target: frontend::Target::Portal(PortalRef::Unnamed),
})?;
// execute the bound prepared statement (portal)
self.stream
.write_message(&frontend::Execute { portal: PortalRef::Unnamed, max_rows: 0 })?;
// <Sync> is what closes the extended query invocation and
// issues a <ReadyForQuery>
self.stream.write_message(&Sync)?;
Ok(())
}
}
macro_rules! impl_raw_query {
($(@$blocking:ident)? $self:ident, $query:ident) => {{
if let Some(arguments) = $query.arguments() {
// prepare the statement for execution
let statement = raw_prepare!($(@$blocking)? $self, $query.sql(), arguments);
// bind values to the prepared statement
$self.stream.write_message(&Bind {
portal: PortalRef::Unnamed,
statement: StatementRef::Named(statement.id),
arguments,
parameters: &statement.parameters,
})?;
// describe the bound prepared statement (portal)
$self.stream.write_message(&frontend::Describe {
target: frontend::Target::Portal(PortalRef::Unnamed),
})?;
// execute the bound prepared statement (portal)
$self.stream.write_message(&frontend::Execute {
portal: PortalRef::Unnamed,
max_rows: 0,
})?;
// <Sync> is what closes the extended query invocation and
// issues a <ReadyForQuery>
$self.stream.write_message(&Sync)?;
$self.write_raw_query_statement(&statement, arguments)?;
} else {
// directly execute the query as an unprepared, simple query
$self.stream.write_message(&Query { sql: $query.sql() })?;
};

View File

@@ -20,7 +20,7 @@ pub enum PgClientError {
impl Display for PgClientError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::NotUtf8(source) => write!(f, "{}", source),
Self::NotUtf8(source) => write!(f, "unexpected invalid utf-8: {}", source),
Self::UnknownAuthenticationMethod(method) => {
write!(f, "unknown authentication method: {}", method)
@@ -50,7 +50,7 @@ impl StdError for PgClientError {}
impl ClientError for PgClientError {}
impl From<PgClientError> for Error {
fn from(err: PgClientError) -> Error {
Error::client(err)
fn from(err: PgClientError) -> Self {
Self::client(err)
}
}