From e8ecbeda36809976aaf437f0f0204acef3cffdee Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Sat, 29 Jun 2019 21:57:04 -0700 Subject: [PATCH] WIP Query --- sqlx-postgres-protocol/src/lib.rs | 2 + sqlx-postgres-protocol/src/query.rs | 46 +++++++++++++++++++++++ sqlx-postgres-protocol/src/terminate.rs | 21 +++++++++++ sqlx-postgres/src/connection/establish.rs | 1 - sqlx-postgres/src/connection/mod.rs | 11 +++--- sqlx-postgres/src/connection/query.rs | 18 ++++----- src/main.rs | 4 +- 7 files changed, 84 insertions(+), 19 deletions(-) create mode 100644 sqlx-postgres-protocol/src/query.rs diff --git a/sqlx-postgres-protocol/src/lib.rs b/sqlx-postgres-protocol/src/lib.rs index 008393e6..001c3aaf 100644 --- a/sqlx-postgres-protocol/src/lib.rs +++ b/sqlx-postgres-protocol/src/lib.rs @@ -7,6 +7,7 @@ mod encode; mod message; mod parameter_status; mod password_message; +mod query; mod ready_for_query; mod response; mod startup_message; @@ -20,6 +21,7 @@ pub use self::{ message::Message, parameter_status::ParameterStatus, password_message::PasswordMessage, + query::Query, ready_for_query::{ReadyForQuery, TransactionStatus}, response::{Response, ResponseBuilder, Severity}, startup_message::StartupMessage, diff --git a/sqlx-postgres-protocol/src/query.rs b/sqlx-postgres-protocol/src/query.rs new file mode 100644 index 00000000..fec61516 --- /dev/null +++ b/sqlx-postgres-protocol/src/query.rs @@ -0,0 +1,46 @@ +use crate::Encode; +use bytes::BufMut; +use std::io; + +#[derive(Debug)] +pub struct Query<'a>(&'a str); + +impl<'a> Query<'a> { + pub fn new(query: &'a str) -> Self { + Self(query) + } +} + +impl Encode for Query<'_> { + fn encode(&self, buf: &mut Vec) -> io::Result<()> { + let len = self.0.len() + 4 + 1; + buf.reserve(len + 1); + buf.put_u8(b'Q'); + buf.put_u32_be(len as u32); + buf.put(self.0); + buf.put_u8(0); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::Query; + use crate::Encode; + use std::io; + + const QUERY_SELECT_1: &[u8] = b"Q\0\0\0\rSELECT 1\0"; + + #[test] + fn it_encodes_query() -> io::Result<()> { + let message = Query::new("SELECT 1"); + + let mut buf = Vec::new(); + message.encode(&mut buf)?; + + assert_eq!(&*buf, QUERY_SELECT_1); + + Ok(()) + } +} diff --git a/sqlx-postgres-protocol/src/terminate.rs b/sqlx-postgres-protocol/src/terminate.rs index 62733d7d..6b624262 100644 --- a/sqlx-postgres-protocol/src/terminate.rs +++ b/sqlx-postgres-protocol/src/terminate.rs @@ -14,3 +14,24 @@ impl Encode for Terminate { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::Terminate; + use crate::Encode; + use std::io; + + const TERMINATE: &[u8] = b"X\0\0\0\x04"; + + #[test] + fn it_encodes_terminate() -> io::Result<()> { + let message = Terminate; + + let mut buf = Vec::new(); + message.encode(&mut buf)?; + + assert_eq!(&*buf, TERMINATE); + + Ok(()) + } +} diff --git a/sqlx-postgres/src/connection/establish.rs b/sqlx-postgres/src/connection/establish.rs index 187b0df4..82475f13 100644 --- a/sqlx-postgres/src/connection/establish.rs +++ b/sqlx-postgres/src/connection/establish.rs @@ -38,7 +38,6 @@ pub async fn establish<'a, 'b: 'a>( conn.send(message).await?; - // FIXME: This feels like it could be reduced (see other connection flows) while let Some(message) = conn.stream.next().await { match message? { Message::Authentication(Authentication::Ok) => { diff --git a/sqlx-postgres/src/connection/mod.rs b/sqlx-postgres/src/connection/mod.rs index 9faa6415..05c79463 100644 --- a/sqlx-postgres/src/connection/mod.rs +++ b/sqlx-postgres/src/connection/mod.rs @@ -4,14 +4,13 @@ use futures::{ task::{Context, Poll}, Stream, }; -use std::fmt::Debug; use runtime::net::TcpStream; use sqlx_core::ConnectOptions; use sqlx_postgres_protocol::{Encode, Message, Terminate}; -use std::{io, pin::Pin}; +use std::{fmt::Debug, io, pin::Pin}; mod establish; -// mod query; +mod query; pub struct Connection { stream: Framed, @@ -42,9 +41,9 @@ impl Connection { Ok(conn) } - // pub async fn execute<'a, 'b: 'a>(&'a mut self, query: &'b str) -> io::Result<()> { - // query::query(self, query).await - // } + pub async fn execute<'a: 'b, 'b>(&'a mut self, query: &'b str) -> io::Result<()> { + query::query(self, query).await + } pub async fn close(mut self) -> io::Result<()> { self.send(Terminate).await?; diff --git a/sqlx-postgres/src/connection/query.rs b/sqlx-postgres/src/connection/query.rs index b81f1785..85ccba01 100644 --- a/sqlx-postgres/src/connection/query.rs +++ b/sqlx-postgres/src/connection/query.rs @@ -1,22 +1,18 @@ use super::Connection; -use crate::protocol::{client::Query, server::Message as ServerMessage}; use futures::StreamExt; +use sqlx_postgres_protocol::{Message, Query}; use std::io; -pub async fn query<'a, 'b: 'a>(conn: &'a mut Connection, query: &'a str) -> io::Result<()> { - conn.send(Query { query }).await?; +pub async fn query<'a: 'b, 'b>(conn: &'a mut Connection, query: &'b str) -> io::Result<()> { + conn.send(Query::new(query)).await?; - // FIXME: This feels like it could be reduced (see other connection flows) - while let Some(message) = conn.incoming.next().await { - match message { - ServerMessage::ReadyForQuery(_) => { + while let Some(message) = conn.stream.next().await { + match message? { + Message::ReadyForQuery(_) => { break; } - ServerMessage::CommandComplete(body) => { - } - - _ => { + message => { unimplemented!("received {:?} unimplemented message", message); } } diff --git a/src/main.rs b/src/main.rs index 7a8cc7fe..58c021df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use std::io; async fn main() -> io::Result<()> { env_logger::init(); - let conn = Connection::establish( + let mut conn = Connection::establish( ConnectOptions::new() .host("127.0.0.1") .port(5432) @@ -16,6 +16,8 @@ async fn main() -> io::Result<()> { ) .await?; + conn.execute("SELECT 1").await?; + conn.close().await?; Ok(())