postgres: add support for simple queries (that do not return results)

This commit is contained in:
Ryan Leckey 2020-02-21 17:24:59 -08:00
parent 7404708bab
commit 55ffd989e1
2 changed files with 36 additions and 35 deletions

View File

@ -43,11 +43,8 @@ impl<'c, 'q> PgCursor<'c, 'q> {
'c: 'a,
{
try_stream! {
loop {
let row = next(&mut self).await?;
if let Some(row) = row {
yield f(row);
}
while let Some(row) = next(&mut self).await? {
yield f(row);
}
}
}
@ -151,32 +148,34 @@ async fn write(
query: &str,
arguments: Option<PgArguments>,
) -> crate::Result<()> {
// TODO: Handle [arguments] being None. This should be a SIMPLE query.
let arguments = arguments.expect("simple queries not implemented yet");
if let Some(arguments) = arguments {
// Check the statement cache for a statement ID that matches the given query
// If it doesn't exist, we generate a new statement ID and write out [Parse] to the
// connection command buffer
let statement = conn.write_prepare(query, &arguments);
// Check the statement cache for a statement ID that matches the given query
// If it doesn't exist, we generate a new statement ID and write out [Parse] to the
// connection command buffer
let statement = conn.write_prepare(query, &arguments);
// Next, [Bind] attaches the arguments to the statement and creates a named portal
conn.write_bind("", statement, &arguments);
// Next, [Bind] attaches the arguments to the statement and creates a named portal
conn.write_bind("", statement, &arguments);
// Next, [Describe] will return the expected result columns and types
// Conditionally run [Describe] only if the results have not been cached
// if !self.statement_cache.has_columns(statement) {
// self.write_describe(protocol::Describe::Portal(""));
// }
// Next, [Describe] will return the expected result columns and types
// Conditionally run [Describe] only if the results have not been cached
// if !self.statement_cache.has_columns(statement) {
// self.write_describe(protocol::Describe::Portal(""));
// }
// Next, [Execute] then executes the named portal
conn.write_execute("", 0);
// Next, [Execute] then executes the named portal
conn.write_execute("", 0);
// Finally, [Sync] asks postgres to process the messages that we sent and respond with
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
// termed batching might suit this.
conn.write_sync();
// Finally, [Sync] asks postgres to process the messages that we sent and respond with
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
// termed batching might suit this.
conn.write_sync();
} else {
// https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.4
conn.write_simple_query(query);
}
conn.wait_until_ready().await?;
@ -199,11 +198,6 @@ async fn resolve(
}
async fn affected_rows(mut conn: MaybeOwnedConnection<'_, PgConnection>) -> crate::Result<u64> {
conn.wait_until_ready().await?;
conn.stream.flush().await?;
conn.is_ready = false;
let mut rows = 0;
loop {
@ -223,11 +217,14 @@ async fn affected_rows(mut conn: MaybeOwnedConnection<'_, PgConnection>) -> crat
Message::ReadyForQuery => {
// done
conn.is_ready = true;
break;
}
message => {
return Err(protocol_err!("unexpected message: {:?}", message).into());
return Err(
protocol_err!("affected_rows: unexpected message: {:?}", message).into(),
);
}
}
}
@ -280,7 +277,7 @@ async fn next<'a, 'c: 'a, 'q: 'a>(
}
message => {
return Err(protocol_err!("unexpected message: {:?}", message).into());
return Err(protocol_err!("next: unexpected message: {:?}", message).into());
}
}
}
@ -328,7 +325,7 @@ async fn first<'c, 'q>(mut cursor: PgCursor<'c, 'q>) -> crate::Result<Option<PgR
}
message => {
return Err(protocol_err!("unexpected message: {:?}", message).into());
return Err(protocol_err!("first: unexpected message: {:?}", message).into());
}
}
}

View File

@ -8,6 +8,10 @@ use crate::postgres::protocol::{self, Encode, StatementId, TypeFormat};
use crate::postgres::{PgArguments, PgConnection, PgCursor, PgRow, PgTypeInfo, Postgres};
impl PgConnection {
pub(crate) fn write_simple_query(&mut self, query: &str) {
self.stream.write(protocol::Query(query));
}
pub(crate) fn write_prepare(&mut self, query: &str, args: &PgArguments) -> StatementId {
// TODO: check query cache