diff --git a/sqlx-core/src/postgres/connection/executor.rs b/sqlx-core/src/postgres/connection/executor.rs index 411dfcf4..1d853d56 100644 --- a/sqlx-core/src/postgres/connection/executor.rs +++ b/sqlx-core/src/postgres/connection/executor.rs @@ -8,8 +8,8 @@ use std::sync::Arc; use crate::error::Error; use crate::executor::{Execute, Executor}; use crate::postgres::message::{ - self, Bind, Close, CommandComplete, DataRow, Flush, MessageFormat, ParameterDescription, Parse, - Query, RowDescription, + self, Bind, Close, CommandComplete, DataRow, MessageFormat, ParameterDescription, Parse, Query, + RowDescription, }; use crate::postgres::type_info::PgType; use crate::postgres::{PgArguments, PgConnection, PgDone, PgRow, PgValueFormat, Postgres}; @@ -52,7 +52,8 @@ async fn prepare( }); // we ask for the server to immediately send us the result of the PARSE command by using FLUSH - conn.stream.write(Flush); + + conn.write_sync(); conn.stream.flush().await?; // indicates that the SQL query string is now successfully parsed and has semantic validity @@ -61,6 +62,8 @@ async fn prepare( .recv_expect(MessageFormat::ParseComplete) .await?; + conn.recv_ready_for_query().await?; + Ok(id) } @@ -118,6 +121,13 @@ impl PgConnection { Ok(()) } + pub(crate) fn write_sync(&mut self) { + self.stream.write(message::Sync); + + // all SYNC messages will return a ReadyForQuery + self.pending_ready_for_query_count += 1; + } + async fn prepare(&mut self, query: &str, arguments: &PgArguments) -> Result { if let Some(statement) = self.cache_statement.get_mut(query) { return Ok(*statement); @@ -127,11 +137,12 @@ impl PgConnection { if let Some(statement) = self.cache_statement.insert(query, statement) { self.stream.write(Close::Statement(statement)); - self.stream.write(Flush); + self.write_sync(); self.stream.flush().await?; self.wait_for_close_complete(1).await?; + self.recv_ready_for_query().await?; } Ok(statement) @@ -157,7 +168,8 @@ impl PgConnection { // describe the statement and, again, ask the server to immediately respond // we need to fully realize the types self.stream.write(message::Describe::Statement(statement)); - self.stream.write(message::Flush); + + self.write_sync(); self.stream.flush().await?; let _ = recv_desc_params(self).await?; @@ -188,19 +200,19 @@ impl PgConnection { // dozens of queries before a [Sync] and postgres can handle that. Execution on the server // is still serial but it would reduce round-trips. Some kind of builder pattern that is // termed batching might suit this. - self.stream.write(message::Sync); + self.write_sync(); // prepared statements are binary PgValueFormat::Binary } else { + // Query will trigger a ReadyForQuery self.stream.write(Query(query)); + self.pending_ready_for_query_count += 1; // and unprepared statements are text PgValueFormat::Text }; - // [Query] or [Sync] will trigger a [ReadyForQuery] - self.pending_ready_for_query_count += 1; self.stream.flush().await?; Ok(try_stream! { @@ -334,13 +346,15 @@ impl<'c> Executor<'c> for &'c mut PgConnection { let id = prepare(self, s, &Default::default()).await?; self.stream.write(message::Describe::Statement(id)); - self.stream.write(Flush); + self.write_sync(); self.stream.flush().await?; let params = recv_desc_params(self).await?; let rows = recv_desc_rows(self).await?; + self.recv_ready_for_query().await?; + let params = self.handle_parameter_description(params).await?; self.handle_row_description(rows, true).await?; diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index f297a643..e5ed188d 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -13,7 +13,7 @@ use crate::ext::ustr::UStr; use crate::io::Decode; use crate::postgres::connection::stream::PgStream; use crate::postgres::message::{ - Close, Flush, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus, + Close, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus, }; use crate::postgres::{PgColumn, PgConnectOptions, PgTypeInfo, Postgres}; use crate::transaction::Transaction; @@ -84,6 +84,18 @@ impl PgConnection { Ok(()) } + async fn recv_ready_for_query(&mut self) -> Result<(), Error> { + let r: ReadyForQuery = self + .stream + .recv_expect(MessageFormat::ReadyForQuery) + .await?; + + self.pending_ready_for_query_count -= 1; + self.transaction_status = r.transaction_status; + + Ok(()) + } + fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> { self.pending_ready_for_query_count -= 1; self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status; @@ -146,10 +158,11 @@ impl Connection for PgConnection { } if cleared > 0 { - self.stream.write(Flush); + self.write_sync(); self.stream.flush().await?; self.wait_for_close_complete(cleared).await?; + self.recv_ready_for_query().await?; } Ok(())