diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 92bc13da..136734b6 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -17,20 +17,13 @@ use crate::url::Url; pub trait Connection where Self: Send + 'static, + Self: Executor, { - type Database: Database; - /// Close this database connection. fn close(self) -> BoxFuture<'static, crate::Result<()>>; /// Verifies a connection to the database is still alive. fn ping(&mut self) -> BoxFuture>; - - #[doc(hidden)] - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>>; } /// Represents a type that can directly establish a new connection. @@ -75,10 +68,9 @@ where } } -impl<'c, C, DB> ConnectionSource<'c, C> +impl<'c, C> ConnectionSource<'c, C> where - C: Connect, - DB: Database, + C: Connect, { pub(crate) async fn resolve_by_ref(&mut self) -> crate::Result> { if let ConnectionSource::Pool(pool) = self { diff --git a/sqlx-core/src/cursor.rs b/sqlx-core/src/cursor.rs index bb72c424..2311f080 100644 --- a/sqlx-core/src/cursor.rs +++ b/sqlx-core/src/cursor.rs @@ -19,8 +19,6 @@ use crate::{Connect, Pool, Row}; pub trait Cursor<'c, 'q> where Self: Send, - // `.await`-ing a cursor will return the affected rows from the query - Self: Future>, { type Database: Database; diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs index e8c274ee..3b9e2f3d 100644 --- a/sqlx-core/src/database.rs +++ b/sqlx-core/src/database.rs @@ -12,7 +12,7 @@ use crate::types::TypeInfo; /// database (e.g., MySQL, Postgres). pub trait Database where - Self: Sized + 'static, + Self: Sized + Send + 'static, Self: for<'a> HasRow<'a, Database = Self>, Self: for<'a> HasRawValue<'a>, Self: for<'c, 'q> HasCursor<'c, 'q, Database = Self>, diff --git a/sqlx-core/src/executor.rs b/sqlx-core/src/executor.rs index c724f967..0d91819f 100644 --- a/sqlx-core/src/executor.rs +++ b/sqlx-core/src/executor.rs @@ -14,27 +14,54 @@ use futures_util::TryStreamExt; /// Implementations are provided for [`&Pool`](struct.Pool.html), /// [`&mut PoolConnection`](struct.PoolConnection.html), /// and [`&mut Connection`](trait.Connection.html). -pub trait Executor<'c> +pub trait Executor where Self: Send, { /// The specific database that this type is implemented for. type Database: Database; - /// Executes a query that may or may not return a result set. - fn fetch<'q, E>(self, query: E) -> >::Cursor + /// Executes the query for its side-effects and + /// discarding any potential result rows. + /// + /// Returns the number of rows affected, or 0 if not applicable. + fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result> where E: Execute<'q, Self::Database>; - #[doc(hidden)] - fn fetch_by_ref<'b, E>(&mut self, query: E) -> >::Cursor + fn fetch<'e, 'q, E>(&'e mut self, query: E) -> >::Cursor where - E: Execute<'b, Self::Database>; + E: Execute<'q, Self::Database>; + + /// Prepare the SQL query and return type information about its parameters + /// and results. + /// + /// This is used by the query macros ( [`query!`] ) during compilation to + /// power their type inference. + fn describe<'e, 'q, E: 'e>( + &'e mut self, + query: E, + ) -> BoxFuture<'e, crate::Result>> + where + E: Execute<'q, Self::Database>; +} + +pub trait RefExecutor<'c> { + type Database: Database; + + /// Executes a query for its result. + /// + /// Returns a [`Cursor`] that can be used to iterate through the [`Row`]s + /// of the result. + fn fetch_by_ref<'q, E>(self, query: E) -> >::Cursor + where + E: Execute<'q, Self::Database>; } /// A type that may be executed against a database connection. pub trait Execute<'q, DB> where + Self: Send, DB: Database, { /// Returns the query to be executed and the arguments to bind against the query, if any. @@ -70,3 +97,34 @@ macro_rules! impl_execute_for_query { } }; } + +impl Executor for &'_ mut T +where + T: Executor, +{ + type Database = T::Database; + + fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result> + where + E: Execute<'q, Self::Database>, + { + (**self).execute(query) + } + + fn fetch<'e, 'q, E>(&'e mut self, query: E) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + (**self).fetch(query) + } + + fn describe<'e, 'q, E: 'e>( + &'e mut self, + query: E, + ) -> BoxFuture<'e, crate::Result>> + where + E: Execute<'q, Self::Database>, + { + (**self).describe(query) + } +} diff --git a/sqlx-core/src/pool/conn.rs b/sqlx-core/src/pool/conn.rs index fb7235b2..11697ee8 100644 --- a/sqlx-core/src/pool/conn.rs +++ b/sqlx-core/src/pool/conn.rs @@ -60,8 +60,6 @@ impl Connection for PoolConnection where C: Connect, { - type Database = C::Database; - /// Detach the connection from the pool and close it nicely. fn close(mut self) -> BoxFuture<'static, crate::Result<()>> { Box::pin(async move { @@ -74,15 +72,6 @@ where fn ping(&mut self) -> BoxFuture> { Box::pin(self.deref_mut().ping()) } - - #[doc(hidden)] - #[inline] - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(self.deref_mut().describe(query)) - } } /// Returns the connection to the [`Pool`][crate::Pool] it was checked-out from. diff --git a/sqlx-core/src/pool/executor.rs b/sqlx-core/src/pool/executor.rs index d3049809..89ded4da 100644 --- a/sqlx-core/src/pool/executor.rs +++ b/sqlx-core/src/pool/executor.rs @@ -6,7 +6,7 @@ use futures_util::StreamExt; use crate::{ connection::{Connect, Connection}, describe::Describe, - executor::Executor, + executor::{Executor, RefExecutor}, pool::Pool, Cursor, Database, }; @@ -15,85 +15,100 @@ use super::PoolConnection; use crate::database::HasCursor; use crate::executor::Execute; -impl<'p, C, DB> Executor<'p> for &'p Pool +impl<'p, C, DB> Executor for &'p Pool where C: Connect, DB: Database, DB: for<'c, 'q> HasCursor<'c, 'q, Database = DB>, - for<'con> &'con mut C: Executor<'con>, { type Database = DB; - fn fetch<'q, E>(self, query: E) -> >::Cursor + fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result> + where + E: Execute<'q, Self::Database>, + { + Box::pin(async move { self.acquire().await?.execute(query).await }) + } + + fn fetch<'e, 'q, E>(&'e mut self, query: E) -> >::Cursor where E: Execute<'q, DB>, { DB::Cursor::from_pool(self, query) } - #[doc(hidden)] - #[inline] - fn fetch_by_ref<'q, 'e, E>( + fn describe<'e, 'q, E: 'e>( &'e mut self, query: E, - ) -> >::Cursor + ) -> BoxFuture<'e, crate::Result>> where - E: Execute<'q, DB>, + E: Execute<'q, Self::Database>, { - self.fetch(query) + Box::pin(async move { self.acquire().await?.describe(query).await }) } } -impl<'c, C, DB> Executor<'c> for &'c mut PoolConnection +impl<'p, C, DB> RefExecutor<'p> for &'p Pool +where + C: Connect, + DB: Database, + DB: for<'c, 'q> HasCursor<'c, 'q>, + for<'c> &'c mut C: RefExecutor<'c>, +{ + type Database = DB; + + fn fetch_by_ref<'q, E>(self, query: E) -> >::Cursor + where + E: Execute<'q, DB>, + { + DB::Cursor::from_pool(self, query) + } +} + +impl Executor for PoolConnection +where + C: Connect, +{ + type Database = C::Database; + + fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result> + where + E: Execute<'q, Self::Database>, + { + (**self).execute(query) + } + + fn fetch<'e, 'q, E>(&'e mut self, query: E) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + (**self).fetch(query) + } + + fn describe<'e, 'q, E: 'e>( + &'e mut self, + query: E, + ) -> BoxFuture<'e, crate::Result>> + where + E: Execute<'q, Self::Database>, + { + (**self).describe(query) + } +} + +impl<'c, C, DB> RefExecutor<'c> for &'c mut PoolConnection where C: Connect, DB: Database, DB: for<'c2, 'q> HasCursor<'c2, 'q, Database = DB>, - for<'con> &'con mut C: Executor<'con>, -{ - type Database = C::Database; - - fn fetch<'q, E>(self, query: E) -> >::Cursor - where - E: Execute<'q, Self::Database>, - { - DB::Cursor::from_connection(&mut **self, query) - } - - #[doc(hidden)] - #[inline] - fn fetch_by_ref<'q, 'e, E>( - &'e mut self, - query: E, - ) -> >::Cursor - where - E: Execute<'q, Self::Database>, - { - self.fetch(query) - } -} - -impl Executor<'static> for PoolConnection -where - C: Connect, - DB: Database, - DB: for<'c, 'q> HasCursor<'c, 'q, Database = DB>, + &'c mut C: RefExecutor<'c, Database = DB>, { type Database = DB; - fn fetch<'q, E>(self, query: E) -> >::Cursor + fn fetch_by_ref<'q, E>(self, query: E) -> >::Cursor where E: Execute<'q, Self::Database>, { - DB::Cursor::from_connection(self, query) - } - - #[doc(hidden)] - #[inline] - fn fetch_by_ref<'q, 'e, E>(&'e mut self, query: E) -> >::Cursor - where - E: Execute<'q, Self::Database>, - { - DB::Cursor::from_connection(&mut **self, query) + (**self).fetch(query) } } diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 9c2167d6..a0e39e3b 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -25,10 +25,9 @@ use crate::Database; /// A pool of database connections. pub struct Pool(Arc>); -impl Pool +impl Pool where - C: Connect, - DB: Database, + C: Connect, { /// Creates a connection pool with the default configuration. /// diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index b0f58493..b62cf922 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -10,10 +10,9 @@ pub struct Builder { options: Options, } -impl Builder +impl Builder where - C: Connect, - DB: Database, + C: Connect, { /// Get a new builder with default options. /// diff --git a/sqlx-core/src/postgres/connection.rs b/sqlx-core/src/postgres/connection.rs index a75630c9..2ae31d2c 100644 --- a/sqlx-core/src/postgres/connection.rs +++ b/sqlx-core/src/postgres/connection.rs @@ -240,95 +240,6 @@ impl PgConnection { is_ready: true, }) } - - pub(super) async fn wait_until_ready(&mut self) -> crate::Result<()> { - // depending on how the previous query finished we may need to continue - // pulling messages from the stream until we receive a [ReadyForQuery] message - - // postgres sends the [ReadyForQuery] message when it's fully complete with processing - // the previous query - - if !self.is_ready { - loop { - if let Message::ReadyForQuery = self.stream.read().await? { - // we are now ready to go - self.is_ready = true; - break; - } - } - } - - Ok(()) - } - - async fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> crate::Result> { - let statement = self.write_prepare(query, &Default::default()); - - self.write_describe(protocol::Describe::Statement(statement)); - self.write_sync(); - - self.stream.flush().await?; - self.wait_until_ready().await?; - - let params = loop { - match self.stream.read().await? { - Message::ParseComplete => { - // ignore complete messsage - // continue - } - - Message::ParameterDescription => { - break ParameterDescription::read(self.stream.buffer())?; - } - - message => { - return Err(protocol_err!( - "expected ParameterDescription; received {:?}", - message - ) - .into()); - } - }; - }; - - let result = match self.stream.read().await? { - Message::NoData => None, - Message::RowDescription => Some(RowDescription::read(self.stream.buffer())?), - - message => { - return Err(protocol_err!( - "expected RowDescription or NoData; received {:?}", - message - ) - .into()); - } - }; - - Ok(Describe { - param_types: params - .ids - .iter() - .map(|id| PgTypeInfo::new(*id)) - .collect::>() - .into_boxed_slice(), - result_columns: result - .map(|r| r.fields) - .unwrap_or_default() - .into_vec() - .into_iter() - // TODO: Should [Column] just wrap [protocol::Field] ? - .map(|field| Column { - name: field.name, - table_id: field.table_id, - type_info: PgTypeInfo::new(field.type_id), - }) - .collect::>() - .into_boxed_slice(), - }) - } } impl Connect for PgConnection { @@ -342,21 +253,11 @@ impl Connect for PgConnection { } impl Connection for PgConnection { - type Database = Postgres; - fn close(self) -> BoxFuture<'static, crate::Result<()>> { Box::pin(terminate(self.stream)) } fn ping(&mut self) -> BoxFuture> { - Box::pin(self.fetch("SELECT 1").map_ok(|_| ())) - } - - #[doc(hidden)] - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(self.describe(query)) + Box::pin(Executor::execute(self, "SELECT 1").map_ok(|_| ())) } } diff --git a/sqlx-core/src/postgres/cursor.rs b/sqlx-core/src/postgres/cursor.rs index ff5c878b..21212924 100644 --- a/sqlx-core/src/postgres/cursor.rs +++ b/sqlx-core/src/postgres/cursor.rs @@ -18,18 +18,9 @@ use crate::postgres::{PgArguments, PgConnection, PgRow}; use crate::{Database, Postgres}; use futures_core::Stream; -enum State<'c, 'q> { - Query(&'q str, Option), - NextRow, - - // Used for `impl Future` - Resolve(BoxFuture<'c, crate::Result>>), - AffectedRows(BoxFuture<'c, crate::Result>), -} - pub struct PgCursor<'c, 'q> { source: ConnectionSource<'c, PgConnection>, - state: State<'c, 'q>, + query: Option<(&'q str, Option)>, } impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> { @@ -41,12 +32,9 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> { Self: Sized, E: Execute<'q, Postgres>, { - let (query, arguments) = query.into_parts(); - Self { - // note: pool is internally reference counted source: ConnectionSource::Pool(pool.clone()), - state: State::Query(query, arguments), + query: Some(query.into_parts()), } } @@ -57,12 +45,9 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> { C: Into>, E: Execute<'q, Postgres>, { - let (query, arguments) = query.into_parts(); - Self { - // note: pool is internally reference counted source: ConnectionSource::Connection(conn.into()), - state: State::Query(query, arguments), + query: Some(query.into_parts()), } } @@ -71,164 +56,16 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> { } } -impl<'s, 'q> Future for PgCursor<'s, 'q> { - type Output = crate::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match &mut self.state { - State::Query(q, arguments) => { - // todo: existential types can remove both the boxed futures - // and this allocation - let query = q.to_owned(); - let arguments = mem::take(arguments); - - self.state = State::Resolve(Box::pin(resolve( - mem::take(&mut self.source), - query, - arguments, - ))); - } - - State::Resolve(fut) => { - match fut.as_mut().poll(cx) { - Poll::Pending => { - return Poll::Pending; - } - - Poll::Ready(conn) => { - let conn = conn?; - - self.state = State::AffectedRows(Box::pin(affected_rows(conn))); - - // continue - } - } - } - - State::NextRow => { - panic!("PgCursor must not be polled after being used"); - } - - State::AffectedRows(fut) => { - return fut.as_mut().poll(cx); - } - } - } - } -} - -// write out query to the connection stream -async fn write( - conn: &mut PgConnection, - query: &str, - arguments: Option, -) -> crate::Result<()> { - if let Some(arguments) = arguments { - // Check the statement cache for a statement ID that matches the given query - // If it doesn't exist, we generate a new statement ID and write out [Parse] to the - // connection command buffer - let statement = conn.write_prepare(query, &arguments); - - // Next, [Bind] attaches the arguments to the statement and creates a named portal - conn.write_bind("", statement, &arguments); - - // Next, [Describe] will return the expected result columns and types - // Conditionally run [Describe] only if the results have not been cached - // if !self.statement_cache.has_columns(statement) { - // self.write_describe(protocol::Describe::Portal("")); - // } - - // Next, [Execute] then executes the named portal - conn.write_execute("", 0); - - // Finally, [Sync] asks postgres to process the messages that we sent and respond with - // a [ReadyForQuery] message when it's completely done. Theoretically, we could send - // dozens of queries before a [Sync] and postgres can handle that. Execution on the server - // is still serial but it would reduce round-trips. Some kind of builder pattern that is - // termed batching might suit this. - conn.write_sync(); - } else { - // https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.4 - conn.write_simple_query(query); - } - - conn.wait_until_ready().await?; - - conn.stream.flush().await?; - conn.is_ready = false; - - Ok(()) -} - -async fn resolve( - mut source: ConnectionSource<'_, PgConnection>, - query: String, - arguments: Option, -) -> crate::Result> { - let mut conn = source.resolve_by_ref().await?; - - write(&mut *conn, &query, arguments).await?; - - Ok(source.into_connection()) -} - -async fn affected_rows(mut conn: MaybeOwnedConnection<'_, PgConnection>) -> crate::Result { - let mut rows = 0; - - loop { - match conn.stream.read().await? { - Message::ParseComplete | Message::BindComplete => { - // ignore x_complete messages - } - - Message::DataRow => { - // ignore rows - // TODO: should we log or something? - } - - Message::CommandComplete => { - rows += CommandComplete::read(conn.stream.buffer())?.affected_rows; - } - - Message::ReadyForQuery => { - // done - conn.is_ready = true; - break; - } - - message => { - return Err( - protocol_err!("affected_rows: unexpected message: {:?}", message).into(), - ); - } - } - } - - Ok(rows) -} - async fn next<'a, 'c: 'a, 'q: 'a>( cursor: &'a mut PgCursor<'c, 'q>, ) -> crate::Result>> { let mut conn = cursor.source.resolve_by_ref().await?; - match cursor.state { - State::Query(q, ref mut arguments) => { - // write out the query to the connection - write(&mut *conn, q, arguments.take()).await?; - - // next time we come through here, skip this block - cursor.state = State::NextRow; - } - - State::Resolve(_) | State::AffectedRows(_) => { - panic!("`PgCursor` must not be used after being polled"); - } - - State::NextRow => { - // grab the next row - } + // The first time [next] is called we need to actually execute our + // contained query. We guard against this happening on _all_ next calls + // by using [Option::take] which replaces the potential value in the Option with `None + if let Some((query, arguments)) = cursor.query.take() { + conn.execute(query, arguments).await?; } loop { diff --git a/sqlx-core/src/postgres/executor.rs b/sqlx-core/src/postgres/executor.rs index a7fc8ace..b660b84c 100644 --- a/sqlx-core/src/postgres/executor.rs +++ b/sqlx-core/src/postgres/executor.rs @@ -2,9 +2,15 @@ use std::collections::HashMap; use std::io; use std::sync::Arc; +use futures_core::future::BoxFuture; + use crate::cursor::Cursor; +use crate::describe::{Column, Describe}; use crate::executor::{Execute, Executor}; -use crate::postgres::protocol::{self, Encode, StatementId, TypeFormat}; +use crate::postgres::protocol::{ + self, CommandComplete, Encode, Message, ParameterDescription, RowDescription, StatementId, + TypeFormat, +}; use crate::postgres::{PgArguments, PgConnection, PgCursor, PgRow, PgTypeInfo, Postgres}; impl PgConnection { @@ -53,25 +59,211 @@ impl PgConnection { pub(crate) fn write_sync(&mut self) { self.stream.write(protocol::Sync); } + + async fn wait_until_ready(&mut self) -> crate::Result<()> { + // depending on how the previous query finished we may need to continue + // pulling messages from the stream until we receive a [ReadyForQuery] message + + // postgres sends the [ReadyForQuery] message when it's fully complete with processing + // the previous query + + if !self.is_ready { + loop { + if let Message::ReadyForQuery = self.stream.read().await? { + // we are now ready to go + self.is_ready = true; + break; + } + } + } + + Ok(()) + } + + // Write out the query to the connection stream, ensure that we are synchronized at the + // most recent [ReadyForQuery] and flush our buffer to postgres. + // + // It is safe to call this method repeatedly (but all data from postgres would be lost) but + // it is assumed that a call to [PgConnection::affected_rows] or [PgCursor::next] would + // immediately follow. + pub(crate) async fn execute( + &mut self, + query: &str, + arguments: Option, + ) -> crate::Result<()> { + if let Some(arguments) = arguments { + // Check the statement cache for a statement ID that matches the given query + // If it doesn't exist, we generate a new statement ID and write out [Parse] to the + // connection command buffer + let statement = self.write_prepare(query, &arguments); + + // Next, [Bind] attaches the arguments to the statement and creates a named portal + self.write_bind("", statement, &arguments); + + // Next, [Describe] will return the expected result columns and types + // Conditionally run [Describe] only if the results have not been cached + // if !self.statement_cache.has_columns(statement) { + // self.write_describe(protocol::Describe::Portal("")); + // } + + // Next, [Execute] then executes the named portal + self.write_execute("", 0); + + // Finally, [Sync] asks postgres to process the messages that we sent and respond with + // a [ReadyForQuery] message when it's completely done. Theoretically, we could send + // dozens of queries before a [Sync] and postgres can handle that. Execution on the server + // is still serial but it would reduce round-trips. Some kind of builder pattern that is + // termed batching might suit this. + self.write_sync(); + } else { + // https://www.postgresql.org/docs/12/protocol-flow.html#id-1.10.5.7.4 + self.write_simple_query(query); + } + + self.wait_until_ready().await?; + + self.stream.flush().await?; + self.is_ready = false; + + Ok(()) + } + + async fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> crate::Result> { + let statement = self.write_prepare(query, &Default::default()); + + self.write_describe(protocol::Describe::Statement(statement)); + self.write_sync(); + + self.stream.flush().await?; + self.wait_until_ready().await?; + + let params = loop { + match self.stream.read().await? { + Message::ParseComplete => { + // ignore complete messsage + // continue + } + + Message::ParameterDescription => { + break ParameterDescription::read(self.stream.buffer())?; + } + + message => { + return Err(protocol_err!( + "expected ParameterDescription; received {:?}", + message + ) + .into()); + } + }; + }; + + let result = match self.stream.read().await? { + Message::NoData => None, + Message::RowDescription => Some(RowDescription::read(self.stream.buffer())?), + + message => { + return Err(protocol_err!( + "expected RowDescription or NoData; received {:?}", + message + ) + .into()); + } + }; + + Ok(Describe { + param_types: params + .ids + .iter() + .map(|id| PgTypeInfo::new(*id)) + .collect::>() + .into_boxed_slice(), + result_columns: result + .map(|r| r.fields) + .unwrap_or_default() + .into_vec() + .into_iter() + // TODO: Should [Column] just wrap [protocol::Field] ? + .map(|field| Column { + name: field.name, + table_id: field.table_id, + type_info: PgTypeInfo::new(field.type_id), + }) + .collect::>() + .into_boxed_slice(), + }) + } + + // Poll messages from Postgres, counting the rows affected, until we finish the query + // This must be called directly after a call to [PgConnection::execute] + async fn affected_rows(&mut self) -> crate::Result { + let mut rows = 0; + + loop { + match self.stream.read().await? { + Message::ParseComplete | Message::BindComplete => { + // ignore x_complete messages + } + + Message::DataRow => { + // ignore rows + // TODO: should we log a warning? this is almost definitely a programmer error + } + + Message::CommandComplete => { + rows += CommandComplete::read(self.stream.buffer())?.affected_rows; + } + + Message::ReadyForQuery => { + self.is_ready = true; + break; + } + + message => { + return Err( + protocol_err!("affected_rows: unexpected message: {:?}", message).into(), + ); + } + } + } + + Ok(rows) + } } -impl<'e> Executor<'e> for &'e mut super::PgConnection { +impl Executor for super::PgConnection { type Database = Postgres; - fn fetch<'q, E>(self, query: E) -> PgCursor<'e, 'q> + fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result> + where + E: Execute<'q, Self::Database>, + { + Box::pin(async move { + let (query, arguments) = query.into_parts(); + + self.execute(query, arguments).await?; + self.affected_rows().await + }) + } + + fn fetch<'q, E>(&mut self, query: E) -> PgCursor<'_, 'q> where E: Execute<'q, Self::Database>, { PgCursor::from_connection(self, query) } - #[doc(hidden)] - #[inline] - fn fetch_by_ref<'q, E>(&mut self, query: E) -> PgCursor<'_, 'q> + fn describe<'e, 'q, E: 'e>( + &'e mut self, + query: E, + ) -> BoxFuture<'e, crate::Result>> where E: Execute<'q, Self::Database>, { - self.fetch(query) + Box::pin(async move { self.describe(query.into_parts().0).await }) } } diff --git a/sqlx-core/src/query.rs b/sqlx-core/src/query.rs index ee5d0d0b..60e5f3c4 100644 --- a/sqlx-core/src/query.rs +++ b/sqlx-core/src/query.rs @@ -17,7 +17,7 @@ use crate::arguments::Arguments; use crate::cursor::Cursor; use crate::database::{Database, HasCursor, HasRow}; use crate::encode::Encode; -use crate::executor::{Execute, Executor}; +use crate::executor::{Execute, Executor, RefExecutor}; use crate::types::Type; use crate::{Error, FromRow}; use futures_core::future::BoxFuture; @@ -114,18 +114,18 @@ where DB: Database, Self: Execute<'q, DB>, { - pub async fn execute<'e, E>(self, executor: E) -> crate::Result + pub async fn execute(self, mut executor: E) -> crate::Result where - E: Executor<'e, Database = DB>, + E: Executor, { - executor.fetch(self).await + executor.execute(self).await } pub fn fetch<'e, E>(self, executor: E) -> >::Cursor where - E: Executor<'e, Database = DB>, + E: RefExecutor<'e, Database = DB>, { - executor.fetch(self) + executor.fetch_by_ref(self) } } @@ -165,13 +165,13 @@ where ) -> impl Stream> + 'e where 'q: 'e, - E: Executor<'e, Database = DB> + 'e, + E: RefExecutor<'e, Database = DB> + 'e, F: 'e, F::Mapped: 'e, A: 'e, { try_stream! { - let mut cursor = executor.fetch(self.query); + let mut cursor = executor.fetch_by_ref(self.query); while let Some(next) = cursor.next().await? { let mapped = self.mapper.map_row(next)?; yield mapped; @@ -182,11 +182,11 @@ where /// Get the first row in the result pub async fn fetch_optional<'e, E>(mut self, executor: E) -> crate::Result> where - E: Executor<'e, Database = DB>, + E: RefExecutor<'e, Database = DB>, 'q: 'e, { // could be implemented in terms of `fetch()` but this avoids overhead from `try_stream!` - let mut cursor = executor.fetch(self.query); + let mut cursor = executor.fetch_by_ref(self.query); let mut mapper = self.mapper; let val = cursor.next().await?; val.map(|row| mapper.map_row(row)).transpose() @@ -194,7 +194,7 @@ where pub async fn fetch_one<'e, E>(self, executor: E) -> crate::Result where - E: Executor<'e, Database = DB>, + E: RefExecutor<'e, Database = DB>, 'q: 'e, { self.fetch_optional(executor) @@ -207,10 +207,10 @@ where pub async fn fetch_all<'e, E>(mut self, executor: E) -> crate::Result> where - E: Executor<'e, Database = DB>, + E: RefExecutor<'e, Database = DB>, 'q: 'e, { - let mut cursor = executor.fetch(self.query); + let mut cursor = executor.fetch_by_ref(self.query); let mut out = vec![]; while let Some(row) = cursor.next().await? { diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index dc0ff24c..8e7495ed 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -14,25 +14,22 @@ use crate::Database; pub struct Transaction where T: Connection, - T: Executor<'static>, { inner: Option, depth: u32, } -impl Transaction +impl Transaction where - T: Connection, - DB: Database, - T: Executor<'static, Database = DB>, + T: Connection, { pub(crate) async fn new(depth: u32, mut inner: T) -> crate::Result { if depth == 0 { - inner.fetch_by_ref("BEGIN").await?; + inner.execute("BEGIN").await?; } else { let stmt = format!("SAVEPOINT _sqlx_savepoint_{}", depth); - inner.fetch_by_ref(&*stmt).await?; + inner.execute(&*stmt).await?; } Ok(Self { @@ -50,11 +47,11 @@ where let depth = self.depth; if depth == 1 { - inner.fetch_by_ref("COMMIT").await?; + inner.execute("COMMIT").await?; } else { let stmt = format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1); - inner.fetch_by_ref(&*stmt).await?; + inner.execute(&*stmt).await?; } Ok(inner) @@ -65,11 +62,11 @@ where let depth = self.depth; if depth == 1 { - inner.fetch_by_ref("ROLLBACK").await?; + inner.execute("ROLLBACK").await?; } else { let stmt = format!("ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}", depth - 1); - inner.fetch_by_ref(&*stmt).await?; + inner.execute(&*stmt).await?; } Ok(inner) @@ -81,7 +78,6 @@ const ERR_FINALIZED: &str = "(bug) transaction already finalized"; impl Deref for Transaction where T: Connection, - T: Executor<'static>, { type Target = T; @@ -93,78 +89,53 @@ where impl DerefMut for Transaction where T: Connection, - T: Executor<'static>, { fn deref_mut(&mut self) -> &mut Self::Target { self.inner.as_mut().expect(ERR_FINALIZED) } } -impl Connection for Transaction -where - T: Connection, - DB: Database, - T: Executor<'static, Database = DB>, -{ - type Database = ::Database; - - // Close is equivalent to ROLLBACK followed by CLOSE - fn close(self) -> BoxFuture<'static, crate::Result<()>> { - Box::pin(async move { self.rollback().await?.close().await }) - } - - #[inline] - fn ping(&mut self) -> BoxFuture> { - Box::pin(self.deref_mut().ping()) - } - - #[doc(hidden)] - #[inline] - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(self.deref_mut().describe(query)) - } -} - -impl<'c, DB, T> Executor<'c> for &'c mut Transaction +impl<'c, DB, T> Executor for &'c mut Transaction where DB: Database, T: Connection, - T: Executor<'static, Database = DB>, { - type Database = ::Database; + type Database = T::Database; - fn fetch<'q, E>(self, query: E) -> <::Database as HasCursor<'c, 'q>>::Cursor + fn execute<'e, 'q, E: 'e>(&'e mut self, query: E) -> BoxFuture<'e, crate::Result> where E: Execute<'q, Self::Database>, { - (**self).fetch_by_ref(query) + (**self).execute(query) } - #[doc(hidden)] - fn fetch_by_ref<'q, 'e, E>( + fn fetch<'q, 'e, E>(&'e mut self, query: E) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + (**self).fetch(query) + } + + fn describe<'e, 'q, E: 'e>( &'e mut self, query: E, - ) -> >::Cursor + ) -> BoxFuture<'e, crate::Result>> where E: Execute<'q, Self::Database>, { - (**self).fetch_by_ref(query) + (**self).describe(query) } } impl Drop for Transaction where T: Connection, - T: Executor<'static>, { fn drop(&mut self) { if self.depth > 0 { if let Some(mut inner) = self.inner.take() { spawn(async move { - let res = inner.fetch_by_ref("ROLLBACK").await; + let res = inner.execute("ROLLBACK").await; // If the rollback failed we need to close the inner connection if res.is_err() {