Change get_results to produce a Stream

This commit is contained in:
Zachery Gyurkovitz
2019-07-25 11:54:36 -07:00
parent 2d9442d439
commit a49191fd46
3 changed files with 52 additions and 25 deletions

View File

@@ -1,4 +1,5 @@
use super::Connection;
use futures::{stream, Stream};
use sqlx_postgres_protocol::{self as proto, DataRow, Execute, Message, Parse, Sync};
use std::io;
@@ -128,7 +129,7 @@ impl<'a> Prepare<'a> {
}
#[inline]
pub async fn get_results(self) -> io::Result<Vec<DataRow>> {
pub fn get_results(self) -> impl Stream<Item = Result<DataRow, io::Error>> + 'a + Unpin {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
@@ -138,34 +139,50 @@ impl<'a> Prepare<'a> {
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
self.connection.flush().await?;
let mut rows = vec![];
stream::unfold(self.connection, Self::unfold_func)
}
while let Some(message) = self.connection.receive().await? {
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(row) => {
rows.push(row);
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
return Ok(rows);
}
message => {
unimplemented!("received {:?} unimplemented message", message);
fn unfold_func(
conn: &mut Connection,
) -> impl std::future::Future<Output = Option<(Result<DataRow, io::Error>, &mut Connection)>>
{
Box::pin(async {
if !conn.wbuf.is_empty() {
if let Err(e) = conn.flush().await {
return Some((Err(e), conn));
}
}
}
// FIXME: This is an end-of-file error. How we should bubble this up here?
unreachable!()
loop {
let message = match conn.receive().await {
Ok(Some(message)) => message,
// FIXME: This is an end-of-file error. How we should bubble this up here?
Ok(None) => unreachable!(),
Err(e) => return Some((Err(e), conn)),
};
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(row) => {
break Some((Ok(row), conn));
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
break None;
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
})
}
}