From a49191fd46b7c9353da1f5490e671025f53d5272 Mon Sep 17 00:00:00 2001 From: Zachery Gyurkovitz Date: Thu, 25 Jul 2019 11:54:36 -0700 Subject: [PATCH] Change `get_results` to produce a Stream --- Cargo.toml | 1 + sqlx-postgres/src/connection/prepare.rs | 67 ++++++++++++++++--------- src/main.rs | 9 ++++ 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 834462400..4ba462a71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ sqlx-core = { path = "sqlx-core" } sqlx-postgres = { path = "sqlx-postgres" } env_logger = "0.6.2" bytes = "0.4.12" +futures-preview = "=0.3.0-alpha.17" [profile.bench] lto = true diff --git a/sqlx-postgres/src/connection/prepare.rs b/sqlx-postgres/src/connection/prepare.rs index 2f7fec873..3c36ccd4d 100644 --- a/sqlx-postgres/src/connection/prepare.rs +++ b/sqlx-postgres/src/connection/prepare.rs @@ -1,4 +1,5 @@ use super::Connection; +use futures::{stream, Stream}; use sqlx_postgres_protocol::{self as proto, DataRow, Execute, Message, Parse, Sync}; use std::io; @@ -128,7 +129,7 @@ impl<'a> Prepare<'a> { } #[inline] - pub async fn get_results(self) -> io::Result> { + pub fn get_results(self) -> impl Stream> + 'a + Unpin { proto::bind::trailer( &mut self.connection.wbuf, self.bind_state, @@ -138,34 +139,50 @@ impl<'a> Prepare<'a> { self.connection.send(Execute::new("", 0)); self.connection.send(Sync); - self.connection.flush().await?; - let mut rows = vec![]; + stream::unfold(self.connection, Self::unfold_func) + } - while let Some(message) = self.connection.receive().await? { - match message { - Message::BindComplete | Message::ParseComplete => { - // Indicates successful completion of a phase - } - - Message::DataRow(row) => { - rows.push(row); - } - - Message::CommandComplete(_) => {} - - Message::ReadyForQuery(_) => { - // Successful completion of the whole cycle - return Ok(rows); - } - - message => { - unimplemented!("received {:?} unimplemented message", message); + fn unfold_func( + conn: &mut Connection, + ) -> impl std::future::Future, &mut Connection)>> + { + Box::pin(async { + if !conn.wbuf.is_empty() { + if let Err(e) = conn.flush().await { + return Some((Err(e), conn)); } } - } - // FIXME: This is an end-of-file error. How we should bubble this up here? - unreachable!() + loop { + let message = match conn.receive().await { + Ok(Some(message)) => message, + // FIXME: This is an end-of-file error. How we should bubble this up here? + Ok(None) => unreachable!(), + Err(e) => return Some((Err(e), conn)), + }; + + match message { + Message::BindComplete | Message::ParseComplete => { + // Indicates successful completion of a phase + } + + Message::DataRow(row) => { + break Some((Ok(row), conn)); + } + + Message::CommandComplete(_) => {} + + Message::ReadyForQuery(_) => { + // Successful completion of the whole cycle + break None; + } + + message => { + unimplemented!("received {:?} unimplemented message", message); + } + } + } + }) } } diff --git a/src/main.rs b/src/main.rs index a828905ef..95ffa41aa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![feature(async_await)] +use futures::TryStreamExt; use sqlx::{pg::Connection, ConnectOptions}; use std::io; @@ -39,6 +40,14 @@ CREATE TABLE IF NOT EXISTS users ( println!("row_id: {:?}", row_id); + let mut row_ids = conn.prepare("SELECT id FROM users").get_results(); + + while let Some(row_id) = row_ids.try_next().await? { + println!("row_ids: {:?}", row_id); + } + + std::mem::drop(row_ids); + let count = conn.prepare("SELECT name FROM users").execute().await?; println!("users: {}", count);