From dbf13844d947495f400d8a4a14d173588356e971 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Sat, 20 Mar 2021 00:16:35 -0700 Subject: [PATCH] feat(sqlx): add sqlx-postgres to sqlx --- sqlx/Cargo.toml | 10 +- sqlx/src/lib.rs | 8 +- sqlx/src/postgres.rs | 20 +++ sqlx/src/postgres/connection.rs | 207 ++++++++++++++++++++++++++++++++ sqlx/src/postgres/options.rs | 162 +++++++++++++++++++++++++ 5 files changed, 398 insertions(+), 9 deletions(-) create mode 100644 sqlx/src/postgres.rs create mode 100644 sqlx/src/postgres/connection.rs create mode 100644 sqlx/src/postgres/options.rs diff --git a/sqlx/Cargo.toml b/sqlx/Cargo.toml index 6e62147f..ef2da2ce 100644 --- a/sqlx/Cargo.toml +++ b/sqlx/Cargo.toml @@ -33,13 +33,13 @@ mysql = ["sqlx-mysql"] mysql-async = ["async", "mysql", "sqlx-mysql/async"] mysql-blocking = ["blocking", "mysql", "sqlx-mysql/blocking"] -## Postgres -#postgres = ["sqlx-postgres"] -#postgres-async = ["async", "postgres", "sqlx-postgres/async"] -#postgres-blocking = ["blocking", "postgres", "sqlx-postgres/blocking"] +# Postgres +postgres = ["sqlx-postgres"] +postgres-async = ["async", "postgres", "sqlx-postgres/async"] +postgres-blocking = ["blocking", "postgres", "sqlx-postgres/blocking"] [dependencies] sqlx-core = { version = "0.6.0-pre", path = "../sqlx-core" } sqlx-mysql = { version = "0.6.0-pre", path = "../sqlx-mysql", optional = true } -#sqlx-postgres = { version = "0.6.0-pre", path = "../sqlx-postgres", optional = true } +sqlx-postgres = { version = "0.6.0-pre", path = "../sqlx-postgres", optional = true } futures-util = { version = "0.3", optional = true, features = ["io"] } diff --git a/sqlx/src/lib.rs b/sqlx/src/lib.rs index 5114a566..011e95e2 100644 --- a/sqlx/src/lib.rs +++ b/sqlx/src/lib.rs @@ -53,8 +53,8 @@ mod runtime; #[cfg(feature = "mysql")] pub mod mysql; -// #[cfg(feature = "postgres")] -// pub mod postgres; +#[cfg(feature = "postgres")] +pub mod postgres; #[cfg(feature = "blocking")] pub use blocking::Blocking; @@ -70,6 +70,6 @@ pub use sqlx_core::AsyncStd; #[cfg(feature = "tokio")] pub use sqlx_core::Tokio; pub use sqlx_core::{ - Acquire, Arguments, Close, Connect, ConnectOptions, Connection, Database, Decode, Encode, - Error, Execute, Executor, FromRow, Result, Row, Runtime, Type, Describe, + Acquire, Arguments, Close, Connect, ConnectOptions, Connection, Database, Decode, Describe, + Encode, Error, Execute, Executor, FromRow, Result, Row, Runtime, Type, }; diff --git a/sqlx/src/postgres.rs b/sqlx/src/postgres.rs new file mode 100644 index 00000000..ba802573 --- /dev/null +++ b/sqlx/src/postgres.rs @@ -0,0 +1,20 @@ +//! [PostgreSQL] database driver. +//! + +mod connection; +mod options; + +// #[cfg(feature = "blocking")] +// mod blocking; + +// +// these types are wrapped instead of re-exported +// this is to provide runtime-specialized inherent methods by taking advantage +// of through crate-local negative reasoning +pub use connection::PgConnection; +pub use options::PgConnectOptions; +// +// re-export the remaining types from the driver +pub use sqlx_postgres::{ + types, PgColumn, PgQueryResult, PgRawValue, PgRawValueFormat, PgRow, PgTypeId, Postgres, +}; diff --git a/sqlx/src/postgres/connection.rs b/sqlx/src/postgres/connection.rs new file mode 100644 index 00000000..a4b0b1aa --- /dev/null +++ b/sqlx/src/postgres/connection.rs @@ -0,0 +1,207 @@ +use std::fmt::{self, Debug, Formatter}; +use std::ops::{Deref, DerefMut}; + +#[cfg(feature = "async")] +use futures_util::future::{BoxFuture, FutureExt}; +use sqlx_core::{Execute, Executor}; + +use super::{PgConnectOptions, PgQueryResult, PgRow, Postgres}; +#[cfg(feature = "blocking")] +use crate::blocking; +use crate::{Arguments, Close, Connect, Connection, DefaultRuntime, Describe, Runtime}; +#[cfg(feature = "async")] +use crate::{Async, Result}; + +/// A single connection (also known as a session) to a MySQL database server. +#[allow(clippy::module_name_repetitions)] +pub struct PgConnection(pub(super) sqlx_postgres::PgConnection); + +#[cfg(feature = "async")] +impl PgConnection { + /// Open a new database connection. + /// + /// A value of [`PgConnectOptions`] is parsed from the provided + /// connection `url`. + /// + /// ```text + /// mysql://[[user[:password]@]host][/database][?properties] + /// ``` + /// + /// Implemented with [`Connect::connect`][crate::Connect::connect]. + pub async fn connect(url: &str) -> Result { + sqlx_postgres::PgConnection::::connect(url).await.map(Self) + } + + /// Open a new database connection with the configured options. + /// + /// Implemented with [`Connect::connect_with`][crate::Connect::connect_with]. + pub async fn connect_with(options: &PgConnectOptions) -> Result { + sqlx_postgres::PgConnection::::connect_with(&**options).await.map(Self) + } + + /// Checks if a connection to the database is still valid. + /// + /// Implemented with [`Connection::ping`][crate::Connection::ping]. + pub async fn ping(&mut self) -> Result<()> { + self.0.ping().await + } + + // pub async fn execute<'q, 'a, E>(&mut self, query: E) -> Result + // where + // E: Execute<'q, 'a, Postgres>, + // { + // self.0.execute(query).await + // } + // + // pub async fn fetch_all<'q, 'a, E>(&mut self, query: E) -> Result> + // where + // E: Execute<'q, 'a, Postgres>, + // { + // self.0.fetch_all(query).await + // } + // + // pub async fn fetch_one<'q, 'a, E>(&mut self, query: E) -> Result + // where + // E: Execute<'q, 'a, Postgres>, + // { + // self.0.fetch_one(query).await + // } + // + // pub async fn fetch_optional<'q, 'a, E>(&mut self, query: E) -> Result> + // where + // E: Execute<'q, 'a, Postgres>, + // { + // self.0.fetch_optional(query).await + // } + + /// Explicitly close this database connection. + /// + /// This method is **not required** for safe and consistent operation. However, it is + /// recommended to call it instead of letting a connection `drop` as MySQL + /// will be faster at cleaning up resources. + /// + /// Implemented with [`Close::close`][crate::Close::close]. + pub async fn close(self) -> Result<()> { + self.0.close().await + } +} + +impl Debug for PgConnection { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.0) + } +} + +impl Close for PgConnection { + #[cfg(feature = "async")] + #[inline] + fn close(self) -> BoxFuture<'static, Result<()>> + where + Rt: Async, + { + self.close().boxed() + } +} + +impl Connect for PgConnection { + type Options = PgConnectOptions; + + #[cfg(feature = "async")] + #[inline] + fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result> + where + Rt: Async, + { + Self::connect_with(options).boxed() + } +} + +impl Connection for PgConnection { + type Database = Postgres; + + #[cfg(feature = "async")] + #[inline] + fn ping(&mut self) -> BoxFuture<'_, Result<()>> + where + Rt: Async, + { + self.0.ping() + } + + #[cfg(feature = "async")] + #[inline] + fn describe<'x, 'e, 'q>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'x, Result>> + where + Rt: Async, + 'e: 'x, + 'q: 'x, + { + self.0.describe(query) + } +} + +// impl Executor for PgConnection { +// type Database = Postgres; +// +// #[cfg(feature = "async")] +// fn execute<'x, 'e, 'q, 'a, E>(&'e mut self, query: E) -> BoxFuture<'x, Result> +// where +// Rt: Async, +// E: 'x + Execute<'q, 'a, Postgres>, +// 'e: 'x, +// 'q: 'x, +// 'a: 'x, +// { +// self.0.execute(query) +// } +// +// #[cfg(feature = "async")] +// fn fetch_all<'x, 'e, 'q, 'a, E>(&'e mut self, query: E) -> BoxFuture<'x, Result>> +// where +// Rt: Async, +// E: 'x + Execute<'q, 'a, Postgres>, +// 'e: 'x, +// 'q: 'x, +// 'a: 'x, +// { +// self.0.fetch_all(query) +// } +// +// #[cfg(feature = "async")] +// fn fetch_optional<'x, 'e, 'q, 'a, E>( +// &'e mut self, +// query: E, +// ) -> BoxFuture<'x, Result>> +// where +// Rt: Async, +// E: 'x + Execute<'q, 'a, Postgres>, +// 'e: 'x, +// 'q: 'x, +// 'a: 'x, +// { +// self.0.fetch_optional(query) +// } +// } + +impl From> for PgConnection { + fn from(connection: sqlx_postgres::PgConnection) -> Self { + Self(connection) + } +} + +impl Deref for PgConnection { + type Target = sqlx_postgres::PgConnection; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for PgConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/sqlx/src/postgres/options.rs b/sqlx/src/postgres/options.rs new file mode 100644 index 00000000..7b6f774a --- /dev/null +++ b/sqlx/src/postgres/options.rs @@ -0,0 +1,162 @@ +use std::fmt::{self, Debug, Formatter}; +use std::marker::PhantomData; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::path::{Path, PathBuf}; +use std::str::FromStr; + +#[cfg(feature = "async")] +use futures_util::future::{BoxFuture, FutureExt}; + +use super::PgConnection; +#[cfg(feature = "async")] +use crate::Async; +use crate::{ConnectOptions, DefaultRuntime, Error, Result, Runtime}; + +/// Options which can be used to configure how a MySQL connection is opened. +#[allow(clippy::module_name_repetitions)] +pub struct PgConnectOptions { + runtime: PhantomData, + options: sqlx_postgres::PgConnectOptions, +} + +impl PgConnectOptions { + /// Creates a default set of connection options. + /// + /// Implemented with [`Default`](#impl-Default). + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Parses connection options from a connection URL. + /// + /// ```text + /// mysql://[[user[:password]@]host][/database][?properties] + /// ``` + /// + /// Implemented with [`FromStr`](#impl-FromStr). + /// + #[inline] + pub fn parse(url: &str) -> Result { + Ok(url.parse::()?.into()) + } +} + +#[cfg(feature = "async")] +impl PgConnectOptions { + /// Open a new database connection with the configured connection options. + /// + /// Implemented with [`ConnectOptions::connect`]. + #[inline] + pub async fn connect(&self) -> Result> { + ::connect::, Rt>(self).await + } +} + +// explicitly forwards builder methods +// in order to return Self as sqlx::mysql::PgConnectOptions instead of +// sqlx_postgres::PgConnectOptions +impl PgConnectOptions { + /// Sets the hostname of the database server. + /// + /// If the hostname begins with a slash (`/`), it is interpreted as the absolute path + /// to a Unix domain socket file instead of a hostname of a server. + /// + /// Defaults to `localhost`. + /// + pub fn host(&mut self, host: impl AsRef) -> &mut Self { + self.options.host(host); + self + } + + /// Sets the path of the Unix domain socket to connect to. + /// + /// Overrides [`host()`](#method.host) and [`port()`](#method.port). + /// + pub fn socket(&mut self, socket: impl AsRef) -> &mut Self { + self.options.socket(socket); + self + } + + /// Sets the TCP port number of the database server. + /// + /// Defaults to `3306`. + /// + pub fn port(&mut self, port: u16) -> &mut Self { + self.options.port(port); + self + } + + /// Sets the username to be used for authentication. + // FIXME: Specify what happens when you do NOT set this + pub fn username(&mut self, username: impl AsRef) -> &mut Self { + self.options.username(username); + self + } + + /// Sets the password to be used for authentication. + pub fn password(&mut self, password: impl AsRef) -> &mut Self { + self.options.password(password); + self + } + + /// Sets the default database for the connection. + pub fn database(&mut self, database: impl AsRef) -> &mut Self { + self.options.database(database); + self + } +} + +// allow trivial conversion from [sqlx_postgres::PgConnectOptions] to +// our runtime-wrapped [sqlx::mysql::PgConnectOptions] +impl From for PgConnectOptions { + #[inline] + fn from(options: sqlx_postgres::PgConnectOptions) -> Self { + Self { runtime: PhantomData, options } + } +} + +// default implement [ConnectOptions] +// ensures that the required traits for [PgConnectOptions] are implemented +impl ConnectOptions for PgConnectOptions {} + +// forward Debug to [sqlx_postgres::PgConnectOptions] +impl Debug for PgConnectOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.options) + } +} + +// forward Default to [sqlx_postgres::PgConnectOptions] +impl Default for PgConnectOptions { + fn default() -> Self { + sqlx_postgres::PgConnectOptions::default().into() + } +} + +// forward Clone to [sqlx_postgres::PgConnectOptions] +impl Clone for PgConnectOptions { + fn clone(&self) -> Self { + Self { runtime: PhantomData, options: self.options.clone() } + } +} + +// forward FromStr to [sqlx_postgres::PgConnectOptions] +impl FromStr for PgConnectOptions { + type Err = Error; + + fn from_str(url: &str) -> Result { + Self::parse(url) + } +} + +// allow dereferencing into [sqlx_postgres::PgConnectOptions] +// note that we do not allow mutable dereferencing as those methods have the wrong return type +impl Deref for PgConnectOptions { + type Target = sqlx_postgres::PgConnectOptions; + + fn deref(&self) -> &Self::Target { + &self.options + } +}