mirror of
https://github.com/launchbadge/sqlx.git
synced 2026-04-28 13:14:42 +00:00
Close pg statements correctly
This commit is contained in:
@@ -49,7 +49,7 @@ impl<T> StatementCache<T> {
|
||||
}
|
||||
|
||||
/// Clear all cached statements from the cache.
|
||||
#[cfg(any(feature = "postgres", feature = "sqlite"))]
|
||||
#[cfg(any(feature = "sqlite"))]
|
||||
pub fn clear(&mut self) {
|
||||
self.inner.clear();
|
||||
}
|
||||
|
||||
@@ -9,8 +9,8 @@ use crate::describe::Describe;
|
||||
use crate::error::Error;
|
||||
use crate::executor::{Execute, Executor};
|
||||
use crate::postgres::message::{
|
||||
self, Bind, CommandComplete, DataRow, Flush, MessageFormat, ParameterDescription, Parse, Query,
|
||||
RowDescription,
|
||||
self, Bind, Close, CommandComplete, DataRow, Flush, MessageFormat, ParameterDescription, Parse,
|
||||
Query, RowDescription,
|
||||
};
|
||||
use crate::postgres::type_info::PgType;
|
||||
use crate::postgres::{PgArguments, PgConnection, PgRow, PgValueFormat, Postgres};
|
||||
@@ -97,7 +97,11 @@ impl PgConnection {
|
||||
|
||||
let statement = prepare(self, query, arguments).await?;
|
||||
|
||||
self.cache_statement.insert(query, statement);
|
||||
if let Some(statement) = self.cache_statement.insert(query, statement) {
|
||||
self.stream.write(Close::Statement(statement));
|
||||
self.stream.write(Flush);
|
||||
self.stream.flush().await?;
|
||||
}
|
||||
|
||||
Ok(statement)
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::ext::ustr::UStr;
|
||||
use crate::io::Decode;
|
||||
use crate::postgres::connection::stream::PgStream;
|
||||
use crate::postgres::message::{
|
||||
Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
|
||||
Close, Flush, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
|
||||
};
|
||||
use crate::postgres::row::PgColumn;
|
||||
use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres};
|
||||
@@ -126,7 +126,18 @@ impl Connection for PgConnection {
|
||||
|
||||
fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
|
||||
Box::pin(async move {
|
||||
self.cache_statement.clear();
|
||||
let mut needs_flush = false;
|
||||
|
||||
while let Some(statement) = self.cache_statement.remove_lru() {
|
||||
self.stream.write(Close::Statement(statement));
|
||||
needs_flush = true;
|
||||
}
|
||||
|
||||
if needs_flush {
|
||||
self.stream.write(Flush);
|
||||
self.stream.flush().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
32
sqlx-core/src/postgres/message/close.rs
Normal file
32
sqlx-core/src/postgres/message/close.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use crate::io::Encode;
|
||||
use crate::postgres::io::PgBufMutExt;
|
||||
|
||||
const CLOSE_PORTAL: u8 = b'P';
|
||||
const CLOSE_STATEMENT: u8 = b'S';
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub enum Close {
|
||||
Statement(u32),
|
||||
Portal(u32),
|
||||
}
|
||||
|
||||
impl Encode<'_> for Close {
|
||||
fn encode_with(&self, buf: &mut Vec<u8>, _: ()) {
|
||||
// 15 bytes for 1-digit statement/portal IDs
|
||||
buf.reserve(20);
|
||||
buf.push(b'C');
|
||||
|
||||
buf.put_length_prefixed(|buf| match self {
|
||||
Close::Statement(id) => {
|
||||
buf.push(CLOSE_STATEMENT);
|
||||
buf.put_statement_name(*id);
|
||||
}
|
||||
|
||||
Close::Portal(id) => {
|
||||
buf.push(CLOSE_PORTAL);
|
||||
buf.put_portal_name(Some(*id));
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ use crate::io::Decode;
|
||||
mod authentication;
|
||||
mod backend_key_data;
|
||||
mod bind;
|
||||
mod close;
|
||||
mod command_complete;
|
||||
mod data_row;
|
||||
mod describe;
|
||||
@@ -28,6 +29,7 @@ mod terminate;
|
||||
pub use authentication::{Authentication, AuthenticationSasl};
|
||||
pub use backend_key_data::BackendKeyData;
|
||||
pub use bind::Bind;
|
||||
pub use close::Close;
|
||||
pub use command_complete::CommandComplete;
|
||||
pub use data_row::DataRow;
|
||||
pub use describe::Describe;
|
||||
|
||||
Reference in New Issue
Block a user