fix(postgres): use Sync over Flush for terminating extended non-query commands; in addition, keep better track of the number of expected readyForQueries

This commit is contained in:
Ryan Leckey 2020-07-17 04:53:42 -07:00
parent fa40e9e55f
commit cc736df4d6
2 changed files with 38 additions and 11 deletions

View File

@ -8,8 +8,8 @@ use std::sync::Arc;
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::postgres::message::{
self, Bind, Close, CommandComplete, DataRow, Flush, MessageFormat, ParameterDescription, Parse,
Query, RowDescription,
self, Bind, Close, CommandComplete, DataRow, MessageFormat, ParameterDescription, Parse, Query,
RowDescription,
};
use crate::postgres::type_info::PgType;
use crate::postgres::{PgArguments, PgConnection, PgDone, PgRow, PgValueFormat, Postgres};
@ -52,7 +52,8 @@ async fn prepare(
});
// we ask for the server to immediately send us the result of the PARSE command by using FLUSH
conn.stream.write(Flush);
conn.write_sync();
conn.stream.flush().await?;
// indicates that the SQL query string is now successfully parsed and has semantic validity
@ -61,6 +62,8 @@ async fn prepare(
.recv_expect(MessageFormat::ParseComplete)
.await?;
conn.recv_ready_for_query().await?;
Ok(id)
}
@ -118,6 +121,13 @@ impl PgConnection {
Ok(())
}
pub(crate) fn write_sync(&mut self) {
self.stream.write(message::Sync);
// all SYNC messages will return a ReadyForQuery
self.pending_ready_for_query_count += 1;
}
async fn prepare(&mut self, query: &str, arguments: &PgArguments) -> Result<u32, Error> {
if let Some(statement) = self.cache_statement.get_mut(query) {
return Ok(*statement);
@ -127,11 +137,12 @@ impl PgConnection {
if let Some(statement) = self.cache_statement.insert(query, statement) {
self.stream.write(Close::Statement(statement));
self.stream.write(Flush);
self.write_sync();
self.stream.flush().await?;
self.wait_for_close_complete(1).await?;
self.recv_ready_for_query().await?;
}
Ok(statement)
@ -157,7 +168,8 @@ impl PgConnection {
// describe the statement and, again, ask the server to immediately respond
// we need to fully realize the types
self.stream.write(message::Describe::Statement(statement));
self.stream.write(message::Flush);
self.write_sync();
self.stream.flush().await?;
let _ = recv_desc_params(self).await?;
@ -188,19 +200,19 @@ impl PgConnection {
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
// termed batching might suit this.
self.stream.write(message::Sync);
self.write_sync();
// prepared statements are binary
PgValueFormat::Binary
} else {
// Query will trigger a ReadyForQuery
self.stream.write(Query(query));
self.pending_ready_for_query_count += 1;
// and unprepared statements are text
PgValueFormat::Text
};
// [Query] or [Sync] will trigger a [ReadyForQuery]
self.pending_ready_for_query_count += 1;
self.stream.flush().await?;
Ok(try_stream! {
@ -334,13 +346,15 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
let id = prepare(self, s, &Default::default()).await?;
self.stream.write(message::Describe::Statement(id));
self.stream.write(Flush);
self.write_sync();
self.stream.flush().await?;
let params = recv_desc_params(self).await?;
let rows = recv_desc_rows(self).await?;
self.recv_ready_for_query().await?;
let params = self.handle_parameter_description(params).await?;
self.handle_row_description(rows, true).await?;

View File

@ -13,7 +13,7 @@ use crate::ext::ustr::UStr;
use crate::io::Decode;
use crate::postgres::connection::stream::PgStream;
use crate::postgres::message::{
Close, Flush, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
Close, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
};
use crate::postgres::{PgColumn, PgConnectOptions, PgTypeInfo, Postgres};
use crate::transaction::Transaction;
@ -84,6 +84,18 @@ impl PgConnection {
Ok(())
}
async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
let r: ReadyForQuery = self
.stream
.recv_expect(MessageFormat::ReadyForQuery)
.await?;
self.pending_ready_for_query_count -= 1;
self.transaction_status = r.transaction_status;
Ok(())
}
fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
self.pending_ready_for_query_count -= 1;
self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;
@ -146,10 +158,11 @@ impl Connection for PgConnection {
}
if cleared > 0 {
self.stream.write(Flush);
self.write_sync();
self.stream.flush().await?;
self.wait_for_close_complete(cleared).await?;
self.recv_ready_for_query().await?;
}
Ok(())