From 55ffd989e11c19046cd69040ba1667df3f86bc5a Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Fri, 21 Feb 2020 17:24:59 -0800 Subject: [PATCH] postgres: add support for simple queries (that do not return results) --- sqlx-core/src/postgres/cursor.rs | 67 ++++++++++++++---------------- sqlx-core/src/postgres/executor.rs | 4 ++ 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/sqlx-core/src/postgres/cursor.rs b/sqlx-core/src/postgres/cursor.rs index 66f2557c..bc8b0c6c 100644 --- a/sqlx-core/src/postgres/cursor.rs +++ b/sqlx-core/src/postgres/cursor.rs @@ -43,11 +43,8 @@ impl<'c, 'q> PgCursor<'c, 'q> { 'c: 'a, { try_stream! { - loop { - let row = next(&mut self).await?; - if let Some(row) = row { - yield f(row); - } + while let Some(row) = next(&mut self).await? { + yield f(row); } } } @@ -151,32 +148,34 @@ async fn write( query: &str, arguments: Option, ) -> crate::Result<()> { - // TODO: Handle [arguments] being None. This should be a SIMPLE query. - let arguments = arguments.expect("simple queries not implemented yet"); + if let Some(arguments) = arguments { + // Check the statement cache for a statement ID that matches the given query + // If it doesn't exist, we generate a new statement ID and write out [Parse] to the + // connection command buffer + let statement = conn.write_prepare(query, &arguments); - // Check the statement cache for a statement ID that matches the given query - // If it doesn't exist, we generate a new statement ID and write out [Parse] to the - // connection command buffer - let statement = conn.write_prepare(query, &arguments); + // Next, [Bind] attaches the arguments to the statement and creates a named portal + conn.write_bind("", statement, &arguments); - // Next, [Bind] attaches the arguments to the statement and creates a named portal - conn.write_bind("", statement, &arguments); + // Next, [Describe] will return the expected result columns and types + // Conditionally run [Describe] only if the results have not been cached + // if !self.statement_cache.has_columns(statement) { + // self.write_describe(protocol::Describe::Portal("")); + // } - // Next, [Describe] will return the expected result columns and types - // Conditionally run [Describe] only if the results have not been cached - // if !self.statement_cache.has_columns(statement) { - // self.write_describe(protocol::Describe::Portal("")); - // } + // Next, [Execute] then executes the named portal + conn.write_execute("", 0); - // Next, [Execute] then executes the named portal - conn.write_execute("", 0); - - // Finally, [Sync] asks postgres to process the messages that we sent and respond with - // a [ReadyForQuery] message when it's completely done. Theoretically, we could send - // dozens of queries before a [Sync] and postgres can handle that. Execution on the server - // is still serial but it would reduce round-trips. Some kind of builder pattern that is - // termed batching might suit this. - conn.write_sync(); + // Finally, [Sync] asks postgres to process the messages that we sent and respond with + // a [ReadyForQuery] message when it's completely done. Theoretically, we could send + // dozens of queries before a [Sync] and postgres can handle that. Execution on the server + // is still serial but it would reduce round-trips. Some kind of builder pattern that is + // termed batching might suit this. + conn.write_sync(); + } else { + // https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.4 + conn.write_simple_query(query); + } conn.wait_until_ready().await?; @@ -199,11 +198,6 @@ async fn resolve( } async fn affected_rows(mut conn: MaybeOwnedConnection<'_, PgConnection>) -> crate::Result { - conn.wait_until_ready().await?; - - conn.stream.flush().await?; - conn.is_ready = false; - let mut rows = 0; loop { @@ -223,11 +217,14 @@ async fn affected_rows(mut conn: MaybeOwnedConnection<'_, PgConnection>) -> crat Message::ReadyForQuery => { // done + conn.is_ready = true; break; } message => { - return Err(protocol_err!("unexpected message: {:?}", message).into()); + return Err( + protocol_err!("affected_rows: unexpected message: {:?}", message).into(), + ); } } } @@ -280,7 +277,7 @@ async fn next<'a, 'c: 'a, 'q: 'a>( } message => { - return Err(protocol_err!("unexpected message: {:?}", message).into()); + return Err(protocol_err!("next: unexpected message: {:?}", message).into()); } } } @@ -328,7 +325,7 @@ async fn first<'c, 'q>(mut cursor: PgCursor<'c, 'q>) -> crate::Result { - return Err(protocol_err!("unexpected message: {:?}", message).into()); + return Err(protocol_err!("first: unexpected message: {:?}", message).into()); } } } diff --git a/sqlx-core/src/postgres/executor.rs b/sqlx-core/src/postgres/executor.rs index 9eabd485..1e2614f0 100644 --- a/sqlx-core/src/postgres/executor.rs +++ b/sqlx-core/src/postgres/executor.rs @@ -8,6 +8,10 @@ use crate::postgres::protocol::{self, Encode, StatementId, TypeFormat}; use crate::postgres::{PgArguments, PgConnection, PgCursor, PgRow, PgTypeInfo, Postgres}; impl PgConnection { + pub(crate) fn write_simple_query(&mut self, query: &str) { + self.stream.write(protocol::Query(query)); + } + pub(crate) fn write_prepare(&mut self, query: &str, args: &PgArguments) -> StatementId { // TODO: check query cache