add Cursor and rewrite Executor/Query over it

* this breaks a lot internally as-is
 * mysql needs a restructure
This commit is contained in:
Ryan Leckey
2020-02-19 02:18:44 -08:00
parent bb17ebfbbd
commit ea1a4fb042
17 changed files with 407 additions and 763 deletions

View File

@@ -16,7 +16,7 @@ use crate::postgres::protocol::{
};
use crate::postgres::PgError;
use crate::url::Url;
use crate::Result;
use crate::{Postgres, Result};
/// An asynchronous connection to a [Postgres][super::Postgres] database.
///
@@ -397,17 +397,6 @@ impl PgConnection {
}
}
impl PgConnection {
#[deprecated(note = "please use 'connect' instead")]
pub fn open<T>(url: T) -> BoxFuture<'static, Result<Self>>
where
T: TryInto<Url, Error = crate::Error>,
Self: Sized,
{
Box::pin(PgConnection::establish(url.try_into()))
}
}
impl Connect for PgConnection {
fn connect<T>(url: T) -> BoxFuture<'static, Result<PgConnection>>
where
@@ -419,6 +408,8 @@ impl Connect for PgConnection {
}
impl Connection for PgConnection {
type Database = Postgres;
fn close(self) -> BoxFuture<'static, Result<()>> {
Box::pin(self.terminate())
}

View File

@@ -0,0 +1,56 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use crate::cursor::Cursor;
use crate::database::HasRow;
use crate::postgres::protocol::StatementId;
use crate::postgres::PgConnection;
use crate::Postgres;
pub struct PgCursor<'a> {
statement: StatementId,
connection: &'a mut PgConnection,
}
impl<'a> PgCursor<'a> {
pub(super) fn from_connection(
connection: &'a mut PgConnection,
statement: StatementId,
) -> Self {
Self {
connection,
statement,
}
}
}
impl<'a> Cursor<'a> for PgCursor<'a> {
type Database = Postgres;
fn first(self) -> BoxFuture<'a, crate::Result<Option<<Self::Database as HasRow>::Row>>> {
todo!()
}
fn next(&mut self) -> BoxFuture<crate::Result<Option<<Self::Database as HasRow>::Row>>> {
todo!()
}
fn map<T, F>(self, f: F) -> BoxStream<'a, crate::Result<T>>
where
F: Fn(<Self::Database as HasRow>::Row) -> T,
{
todo!()
}
}
impl<'a> Future for PgCursor<'a> {
type Output = crate::Result<u64>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
todo!()
}
}

View File

@@ -1,4 +1,4 @@
use crate::database::Database;
use crate::database::{Database, HasCursor, HasRawValue, HasRow};
/// **Postgres** database driver.
pub struct Postgres;
@@ -8,9 +8,25 @@ impl Database for Postgres {
type Arguments = super::PgArguments;
type Row = super::PgRow;
type TypeInfo = super::PgTypeInfo;
type TableId = u32;
}
impl HasRow for Postgres {
// TODO: Can we drop the `type Database = _`
type Database = Postgres;
type Row = super::PgRow;
}
impl<'a> HasCursor<'a> for Postgres {
// TODO: Can we drop the `type Database = _`
type Database = Postgres;
type Cursor = super::PgCursor<'a>;
}
impl<'a> HasRawValue<'a> for Postgres {
type RawValue = Option<&'a [u8]>;
}

View File

@@ -6,17 +6,9 @@ use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use crate::describe::{Column, Describe};
use crate::executor::{Execute, Executor};
use crate::postgres::protocol::{self, Encode, Message, StatementId, TypeFormat};
use crate::postgres::{PgArguments, PgRow, PgTypeInfo, Postgres};
#[derive(Debug)]
enum Step {
Command(u64),
NoData,
Row(protocol::DataRow),
ParamDesc(Box<protocol::ParameterDescription>),
RowDesc(Box<protocol::RowDescription>),
}
use crate::postgres::{PgArguments, PgCursor, PgRow, PgTypeInfo, Postgres};
impl super::PgConnection {
fn write_prepare(&mut self, query: &str, args: &PgArguments) -> StatementId {
@@ -63,274 +55,51 @@ impl super::PgConnection {
fn write_sync(&mut self) {
protocol::Sync.encode(self.stream.buffer_mut());
}
async fn wait_until_ready(&mut self) -> crate::Result<()> {
if !self.ready {
while let Some(message) = self.receive().await? {
match message {
Message::ReadyForQuery(_) => {
self.ready = true;
break;
}
_ => {
// Drain the stream
}
}
}
}
Ok(())
}
async fn step(&mut self) -> crate::Result<Option<Step>> {
while let Some(message) = self.receive().await? {
match message {
Message::BindComplete
| Message::ParseComplete
| Message::PortalSuspended
| Message::CloseComplete => {}
Message::CommandComplete(body) => {
return Ok(Some(Step::Command(body.affected_rows)));
}
Message::NoData => {
return Ok(Some(Step::NoData));
}
Message::DataRow(body) => {
return Ok(Some(Step::Row(body)));
}
Message::ReadyForQuery(_) => {
self.ready = true;
return Ok(None);
}
Message::ParameterDescription(desc) => {
return Ok(Some(Step::ParamDesc(desc)));
}
Message::RowDescription(desc) => {
return Ok(Some(Step::RowDesc(desc)));
}
message => {
return Err(protocol_err!("received unexpected message: {:?}", message).into());
}
}
}
// Connection was (unexpectedly) closed
Err(io::Error::from(io::ErrorKind::ConnectionAborted).into())
}
}
impl super::PgConnection {
async fn send<'e, 'q: 'e>(&'e mut self, command: &'q str) -> crate::Result<()> {
protocol::Query(command).encode(self.stream.buffer_mut());
impl<'e> Executor<'e> for &'e mut super::PgConnection {
type Database = Postgres;
self.wait_until_ready().await?;
fn execute<'q, E>(self, query: E) -> PgCursor<'e>
where
E: Execute<'q, Self::Database>,
{
let (query, arguments) = query.into_parts();
self.stream.flush().await?;
self.ready = false;
// TODO: Handle [arguments] being None. This should be a SIMPLE query.
let arguments = arguments.unwrap();
while let Some(_step) = self.step().await? {
// Drain the stream until ReadyForQuery
}
// Check the statement cache for a statement ID that matches the given query
// If it doesn't exist, we generate a new statement ID and write out [Parse] to the
// connection command buffer
let statement = self.write_prepare(query, &arguments);
Ok(())
}
async fn execute<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
args: PgArguments,
) -> crate::Result<u64> {
let statement = self.write_prepare(query, &args);
self.write_bind("", statement, &args);
self.write_execute("", 1);
self.write_sync();
self.wait_until_ready().await?;
self.stream.flush().await?;
self.ready = false;
let mut affected = 0;
while let Some(step) = self.step().await? {
if let Step::Command(cnt) = step {
affected = cnt;
}
}
Ok(affected)
}
// Initial part of [fetch]; write message to stream
fn write_fetch(&mut self, query: &str, args: &PgArguments) -> StatementId {
let statement = self.write_prepare(query, &args);
self.write_bind("", statement, &args);
// Next, [Bind] attaches the arguments to the statement and creates a named portal
self.write_bind("", statement, &arguments);
// Next, [Describe] will return the expected result columns and types
// Conditionally run [Describe] only if the results have not been cached
if !self.statement_cache.has_columns(statement) {
self.write_describe(protocol::Describe::Portal(""));
}
// Next, [Execute] then executes the named portal
self.write_execute("", 0);
// Finally, [Sync] asks postgres to process the messages that we sent and respond with
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
// termed batching might suit this.
self.write_sync();
statement
PgCursor::from_connection(self, statement)
}
async fn get_columns(
&mut self,
statement: StatementId,
) -> crate::Result<Arc<HashMap<Box<str>, usize>>> {
if !self.statement_cache.has_columns(statement) {
let desc: Option<_> = 'outer: loop {
while let Some(step) = self.step().await? {
match step {
Step::RowDesc(desc) => break 'outer Some(desc),
Step::NoData => break 'outer None,
_ => {}
}
}
unreachable!();
};
let mut columns = HashMap::new();
if let Some(desc) = desc {
columns.reserve(desc.fields.len());
for (index, field) in desc.fields.iter().enumerate() {
if let Some(name) = &field.name {
columns.insert(name.clone(), index);
}
}
}
self.statement_cache.put_columns(statement, columns);
}
Ok(self.statement_cache.get_columns(statement))
}
fn fetch<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
args: PgArguments,
) -> BoxStream<'e, crate::Result<PgRow>> {
Box::pin(async_stream::try_stream! {
let statement = self.write_fetch(query, &args);
self.wait_until_ready().await?;
self.stream.flush().await?;
self.ready = false;
let columns = self.get_columns(statement).await?;
while let Some(step) = self.step().await? {
if let Step::Row(data) = step {
yield PgRow { data, columns: Arc::clone(&columns) };
}
}
// No more rows in the result set
})
}
async fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> crate::Result<Describe<Postgres>> {
let statement = self.write_prepare(query, &Default::default());
self.write_describe(protocol::Describe::Statement(statement));
self.write_sync();
self.stream.flush().await?;
self.wait_until_ready().await?;
let params = match self.step().await? {
Some(Step::ParamDesc(desc)) => desc,
step => {
return Err(
protocol_err!("expected ParameterDescription; received {:?}", step).into(),
);
}
};
let result = match self.step().await? {
Some(Step::RowDesc(desc)) => Some(desc),
Some(Step::NoData) => None,
step => {
return Err(protocol_err!("expected RowDescription; received {:?}", step).into());
}
};
Ok(Describe {
param_types: params
.ids
.iter()
.map(|id| PgTypeInfo::new(*id))
.collect::<Vec<_>>()
.into_boxed_slice(),
result_columns: result
.map(|r| r.fields)
.unwrap_or_default()
.into_vec()
.into_iter()
// TODO: Should [Column] just wrap [protocol::Field] ?
.map(|field| Column {
name: field.name,
table_id: field.table_id,
type_info: PgTypeInfo::new(field.type_id),
})
.collect::<Vec<_>>()
.into_boxed_slice(),
})
}
}
impl crate::Executor for super::PgConnection {
type Database = super::Postgres;
fn send<'e, 'q: 'e>(&'e mut self, query: &'q str) -> BoxFuture<'e, crate::Result<()>> {
Box::pin(self.send(query))
}
fn execute<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
args: PgArguments,
) -> BoxFuture<'e, crate::Result<u64>> {
Box::pin(self.execute(query, args))
}
fn fetch<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
args: PgArguments,
) -> BoxStream<'e, crate::Result<PgRow>> {
self.fetch(query, args)
}
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> {
Box::pin(self.describe(query))
fn execute_by_ref<'q, E>(&mut self, query: E) -> PgCursor<'_>
where
E: Execute<'q, Self::Database>,
{
self.execute(query)
}
}

View File

@@ -2,6 +2,7 @@
pub use arguments::PgArguments;
pub use connection::PgConnection;
pub use cursor::PgCursor;
pub use database::Postgres;
pub use error::PgError;
pub use row::PgRow;
@@ -9,6 +10,7 @@ pub use types::PgTypeInfo;
mod arguments;
mod connection;
mod cursor;
mod database;
mod error;
mod executor;