adjust Cursor::map to work through black magic hacks and hope we get GATs soon

This commit is contained in:
Ryan Leckey 2020-02-21 15:54:08 -08:00
parent a374c18a18
commit 7404708bab
11 changed files with 138 additions and 137 deletions

View File

@ -6,7 +6,7 @@ use futures_core::stream::BoxStream;
use crate::connection::MaybeOwnedConnection; use crate::connection::MaybeOwnedConnection;
use crate::database::{Database, HasRow}; use crate::database::{Database, HasRow};
use crate::executor::Execute; use crate::executor::Execute;
use crate::{Connect, Pool}; use crate::{Connect, Pool, Row};
/// Represents a result set, which is generated by executing a query against the database. /// Represents a result set, which is generated by executing a query against the database.
/// ///
@ -16,67 +16,50 @@ use crate::{Connect, Pool};
/// Initially the `Cursor` is positioned before the first row. The `next` method moves the cursor /// Initially the `Cursor` is positioned before the first row. The `next` method moves the cursor
/// to the next row, and because it returns `None` when there are no more rows, it can be used /// to the next row, and because it returns `None` when there are no more rows, it can be used
/// in a `while` loop to iterate through all returned rows. /// in a `while` loop to iterate through all returned rows.
pub trait Cursor<'c, 'q> pub trait Cursor<'c, 'q, DB>
where where
Self: Send, Self: Send,
DB: Database,
// `.await`-ing a cursor will return the affected rows from the query // `.await`-ing a cursor will return the affected rows from the query
Self: Future<Output = crate::Result<u64>>, Self: Future<Output = crate::Result<u64>>,
{ {
type Database: Database;
// Construct the [Cursor] from a [Pool] // Construct the [Cursor] from a [Pool]
// Meant for internal use only // Meant for internal use only
// TODO: Anyone have any better ideas on how to instantiate cursors generically from a pool? // TODO: Anyone have any better ideas on how to instantiate cursors generically from a pool?
#[doc(hidden)] #[doc(hidden)]
fn from_pool<E>(pool: &Pool<<Self::Database as Database>::Connection>, query: E) -> Self fn from_pool<E>(pool: &Pool<DB::Connection>, query: E) -> Self
where where
Self: Sized, Self: Sized,
E: Execute<'q, Self::Database>; E: Execute<'q, DB>;
#[doc(hidden)] #[doc(hidden)]
fn from_connection<E, C>(conn: C, query: E) -> Self fn from_connection<E, C>(conn: C, query: E) -> Self
where where
Self: Sized, Self: Sized,
<Self::Database as Database>::Connection: Connect, DB::Connection: Connect,
// MaybeOwnedConnection<'c, <Self::Database as Database>::Connection>: // MaybeOwnedConnection<'c, DB::Connection>:
// Connect<Database = Self::Database>, // Connect<Database = DB>,
C: Into<MaybeOwnedConnection<'c, <Self::Database as Database>::Connection>>, C: Into<MaybeOwnedConnection<'c, DB::Connection>>,
E: Execute<'q, Self::Database>; E: Execute<'q, DB>;
#[doc(hidden)] #[doc(hidden)]
fn first(self) -> BoxFuture<'c, crate::Result<Option<<Self::Database as HasRow<'c>>::Row>>> fn first(self) -> BoxFuture<'c, crate::Result<Option<<DB as HasRow<'c>>::Row>>>
where where
'q: 'c; 'q: 'c;
/// Fetch the next row in the result. Returns `None` if there are no more rows. /// Fetch the next row in the result. Returns `None` if there are no more rows.
fn next(&mut self) -> BoxFuture<crate::Result<Option<<Self::Database as HasRow>::Row>>>; fn next(&mut self) -> BoxFuture<crate::Result<Option<<DB as HasRow>::Row>>>;
/// Map the `Row`s in this result to a different type, returning a [`Stream`] of the results. /// Map the `Row`s in this result to a different type, returning a [`Stream`] of the results.
fn map<T, F>(self, f: F) -> BoxStream<'c, crate::Result<T>> fn map<'a, T, F>(self, f: F) -> BoxStream<'a, crate::Result<T>>
where where
F: MapRowFn<Self::Database, T>, Self: Sized,
T: 'c + Send + Unpin,
'q: 'c;
}
pub trait MapRowFn<DB, T>
where
Self: Send + Sync + 'static,
DB: Database,
DB: for<'c> HasRow<'c>,
{
fn call(&self, row: <DB as HasRow>::Row) -> T;
}
impl<DB, T, F> MapRowFn<DB, T> for F
where
DB: Database,
DB: for<'c> HasRow<'c>,
F: Send + Sync + 'static, F: Send + Sync + 'static,
F: Fn(<DB as HasRow>::Row) -> T, T: Send + Unpin + 'static,
{ F: for<'r> Fn(<DB as HasRow<'r>>::Row) -> T,
#[inline(always)] {
fn call(&self, row: <DB as HasRow>::Row) -> T { unimplemented!(
self(row) "Cursor::map is currently supplied by inherent methods to work around not having GATs"
)
} }
} }

View File

@ -15,7 +15,7 @@ where
Self: Sized + 'static, Self: Sized + 'static,
Self: for<'a> HasRow<'a, Database = Self>, Self: for<'a> HasRow<'a, Database = Self>,
Self: for<'a> HasRawValue<'a>, Self: for<'a> HasRawValue<'a>,
Self: for<'c, 'q> HasCursor<'c, 'q, Database = Self>, Self: for<'c, 'q> HasCursor<'c, 'q, Self>,
{ {
/// The concrete `Connection` implementation for this database. /// The concrete `Connection` implementation for this database.
type Connection: Connection<Database = Self>; type Connection: Connection<Database = Self>;
@ -34,10 +34,11 @@ pub trait HasRawValue<'a> {
type RawValue; type RawValue;
} }
pub trait HasCursor<'c, 'q> { pub trait HasCursor<'c, 'q, DB>
type Database: Database; where
DB: Database,
type Cursor: Cursor<'c, 'q, Database = Self::Database>; {
type Cursor: Cursor<'c, 'q, DB>;
} }
pub trait HasRow<'a> { pub trait HasRow<'a> {

View File

@ -22,12 +22,18 @@ where
type Database: Database; type Database: Database;
/// Executes a query that may or may not return a result set. /// Executes a query that may or may not return a result set.
fn execute<'q, E>(self, query: E) -> <Self::Database as HasCursor<'c, 'q>>::Cursor fn execute<'q, E>(
self,
query: E,
) -> <Self::Database as HasCursor<'c, 'q, Self::Database>>::Cursor
where where
E: Execute<'q, Self::Database>; E: Execute<'q, Self::Database>;
#[doc(hidden)] #[doc(hidden)]
fn execute_by_ref<'b, E>(&mut self, query: E) -> <Self::Database as HasCursor<'_, 'b>>::Cursor fn execute_by_ref<'b, E>(
&mut self,
query: E,
) -> <Self::Database as HasCursor<'_, 'b, Self::Database>>::Cursor
where where
E: Execute<'b, Self::Database>; E: Execute<'b, Self::Database>;
} }

View File

@ -19,25 +19,26 @@ impl<'p, C, DB> Executor<'p> for &'p Pool<C>
where where
C: Connect<Database = DB>, C: Connect<Database = DB>,
DB: Database<Connection = C>, DB: Database<Connection = C>,
DB: for<'c, 'q> HasCursor<'c, 'q>, DB: for<'c, 'q> HasCursor<'c, 'q, DB>,
for<'con> &'con mut C: Executor<'con>, for<'con> &'con mut C: Executor<'con>,
{ {
type Database = DB; type Database = DB;
fn execute<'q, E>(self, query: E) -> <Self::Database as HasCursor<'p, 'q>>::Cursor fn execute<'q, E>(self, query: E) -> <Self::Database as HasCursor<'p, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, DB>,
{ {
DB::Cursor::from_pool(self, query) DB::Cursor::from_pool(self, query)
} }
#[doc(hidden)]
#[inline] #[inline]
fn execute_by_ref<'q, 'e, E>( fn execute_by_ref<'q, 'e, E>(
&'e mut self, &'e mut self,
query: E, query: E,
) -> <Self::Database as HasCursor<'_, 'q>>::Cursor ) -> <Self::Database as HasCursor<'_, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, DB>,
{ {
self.execute(query) self.execute(query)
} }
@ -47,23 +48,24 @@ impl<'c, C, DB> Executor<'c> for &'c mut PoolConnection<C>
where where
C: Connect<Database = DB>, C: Connect<Database = DB>,
DB: Database<Connection = C>, DB: Database<Connection = C>,
DB: for<'c2, 'q> HasCursor<'c2, 'q, Database = DB>, DB: for<'c2, 'q> HasCursor<'c2, 'q, DB>,
for<'con> &'con mut C: Executor<'con>, for<'con> &'con mut C: Executor<'con>,
{ {
type Database = C::Database; type Database = C::Database;
fn execute<'q, E>(self, query: E) -> <Self::Database as HasCursor<'c, 'q>>::Cursor fn execute<'q, E>(self, query: E) -> <Self::Database as HasCursor<'c, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, Self::Database>,
{ {
DB::Cursor::from_connection(&mut **self, query) DB::Cursor::from_connection(&mut **self, query)
} }
#[doc(hidden)]
#[inline] #[inline]
fn execute_by_ref<'q, 'e, E>( fn execute_by_ref<'q, 'e, E>(
&'e mut self, &'e mut self,
query: E, query: E,
) -> <Self::Database as HasCursor<'_, 'q>>::Cursor ) -> <Self::Database as HasCursor<'_, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, Self::Database>,
{ {
@ -75,19 +77,20 @@ impl<C, DB> Executor<'static> for PoolConnection<C>
where where
C: Connect<Database = DB>, C: Connect<Database = DB>,
DB: Database<Connection = C>, DB: Database<Connection = C>,
DB: for<'c, 'q> HasCursor<'c, 'q, Database = DB>, DB: for<'c, 'q> HasCursor<'c, 'q, DB>,
{ {
type Database = DB; type Database = DB;
fn execute<'q, E>(self, query: E) -> <DB as HasCursor<'static, 'q>>::Cursor fn execute<'q, E>(self, query: E) -> <DB as HasCursor<'static, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, Self::Database>,
{ {
DB::Cursor::from_connection(self, query) DB::Cursor::from_connection(self, query)
} }
#[doc(hidden)]
#[inline] #[inline]
fn execute_by_ref<'q, 'e, E>(&'e mut self, query: E) -> <DB as HasCursor<'_, 'q>>::Cursor fn execute_by_ref<'q, 'e, E>(&'e mut self, query: E) -> <DB as HasCursor<'_, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, Self::Database>,
{ {

View File

@ -9,13 +9,14 @@ use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream; use futures_core::stream::BoxStream;
use crate::connection::{ConnectionSource, MaybeOwnedConnection}; use crate::connection::{ConnectionSource, MaybeOwnedConnection};
use crate::cursor::{Cursor, MapRowFn}; use crate::cursor::Cursor;
use crate::database::HasRow; use crate::database::HasRow;
use crate::executor::Execute; use crate::executor::Execute;
use crate::pool::{Pool, PoolConnection}; use crate::pool::{Pool, PoolConnection};
use crate::postgres::protocol::{CommandComplete, DataRow, Message, StatementId}; use crate::postgres::protocol::{CommandComplete, DataRow, Message, StatementId};
use crate::postgres::{PgArguments, PgConnection, PgRow}; use crate::postgres::{PgArguments, PgConnection, PgRow};
use crate::{Database, Postgres}; use crate::{Database, Postgres};
use futures_core::Stream;
enum State<'c, 'q> { enum State<'c, 'q> {
Query(&'q str, Option<PgArguments>), Query(&'q str, Option<PgArguments>),
@ -31,14 +32,33 @@ pub struct PgCursor<'c, 'q> {
state: State<'c, 'q>, state: State<'c, 'q>,
} }
impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> { impl<'c, 'q> PgCursor<'c, 'q> {
type Database = Postgres;
#[doc(hidden)] #[doc(hidden)]
fn from_pool<E>(pool: &Pool<<Self::Database as Database>::Connection>, query: E) -> Self pub fn map<'a, T, F>(mut self, f: F) -> impl Stream<Item = crate::Result<T>> + 'a + 'c
where
F: Send + Sync + 'static,
T: Send + Unpin + 'static,
F: for<'r> Fn(PgRow<'r>) -> T,
'q: 'c,
'c: 'a,
{
try_stream! {
loop {
let row = next(&mut self).await?;
if let Some(row) = row {
yield f(row);
}
}
}
}
}
impl<'c, 'q> Cursor<'c, 'q, Postgres> for PgCursor<'c, 'q> {
#[doc(hidden)]
fn from_pool<E>(pool: &Pool<PgConnection>, query: E) -> Self
where where
Self: Sized, Self: Sized,
E: Execute<'q, Self::Database>, E: Execute<'q, Postgres>,
{ {
let (query, arguments) = query.into_parts(); let (query, arguments) = query.into_parts();
@ -53,8 +73,8 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
fn from_connection<E, C>(conn: C, query: E) -> Self fn from_connection<E, C>(conn: C, query: E) -> Self
where where
Self: Sized, Self: Sized,
C: Into<MaybeOwnedConnection<'c, <Self::Database as Database>::Connection>>, C: Into<MaybeOwnedConnection<'c, PgConnection>>,
E: Execute<'q, Self::Database>, E: Execute<'q, Postgres>,
{ {
let (query, arguments) = query.into_parts(); let (query, arguments) = query.into_parts();
@ -65,29 +85,17 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
} }
} }
fn first(self) -> BoxFuture<'c, crate::Result<Option<<Self::Database as HasRow<'c>>::Row>>> #[doc(hidden)]
fn first(self) -> BoxFuture<'c, crate::Result<Option<PgRow<'c>>>>
where where
'q: 'c, 'q: 'c,
{ {
Box::pin(first(self)) Box::pin(first(self))
} }
fn next(&mut self) -> BoxFuture<crate::Result<Option<<Self::Database as HasRow<'_>>::Row>>> { fn next(&mut self) -> BoxFuture<crate::Result<Option<PgRow<'_>>>> {
Box::pin(next(self)) Box::pin(next(self))
} }
fn map<T, F>(mut self, f: F) -> BoxStream<'c, crate::Result<T>>
where
F: MapRowFn<Self::Database, T>,
T: 'c + Send + Unpin,
'q: 'c,
{
Box::pin(try_stream! {
while let Some(row) = self.next().await? {
yield f.call(row);
}
})
}
} }
impl<'s, 'q> Future for PgCursor<'s, 'q> { impl<'s, 'q> Future for PgCursor<'s, 'q> {
@ -144,7 +152,7 @@ async fn write(
arguments: Option<PgArguments>, arguments: Option<PgArguments>,
) -> crate::Result<()> { ) -> crate::Result<()> {
// TODO: Handle [arguments] being None. This should be a SIMPLE query. // TODO: Handle [arguments] being None. This should be a SIMPLE query.
let arguments = arguments.unwrap(); let arguments = arguments.expect("simple queries not implemented yet");
// Check the statement cache for a statement ID that matches the given query // Check the statement cache for a statement ID that matches the given query
// If it doesn't exist, we generate a new statement ID and write out [Parse] to the // If it doesn't exist, we generate a new statement ID and write out [Parse] to the

View File

@ -20,10 +20,7 @@ impl<'a> HasRow<'a> for Postgres {
type Row = super::PgRow<'a>; type Row = super::PgRow<'a>;
} }
impl<'s, 'q> HasCursor<'s, 'q> for Postgres { impl<'s, 'q> HasCursor<'s, 'q, Postgres> for Postgres {
// TODO: Can we drop the `type Database = _`
type Database = Postgres;
type Cursor = super::PgCursor<'s, 'q>; type Cursor = super::PgCursor<'s, 'q>;
} }

View File

@ -61,6 +61,7 @@ impl<'e> Executor<'e> for &'e mut super::PgConnection {
PgCursor::from_connection(self, query) PgCursor::from_connection(self, query)
} }
#[doc(hidden)]
#[inline] #[inline]
fn execute_by_ref<'q, E>(&mut self, query: E) -> PgCursor<'_, 'q> fn execute_by_ref<'q, E>(&mut self, query: E) -> PgCursor<'_, 'q>
where where

View File

@ -40,19 +40,19 @@ impl<'c> RowIndex<'c, PgRow<'c>> for usize {
} }
} }
// impl<'c> RowIndex<'c, PgRow<'c>> for &'_ str { impl<'c> RowIndex<'c, PgRow<'c>> for &'_ str {
// fn try_get_raw(self, row: &'r PgRow<'c>) -> crate::Result<Option<&'c [u8]>> { fn try_get_raw(self, row: &'c PgRow<'c>) -> crate::Result<Option<&'c [u8]>> {
// let index = row let index = row
// .columns .columns
// .get(self) .get(self)
// .ok_or_else(|| crate::Error::ColumnNotFound((*self).into()))?; .ok_or_else(|| crate::Error::ColumnNotFound((*self).into()))?;
//
// Ok(row.data.get( Ok(row.data.get(
// row.connection.stream.buffer(), row.connection.stream.buffer(),
// &row.connection.data_row_values_buf, &row.connection.data_row_values_buf,
// *index, *index,
// )) ))
// } }
// } }
// TODO: impl_from_row_for_row!(PgRow); // TODO: impl_from_row_for_row!(PgRow);

View File

@ -45,7 +45,7 @@ where
executor.execute(self).await executor.execute(self).await
} }
pub fn fetch<'e, E>(self, executor: E) -> <DB as HasCursor<'e, 'q>>::Cursor pub fn fetch<'e, E>(self, executor: E) -> <DB as HasCursor<'e, 'q, DB>>::Cursor
where where
E: Executor<'e, Database = DB>, E: Executor<'e, Database = DB>,
{ {

View File

@ -136,17 +136,21 @@ where
{ {
type Database = <T as Connection>::Database; type Database = <T as Connection>::Database;
fn execute<'q, E>(self, query: E) -> <<T as Connection>::Database as HasCursor<'c, 'q>>::Cursor fn execute<'q, E>(
self,
query: E,
) -> <<T as Connection>::Database as HasCursor<'c, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, Self::Database>,
{ {
(**self).execute_by_ref(query) (**self).execute_by_ref(query)
} }
#[doc(hidden)]
fn execute_by_ref<'q, 'e, E>( fn execute_by_ref<'q, 'e, E>(
&'e mut self, &'e mut self,
query: E, query: E,
) -> <Self::Database as HasCursor<'e, 'q>>::Cursor ) -> <Self::Database as HasCursor<'e, 'q, DB>>::Cursor
where where
E: Execute<'q, Self::Database>, E: Execute<'q, Self::Database>,
{ {

View File

@ -1,6 +1,6 @@
use futures::TryStreamExt; use futures::TryStreamExt;
use sqlx::{postgres::PgConnection, Connect, Connection, Executor, Row}; use sqlx::{postgres::PgConnection, Connect, Connection, Cursor, Executor, Row};
use sqlx_core::postgres::PgPool; use sqlx_core::postgres::{PgPool, PgRow};
use std::time::Duration; use std::time::Duration;
#[cfg_attr(feature = "runtime-async-std", async_std::test)] #[cfg_attr(feature = "runtime-async-std", async_std::test)]
@ -17,40 +17,38 @@ async fn it_connects() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
// #[cfg_attr(feature = "runtime-async-std", async_std::test)] #[cfg_attr(feature = "runtime-async-std", async_std::test)]
// #[cfg_attr(feature = "runtime-tokio", tokio::test)] #[cfg_attr(feature = "runtime-tokio", tokio::test)]
// async fn it_executes() -> anyhow::Result<()> { async fn it_executes() -> anyhow::Result<()> {
// let mut conn = connect().await?; let mut conn = connect().await?;
//
// let _ = conn let _ = conn
// .send( .execute(
// r#" r#"
// CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY); CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY);
// "#, "#,
// ) )
// .await?; .await?;
//
// for index in 1..=10_i32 { for index in 1..=10_i32 {
// let cnt = sqlx::query("INSERT INTO users (id) VALUES ($1)") let cnt = sqlx::query("INSERT INTO users (id) VALUES ($1)")
// .bind(index) .bind(index)
// .execute(&mut conn) .execute(&mut conn)
// .await?; .await?;
//
// assert_eq!(cnt, 1); assert_eq!(cnt, 1);
// } }
//
// let sum: i32 = sqlx::query("SELECT id FROM users") let sum: i32 = sqlx::query("SELECT id FROM users")
// .fetch(&mut conn) .fetch(&mut conn)
// .try_fold( .map(|row| row.get::<i32, _>(0))
// 0_i32, .try_fold(0_i32, |acc, x| async move { Ok(acc + x) })
// |acc, x| async move { Ok(acc + x.get::<i32, _>("id")) }, .await?;
// )
// .await?; assert_eq!(sum, 55);
//
// assert_eq!(sum, 55); Ok(())
// }
// Ok(())
// }
// https://github.com/launchbadge/sqlx/issues/104 // https://github.com/launchbadge/sqlx/issues/104
#[cfg_attr(feature = "runtime-async-std", async_std::test)] #[cfg_attr(feature = "runtime-async-std", async_std::test)]