diff --git a/benches/postgres-protocol.rs b/benches/postgres-protocol.rs index 2b1f0e6c..552372d7 100644 --- a/benches/postgres-protocol.rs +++ b/benches/postgres-protocol.rs @@ -1,6 +1,5 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use sqlx::postgres::protocol::{Bind, DataRow, RowDescription}; -use sqlx::postgres::protocol::{Decode, Encode}; +use sqlx::postgres::protocol::{Bind, DataRow, Decode, Encode, RowDescription}; fn bench(c: &mut Criterion) { c.bench_function("decode_data_row", |b| { diff --git a/examples/realworld/src/main.rs b/examples/realworld/src/main.rs index 9d0bdac8..22a67ebc 100644 --- a/examples/realworld/src/main.rs +++ b/examples/realworld/src/main.rs @@ -1,9 +1,6 @@ use sqlx::{FromRow, Pool, Postgres}; use std::env; -use tide::http::StatusCode; -use tide::Request; -use tide::Response; -use tide::ResultExt; +use tide::{http::StatusCode, Request, Response, ResultExt}; #[async_std::main] async fn main() -> anyhow::Result<()> { diff --git a/rust-toolchain b/rust-toolchain deleted file mode 100644 index 2bf5ad04..00000000 --- a/rust-toolchain +++ /dev/null @@ -1 +0,0 @@ -stable diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 00000000..4afdfbd8 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +unstable_features = true +merge_imports = true diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 9f695b6d..fd82dea9 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -18,7 +18,6 @@ mariadb = [] [dependencies] async-std = { version = "1.1.0", features = ["attributes", "unstable"] } async-stream = "0.2.0" -async-trait = "0.1.18" bitflags = "1.2.1" byteorder = { version = "1.3.2", default-features = false } bytes = "0.4.12" diff --git a/sqlx-core/src/backend.rs b/sqlx-core/src/backend.rs index 62e15947..d656f122 100644 --- a/sqlx-core/src/backend.rs +++ b/sqlx-core/src/backend.rs @@ -1,7 +1,8 @@ -use crate::describe::Describe; -use crate::{params::QueryParameters, row::RawRow, types::HasTypeMetadata}; -use async_trait::async_trait; -use futures_core::stream::BoxStream; +use crate::{ + describe::Describe, executor::Executor, params::QueryParameters, row::Row, + types::HasTypeMetadata, +}; +use futures_core::future::BoxFuture; /// A database backend. /// @@ -9,22 +10,22 @@ use futures_core::stream::BoxStream; /// important related traits as associated types. /// /// This trait is not intended to be used directly. -/// Instead [sqlx::Connection] or [sqlx::Pool] should be used instead, -/// which provide concurrent access and typed retrieval of results. -#[async_trait] -pub trait Backend: HasTypeMetadata + Send + Sync + Sized { +/// Instead [sqlx::Connection] or [sqlx::Pool] should be used instead. +pub trait Backend: + Executor + HasTypeMetadata + Send + Sync + Sized + 'static +{ /// The concrete `QueryParameters` implementation for this backend. type QueryParameters: QueryParameters; /// The concrete `Row` implementation for this backend. - type Row: RawRow; + type Row: Row; /// The identifier for tables; in Postgres this is an `oid` while /// in MariaDB/MySQL this is the qualified name of the table. type TableIdent; /// Establish a new connection to the database server. - async fn open(url: &str) -> crate::Result + fn open(url: &str) -> BoxFuture<'static, crate::Result> where Self: Sized; @@ -32,30 +33,5 @@ pub trait Backend: HasTypeMetadata + Send + Sync + Sized { /// /// This method is not required to be called. A database server will /// eventually notice and clean up not fully closed connections. - async fn close(mut self) -> crate::Result<()>; - - async fn ping(&mut self) -> crate::Result<()> { - // TODO: Does this need to be specialized for any database backends? - let _ = self - .execute("SELECT 1", Self::QueryParameters::new()) - .await?; - - Ok(()) - } - - async fn describe(&mut self, query: &str) -> crate::Result>; - - async fn execute(&mut self, query: &str, params: Self::QueryParameters) -> crate::Result; - - fn fetch( - &mut self, - query: &str, - params: Self::QueryParameters, - ) -> BoxStream<'_, crate::Result>; - - async fn fetch_optional( - &mut self, - query: &str, - params: Self::QueryParameters, - ) -> crate::Result>; + fn close(self) -> BoxFuture<'static, crate::Result<()>>; } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 76afb10c..1caab08f 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -5,12 +5,15 @@ use crate::{ executor::Executor, params::IntoQueryParameters, pool::{Live, SharedPool}, - row::FromRow, - row::Row, + row::{FromRow, Row}, }; use futures_core::{future::BoxFuture, stream::BoxStream}; use futures_util::stream::StreamExt; -use std::{sync::Arc, time::Instant}; +use std::{ + ops::{Deref, DerefMut}, + sync::Arc, + time::Instant, +}; pub struct Connection where @@ -29,21 +32,7 @@ where } pub async fn open(url: &str) -> crate::Result { - let raw = DB::open(url).await?; - Ok(Self::new(Live::unpooled(raw), None)) - } - - /// Verifies a connection to the database is still alive. - pub async fn ping(&mut self) -> crate::Result<()> { - self.live.ping().await - } - - /// Analyze the SQL statement and report the inferred bind parameter types and returned - /// columns. - /// - /// Mainly intended for use by sqlx-macros. - pub async fn describe(&mut self, statement: &str) -> crate::Result> { - self.live.describe(statement).await + Ok(Self::new(Live::unpooled(DB::open(url).await?), None)) } } @@ -53,51 +42,45 @@ where { type Backend = DB; - fn execute<'c, 'q: 'c, I: 'c>( - &'c mut self, + fn execute<'e, 'q: 'e, I: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result> + ) -> BoxFuture<'e, crate::Result> where I: IntoQueryParameters + Send, { - Box::pin(async move { self.live.execute(query, params.into_params()).await }) + self.live.execute(query, params) } - fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxStream<'c, Result> + ) -> BoxStream<'e, crate::Result> where I: IntoQueryParameters + Send, T: FromRow + Send + Unpin, { - Box::pin(async_stream::try_stream! { - let mut s = self.live.fetch(query, params.into_params()); - - while let Some(row) = s.next().await.transpose()? { - yield T::from_row(Row(row)); - } - }) + self.live.fetch(query, params) } - fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result, Error>> + ) -> BoxFuture<'e, crate::Result>> where I: IntoQueryParameters + Send, - T: FromRow, + T: FromRow + Send, { - Box::pin(async move { - let row = self - .live - .fetch_optional(query, params.into_params()) - .await?; + self.live.fetch_optional(query, params) + } - Ok(row.map(Row).map(T::from_row)) - }) + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>> { + self.live.describe(query) } } diff --git a/sqlx-core/src/executor.rs b/sqlx-core/src/executor.rs index b6d940c0..82a486f4 100644 --- a/sqlx-core/src/executor.rs +++ b/sqlx-core/src/executor.rs @@ -1,32 +1,49 @@ -use crate::{backend::Backend, error::Error, params::IntoQueryParameters, row::FromRow}; +use crate::{ + backend::Backend, + describe::Describe, + error::Error, + params::{IntoQueryParameters, QueryParameters}, + row::FromRow, +}; use futures_core::{future::BoxFuture, stream::BoxStream}; -use futures_util::TryStreamExt; +use futures_util::{TryFutureExt, TryStreamExt}; pub trait Executor: Send { type Backend: Backend; - fn execute<'c, 'q: 'c, I: 'c>( - &'c mut self, + /// Verifies a connection to the database is still alive. + fn ping<'e>(&'e mut self) -> BoxFuture<'e, crate::Result<()>> { + Box::pin( + self.execute( + "SELECT 1", + ::QueryParameters::new(), + ) + .map_ok(|_| ()), + ) + } + + fn execute<'e, 'q: 'e, I: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result> + ) -> BoxFuture<'e, crate::Result> where I: IntoQueryParameters + Send; - fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxStream<'c, Result> + ) -> BoxStream<'e, crate::Result> where I: IntoQueryParameters + Send, T: FromRow + Send + Unpin; - fn fetch_all<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch_all<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result, Error>> + ) -> BoxFuture<'e, crate::Result>> where I: IntoQueryParameters + Send, T: FromRow + Send + Unpin, @@ -34,20 +51,20 @@ pub trait Executor: Send { Box::pin(self.fetch(query, params).try_collect()) } - fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result, Error>> + ) -> BoxFuture<'e, crate::Result>> where I: IntoQueryParameters + Send, T: FromRow + Send; - fn fetch_one<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch_one<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result> + ) -> BoxFuture<'e, crate::Result> where I: IntoQueryParameters + Send, T: FromRow + Send, @@ -55,4 +72,11 @@ pub trait Executor: Send { let fut = self.fetch_optional(query, params); Box::pin(async move { fut.await?.ok_or(Error::NotFound) }) } + + /// Analyze the SQL statement and report the inferred bind parameter types and returned + /// columns. + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>>; } diff --git a/sqlx-core/src/mariadb/backend.rs b/sqlx-core/src/mariadb/backend.rs index 9c2a3739..aad549d7 100644 --- a/sqlx-core/src/mariadb/backend.rs +++ b/sqlx-core/src/mariadb/backend.rs @@ -1,11 +1,14 @@ use super::{MariaDb, MariaDbQueryParameters, MariaDbRow}; -use crate::backend::Backend; -use crate::describe::{Describe, ResultField}; -use crate::mariadb::protocol::{StmtExecFlag, ComStmtExecute, ResultRow, Capabilities, OkPacket, EofPacket, ErrPacket, ColumnDefinitionPacket, ColumnCountPacket}; -use async_trait::async_trait; +use crate::{ + backend::Backend, + describe::{Describe, ResultField}, + mariadb::protocol::{ + Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComStmtExecute, EofPacket, + ErrPacket, OkPacket, ResultRow, StmtExecFlag, + }, +}; use futures_core::stream::BoxStream; -#[async_trait] impl Backend for MariaDb { type QueryParameters = MariaDbQueryParameters; type Row = MariaDbRow; diff --git a/sqlx-core/src/mariadb/connection.rs b/sqlx-core/src/mariadb/connection.rs index 76d32858..4e35c735 100644 --- a/sqlx-core/src/mariadb/connection.rs +++ b/sqlx-core/src/mariadb/connection.rs @@ -17,8 +17,7 @@ use std::{ io, net::{IpAddr, SocketAddr}, }; -use url::quirks::protocol; -use url::Url; +use url::{quirks::protocol, Url}; pub struct MariaDb { pub(crate) stream: BufStream, @@ -192,13 +191,20 @@ impl MariaDb { ComStmtPrepareOk::decode(packet).map_err(Into::into) } - pub(super) async fn step(&mut self, columns: &Vec, packet: &[u8]) -> Result> { + pub(super) async fn step( + &mut self, + columns: &Vec, + packet: &[u8], + ) -> Result> { // For each row in the result set we will receive a ResultRow packet. // We may receive an [OkPacket], [EofPacket], or [ErrPacket] (depending on if EOFs are enabled) to finalize the iteration. if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF { // NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows) // but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF. - if !self.capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) { + if !self + .capabilities + .contains(Capabilities::CLIENT_DEPRECATE_EOF) + { let _eof = EofPacket::decode(packet)?; Ok(None) } else { @@ -214,7 +220,10 @@ impl MariaDb { } } - pub(super) async fn column_definitions(&mut self, packet: &[u8]) -> Result> { + pub(super) async fn column_definitions( + &mut self, + packet: &[u8], + ) -> Result> { // A Resultset starts with a [ColumnCountPacket] which is a single field that encodes // how many columns we can expect when fetching rows from this statement let column_count: u64 = ColumnCountPacket::decode(packet)?.columns; @@ -229,7 +238,10 @@ impl MariaDb { // When (legacy) EOFs are enabled, the fixed number column definitions are further terminated by // an EOF packet - if !self.capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) { + if !self + .capabilities + .contains(Capabilities::CLIENT_DEPRECATE_EOF) + { let _eof = EofPacket::decode(self.receive().await?)?; } diff --git a/sqlx-core/src/mariadb/row.rs b/sqlx-core/src/mariadb/row.rs index 721dff3a..03a5449f 100644 --- a/sqlx-core/src/mariadb/row.rs +++ b/sqlx-core/src/mariadb/row.rs @@ -1,5 +1,7 @@ -use crate::mariadb::{protocol::ResultRow, MariaDb}; -use crate::row::RawRow; +use crate::{ + mariadb::{protocol::ResultRow, MariaDb}, + row::RawRow, +}; pub struct MariaDbRow(pub(super) ResultRow); diff --git a/sqlx-core/src/pool.rs b/sqlx-core/src/pool.rs index 97d5440c..b7a49283 100644 --- a/sqlx-core/src/pool.rs +++ b/sqlx-core/src/pool.rs @@ -1,11 +1,16 @@ use crate::{ backend::Backend, connection::Connection, + describe::Describe, error::Error, executor::Executor, params::IntoQueryParameters, row::{FromRow, Row}, }; +use async_std::{ + sync::{channel, Receiver, Sender}, + task, +}; use futures_channel::oneshot; use futures_core::{future::BoxFuture, stream::BoxStream}; use futures_util::{future::FutureExt, stream::StreamExt}; @@ -20,9 +25,6 @@ use std::{ time::{Duration, Instant}, }; -use async_std::sync::{channel, Receiver, Sender}; -use async_std::task; - /// A pool of database connections. pub struct Pool(Arc>) where @@ -244,22 +246,22 @@ where { type Backend = DB; - fn execute<'c, 'q: 'c, I: 'c>( - &'c mut self, + fn execute<'e, 'q: 'e, I: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result> + ) -> BoxFuture<'e, crate::Result> where I: IntoQueryParameters + Send, { Box::pin(async move { <&Pool as Executor>::execute(&mut &*self, query, params).await }) } - fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxStream<'c, Result> + ) -> BoxStream<'e, crate::Result> where I: IntoQueryParameters + Send, T: FromRow + Send + Unpin, @@ -271,16 +273,14 @@ where while let Some(row) = s.next().await.transpose()? { yield row; } - - drop(s); }) } - fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result, Error>> + ) -> BoxFuture<'e, crate::Result>> where I: IntoQueryParameters + Send, T: FromRow + Send, @@ -289,6 +289,13 @@ where <&Pool as Executor>::fetch_optional(&mut &*self, query, params).await }) } + + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>> { + Box::pin(async move { <&Pool as Executor>::describe(&mut &*self, query).await }) + } } impl Executor for &'_ Pool @@ -297,59 +304,53 @@ where { type Backend = DB; - fn execute<'c, 'q: 'c, I: 'c>( - &'c mut self, + fn execute<'e, 'q: 'e, I: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result> + ) -> BoxFuture<'e, crate::Result> where I: IntoQueryParameters + Send, { - Box::pin(async move { - let mut live = self.0.acquire().await?; - let result = live.execute(query, params.into_params()).await; - result - }) + Box::pin(async move { self.0.acquire().await?.execute(query, params).await }) } - fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxStream<'c, Result> + ) -> BoxStream<'e, crate::Result> where I: IntoQueryParameters + Send, T: FromRow + Send + Unpin, { Box::pin(async_stream::try_stream! { let mut live = self.0.acquire().await?; - let mut s = live.fetch(query, params.into_params()); + let mut s = live.fetch(query, params); while let Some(row) = s.next().await.transpose()? { - yield T::from_row(Row(row)); + yield row; } }) } - fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( - &'c mut self, + fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, query: &'q str, params: I, - ) -> BoxFuture<'c, Result, Error>> + ) -> BoxFuture<'e, crate::Result>> where I: IntoQueryParameters + Send, T: FromRow + Send, { - Box::pin(async move { - Ok(self - .0 - .acquire() - .await? - .fetch_optional(query, params.into_params()) - .await? - .map(Row) - .map(T::from_row)) - }) + Box::pin(async move { self.0.acquire().await?.fetch_optional(query, params).await }) + } + + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>> { + Box::pin(async move { self.0.acquire().await?.describe(query).await }) } } diff --git a/sqlx-core/src/postgres/backend.rs b/sqlx-core/src/postgres/backend.rs index 45262bf1..a92359ba 100644 --- a/sqlx-core/src/postgres/backend.rs +++ b/sqlx-core/src/postgres/backend.rs @@ -1,15 +1,12 @@ -use super::connection::Step; -use super::Postgres; -use super::PostgresQueryParameters; -use super::PostgresRow; -use crate::backend::Backend; -use crate::describe::{Describe, ResultField}; -use crate::params::QueryParameters; -use crate::url::Url; -use async_trait::async_trait; -use futures_core::stream::BoxStream; +use super::{connection::Step, Postgres, PostgresQueryParameters, PostgresRow}; +use crate::{ + backend::Backend, + describe::{Describe, ResultField}, + params::QueryParameters, + url::Url, +}; +use futures_core::{future::BoxFuture, stream::BoxStream}; -#[async_trait] impl Backend for Postgres { type QueryParameters = PostgresQueryParameters; @@ -17,132 +14,27 @@ impl Backend for Postgres { type TableIdent = u32; - async fn open(url: &str) -> crate::Result { - let url = Url::parse(url)?; - let address = url.resolve(5432); - let mut conn = Self::new(address).await?; + fn open(url: &str) -> BoxFuture<'static, crate::Result> { + let url = Url::parse(url); - conn.startup( - url.username(), - url.password().unwrap_or_default(), - url.database(), - ) - .await?; + Box::pin(async move { + let url = url?; + let address = url.resolve(5432); + let mut conn = Self::new(address).await?; - Ok(conn) - } + conn.startup( + url.username(), + url.password().unwrap_or_default(), + url.database(), + ) + .await?; - #[inline] - async fn close(mut self) -> crate::Result<()> { - self.terminate().await - } - - async fn execute( - &mut self, - query: &str, - params: PostgresQueryParameters, - ) -> crate::Result { - self.parse("", query, ¶ms); - self.bind("", "", ¶ms); - self.execute("", 1); - self.sync().await?; - - let mut affected = 0; - - while let Some(step) = self.step().await? { - if let Step::Command(cnt) = step { - affected = cnt; - } - } - - Ok(affected) - } - - fn fetch( - &mut self, - query: &str, - params: PostgresQueryParameters, - ) -> BoxStream<'_, crate::Result> { - self.parse("", query, ¶ms); - self.bind("", "", ¶ms); - self.execute("", 0); - - Box::pin(async_stream::try_stream! { - self.sync().await?; - - while let Some(step) = self.step().await? { - if let Step::Row(row) = step { - yield row; - } - } + Ok(conn) }) } - async fn fetch_optional( - &mut self, - query: &str, - params: PostgresQueryParameters, - ) -> crate::Result> { - self.parse("", query, ¶ms); - self.bind("", "", ¶ms); - self.execute("", 2); - self.sync().await?; - - let mut row: Option = None; - - while let Some(step) = self.step().await? { - if let Step::Row(r) = step { - if row.is_some() { - return Err(crate::Error::FoundMoreThanOne); - } - - row = Some(r); - } - } - - Ok(row) - } - - async fn describe(&mut self, body: &str) -> crate::Result> { - self.parse("", body, &PostgresQueryParameters::new()); - self.describe(""); - self.sync().await?; - - let param_desc = loop { - let step = self - .step() - .await? - .ok_or(protocol_err!("did not receive ParameterDescription")); - - if let Step::ParamDesc(desc) = step? { - break desc; - } - }; - - let row_desc = loop { - let step = self - .step() - .await? - .ok_or(protocol_err!("did not receive RowDescription")); - - if let Step::RowDesc(desc) = step? { - break desc; - } - }; - - Ok(Describe { - param_types: param_desc.ids.into_vec(), - result_fields: row_desc - .fields - .into_vec() - .into_iter() - .map(|field| ResultField { - name: Some(field.name), - table_id: Some(field.table_id), - type_id: field.type_id, - }) - .collect(), - }) + fn close(mut self) -> BoxFuture<'static, crate::Result<()>> { + Box::pin(self.terminate()) } } diff --git a/sqlx-core/src/postgres/connection.rs b/sqlx-core/src/postgres/connection.rs index 96938a75..7ad9793e 100644 --- a/sqlx-core/src/postgres/connection.rs +++ b/sqlx-core/src/postgres/connection.rs @@ -7,8 +7,10 @@ use crate::{ }; use async_std::net::TcpStream; use byteorder::NetworkEndian; -use std::net::Shutdown; -use std::{io, net::SocketAddr}; +use std::{ + io, + net::{Shutdown, SocketAddr}, +}; pub struct Postgres { stream: BufStream, diff --git a/sqlx-core/src/postgres/executor.rs b/sqlx-core/src/postgres/executor.rs new file mode 100644 index 00000000..18eb6c51 --- /dev/null +++ b/sqlx-core/src/postgres/executor.rs @@ -0,0 +1,148 @@ +use super::{connection::Step, Postgres, PostgresQueryParameters, PostgresRow}; +use crate::{ + backend::Backend, + describe::{Describe, ResultField}, + executor::Executor, + params::{IntoQueryParameters, QueryParameters}, + row::FromRow, + url::Url, +}; +use futures_core::{future::BoxFuture, stream::BoxStream}; + +impl Executor for Postgres { + type Backend = Self; + + fn execute<'e, 'q: 'e, I: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxFuture<'e, crate::Result> + where + I: IntoQueryParameters + Send, + { + Box::pin(async move { + let params = params.into_params(); + + self.parse("", query, ¶ms); + self.bind("", "", ¶ms); + self.execute("", 1); + self.sync().await?; + + let mut affected = 0; + + while let Some(step) = self.step().await? { + if let Step::Command(cnt) = step { + affected = cnt; + } + } + + Ok(affected) + }) + } + + fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxStream<'e, crate::Result> + where + I: IntoQueryParameters + Send, + T: FromRow + Send + Unpin, + { + let params = params.into_params(); + + self.parse("", query, ¶ms); + self.bind("", "", ¶ms); + self.execute("", 0); + + Box::pin(async_stream::try_stream! { + self.sync().await?; + + while let Some(step) = self.step().await? { + if let Step::Row(row) = step { + yield FromRow::from_row(row); + } + } + }) + } + + fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxFuture<'e, crate::Result>> + where + I: IntoQueryParameters + Send, + T: FromRow + Send, + { + Box::pin(async move { + let params = params.into_params(); + + self.parse("", query, ¶ms); + self.bind("", "", ¶ms); + self.execute("", 2); + self.sync().await?; + + let mut row: Option<_> = None; + + while let Some(step) = self.step().await? { + if let Step::Row(r) = step { + if row.is_some() { + return Err(crate::Error::FoundMoreThanOne); + } + + row = Some(FromRow::from_row(r)); + } + } + + Ok(row) + }) + } + + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>> { + Box::pin(async move { + self.parse("", query, &PostgresQueryParameters::new()); + self.describe(""); + self.sync().await?; + + let param_desc = loop { + let step = self + .step() + .await? + .ok_or(protocol_err!("did not receive ParameterDescription")); + + if let Step::ParamDesc(desc) = step? { + break desc; + } + }; + + let row_desc = loop { + let step = self + .step() + .await? + .ok_or(protocol_err!("did not receive RowDescription")); + + if let Step::RowDesc(desc) = step? { + break desc; + } + }; + + Ok(Describe { + param_types: param_desc.ids.into_vec(), + result_fields: row_desc + .fields + .into_vec() + .into_iter() + .map(|field| ResultField { + name: Some(field.name), + table_id: Some(field.table_id), + type_id: field.type_id, + }) + .collect(), + }) + }) + } +} diff --git a/sqlx-core/src/postgres/mod.rs b/sqlx-core/src/postgres/mod.rs index b90bf405..e6c77936 100644 --- a/sqlx-core/src/postgres/mod.rs +++ b/sqlx-core/src/postgres/mod.rs @@ -1,6 +1,7 @@ mod backend; mod connection; mod error; +mod executor; mod query; mod row; diff --git a/sqlx-core/src/postgres/row.rs b/sqlx-core/src/postgres/row.rs index a0139537..f612f422 100644 --- a/sqlx-core/src/postgres/row.rs +++ b/sqlx-core/src/postgres/row.rs @@ -1,10 +1,10 @@ use super::{protocol::DataRow, Postgres}; -use crate::row::RawRow; +use crate::row::Row; #[derive(Debug)] pub struct PostgresRow(pub(crate) DataRow); -impl RawRow for PostgresRow { +impl Row for PostgresRow { type Backend = Postgres; #[inline] diff --git a/sqlx-core/src/query.rs b/sqlx-core/src/query.rs index e0429cd3..eeb83aee 100644 --- a/sqlx-core/src/query.rs +++ b/sqlx-core/src/query.rs @@ -1,12 +1,17 @@ -use crate::params::IntoQueryParameters; use crate::{ - backend::Backend, encode::Encode, error::Error, executor::Executor, params::QueryParameters, - row::FromRow, types::HasSqlType, Row, + backend::Backend, + encode::Encode, + error::Error, + executor::Executor, + params::{IntoQueryParameters, QueryParameters}, + row::FromRow, + types::HasSqlType, + Row, }; use bitflags::_core::marker::PhantomData; use futures_core::{future::BoxFuture, stream::BoxStream}; -pub struct Query<'q, DB, I = ::QueryParameters, O = Row, T = O> +pub struct Query<'q, DB, I = ::QueryParameters, O = ::Row> where DB: Backend, { @@ -19,9 +24,6 @@ where #[doc(hidden)] pub output: PhantomData, - #[doc(hidden)] - pub target: PhantomData, - #[doc(hidden)] pub backend: PhantomData, } @@ -31,6 +33,7 @@ where DB: Backend, DB::QueryParameters: 'q, I: IntoQueryParameters + Send, + O: FromRow + Send + Unpin, { #[inline] pub fn execute(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result> @@ -39,37 +42,29 @@ where { executor.execute(self.query, self.input) } -} -impl<'q, DB, I: 'q, O: 'q, T: 'q> Query<'q, DB, I, O, T> -where - DB: Backend, - DB::QueryParameters: 'q, - I: IntoQueryParameters + Send, - T: FromRow + Send + Unpin, -{ - pub fn fetch(self, executor: &'q mut E) -> BoxStream<'q, crate::Result> + pub fn fetch(self, executor: &'q mut E) -> BoxStream<'q, crate::Result> where E: Executor, { executor.fetch(self.query, self.input) } - pub fn fetch_all(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result>> + pub fn fetch_all(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result>> where E: Executor, { executor.fetch_all(self.query, self.input) } - pub fn fetch_optional(self, executor: &'q mut E) -> BoxFuture<'q, Result, Error>> + pub fn fetch_optional(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result>> where E: Executor, { executor.fetch_optional(self.query, self.input) } - pub fn fetch_one(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result> + pub fn fetch_one(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result> where E: Executor, { @@ -77,7 +72,7 @@ where } } -impl Query<'_, DB, ::QueryParameters, Row, Target> +impl Query<'_, DB, ::QueryParameters> where DB: Backend, { @@ -100,7 +95,7 @@ where /// Construct a full SQL query using raw SQL. #[inline] -pub fn query(query: &str) -> Query<'_, DB, DB::QueryParameters, Row, T> +pub fn query(query: &str) -> Query<'_, DB> where DB: Backend, { @@ -109,6 +104,5 @@ where input: DB::QueryParameters::new(), output: PhantomData, backend: PhantomData, - target: PhantomData, } } diff --git a/sqlx-core/src/row.rs b/sqlx-core/src/row.rs index a6226ff2..f9b12914 100644 --- a/sqlx-core/src/row.rs +++ b/sqlx-core/src/row.rs @@ -1,6 +1,6 @@ use crate::{backend::Backend, decode::Decode, types::HasSqlType}; -pub trait RawRow: Send { +pub trait Row: Send { type Backend: Backend; fn len(&self) -> usize; @@ -16,25 +16,8 @@ pub trait RawRow: Send { } } -pub struct Row(pub(crate) DB::Row) -where - DB: Backend; - -impl Row -where - DB: Backend, -{ - pub fn get(&self, index: usize) -> T - where - DB: HasSqlType, - T: Decode, - { - self.0.get(index) - } -} - -pub trait FromRow> { - fn from_row(row: Row) -> Self; +pub trait FromRow::Row> { + fn from_row(row: ::Row) -> Self; } #[allow(unused)] @@ -47,7 +30,9 @@ macro_rules! impl_from_row { $($T: crate::decode::Decode<$B>,)+ { #[inline] - fn from_row(row: crate::row::Row<$B>) -> Self { + fn from_row(row: <$B as crate::backend::Backend>::Row) -> Self { + use crate::row::Row; + ($(row.get($idx),)+) } } @@ -59,7 +44,9 @@ macro_rules! impl_from_row { $($T: crate::decode::Decode<$B>,)+ { #[inline] - fn from_row(row: crate::row::Row<$B>) -> Self { + fn from_row(row: <$B as crate::backend::Backend>::Row) -> Self { + use crate::row::Row; + ($(row.get($idx),)+) } } @@ -69,9 +56,9 @@ macro_rules! impl_from_row { #[allow(unused)] macro_rules! impl_from_row_for_backend { ($B:ident) => { - impl crate::row::FromRow<$B> for crate::row::Row<$B> where $B: crate::Backend { + impl crate::row::FromRow<$B> for <$B as crate::backend::Backend>::Row where $B: crate::Backend { #[inline] - fn from_row(row: crate::row::Row<$B>) -> Self { + fn from_row(row: <$B as crate::backend::Backend>::Row) -> Self { row } } diff --git a/sqlx-macros/src/lib.rs b/sqlx-macros/src/lib.rs index 986a9123..3cd77036 100644 --- a/sqlx-macros/src/lib.rs +++ b/sqlx-macros/src/lib.rs @@ -16,7 +16,7 @@ use syn::{ Expr, ExprLit, Lit, Token, }; -use sqlx::HasTypeMetadata; +use sqlx::{Executor, HasTypeMetadata}; use async_std::task; diff --git a/tests/postgres-types.rs b/tests/postgres-types.rs index 95e53b71..48fc3dbe 100644 --- a/tests/postgres-types.rs +++ b/tests/postgres-types.rs @@ -1,4 +1,4 @@ -use sqlx::{Connection, Postgres}; +use sqlx::{Connection, Postgres, Row}; use std::env; macro_rules! test {