fix(postgres): after closing a statement, the connection should await CloseComplete

This commit is contained in:
Ryan Leckey 2020-07-02 22:37:04 -07:00
parent 24d626b883
commit 1a7480774b
5 changed files with 71 additions and 6 deletions

View File

@ -90,6 +90,34 @@ async fn recv_desc_rows(conn: &mut PgConnection) -> Result<Option<RowDescription
}
impl PgConnection {
// wait for CloseComplete to indicate a statement was closed
pub(super) async fn wait_for_close_complete(&mut self, mut count: usize) -> Result<(), Error> {
// we need to wait for the [CloseComplete] to be returned from the server
while count > 0 {
match self.stream.recv().await? {
message if message.format == MessageFormat::PortalSuspended => {
// there was an open portal
// this can happen if the last time a statement was used it was not fully executed
// such as in [fetch_one]
}
message if message.format == MessageFormat::CloseComplete => {
// successfully closed the statement (and freed up the server resources)
count -= 1;
}
message => {
return Err(err_protocol!(
"expecting PortalSuspended or CloseComplete but received {:?}",
message.format
));
}
}
}
Ok(())
}
async fn prepare(&mut self, query: &str, arguments: &PgArguments) -> Result<u32, Error> {
if let Some(statement) = self.cache_statement.get_mut(query) {
return Ok(*statement);
@ -100,7 +128,10 @@ impl PgConnection {
if let Some(statement) = self.cache_statement.insert(query, statement) {
self.stream.write(Close::Statement(statement));
self.stream.write(Flush);
self.stream.flush().await?;
self.wait_for_close_complete(1).await?;
}
Ok(statement)

View File

@ -126,16 +126,20 @@ impl Connection for PgConnection {
fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let mut needs_flush = false;
let mut cleared = 0_usize;
self.wait_until_ready().await?;
while let Some(statement) = self.cache_statement.remove_lru() {
self.stream.write(Close::Statement(statement));
needs_flush = true;
cleared += 1;
}
if needs_flush {
if cleared > 0 {
self.stream.write(Flush);
self.stream.flush().await?;
self.wait_for_close_complete(cleared).await?;
}
Ok(())

View File

@ -55,6 +55,7 @@ pub enum MessageFormat {
Authentication,
BackendKeyData,
BindComplete,
CloseComplete,
CommandComplete,
DataRow,
EmptyQueryResponse,
@ -93,6 +94,7 @@ impl MessageFormat {
Ok(match v {
b'1' => MessageFormat::ParseComplete,
b'2' => MessageFormat::BindComplete,
b'3' => MessageFormat::CloseComplete,
b'C' => MessageFormat::CommandComplete,
b'D' => MessageFormat::DataRow,
b'E' => MessageFormat::ErrorResponse,

View File

@ -1,7 +1,7 @@
use sqlx::{database::Database, Connect, Pool};
use std::env;
fn setup_if_needed() {
pub fn setup_if_needed() {
let _ = dotenv::dotenv();
let _ = env_logger::builder().is_test(true).try_init();
}

View File

@ -1,7 +1,8 @@
use futures::TryStreamExt;
use sqlx::postgres::PgRow;
use sqlx::postgres::{PgDatabaseError, PgErrorPosition, PgSeverity};
use sqlx::{postgres::Postgres, Connection, Executor, PgPool, Row};
use std::env;
use sqlx::postgres::{PgConnection, PgConnectOptions, PgDatabaseError, PgErrorPosition, PgSeverity};
use sqlx::{postgres::Postgres, Connect, Connection, Executor, PgPool, Row};
use sqlx_test::new;
use std::time::Duration;
@ -513,3 +514,30 @@ async fn it_caches_statements() -> anyhow::Result<()> {
Ok(())
}
#[sqlx_macros::test]
async fn it_closes_statement_from_cache_issue_470() -> anyhow::Result<()> {
sqlx_test::setup_if_needed();
let mut options: PgConnectOptions = env::var("DATABASE_URL")?.parse().unwrap();
// a capacity of 1 means that before each statement (after the first)
// we will close the previous statement
options = options.statement_cache_capacity(1);
let mut conn = PgConnection::connect_with(&options).await?;
for i in 0..5 {
let row = sqlx::query(&*format!("SELECT {}::int4 AS val", i))
.fetch_one(&mut conn)
.await?;
let val: i32 = row.get("val");
assert_eq!(i, val);
}
assert_eq!(1, conn.cached_statements_size());
Ok(())
}