diff --git a/src/postgres/connection/execute.rs b/src/postgres/connection/execute.rs index d7fc1b5a..6f0eac39 100644 --- a/src/postgres/connection/execute.rs +++ b/src/postgres/connection/execute.rs @@ -1,5 +1,5 @@ use super::prepare::Prepare; -use crate::postgres::protocol::{self, Execute, Message, Sync}; +use crate::postgres::protocol::{self, Message}; use std::io; impl<'a> Prepare<'a> { @@ -11,8 +11,9 @@ impl<'a> Prepare<'a> { &[], ); - self.connection.send(Execute::new("", 0)); - self.connection.send(Sync); + protocol::execute(&mut self.connection.wbuf, "", 0); + protocol::sync(&mut self.connection.wbuf); + self.connection.flush().await?; let mut rows = 0; diff --git a/src/postgres/connection/get.rs b/src/postgres/connection/get.rs index d11df6bf..486b9d37 100644 --- a/src/postgres/connection/get.rs +++ b/src/postgres/connection/get.rs @@ -1,5 +1,5 @@ use super::prepare::Prepare; -use crate::postgres::protocol::{self, DataRow, Execute, Message, Sync}; +use crate::postgres::protocol::{self, DataRow, Message}; use std::io; impl<'a> Prepare<'a> { @@ -11,8 +11,9 @@ impl<'a> Prepare<'a> { &[], ); - self.connection.send(Execute::new("", 0)); - self.connection.send(Sync); + protocol::execute(&mut self.connection.wbuf, "", 0); + protocol::sync(&mut self.connection.wbuf); + self.connection.flush().await?; let mut row: Option = None; diff --git a/src/postgres/connection/select.rs b/src/postgres/connection/select.rs index d0c08ae5..9e41d2a9 100644 --- a/src/postgres/connection/select.rs +++ b/src/postgres/connection/select.rs @@ -1,5 +1,5 @@ use super::prepare::Prepare; -use crate::postgres::protocol::{self, DataRow, Execute, Message, Sync}; +use crate::postgres::protocol::{self, DataRow, Message}; use futures::{stream, Stream}; use std::io; @@ -12,8 +12,8 @@ impl<'a> Prepare<'a> { &[], ); - self.connection.send(Execute::new("", 0)); - self.connection.send(Sync); + protocol::execute(&mut self.connection.wbuf, "", 0); + protocol::sync(&mut self.connection.wbuf); // FIXME: Manually implement Stream on a new type to avoid the unfold adapter stream::unfold(self.connection, |conn| { diff --git a/src/postgres/protocol/describe.rs b/src/postgres/protocol/describe.rs index 9d8509d6..69cd7516 100644 --- a/src/postgres/protocol/describe.rs +++ b/src/postgres/protocol/describe.rs @@ -1,4 +1,3 @@ - /// The Describe message (portal variant) specifies the name of an existing portal /// (or an empty string for the unnamed portal). The response is a RowDescription message /// describing the rows that will be returned by executing the portal; or a NoData message diff --git a/src/postgres/protocol/execute.rs b/src/postgres/protocol/execute.rs index d950a2d8..e75d4f99 100644 --- a/src/postgres/protocol/execute.rs +++ b/src/postgres/protocol/execute.rs @@ -1,32 +1,17 @@ -use super::Encode; -use std::io; +/// Specifies the portal name (empty string denotes the unnamed portal) and a maximum +/// result-row count (zero meaning “fetch all rows”). The result-row count is only meaningful +/// for portals containing commands that return row sets; in other cases the command is +/// always executed to completion, and the row count is ignored. +pub fn execute(buf: &mut Vec, portal: &str, limit: i32) { + buf.push(b'E'); -#[derive(Debug)] -pub struct Execute<'a> { - portal: &'a str, - limit: i32, -} - -impl<'a> Execute<'a> { - pub fn new(portal: &'a str, limit: i32) -> Self { - Self { portal, limit } - } -} - -impl<'a> Encode for Execute<'a> { - fn encode(&self, buf: &mut Vec) -> io::Result<()> { - buf.push(b'E'); - - let len = 4 + self.portal.len() + 1 + 4; - buf.extend_from_slice(&(len as i32).to_be_bytes()); - - // portal - buf.extend_from_slice(self.portal.as_bytes()); - buf.push(b'\0'); - - // limit - buf.extend_from_slice(&self.limit.to_be_bytes()); - - Ok(()) - } + let len = 4 + portal.len() + 1 + 4; + buf.extend_from_slice(&(len as i32).to_be_bytes()); + + // portal + buf.extend_from_slice(portal.as_bytes()); + buf.push(b'\0'); + + // limit + buf.extend_from_slice(&limit.to_be_bytes()); } diff --git a/src/postgres/protocol/mod.rs b/src/postgres/protocol/mod.rs index dd6d5b34..a1af23b4 100644 --- a/src/postgres/protocol/mod.rs +++ b/src/postgres/protocol/mod.rs @@ -1,11 +1,8 @@ -// Unsorted - mod backend_key_data; mod command_complete; mod data_row; mod decode; mod encode; -mod execute; mod message; mod notification_response; mod parameter_description; @@ -17,15 +14,15 @@ mod ready_for_query; mod response; mod row_description; mod startup_message; -mod sync; mod terminate; -// Front-end - pub mod bind; pub mod describe; -// Back-end +mod execute; +mod sync; + +pub use self::{execute::execute, sync::sync}; mod authentication; @@ -36,7 +33,6 @@ pub use self::{ data_row::DataRow, decode::Decode, encode::Encode, - execute::Execute, message::Message, notification_response::NotificationResponse, parameter_description::ParameterDescription, @@ -48,6 +44,5 @@ pub use self::{ response::{Response, Severity}, row_description::{FieldDescription, FieldDescriptions, RowDescription}, startup_message::StartupMessage, - sync::Sync, terminate::Terminate, }; diff --git a/src/postgres/protocol/sync.rs b/src/postgres/protocol/sync.rs index 1986c1e2..f3d06e8a 100644 --- a/src/postgres/protocol/sync.rs +++ b/src/postgres/protocol/sync.rs @@ -1,14 +1,7 @@ -use super::Encode; -use std::io; - -#[derive(Debug)] -pub struct Sync; - -impl Encode for Sync { - fn encode(&self, buf: &mut Vec) -> io::Result<()> { - buf.push(b'S'); - buf.extend_from_slice(&4_i32.to_be_bytes()); - - Ok(()) - } +/// This parameterless message causes the backend to close the current transaction if it's not inside +/// a BEGIN/COMMIT transaction block (“close” meaning to commit if no error, or roll back if error). +/// Then a ReadyForQuery response is issued. +pub fn sync(buf: &mut Vec) { + buf.push(b'S'); + buf.extend_from_slice(&4_i32.to_be_bytes()); }