From d4aa3dfa5e39992d6dd7e94ba8e75a3cbb1c6dd4 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Sat, 20 Mar 2021 08:28:15 -0700 Subject: [PATCH] feat(postgres): add more frontend protocol messages --- Cargo.lock | 7 ++++ sqlx-postgres/Cargo.toml | 1 + sqlx-postgres/src/protocol/frontend.rs | 22 ++++++++++ sqlx-postgres/src/protocol/frontend/bind.rs | 41 +++++++++++++++++++ sqlx-postgres/src/protocol/frontend/close.rs | 16 ++++++++ .../src/protocol/frontend/describe.rs | 16 ++++++++ .../src/protocol/frontend/execute.rs | 25 +++++++++++ sqlx-postgres/src/protocol/frontend/flush.rs | 13 ++++++ sqlx-postgres/src/protocol/frontend/parse.rs | 37 +++++++++++++++++ sqlx-postgres/src/protocol/frontend/portal.rs | 22 ++++++++++ sqlx-postgres/src/protocol/frontend/query.rs | 21 ++++++++++ .../src/protocol/frontend/statement.rs | 22 ++++++++++ sqlx-postgres/src/protocol/frontend/sync.rs | 13 ++++++ sqlx-postgres/src/protocol/frontend/target.rs | 32 +++++++++++++++ 14 files changed, 288 insertions(+) create mode 100644 sqlx-postgres/src/protocol/frontend/bind.rs create mode 100644 sqlx-postgres/src/protocol/frontend/close.rs create mode 100644 sqlx-postgres/src/protocol/frontend/describe.rs create mode 100644 sqlx-postgres/src/protocol/frontend/execute.rs create mode 100644 sqlx-postgres/src/protocol/frontend/flush.rs create mode 100644 sqlx-postgres/src/protocol/frontend/parse.rs create mode 100644 sqlx-postgres/src/protocol/frontend/portal.rs create mode 100644 sqlx-postgres/src/protocol/frontend/query.rs create mode 100644 sqlx-postgres/src/protocol/frontend/statement.rs create mode 100644 sqlx-postgres/src/protocol/frontend/sync.rs create mode 100644 sqlx-postgres/src/protocol/frontend/target.rs diff --git a/Cargo.lock b/Cargo.lock index 1395cf897..dab07ae59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" + [[package]] name = "js-sys" version = "0.3.48" @@ -1150,6 +1156,7 @@ dependencies = [ "futures-io", "futures-util", "hex", + "itoa", "log", "md-5", "memchr", diff --git a/sqlx-postgres/Cargo.toml b/sqlx-postgres/Cargo.toml index f20534df2..b6ca43c79 100644 --- a/sqlx-postgres/Cargo.toml +++ b/sqlx-postgres/Cargo.toml @@ -41,6 +41,7 @@ memchr = "2.3" bitflags = "1.2" base64 = "0.13.0" md-5 = "0.9.1" +itoa = "0.4.7" [dev-dependencies] sqlx-core = { version = "0.6.0-pre", path = "../sqlx-core", features = ["_mock"] } diff --git a/sqlx-postgres/src/protocol/frontend.rs b/sqlx-postgres/src/protocol/frontend.rs index dd0cdbf1a..3fd2e94be 100644 --- a/sqlx-postgres/src/protocol/frontend.rs +++ b/sqlx-postgres/src/protocol/frontend.rs @@ -1,7 +1,29 @@ +mod bind; +mod close; +mod describe; +mod execute; +mod flush; +mod parse; mod password; +mod portal; +mod query; mod startup; +mod statement; +mod sync; +mod target; mod terminate; +pub(crate) use bind::Bind; +pub(crate) use close::Close; +pub(crate) use describe::Describe; +pub(crate) use execute::Execute; +pub(crate) use flush::Flush; +pub(crate) use parse::Parse; pub(crate) use password::{Password, PasswordMd5}; +pub(crate) use portal::PortalRef; +pub(crate) use query::Query; pub(crate) use startup::Startup; +pub(crate) use statement::StatementRef; +pub(crate) use sync::Sync; +pub(crate) use target::Target; pub(crate) use terminate::Terminate; diff --git a/sqlx-postgres/src/protocol/frontend/bind.rs b/sqlx-postgres/src/protocol/frontend/bind.rs new file mode 100644 index 000000000..05d8c1192 --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/bind.rs @@ -0,0 +1,41 @@ +use crate::io::PgWriteExt; +use crate::protocol::frontend::{PortalRef, StatementRef}; +use crate::PgArguments; +use sqlx_core::io::{Serialize, WriteExt}; +use sqlx_core::Result; + +pub(crate) struct Bind<'a> { + pub(crate) portal: PortalRef, + pub(crate) statement: StatementRef, + pub(crate) arguments: &'a PgArguments<'a>, +} + +impl Serialize<'_> for Bind<'_> { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.push(b'B'); + buf.write_len_prefixed(|buf| { + self.portal.serialize(buf)?; + self.statement.serialize(buf)?; + + // the parameter format codes, each must presently be zero (text) or one (binary) + // can use one to indicate that all parameters use that format + write_i16_arr(buf, &[1]); + + todo!("arguments"); + + // the result format codes, each must presently be zero (text) or one (binary) + // can use one to indicate that all results use that format + write_i16_arr(buf, &[1]); + + Ok(()) + }) + } +} + +fn write_i16_arr(buf: &mut Vec, arr: &[i16]) { + buf.extend(&(arr.len() as i16).to_be_bytes()); + + for val in arr { + buf.extend(&val.to_be_bytes()); + } +} diff --git a/sqlx-postgres/src/protocol/frontend/close.rs b/sqlx-postgres/src/protocol/frontend/close.rs new file mode 100644 index 000000000..ba7144e76 --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/close.rs @@ -0,0 +1,16 @@ +use crate::io::PgWriteExt; +use crate::protocol::frontend::Target; +use sqlx_core::io::{Serialize, WriteExt}; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) struct Close { + target: Target, +} + +impl Serialize<'_> for Close { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.push(b'C'); + buf.write_len_prefixed(|buf| self.target.serialize(buf)) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/describe.rs b/sqlx-postgres/src/protocol/frontend/describe.rs new file mode 100644 index 000000000..7eb84ddf5 --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/describe.rs @@ -0,0 +1,16 @@ +use crate::io::PgWriteExt; +use crate::protocol::frontend::Target; +use sqlx_core::io::{Serialize, WriteExt}; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) struct Describe { + target: Target, +} + +impl Serialize<'_> for Describe { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.push(b'D'); + buf.write_len_prefixed(|buf| self.target.serialize(buf)) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/execute.rs b/sqlx-postgres/src/protocol/frontend/execute.rs new file mode 100644 index 000000000..8934d9efc --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/execute.rs @@ -0,0 +1,25 @@ +use crate::io::PgWriteExt; +use crate::protocol::frontend::PortalRef; +use sqlx_core::io::Serialize; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) struct Execute { + pub(crate) portal: PortalRef, + + /// Maximum number of rows to return, if portal contains a query + /// that returns rows (ignored otherwise). Zero denotes “no limit”. + pub(crate) max_rows: u32, +} + +impl Serialize<'_> for Execute { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.push(b'E'); + buf.write_len_prefixed(|buf| { + self.portal.serialize(buf)?; + buf.extend(&self.max_rows.to_be_bytes()); + + Ok(()) + }) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/flush.rs b/sqlx-postgres/src/protocol/frontend/flush.rs new file mode 100644 index 000000000..e90972af2 --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/flush.rs @@ -0,0 +1,13 @@ +use sqlx_core::io::Serialize; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) struct Flush; + +impl Serialize<'_> for Flush { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.push(b'H'); + + Ok(()) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/parse.rs b/sqlx-postgres/src/protocol/frontend/parse.rs new file mode 100644 index 000000000..e9bf43a7f --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/parse.rs @@ -0,0 +1,37 @@ +use crate::io::PgWriteExt; +use crate::protocol::frontend::{PortalRef, StatementRef}; +use sqlx_core::io::{Serialize, WriteExt}; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) struct Parse<'a> { + pub(crate) statement: StatementRef, + pub(crate) sql: &'a str, + + /// The parameter data types specified (could be zero). Note that this is not an + /// indication of the number of parameters that might appear in the query string, + /// only the number that the frontend wants to pre-specify types for. + pub(crate) parameters: &'a [u32], +} + +impl Serialize<'_> for Parse<'_> { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.push(b'P'); + buf.write_len_prefixed(|buf| { + self.statement.serialize(buf)?; + + buf.write_str_nul(self.sql); + + // TODO: return a proper error + assert!(!(self.parameters.len() >= (u16::MAX as usize))); + + buf.extend(&(self.parameters.len() as u16).to_be_bytes()); + + for &oid in self.parameters { + buf.extend(&oid.to_be_bytes()); + } + + Ok(()) + }) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/portal.rs b/sqlx-postgres/src/protocol/frontend/portal.rs new file mode 100644 index 000000000..eef174fc3 --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/portal.rs @@ -0,0 +1,22 @@ +use sqlx_core::io::Serialize; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) enum PortalRef { + Unnamed, + Named(u32), +} + +impl Serialize<'_> for PortalRef { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + if let PortalRef::Named(id) = self { + buf.extend_from_slice(b"_sqlx_p_"); + + itoa::write(&mut *buf, *id).unwrap(); + } + + buf.push(b'\0'); + + Ok(()) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/query.rs b/sqlx-postgres/src/protocol/frontend/query.rs new file mode 100644 index 000000000..0c19e22b0 --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/query.rs @@ -0,0 +1,21 @@ +use crate::io::PgWriteExt; +use sqlx_core::io::Serialize; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) struct Query<'a>(pub(crate) &'a str); + +impl Serialize<'_> for Query<'_> { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.reserve(1 + self.0.len() + 1 + 4); + + buf.push(b'Q'); + + buf.write_len_prefixed(|buf| { + buf.extend_from_slice(self.0.as_bytes()); + buf.push(0); + + Ok(()) + }) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/statement.rs b/sqlx-postgres/src/protocol/frontend/statement.rs new file mode 100644 index 000000000..9c134027a --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/statement.rs @@ -0,0 +1,22 @@ +use sqlx_core::io::Serialize; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) enum StatementRef { + Unnamed, + Named(u32), +} + +impl Serialize<'_> for StatementRef { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + if let StatementRef::Named(id) = self { + buf.extend_from_slice(b"_sqlx_s_"); + + itoa::write(&mut *buf, *id).unwrap(); + } + + buf.push(b'\0'); + + Ok(()) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/sync.rs b/sqlx-postgres/src/protocol/frontend/sync.rs new file mode 100644 index 000000000..57724c75e --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/sync.rs @@ -0,0 +1,13 @@ +use sqlx_core::io::Serialize; +use sqlx_core::Result; + +#[derive(Debug)] +pub(crate) struct Sync; + +impl Serialize<'_> for Sync { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.push(b'S'); + + Ok(()) + } +} diff --git a/sqlx-postgres/src/protocol/frontend/target.rs b/sqlx-postgres/src/protocol/frontend/target.rs new file mode 100644 index 000000000..226e91b95 --- /dev/null +++ b/sqlx-postgres/src/protocol/frontend/target.rs @@ -0,0 +1,32 @@ +use crate::io::PgWriteExt; +use crate::protocol::frontend::{PortalRef, StatementRef}; +use sqlx_core::io::Serialize; +use sqlx_core::Result; + +/// Target a command at a portal *or* statement. +/// Used by [`Describe`] and [`Close`]. +#[derive(Debug)] +pub(crate) enum Target { + Portal(PortalRef), + Statement(StatementRef), +} + +impl Serialize<'_> for Target { + fn serialize_with(&self, buf: &mut Vec, _: ()) -> Result<()> { + buf.write_len_prefixed(|buf| { + match self { + Self::Portal(portal) => { + buf.push(b'P'); + portal.serialize(buf); + } + + Self::Statement(statement) => { + buf.push(b'S'); + statement.serialize(buf); + } + } + + Ok(()) + }) + } +}