diff --git a/sqlx-core/src/mysql/mod.rs b/sqlx-core/src/mysql/mod.rs index ff972e83..114a0004 100644 --- a/sqlx-core/src/mysql/mod.rs +++ b/sqlx-core/src/mysql/mod.rs @@ -24,7 +24,7 @@ pub use type_info::MySqlTypeInfo; pub use value::{MySqlValue, MySqlValueFormat, MySqlValueRef}; /// An alias for [`Pool`][crate::pool::Pool], specialized for MySQL. -pub type MySqlPool = crate::pool::Pool; +pub type MySqlPool = crate::pool::Pool; // NOTE: required due to the lack of lazy normalization impl_into_arguments_for_arguments!(MySqlArguments); diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 0a7c94ed..1d953ebf 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -1,34 +1,31 @@ -use std::borrow::{Borrow, BorrowMut}; use std::fmt::{self, Debug, Formatter}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Instant; use futures_core::future::BoxFuture; +use sqlx_rt::spawn; use super::inner::{DecrementSizeGuard, SharedPool}; -use crate::connection::{Connect, Connection}; +use crate::connection::Connection; use crate::database::Database; use crate::error::Error; /// A connection checked out from [`Pool`][crate::pool::Pool]. /// /// Will be returned to the pool on-drop. -pub struct PoolConnection -where - C: 'static + Connect, -{ - live: Option>, - pub(crate) pool: Arc>, +pub struct PoolConnection { + live: Option>, + pub(crate) pool: Arc>, } -pub(super) struct Live { - raw: C, +pub(super) struct Live { + raw: DB::Connection, pub(super) created: Instant, } -pub(super) struct Idle { - live: Live, +pub(super) struct Idle { + live: Live, pub(super) since: Instant, } @@ -40,56 +37,29 @@ pub(super) struct Floating<'p, C> { const DEREF_ERR: &str = "(bug) connection already released to pool"; -impl Debug for PoolConnection { +impl Debug for PoolConnection { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { // TODO: Show the type name of the connection ? f.debug_struct("PoolConnection").finish() } } -impl Borrow for PoolConnection -where - C: Connect, -{ - fn borrow(&self) -> &C { - &*self - } -} - -impl BorrowMut for PoolConnection -where - C: Connect, -{ - fn borrow_mut(&mut self) -> &mut C { - &mut *self - } -} - -impl Deref for PoolConnection -where - C: Connect, -{ - type Target = C; +impl Deref for PoolConnection { + type Target = DB::Connection; fn deref(&self) -> &Self::Target { &self.live.as_ref().expect(DEREF_ERR).raw } } -impl DerefMut for PoolConnection -where - C: Connect, -{ +impl DerefMut for PoolConnection { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.live.as_mut().expect(DEREF_ERR).raw } } -impl Connection for PoolConnection -where - C: 'static + Connect, -{ - type Database = C::Database; +impl Connection for PoolConnection { + type Database = DB; fn close(mut self) -> BoxFuture<'static, Result<(), Error>> { Box::pin(async move { @@ -104,30 +74,27 @@ where } #[doc(hidden)] - fn get_ref(&self) -> &::Connection { + fn flush(&mut self) -> BoxFuture> { + self.get_mut().flush() + } + + #[doc(hidden)] + fn get_ref(&self) -> &DB::Connection { self.deref().get_ref() } #[doc(hidden)] - fn get_mut(&mut self) -> &mut ::Connection { + fn get_mut(&mut self) -> &mut DB::Connection { self.deref_mut().get_mut() } - - #[doc(hidden)] - fn flush(&mut self) -> BoxFuture> { - self.get_mut().flush() - } } /// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from. -impl Drop for PoolConnection -where - C: 'static + Connect, -{ +impl Drop for PoolConnection { fn drop(&mut self) { if let Some(mut live) = self.live.take() { let pool = self.pool.clone(); - sqlx_rt::spawn(async move { + spawn(async move { // flush the connection (will immediately return if not needed) before // we fully release to the pool if let Err(e) = live.raw.flush().await { @@ -144,15 +111,15 @@ where } } -impl Live { - pub fn float(self, pool: &SharedPool) -> Floating { +impl Live { + pub fn float(self, pool: &SharedPool) -> Floating { Floating { inner: self, guard: DecrementSizeGuard::new(pool), } } - pub fn into_idle(self) -> Idle { + pub fn into_idle(self) -> Idle { Idle { live: self, since: Instant::now(), @@ -160,15 +127,15 @@ impl Live { } } -impl Deref for Idle { - type Target = Live; +impl Deref for Idle { + type Target = Live; fn deref(&self) -> &Self::Target { &self.live } } -impl DerefMut for Idle { +impl DerefMut for Idle { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.live } @@ -181,8 +148,8 @@ impl<'s, C> Floating<'s, C> { } } -impl<'s, C> Floating<'s, Live> { - pub fn new_live(conn: C, guard: DecrementSizeGuard<'s>) -> Self { +impl<'s, DB: Database> Floating<'s, Live> { + pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<'s>) -> Self { Self { inner: Live { raw: conn, @@ -192,10 +159,7 @@ impl<'s, C> Floating<'s, Live> { } } - pub fn attach(self, pool: &Arc>) -> PoolConnection - where - C: Connect, - { + pub fn attach(self, pool: &Arc>) -> PoolConnection { let Floating { inner, guard } = self; debug_assert!( @@ -210,7 +174,7 @@ impl<'s, C> Floating<'s, Live> { } } - pub fn into_idle(self) -> Floating<'s, Idle> { + pub fn into_idle(self) -> Floating<'s, Idle> { Floating { inner: self.inner.into_idle(), guard: self.guard, @@ -218,32 +182,26 @@ impl<'s, C> Floating<'s, Live> { } } -impl<'s, C> Floating<'s, Idle> { - pub fn from_idle(idle: Idle, pool: &'s SharedPool) -> Self { +impl<'s, DB: Database> Floating<'s, Idle> { + pub fn from_idle(idle: Idle, pool: &'s SharedPool) -> Self { Self { inner: idle, guard: DecrementSizeGuard::new(pool), } } - pub async fn ping(&mut self) -> Result<(), Error> - where - C: Connection, - { + pub async fn ping(&mut self) -> Result<(), Error> { self.live.raw.ping().await } - pub fn into_live(self) -> Floating<'s, Live> { + pub fn into_live(self) -> Floating<'s, Live> { Floating { inner: self.inner.live, guard: self.guard, } } - pub async fn close(self) -> Result<(), Error> - where - C: Connection, - { + pub async fn close(self) -> Result<(), Error> { // `guard` is dropped as intended self.inner.live.raw.close().await } diff --git a/sqlx-core/src/pool/executor.rs b/sqlx-core/src/pool/executor.rs index f4911560..d993cc9b 100644 --- a/sqlx-core/src/pool/executor.rs +++ b/sqlx-core/src/pool/executor.rs @@ -4,19 +4,17 @@ use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_util::TryStreamExt; -use crate::connection::Connect; use crate::database::Database; use crate::describe::Describe; use crate::error::Error; use crate::executor::{Execute, Executor}; use crate::pool::Pool; -impl<'p, C> Executor<'p> for &'_ Pool +impl<'p, DB: Database> Executor<'p> for &'_ Pool where - C: 'static + Connect, - for<'c> &'c mut C: Executor<'c, Database = C::Database>, + for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>, { - type Database = C::Database; + type Database = DB; fn fetch_many<'e, 'q: 'e, E: 'q>( self, @@ -67,7 +65,7 @@ where #[allow(unused_macros)] macro_rules! impl_executor_for_pool_connection { ($DB:ident, $C:ident, $R:ident) => { - impl<'c> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<$C> { + impl<'c> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<$DB> { type Database = $DB; #[inline] diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 031bad26..1de7998a 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -10,26 +10,24 @@ use futures_core::task::{Poll, Waker}; use futures_util::future; use sqlx_rt::{sleep, spawn, timeout}; -use crate::connection::{Connect, Connection}; +use crate::connection::Connect; +use crate::database::Database; use crate::error::Error; use crate::pool::deadline_as_timeout; use super::connection::{Floating, Idle, Live}; use super::Options; -pub(crate) struct SharedPool { +pub(crate) struct SharedPool { url: String, - idle_conns: ArrayQueue>, + idle_conns: ArrayQueue>, waiters: SegQueue, pub(super) size: AtomicU32, is_closed: AtomicBool, options: Options, } -impl SharedPool -where - C: Connection, -{ +impl SharedPool { pub fn options(&self) -> &Options { &self.options } @@ -60,11 +58,11 @@ where } #[inline] - pub(super) fn try_acquire(&self) -> Option>> { + pub(super) fn try_acquire(&self) -> Option>> { Some(self.pop_idle()?.into_live()) } - fn pop_idle(&self) -> Option>> { + fn pop_idle(&self) -> Option>> { if self.is_closed.load(Ordering::Acquire) { return None; } @@ -72,7 +70,7 @@ where Some(Floating::from_idle(self.idle_conns.pop().ok()?, self)) } - pub(super) fn release(&self, floating: Floating>) { + pub(super) fn release(&self, floating: Floating>) { self.idle_conns .push(floating.into_idle().into_leakable()) .expect("BUG: connection queue overflow in release()"); @@ -109,7 +107,7 @@ where let mut waker_pushed = false; timeout( - deadline_as_timeout::(deadline)?, + deadline_as_timeout::(deadline)?, // `poll_fn` gets us easy access to a `Waker` that we can push to our queue future::poll_fn(|ctx| -> Poll<()> { if !waker_pushed { @@ -125,12 +123,7 @@ where .await .map_err(|_| Error::PoolTimedOut) } -} -impl SharedPool -where - C: 'static + Connect, -{ pub(super) async fn new_arc(url: &str, options: Options) -> Result, Error> { let mut pool = Self { url: url.to_owned(), @@ -151,7 +144,7 @@ where } #[allow(clippy::needless_lifetimes)] - pub(super) async fn acquire<'s>(&'s self) -> Result>, Error> { + pub(super) async fn acquire<'s>(&'s self) -> Result>, Error> { let start = Instant::now(); let deadline = start + self.options.connect_timeout; @@ -208,15 +201,15 @@ where &'s self, deadline: Instant, guard: DecrementSizeGuard<'s>, - ) -> Result>>, Error> { + ) -> Result>>, Error> { if self.is_closed() { return Err(Error::PoolClosed); } - let timeout = super::deadline_as_timeout::(deadline)?; + let timeout = super::deadline_as_timeout::(deadline)?; // result here is `Result, TimeoutError>` - match sqlx_rt::timeout(timeout, C::connect(&self.url)).await { + match sqlx_rt::timeout(timeout, DB::Connection::connect(&self.url)).await { // successfully established connection Ok(Ok(raw)) => Ok(Some(Floating::new_live(raw, guard))), @@ -241,27 +234,24 @@ where // NOTE: Function names here are bizzare. Helpful help would be appreciated. -fn is_beyond_lifetime(live: &Live, options: &Options) -> bool { +fn is_beyond_lifetime(live: &Live, options: &Options) -> bool { // check if connection was within max lifetime (or not set) options .max_lifetime .map_or(false, |max| live.created.elapsed() > max) } -fn is_beyond_idle(idle: &Idle, options: &Options) -> bool { +fn is_beyond_idle(idle: &Idle, options: &Options) -> bool { // if connection wasn't idle too long (or not set) options .idle_timeout .map_or(false, |timeout| idle.since.elapsed() > timeout) } -async fn check_conn<'s: 'p, 'p, C>( - mut conn: Floating<'s, Idle>, +async fn check_conn<'s: 'p, 'p, DB: Database>( + mut conn: Floating<'s, Idle>, options: &'p Options, -) -> Option>> -where - C: Connection, -{ +) -> Option>> { // If the connection we pulled has expired, close the connection and // immediately create a new connection if is_beyond_lifetime(&conn, options) { @@ -287,10 +277,7 @@ where } /// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections -fn spawn_reaper(pool: &Arc>) -where - C: 'static + Connection, -{ +fn spawn_reaper(pool: &Arc>) { let period = match (pool.options.max_lifetime, pool.options.idle_timeout) { (Some(it), None) | (None, Some(it)) => it, @@ -341,7 +328,7 @@ pub(in crate::pool) struct DecrementSizeGuard<'a> { } impl<'a> DecrementSizeGuard<'a> { - pub fn new(pool: &'a SharedPool) -> Self { + pub fn new(pool: &'a SharedPool) -> Self { Self { size: &pool.size, waiters: &pool.waiters, @@ -350,7 +337,7 @@ impl<'a> DecrementSizeGuard<'a> { } /// Return `true` if the internal references point to the same fields in `SharedPool`. - pub fn same_pool(&self, pool: &'a SharedPool) -> bool { + pub fn same_pool(&self, pool: &'a SharedPool) -> bool { ptr::eq(self.size, &pool.size) && ptr::eq(self.waiters, &pool.waiters) } diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 0bed26bc..2514c9d1 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -6,7 +6,6 @@ use std::{ time::{Duration, Instant}, }; -use crate::connection::Connect; use crate::database::Database; use crate::error::Error; @@ -24,12 +23,9 @@ pub use self::connection::PoolConnection; pub use self::options::Builder; /// A pool of database connections. -pub struct Pool(pub(crate) Arc>); +pub struct Pool(pub(crate) Arc>); -impl Pool -where - C: 'static + Connect, -{ +impl Pool { /// Creates a connection pool with the default configuration. /// /// The connection URL syntax is documented on the connection type for the respective @@ -42,25 +38,25 @@ where } async fn new_with(url: &str, options: Options) -> Result { - Ok(Pool(SharedPool::::new_arc(url, options).await?)) + Ok(Pool(SharedPool::::new_arc(url, options).await?)) } /// Returns a [`Builder`] to configure a new connection pool. - pub fn builder() -> Builder { + pub fn builder() -> Builder { Builder::new() } /// Retrieves a connection from the pool. /// /// Waits for at most the configured connection timeout before returning an error. - pub async fn acquire(&self) -> Result, Error> { + pub async fn acquire(&self) -> Result, Error> { self.0.acquire().await.map(|conn| conn.attach(&self.0)) } /// Attempts to retrieve a connection from the pool if there is one available. /// /// Returns `None` immediately if there are no idle connections available in the pool. - pub fn try_acquire(&self) -> Option> { + pub fn try_acquire(&self) -> Option> { self.0.try_acquire().map(|conn| conn.attach(&self.0)) } @@ -114,16 +110,13 @@ where } /// Returns a new [Pool] tied to the same shared connection pool. -impl Clone for Pool { +impl Clone for Pool { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } } -impl fmt::Debug for Pool -where - C: Connect, -{ +impl fmt::Debug for Pool { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Pool") .field("url", &self.0.url()) diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 6b3dda1b..119e1627 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -1,20 +1,16 @@ use std::{marker::PhantomData, time::Duration}; use super::Pool; -use crate::connection::Connect; use crate::database::Database; use crate::error::Error; /// Builder for [Pool]. -pub struct Builder { - phantom: PhantomData, +pub struct Builder { + phantom: PhantomData, options: Options, } -impl Builder -where - C: 'static + Connect, -{ +impl Builder { /// Get a new builder with default options. /// /// See the source of this method for current defaults. @@ -114,19 +110,12 @@ where /// opened and placed into the pool. /// /// [`min_size`]: #method.min_size - pub async fn build(self, url: &str) -> Result, Error> - where - C: Connect, - { - Pool::::new_with(url, self.options).await + pub async fn build(self, url: &str) -> Result, Error> { + Pool::::new_with(url, self.options).await } } -impl Default for Builder -where - C: 'static + Connect, - DB: Database, -{ +impl Default for Builder { fn default() -> Self { Self::new() } diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index 1c13fe53..f8c891a2 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -12,7 +12,7 @@ use crate::ext::ustr::UStr; use crate::io::Decode; use crate::postgres::connection::stream::PgStream; use crate::postgres::message::{ - CommandComplete, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus, + Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus, }; use crate::postgres::row::PgColumn; use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres}; diff --git a/sqlx-core/src/postgres/listener.rs b/sqlx-core/src/postgres/listener.rs index 66868856..554068f2 100644 --- a/sqlx-core/src/postgres/listener.rs +++ b/sqlx-core/src/postgres/listener.rs @@ -22,8 +22,8 @@ use either::Either; /// new connection, will re-subscribe to all of the originally specified channels, and will resume /// operations as normal. pub struct PgListener { - pool: Pool, - connection: Option>, + pool: Pool, + connection: Option>, buffer_rx: mpsc::UnboundedReceiver, buffer_tx: Option>, channels: Vec, @@ -36,7 +36,7 @@ impl PgListener { pub async fn new(url: &str) -> Result { // Create a pool of 1 without timeouts (as they don't apply here) // We only use the pool to handle re-connections - let pool = Pool::::builder() + let pool = Pool::::builder() .max_size(1) .max_lifetime(None) .idle_timeout(None) @@ -46,7 +46,7 @@ impl PgListener { Self::from_pool(&pool).await } - pub async fn from_pool(pool: &Pool) -> Result { + pub async fn from_pool(pool: &Pool) -> Result { // Pull out an initial connection let mut connection = pool.acquire().await?; diff --git a/sqlx-core/src/postgres/mod.rs b/sqlx-core/src/postgres/mod.rs index 77aa846c..77b5595a 100644 --- a/sqlx-core/src/postgres/mod.rs +++ b/sqlx-core/src/postgres/mod.rs @@ -27,7 +27,7 @@ pub use type_info::{PgTypeInfo, PgTypeKind}; pub use value::{PgValue, PgValueFormat, PgValueRef}; /// An alias for [`Pool`][crate::pool::Pool], specialized for Postgres. -pub type PgPool = crate::pool::Pool; +pub type PgPool = crate::pool::Pool; // NOTE: required due to the lack of lazy normalization impl_into_arguments_for_arguments!(PgArguments); diff --git a/sqlx-core/src/sqlite/mod.rs b/sqlx-core/src/sqlite/mod.rs index ab6337a4..e56c97fb 100644 --- a/sqlx-core/src/sqlite/mod.rs +++ b/sqlx-core/src/sqlite/mod.rs @@ -28,7 +28,7 @@ pub use type_info::SqliteTypeInfo; pub use value::{SqliteValue, SqliteValueRef}; /// An alias for [`Pool`][crate::pool::Pool], specialized for SQLite. -pub type SqlitePool = crate::pool::Pool; +pub type SqlitePool = crate::pool::Pool; // NOTE: required due to the lack of lazy normalization impl_into_arguments_for_arguments!(SqliteArguments<'q>);