Split Executor into Executor and RefExecutor

* Allow `conn.fetch(" ... ")` to be called where `conn` is an owned Connection
 * Executor::fetch -> RefExecutor::fetch_by_ref
 * Executor::fetch_by_ref -> Executor::fetch
 * Move `Connection::describe` to `Executor::describe`
 * `Transaction` is no longer a `Connection`
 * `Connection` has `Executor` as a super-trait again which greatly simplifies bounds
This commit is contained in:
Ryan Leckey 2020-02-29 17:08:25 -08:00
parent 0afcf33395
commit f462343787
13 changed files with 382 additions and 431 deletions

View File

@ -17,20 +17,13 @@ use crate::url::Url;
pub trait Connection
where
Self: Send + 'static,
Self: Executor,
{
type Database: Database;
/// Close this database connection.
fn close(self) -> BoxFuture<'static, crate::Result<()>>;
/// Verifies a connection to the database is still alive.
fn ping(&mut self) -> BoxFuture<crate::Result<()>>;
#[doc(hidden)]
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>;
}
/// Represents a type that can directly establish a new connection.
@ -75,10 +68,9 @@ where
}
}
impl<'c, C, DB> ConnectionSource<'c, C>
impl<'c, C> ConnectionSource<'c, C>
where
C: Connect<Database = DB>,
DB: Database<Connection = C>,
C: Connect,
{
pub(crate) async fn resolve_by_ref(&mut self) -> crate::Result<MaybeOwnedConnection<'_, C>> {
if let ConnectionSource::Pool(pool) = self {

View File

@ -19,8 +19,6 @@ use crate::{Connect, Pool, Row};
pub trait Cursor<'c, 'q>
where
Self: Send,
// `.await`-ing a cursor will return the affected rows from the query
Self: Future<Output = crate::Result<u64>>,
{
type Database: Database;

View File

@ -12,7 +12,7 @@ use crate::types::TypeInfo;
/// database (e.g., MySQL, Postgres).
pub trait Database
where
Self: Sized + 'static,
Self: Sized + Send + 'static,
Self: for<'a> HasRow<'a, Database = Self>,
Self: for<'a> HasRawValue<'a>,
Self: for<'c, 'q> HasCursor<'c, 'q, Database = Self>,

View File

@ -14,27 +14,54 @@ use futures_util::TryStreamExt;
/// Implementations are provided for [`&Pool`](struct.Pool.html),
/// [`&mut PoolConnection`](struct.PoolConnection.html),
/// and [`&mut Connection`](trait.Connection.html).
pub trait Executor<'c>
pub trait Executor
where
Self: Send,
{
/// The specific database that this type is implemented for.
type Database: Database;
/// Executes a query that may or may not return a result set.
fn fetch<'q, E>(self, query: E) -> <Self::Database as HasCursor<'c, 'q>>::Cursor
/// Executes the query for its side-effects and
/// discarding any potential result rows.
///
/// Returns the number of rows affected, or 0 if not applicable.
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>;
#[doc(hidden)]
fn fetch_by_ref<'b, E>(&mut self, query: E) -> <Self::Database as HasCursor<'_, 'b>>::Cursor
fn fetch<'e, 'q, E>(&'e mut self, query: E) -> <Self::Database as HasCursor<'e, 'q>>::Cursor
where
E: Execute<'b, Self::Database>;
E: Execute<'q, Self::Database>;
/// Prepare the SQL query and return type information about its parameters
/// and results.
///
/// This is used by the query macros ( [`query!`] ) during compilation to
/// power their type inference.
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, Self::Database>;
}
pub trait RefExecutor<'c> {
type Database: Database;
/// Executes a query for its result.
///
/// Returns a [`Cursor`] that can be used to iterate through the [`Row`]s
/// of the result.
fn fetch_by_ref<'q, E>(self, query: E) -> <Self::Database as HasCursor<'c, 'q>>::Cursor
where
E: Execute<'q, Self::Database>;
}
/// A type that may be executed against a database connection.
pub trait Execute<'q, DB>
where
Self: Send,
DB: Database,
{
/// Returns the query to be executed and the arguments to bind against the query, if any.
@ -70,3 +97,34 @@ macro_rules! impl_execute_for_query {
}
};
}
impl<T> Executor for &'_ mut T
where
T: Executor,
{
type Database = T::Database;
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
(**self).execute(query)
}
fn fetch<'e, 'q, E>(&'e mut self, query: E) -> <Self::Database as HasCursor<'_, 'q>>::Cursor
where
E: Execute<'q, Self::Database>,
{
(**self).fetch(query)
}
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, Self::Database>,
{
(**self).describe(query)
}
}

View File

@ -60,8 +60,6 @@ impl<C> Connection for PoolConnection<C>
where
C: Connect,
{
type Database = C::Database;
/// Detach the connection from the pool and close it nicely.
fn close(mut self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(async move {
@ -74,15 +72,6 @@ where
fn ping(&mut self) -> BoxFuture<crate::Result<()>> {
Box::pin(self.deref_mut().ping())
}
#[doc(hidden)]
#[inline]
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> {
Box::pin(self.deref_mut().describe(query))
}
}
/// Returns the connection to the [`Pool`][crate::Pool] it was checked-out from.

View File

@ -6,7 +6,7 @@ use futures_util::StreamExt;
use crate::{
connection::{Connect, Connection},
describe::Describe,
executor::Executor,
executor::{Executor, RefExecutor},
pool::Pool,
Cursor, Database,
};
@ -15,85 +15,100 @@ use super::PoolConnection;
use crate::database::HasCursor;
use crate::executor::Execute;
impl<'p, C, DB> Executor<'p> for &'p Pool<C>
impl<'p, C, DB> Executor for &'p Pool<C>
where
C: Connect<Database = DB>,
DB: Database<Connection = C>,
DB: for<'c, 'q> HasCursor<'c, 'q, Database = DB>,
for<'con> &'con mut C: Executor<'con>,
{
type Database = DB;
fn fetch<'q, E>(self, query: E) -> <Self::Database as HasCursor<'p, 'q>>::Cursor
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
Box::pin(async move { self.acquire().await?.execute(query).await })
}
fn fetch<'e, 'q, E>(&'e mut self, query: E) -> <Self::Database as HasCursor<'_, 'q>>::Cursor
where
E: Execute<'q, DB>,
{
DB::Cursor::from_pool(self, query)
}
#[doc(hidden)]
#[inline]
fn fetch_by_ref<'q, 'e, E>(
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> <Self::Database as HasCursor<'_, 'q>>::Cursor
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, DB>,
E: Execute<'q, Self::Database>,
{
self.fetch(query)
Box::pin(async move { self.acquire().await?.describe(query).await })
}
}
impl<'c, C, DB> Executor<'c> for &'c mut PoolConnection<C>
impl<'p, C, DB> RefExecutor<'p> for &'p Pool<C>
where
C: Connect<Database = DB>,
DB: Database<Connection = C>,
DB: for<'c, 'q> HasCursor<'c, 'q>,
for<'c> &'c mut C: RefExecutor<'c>,
{
type Database = DB;
fn fetch_by_ref<'q, E>(self, query: E) -> <Self::Database as HasCursor<'p, 'q>>::Cursor
where
E: Execute<'q, DB>,
{
DB::Cursor::from_pool(self, query)
}
}
impl<C> Executor for PoolConnection<C>
where
C: Connect,
{
type Database = C::Database;
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
(**self).execute(query)
}
fn fetch<'e, 'q, E>(&'e mut self, query: E) -> <C::Database as HasCursor<'_, 'q>>::Cursor
where
E: Execute<'q, Self::Database>,
{
(**self).fetch(query)
}
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, Self::Database>,
{
(**self).describe(query)
}
}
impl<'c, C, DB> RefExecutor<'c> for &'c mut PoolConnection<C>
where
C: Connect<Database = DB>,
DB: Database<Connection = C>,
DB: for<'c2, 'q> HasCursor<'c2, 'q, Database = DB>,
for<'con> &'con mut C: Executor<'con>,
{
type Database = C::Database;
fn fetch<'q, E>(self, query: E) -> <Self::Database as HasCursor<'c, 'q>>::Cursor
where
E: Execute<'q, Self::Database>,
{
DB::Cursor::from_connection(&mut **self, query)
}
#[doc(hidden)]
#[inline]
fn fetch_by_ref<'q, 'e, E>(
&'e mut self,
query: E,
) -> <Self::Database as HasCursor<'_, 'q>>::Cursor
where
E: Execute<'q, Self::Database>,
{
self.fetch(query)
}
}
impl<C, DB> Executor<'static> for PoolConnection<C>
where
C: Connect<Database = DB>,
DB: Database<Connection = C>,
DB: for<'c, 'q> HasCursor<'c, 'q, Database = DB>,
&'c mut C: RefExecutor<'c, Database = DB>,
{
type Database = DB;
fn fetch<'q, E>(self, query: E) -> <DB as HasCursor<'static, 'q>>::Cursor
fn fetch_by_ref<'q, E>(self, query: E) -> <Self::Database as HasCursor<'c, 'q>>::Cursor
where
E: Execute<'q, Self::Database>,
{
DB::Cursor::from_connection(self, query)
}
#[doc(hidden)]
#[inline]
fn fetch_by_ref<'q, 'e, E>(&'e mut self, query: E) -> <DB as HasCursor<'_, 'q>>::Cursor
where
E: Execute<'q, Self::Database>,
{
DB::Cursor::from_connection(&mut **self, query)
(**self).fetch(query)
}
}

View File

@ -25,10 +25,9 @@ use crate::Database;
/// A pool of database connections.
pub struct Pool<C>(Arc<SharedPool<C>>);
impl<C, DB> Pool<C>
impl<C> Pool<C>
where
C: Connect<Database = DB>,
DB: Database<Connection = C>,
C: Connect,
{
/// Creates a connection pool with the default configuration.
///

View File

@ -10,10 +10,9 @@ pub struct Builder<C> {
options: Options,
}
impl<C, DB> Builder<C>
impl<C> Builder<C>
where
C: Connect<Database = DB>,
DB: Database<Connection = C>,
C: Connect,
{
/// Get a new builder with default options.
///

View File

@ -240,95 +240,6 @@ impl PgConnection {
is_ready: true,
})
}
pub(super) async fn wait_until_ready(&mut self) -> crate::Result<()> {
// depending on how the previous query finished we may need to continue
// pulling messages from the stream until we receive a [ReadyForQuery] message
// postgres sends the [ReadyForQuery] message when it's fully complete with processing
// the previous query
if !self.is_ready {
loop {
if let Message::ReadyForQuery = self.stream.read().await? {
// we are now ready to go
self.is_ready = true;
break;
}
}
}
Ok(())
}
async fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> crate::Result<Describe<Postgres>> {
let statement = self.write_prepare(query, &Default::default());
self.write_describe(protocol::Describe::Statement(statement));
self.write_sync();
self.stream.flush().await?;
self.wait_until_ready().await?;
let params = loop {
match self.stream.read().await? {
Message::ParseComplete => {
// ignore complete messsage
// continue
}
Message::ParameterDescription => {
break ParameterDescription::read(self.stream.buffer())?;
}
message => {
return Err(protocol_err!(
"expected ParameterDescription; received {:?}",
message
)
.into());
}
};
};
let result = match self.stream.read().await? {
Message::NoData => None,
Message::RowDescription => Some(RowDescription::read(self.stream.buffer())?),
message => {
return Err(protocol_err!(
"expected RowDescription or NoData; received {:?}",
message
)
.into());
}
};
Ok(Describe {
param_types: params
.ids
.iter()
.map(|id| PgTypeInfo::new(*id))
.collect::<Vec<_>>()
.into_boxed_slice(),
result_columns: result
.map(|r| r.fields)
.unwrap_or_default()
.into_vec()
.into_iter()
// TODO: Should [Column] just wrap [protocol::Field] ?
.map(|field| Column {
name: field.name,
table_id: field.table_id,
type_info: PgTypeInfo::new(field.type_id),
})
.collect::<Vec<_>>()
.into_boxed_slice(),
})
}
}
impl Connect for PgConnection {
@ -342,21 +253,11 @@ impl Connect for PgConnection {
}
impl Connection for PgConnection {
type Database = Postgres;
fn close(self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(terminate(self.stream))
}
fn ping(&mut self) -> BoxFuture<crate::Result<()>> {
Box::pin(self.fetch("SELECT 1").map_ok(|_| ()))
}
#[doc(hidden)]
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> {
Box::pin(self.describe(query))
Box::pin(Executor::execute(self, "SELECT 1").map_ok(|_| ()))
}
}

View File

@ -18,18 +18,9 @@ use crate::postgres::{PgArguments, PgConnection, PgRow};
use crate::{Database, Postgres};
use futures_core::Stream;
enum State<'c, 'q> {
Query(&'q str, Option<PgArguments>),
NextRow,
// Used for `impl Future`
Resolve(BoxFuture<'c, crate::Result<MaybeOwnedConnection<'c, PgConnection>>>),
AffectedRows(BoxFuture<'c, crate::Result<u64>>),
}
pub struct PgCursor<'c, 'q> {
source: ConnectionSource<'c, PgConnection>,
state: State<'c, 'q>,
query: Option<(&'q str, Option<PgArguments>)>,
}
impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
@ -41,12 +32,9 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
Self: Sized,
E: Execute<'q, Postgres>,
{
let (query, arguments) = query.into_parts();
Self {
// note: pool is internally reference counted
source: ConnectionSource::Pool(pool.clone()),
state: State::Query(query, arguments),
query: Some(query.into_parts()),
}
}
@ -57,12 +45,9 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
C: Into<MaybeOwnedConnection<'c, PgConnection>>,
E: Execute<'q, Postgres>,
{
let (query, arguments) = query.into_parts();
Self {
// note: pool is internally reference counted
source: ConnectionSource::Connection(conn.into()),
state: State::Query(query, arguments),
query: Some(query.into_parts()),
}
}
@ -71,164 +56,16 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
}
}
impl<'s, 'q> Future for PgCursor<'s, 'q> {
type Output = crate::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match &mut self.state {
State::Query(q, arguments) => {
// todo: existential types can remove both the boxed futures
// and this allocation
let query = q.to_owned();
let arguments = mem::take(arguments);
self.state = State::Resolve(Box::pin(resolve(
mem::take(&mut self.source),
query,
arguments,
)));
}
State::Resolve(fut) => {
match fut.as_mut().poll(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(conn) => {
let conn = conn?;
self.state = State::AffectedRows(Box::pin(affected_rows(conn)));
// continue
}
}
}
State::NextRow => {
panic!("PgCursor must not be polled after being used");
}
State::AffectedRows(fut) => {
return fut.as_mut().poll(cx);
}
}
}
}
}
// write out query to the connection stream
async fn write(
conn: &mut PgConnection,
query: &str,
arguments: Option<PgArguments>,
) -> crate::Result<()> {
if let Some(arguments) = arguments {
// Check the statement cache for a statement ID that matches the given query
// If it doesn't exist, we generate a new statement ID and write out [Parse] to the
// connection command buffer
let statement = conn.write_prepare(query, &arguments);
// Next, [Bind] attaches the arguments to the statement and creates a named portal
conn.write_bind("", statement, &arguments);
// Next, [Describe] will return the expected result columns and types
// Conditionally run [Describe] only if the results have not been cached
// if !self.statement_cache.has_columns(statement) {
// self.write_describe(protocol::Describe::Portal(""));
// }
// Next, [Execute] then executes the named portal
conn.write_execute("", 0);
// Finally, [Sync] asks postgres to process the messages that we sent and respond with
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
// termed batching might suit this.
conn.write_sync();
} else {
// https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.4
conn.write_simple_query(query);
}
conn.wait_until_ready().await?;
conn.stream.flush().await?;
conn.is_ready = false;
Ok(())
}
async fn resolve(
mut source: ConnectionSource<'_, PgConnection>,
query: String,
arguments: Option<PgArguments>,
) -> crate::Result<MaybeOwnedConnection<'_, PgConnection>> {
let mut conn = source.resolve_by_ref().await?;
write(&mut *conn, &query, arguments).await?;
Ok(source.into_connection())
}
async fn affected_rows(mut conn: MaybeOwnedConnection<'_, PgConnection>) -> crate::Result<u64> {
let mut rows = 0;
loop {
match conn.stream.read().await? {
Message::ParseComplete | Message::BindComplete => {
// ignore x_complete messages
}
Message::DataRow => {
// ignore rows
// TODO: should we log or something?
}
Message::CommandComplete => {
rows += CommandComplete::read(conn.stream.buffer())?.affected_rows;
}
Message::ReadyForQuery => {
// done
conn.is_ready = true;
break;
}
message => {
return Err(
protocol_err!("affected_rows: unexpected message: {:?}", message).into(),
);
}
}
}
Ok(rows)
}
async fn next<'a, 'c: 'a, 'q: 'a>(
cursor: &'a mut PgCursor<'c, 'q>,
) -> crate::Result<Option<PgRow<'a>>> {
let mut conn = cursor.source.resolve_by_ref().await?;
match cursor.state {
State::Query(q, ref mut arguments) => {
// write out the query to the connection
write(&mut *conn, q, arguments.take()).await?;
// next time we come through here, skip this block
cursor.state = State::NextRow;
}
State::Resolve(_) | State::AffectedRows(_) => {
panic!("`PgCursor` must not be used after being polled");
}
State::NextRow => {
// grab the next row
}
// The first time [next] is called we need to actually execute our
// contained query. We guard against this happening on _all_ next calls
// by using [Option::take] which replaces the potential value in the Option with `None
if let Some((query, arguments)) = cursor.query.take() {
conn.execute(query, arguments).await?;
}
loop {

View File

@ -2,9 +2,15 @@ use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use futures_core::future::BoxFuture;
use crate::cursor::Cursor;
use crate::describe::{Column, Describe};
use crate::executor::{Execute, Executor};
use crate::postgres::protocol::{self, Encode, StatementId, TypeFormat};
use crate::postgres::protocol::{
self, CommandComplete, Encode, Message, ParameterDescription, RowDescription, StatementId,
TypeFormat,
};
use crate::postgres::{PgArguments, PgConnection, PgCursor, PgRow, PgTypeInfo, Postgres};
impl PgConnection {
@ -53,25 +59,211 @@ impl PgConnection {
pub(crate) fn write_sync(&mut self) {
self.stream.write(protocol::Sync);
}
async fn wait_until_ready(&mut self) -> crate::Result<()> {
// depending on how the previous query finished we may need to continue
// pulling messages from the stream until we receive a [ReadyForQuery] message
// postgres sends the [ReadyForQuery] message when it's fully complete with processing
// the previous query
if !self.is_ready {
loop {
if let Message::ReadyForQuery = self.stream.read().await? {
// we are now ready to go
self.is_ready = true;
break;
}
}
}
Ok(())
}
// Write out the query to the connection stream, ensure that we are synchronized at the
// most recent [ReadyForQuery] and flush our buffer to postgres.
//
// It is safe to call this method repeatedly (but all data from postgres would be lost) but
// it is assumed that a call to [PgConnection::affected_rows] or [PgCursor::next] would
// immediately follow.
pub(crate) async fn execute(
&mut self,
query: &str,
arguments: Option<PgArguments>,
) -> crate::Result<()> {
if let Some(arguments) = arguments {
// Check the statement cache for a statement ID that matches the given query
// If it doesn't exist, we generate a new statement ID and write out [Parse] to the
// connection command buffer
let statement = self.write_prepare(query, &arguments);
// Next, [Bind] attaches the arguments to the statement and creates a named portal
self.write_bind("", statement, &arguments);
// Next, [Describe] will return the expected result columns and types
// Conditionally run [Describe] only if the results have not been cached
// if !self.statement_cache.has_columns(statement) {
// self.write_describe(protocol::Describe::Portal(""));
// }
// Next, [Execute] then executes the named portal
self.write_execute("", 0);
// Finally, [Sync] asks postgres to process the messages that we sent and respond with
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
// dozens of queries before a [Sync] and postgres can handle that. Execution on the server
// is still serial but it would reduce round-trips. Some kind of builder pattern that is
// termed batching might suit this.
self.write_sync();
} else {
// https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.4
self.write_simple_query(query);
}
self.wait_until_ready().await?;
self.stream.flush().await?;
self.is_ready = false;
Ok(())
}
async fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> crate::Result<Describe<Postgres>> {
let statement = self.write_prepare(query, &Default::default());
self.write_describe(protocol::Describe::Statement(statement));
self.write_sync();
self.stream.flush().await?;
self.wait_until_ready().await?;
let params = loop {
match self.stream.read().await? {
Message::ParseComplete => {
// ignore complete messsage
// continue
}
Message::ParameterDescription => {
break ParameterDescription::read(self.stream.buffer())?;
}
message => {
return Err(protocol_err!(
"expected ParameterDescription; received {:?}",
message
)
.into());
}
};
};
let result = match self.stream.read().await? {
Message::NoData => None,
Message::RowDescription => Some(RowDescription::read(self.stream.buffer())?),
message => {
return Err(protocol_err!(
"expected RowDescription or NoData; received {:?}",
message
)
.into());
}
};
Ok(Describe {
param_types: params
.ids
.iter()
.map(|id| PgTypeInfo::new(*id))
.collect::<Vec<_>>()
.into_boxed_slice(),
result_columns: result
.map(|r| r.fields)
.unwrap_or_default()
.into_vec()
.into_iter()
// TODO: Should [Column] just wrap [protocol::Field] ?
.map(|field| Column {
name: field.name,
table_id: field.table_id,
type_info: PgTypeInfo::new(field.type_id),
})
.collect::<Vec<_>>()
.into_boxed_slice(),
})
}
// Poll messages from Postgres, counting the rows affected, until we finish the query
// This must be called directly after a call to [PgConnection::execute]
async fn affected_rows(&mut self) -> crate::Result<u64> {
let mut rows = 0;
loop {
match self.stream.read().await? {
Message::ParseComplete | Message::BindComplete => {
// ignore x_complete messages
}
Message::DataRow => {
// ignore rows
// TODO: should we log a warning? this is almost definitely a programmer error
}
Message::CommandComplete => {
rows += CommandComplete::read(self.stream.buffer())?.affected_rows;
}
Message::ReadyForQuery => {
self.is_ready = true;
break;
}
message => {
return Err(
protocol_err!("affected_rows: unexpected message: {:?}", message).into(),
);
}
}
}
Ok(rows)
}
}
impl<'e> Executor<'e> for &'e mut super::PgConnection {
impl Executor for super::PgConnection {
type Database = Postgres;
fn fetch<'q, E>(self, query: E) -> PgCursor<'e, 'q>
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
Box::pin(async move {
let (query, arguments) = query.into_parts();
self.execute(query, arguments).await?;
self.affected_rows().await
})
}
fn fetch<'q, E>(&mut self, query: E) -> PgCursor<'_, 'q>
where
E: Execute<'q, Self::Database>,
{
PgCursor::from_connection(self, query)
}
#[doc(hidden)]
#[inline]
fn fetch_by_ref<'q, E>(&mut self, query: E) -> PgCursor<'_, 'q>
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, Self::Database>,
{
self.fetch(query)
Box::pin(async move { self.describe(query.into_parts().0).await })
}
}

View File

@ -17,7 +17,7 @@ use crate::arguments::Arguments;
use crate::cursor::Cursor;
use crate::database::{Database, HasCursor, HasRow};
use crate::encode::Encode;
use crate::executor::{Execute, Executor};
use crate::executor::{Execute, Executor, RefExecutor};
use crate::types::Type;
use crate::{Error, FromRow};
use futures_core::future::BoxFuture;
@ -114,18 +114,18 @@ where
DB: Database,
Self: Execute<'q, DB>,
{
pub async fn execute<'e, E>(self, executor: E) -> crate::Result<u64>
pub async fn execute<E>(self, mut executor: E) -> crate::Result<u64>
where
E: Executor<'e, Database = DB>,
E: Executor<Database = DB>,
{
executor.fetch(self).await
executor.execute(self).await
}
pub fn fetch<'e, E>(self, executor: E) -> <DB as HasCursor<'e, 'q>>::Cursor
where
E: Executor<'e, Database = DB>,
E: RefExecutor<'e, Database = DB>,
{
executor.fetch(self)
executor.fetch_by_ref(self)
}
}
@ -165,13 +165,13 @@ where
) -> impl Stream<Item = crate::Result<F::Mapped>> + 'e
where
'q: 'e,
E: Executor<'e, Database = DB> + 'e,
E: RefExecutor<'e, Database = DB> + 'e,
F: 'e,
F::Mapped: 'e,
A: 'e,
{
try_stream! {
let mut cursor = executor.fetch(self.query);
let mut cursor = executor.fetch_by_ref(self.query);
while let Some(next) = cursor.next().await? {
let mapped = self.mapper.map_row(next)?;
yield mapped;
@ -182,11 +182,11 @@ where
/// Get the first row in the result
pub async fn fetch_optional<'e, E>(mut self, executor: E) -> crate::Result<Option<F::Mapped>>
where
E: Executor<'e, Database = DB>,
E: RefExecutor<'e, Database = DB>,
'q: 'e,
{
// could be implemented in terms of `fetch()` but this avoids overhead from `try_stream!`
let mut cursor = executor.fetch(self.query);
let mut cursor = executor.fetch_by_ref(self.query);
let mut mapper = self.mapper;
let val = cursor.next().await?;
val.map(|row| mapper.map_row(row)).transpose()
@ -194,7 +194,7 @@ where
pub async fn fetch_one<'e, E>(self, executor: E) -> crate::Result<F::Mapped>
where
E: Executor<'e, Database = DB>,
E: RefExecutor<'e, Database = DB>,
'q: 'e,
{
self.fetch_optional(executor)
@ -207,10 +207,10 @@ where
pub async fn fetch_all<'e, E>(mut self, executor: E) -> crate::Result<Vec<F::Mapped>>
where
E: Executor<'e, Database = DB>,
E: RefExecutor<'e, Database = DB>,
'q: 'e,
{
let mut cursor = executor.fetch(self.query);
let mut cursor = executor.fetch_by_ref(self.query);
let mut out = vec![];
while let Some(row) = cursor.next().await? {

View File

@ -14,25 +14,22 @@ use crate::Database;
pub struct Transaction<T>
where
T: Connection,
T: Executor<'static>,
{
inner: Option<T>,
depth: u32,
}
impl<DB, T> Transaction<T>
impl<T> Transaction<T>
where
T: Connection<Database = DB>,
DB: Database,
T: Executor<'static, Database = DB>,
T: Connection,
{
pub(crate) async fn new(depth: u32, mut inner: T) -> crate::Result<Self> {
if depth == 0 {
inner.fetch_by_ref("BEGIN").await?;
inner.execute("BEGIN").await?;
} else {
let stmt = format!("SAVEPOINT _sqlx_savepoint_{}", depth);
inner.fetch_by_ref(&*stmt).await?;
inner.execute(&*stmt).await?;
}
Ok(Self {
@ -50,11 +47,11 @@ where
let depth = self.depth;
if depth == 1 {
inner.fetch_by_ref("COMMIT").await?;
inner.execute("COMMIT").await?;
} else {
let stmt = format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1);
inner.fetch_by_ref(&*stmt).await?;
inner.execute(&*stmt).await?;
}
Ok(inner)
@ -65,11 +62,11 @@ where
let depth = self.depth;
if depth == 1 {
inner.fetch_by_ref("ROLLBACK").await?;
inner.execute("ROLLBACK").await?;
} else {
let stmt = format!("ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}", depth - 1);
inner.fetch_by_ref(&*stmt).await?;
inner.execute(&*stmt).await?;
}
Ok(inner)
@ -81,7 +78,6 @@ const ERR_FINALIZED: &str = "(bug) transaction already finalized";
impl<T> Deref for Transaction<T>
where
T: Connection,
T: Executor<'static>,
{
type Target = T;
@ -93,78 +89,53 @@ where
impl<T> DerefMut for Transaction<T>
where
T: Connection,
T: Executor<'static>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut().expect(ERR_FINALIZED)
}
}
impl<T, DB> Connection for Transaction<T>
where
T: Connection<Database = DB>,
DB: Database,
T: Executor<'static, Database = DB>,
{
type Database = <T as Connection>::Database;
// Close is equivalent to ROLLBACK followed by CLOSE
fn close(self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(async move { self.rollback().await?.close().await })
}
#[inline]
fn ping(&mut self) -> BoxFuture<crate::Result<()>> {
Box::pin(self.deref_mut().ping())
}
#[doc(hidden)]
#[inline]
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> {
Box::pin(self.deref_mut().describe(query))
}
}
impl<'c, DB, T> Executor<'c> for &'c mut Transaction<T>
impl<'c, DB, T> Executor for &'c mut Transaction<T>
where
DB: Database,
T: Connection<Database = DB>,
T: Executor<'static, Database = DB>,
{
type Database = <T as Connection>::Database;
type Database = T::Database;
fn fetch<'q, E>(self, query: E) -> <<T as Connection>::Database as HasCursor<'c, 'q>>::Cursor
fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result<u64>>
where
E: Execute<'q, Self::Database>,
{
(**self).fetch_by_ref(query)
(**self).execute(query)
}
#[doc(hidden)]
fn fetch_by_ref<'q, 'e, E>(
fn fetch<'q, 'e, E>(&'e mut self, query: E) -> <Self::Database as HasCursor<'e, 'q>>::Cursor
where
E: Execute<'q, Self::Database>,
{
(**self).fetch(query)
}
fn describe<'e, 'q, E: 'e>(
&'e mut self,
query: E,
) -> <Self::Database as HasCursor<'e, 'q>>::Cursor
) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>>
where
E: Execute<'q, Self::Database>,
{
(**self).fetch_by_ref(query)
(**self).describe(query)
}
}
impl<T> Drop for Transaction<T>
where
T: Connection,
T: Executor<'static>,
{
fn drop(&mut self) {
if self.depth > 0 {
if let Some(mut inner) = self.inner.take() {
spawn(async move {
let res = inner.fetch_by_ref("ROLLBACK").await;
let res = inner.execute("ROLLBACK").await;
// If the rollback failed we need to close the inner connection
if res.is_err() {