Clean up connection methods a bit

This commit is contained in:
Ryan Leckey
2019-07-25 23:34:39 -07:00
parent 6a92b36074
commit 0703a1b91a
6 changed files with 174 additions and 167 deletions

View File

@@ -0,0 +1,48 @@
use super::prepare::Prepare;
use sqlx_postgres_protocol::{self as proto, Execute, Message, Sync};
use std::io;
impl<'a> Prepare<'a> {
pub async fn execute(self) -> io::Result<u64> {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
self.connection.flush().await?;
let mut rows = 0;
while let Some(message) = self.connection.receive().await? {
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(_) => {
// This is EXECUTE so we are ignoring any potential results
}
Message::CommandComplete(body) => {
rows = body.rows();
}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
return Ok(rows);
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
// FIXME: This is an end-of-file error. How we should bubble this up here?
unreachable!()
}
}

View File

@@ -0,0 +1,49 @@
use super::prepare::Prepare;
use sqlx_postgres_protocol::{self as proto, DataRow, Execute, Message, Sync};
use std::io;
impl<'a> Prepare<'a> {
pub async fn get(self) -> io::Result<Option<DataRow>> {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
self.connection.flush().await?;
let mut row: Option<DataRow> = None;
while let Some(message) = self.connection.receive().await? {
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(data_row) => {
// we only care about the first result.
if row.is_none() {
row = Some(data_row);
}
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
return Ok(row);
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
// FIXME: This is an end-of-file error. How we should bubble this up here?
unreachable!()
}
}

View File

@@ -11,7 +11,10 @@ use sqlx_postgres_protocol::{Encode, Message, Terminate};
use std::{fmt::Debug, io, pin::Pin};
mod establish;
mod execute;
mod get;
mod prepare;
mod select;
pub struct Connection {
pub(super) stream: TcpStream,

View File

@@ -1,12 +1,10 @@
use super::Connection;
use futures::{stream, Stream};
use sqlx_postgres_protocol::{self as proto, DataRow, Execute, Message, Parse, Sync};
use std::io;
use sqlx_postgres_protocol::{self as proto, Parse};
pub struct Prepare<'a> {
connection: &'a mut Connection,
bind_state: (usize, usize),
bind_values: usize,
pub(super) connection: &'a mut Connection,
pub(super) bind_state: (usize, usize),
pub(super) bind_values: usize,
}
#[inline]
@@ -38,151 +36,4 @@ impl<'a> Prepare<'a> {
self.bind_values += 1;
self
}
#[inline]
pub async fn execute(self) -> io::Result<u64> {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
self.connection.flush().await?;
let mut rows = 0;
while let Some(message) = self.connection.receive().await? {
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(_) => {
// This is EXECUTE so we are ignoring any potential results
}
Message::CommandComplete(body) => {
rows = body.rows();
}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
return Ok(rows);
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
// FIXME: This is an end-of-file error. How we should bubble this up here?
unreachable!()
}
#[inline]
pub async fn get_result(self) -> io::Result<Option<DataRow>> {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
self.connection.flush().await?;
let mut row: Option<DataRow> = None;
while let Some(message) = self.connection.receive().await? {
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(data_row) => {
// we only care about the first result.
if row.is_none() {
row = Some(data_row);
}
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
return Ok(row);
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
// FIXME: This is an end-of-file error. How we should bubble this up here?
unreachable!()
}
#[inline]
pub fn get_results(self) -> impl Stream<Item = Result<DataRow, io::Error>> + 'a + Unpin {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
stream::unfold(self.connection, Self::unfold_func)
}
fn unfold_func(
conn: &mut Connection,
) -> impl std::future::Future<Output = Option<(Result<DataRow, io::Error>, &mut Connection)>>
{
Box::pin(async {
if !conn.wbuf.is_empty() {
if let Err(e) = conn.flush().await {
return Some((Err(e), conn));
}
}
loop {
let message = match conn.receive().await {
Ok(Some(message)) => message,
// FIXME: This is an end-of-file error. How we should bubble this up here?
Ok(None) => unreachable!(),
Err(e) => return Some((Err(e), conn)),
};
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(row) => {
break Some((Ok(row), conn));
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
break None;
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
})
}
}

View File

@@ -0,0 +1,59 @@
use super::prepare::Prepare;
use futures::{stream, Stream};
use sqlx_postgres_protocol::{self as proto, DataRow, Execute, Message, Sync};
use std::io;
impl<'a> Prepare<'a> {
pub fn select(self) -> impl Stream<Item = Result<DataRow, io::Error>> + 'a + Unpin {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
// FIXME: Manually implement Stream on a new type to avoid the unfold adapter
stream::unfold(self.connection, |conn| {
Box::pin(async {
if !conn.wbuf.is_empty() {
if let Err(e) = conn.flush().await {
return Some((Err(e), conn));
}
}
loop {
let message = match conn.receive().await {
Ok(Some(message)) => message,
// FIXME: This is an end-of-file error. How we should bubble this up here?
Ok(None) => unreachable!(),
Err(e) => return Some((Err(e), conn)),
};
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(row) => {
break Some((Ok(row), conn));
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
break None;
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
})
})
}
}