feat: Pool is now generic over Database, as opposed to Connection

this fixes an unfortunate interaction with HRTBs where the compiler would
produce infinitely nested PoolConnection<PoolConnection<....
This commit is contained in:
Ryan Leckey 2020-05-30 05:04:19 -07:00
parent cf7606be1b
commit 0a04abdb3e
No known key found for this signature in database
GPG Key ID: BBDFC5595030E7D3
10 changed files with 86 additions and 161 deletions

View File

@ -24,7 +24,7 @@ pub use type_info::MySqlTypeInfo;
pub use value::{MySqlValue, MySqlValueFormat, MySqlValueRef};
/// An alias for [`Pool`][crate::pool::Pool], specialized for MySQL.
pub type MySqlPool = crate::pool::Pool<MySqlConnection>;
pub type MySqlPool = crate::pool::Pool<MySql>;
// NOTE: required due to the lack of lazy normalization
impl_into_arguments_for_arguments!(MySqlArguments);

View File

@ -1,34 +1,31 @@
use std::borrow::{Borrow, BorrowMut};
use std::fmt::{self, Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Instant;
use futures_core::future::BoxFuture;
use sqlx_rt::spawn;
use super::inner::{DecrementSizeGuard, SharedPool};
use crate::connection::{Connect, Connection};
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
/// A connection checked out from [`Pool`][crate::pool::Pool].
///
/// Will be returned to the pool on-drop.
pub struct PoolConnection<C>
where
C: 'static + Connect,
{
live: Option<Live<C>>,
pub(crate) pool: Arc<SharedPool<C>>,
pub struct PoolConnection<DB: Database> {
live: Option<Live<DB>>,
pub(crate) pool: Arc<SharedPool<DB>>,
}
pub(super) struct Live<C> {
raw: C,
pub(super) struct Live<DB: Database> {
raw: DB::Connection,
pub(super) created: Instant,
}
pub(super) struct Idle<C> {
live: Live<C>,
pub(super) struct Idle<DB: Database> {
live: Live<DB>,
pub(super) since: Instant,
}
@ -40,56 +37,29 @@ pub(super) struct Floating<'p, C> {
const DEREF_ERR: &str = "(bug) connection already released to pool";
impl<C: Connect> Debug for PoolConnection<C> {
impl<DB: Database> Debug for PoolConnection<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
// TODO: Show the type name of the connection ?
f.debug_struct("PoolConnection").finish()
}
}
impl<C> Borrow<C> for PoolConnection<C>
where
C: Connect,
{
fn borrow(&self) -> &C {
&*self
}
}
impl<C> BorrowMut<C> for PoolConnection<C>
where
C: Connect,
{
fn borrow_mut(&mut self) -> &mut C {
&mut *self
}
}
impl<C> Deref for PoolConnection<C>
where
C: Connect,
{
type Target = C;
impl<DB: Database> Deref for PoolConnection<DB> {
type Target = DB::Connection;
fn deref(&self) -> &Self::Target {
&self.live.as_ref().expect(DEREF_ERR).raw
}
}
impl<C> DerefMut for PoolConnection<C>
where
C: Connect,
{
impl<DB: Database> DerefMut for PoolConnection<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live.as_mut().expect(DEREF_ERR).raw
}
}
impl<C> Connection for PoolConnection<C>
where
C: 'static + Connect,
{
type Database = C::Database;
impl<DB: Database> Connection for PoolConnection<DB> {
type Database = DB;
fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
@ -104,30 +74,27 @@ where
}
#[doc(hidden)]
fn get_ref(&self) -> &<Self::Database as Database>::Connection {
fn flush(&mut self) -> BoxFuture<Result<(), Error>> {
self.get_mut().flush()
}
#[doc(hidden)]
fn get_ref(&self) -> &DB::Connection {
self.deref().get_ref()
}
#[doc(hidden)]
fn get_mut(&mut self) -> &mut <Self::Database as Database>::Connection {
fn get_mut(&mut self) -> &mut DB::Connection {
self.deref_mut().get_mut()
}
#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<Result<(), Error>> {
self.get_mut().flush()
}
}
/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
impl<C> Drop for PoolConnection<C>
where
C: 'static + Connect,
{
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(mut live) = self.live.take() {
let pool = self.pool.clone();
sqlx_rt::spawn(async move {
spawn(async move {
// flush the connection (will immediately return if not needed) before
// we fully release to the pool
if let Err(e) = live.raw.flush().await {
@ -144,15 +111,15 @@ where
}
}
impl<C> Live<C> {
pub fn float(self, pool: &SharedPool<C>) -> Floating<Self> {
impl<DB: Database> Live<DB> {
pub fn float(self, pool: &SharedPool<DB>) -> Floating<Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new(pool),
}
}
pub fn into_idle(self) -> Idle<C> {
pub fn into_idle(self) -> Idle<DB> {
Idle {
live: self,
since: Instant::now(),
@ -160,15 +127,15 @@ impl<C> Live<C> {
}
}
impl<C> Deref for Idle<C> {
type Target = Live<C>;
impl<DB: Database> Deref for Idle<DB> {
type Target = Live<DB>;
fn deref(&self) -> &Self::Target {
&self.live
}
}
impl<C> DerefMut for Idle<C> {
impl<DB: Database> DerefMut for Idle<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live
}
@ -181,8 +148,8 @@ impl<'s, C> Floating<'s, C> {
}
}
impl<'s, C> Floating<'s, Live<C>> {
pub fn new_live(conn: C, guard: DecrementSizeGuard<'s>) -> Self {
impl<'s, DB: Database> Floating<'s, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<'s>) -> Self {
Self {
inner: Live {
raw: conn,
@ -192,10 +159,7 @@ impl<'s, C> Floating<'s, Live<C>> {
}
}
pub fn attach(self, pool: &Arc<SharedPool<C>>) -> PoolConnection<C>
where
C: Connect,
{
pub fn attach(self, pool: &Arc<SharedPool<DB>>) -> PoolConnection<DB> {
let Floating { inner, guard } = self;
debug_assert!(
@ -210,7 +174,7 @@ impl<'s, C> Floating<'s, Live<C>> {
}
}
pub fn into_idle(self) -> Floating<'s, Idle<C>> {
pub fn into_idle(self) -> Floating<'s, Idle<DB>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
@ -218,32 +182,26 @@ impl<'s, C> Floating<'s, Live<C>> {
}
}
impl<'s, C> Floating<'s, Idle<C>> {
pub fn from_idle(idle: Idle<C>, pool: &'s SharedPool<C>) -> Self {
impl<'s, DB: Database> Floating<'s, Idle<DB>> {
pub fn from_idle(idle: Idle<DB>, pool: &'s SharedPool<DB>) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::new(pool),
}
}
pub async fn ping(&mut self) -> Result<(), Error>
where
C: Connection,
{
pub async fn ping(&mut self) -> Result<(), Error> {
self.live.raw.ping().await
}
pub fn into_live(self) -> Floating<'s, Live<C>> {
pub fn into_live(self) -> Floating<'s, Live<DB>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}
pub async fn close(self) -> Result<(), Error>
where
C: Connection,
{
pub async fn close(self) -> Result<(), Error> {
// `guard` is dropped as intended
self.inner.live.raw.close().await
}

View File

@ -4,19 +4,17 @@ use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::TryStreamExt;
use crate::connection::Connect;
use crate::database::Database;
use crate::describe::Describe;
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::pool::Pool;
impl<'p, C> Executor<'p> for &'_ Pool<C>
impl<'p, DB: Database> Executor<'p> for &'_ Pool<DB>
where
C: 'static + Connect,
for<'c> &'c mut C: Executor<'c, Database = C::Database>,
for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
{
type Database = C::Database;
type Database = DB;
fn fetch_many<'e, 'q: 'e, E: 'q>(
self,
@ -67,7 +65,7 @@ where
#[allow(unused_macros)]
macro_rules! impl_executor_for_pool_connection {
($DB:ident, $C:ident, $R:ident) => {
impl<'c> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<$C> {
impl<'c> crate::executor::Executor<'c> for &'c mut crate::pool::PoolConnection<$DB> {
type Database = $DB;
#[inline]

View File

@ -10,26 +10,24 @@ use futures_core::task::{Poll, Waker};
use futures_util::future;
use sqlx_rt::{sleep, spawn, timeout};
use crate::connection::{Connect, Connection};
use crate::connection::Connect;
use crate::database::Database;
use crate::error::Error;
use crate::pool::deadline_as_timeout;
use super::connection::{Floating, Idle, Live};
use super::Options;
pub(crate) struct SharedPool<C> {
pub(crate) struct SharedPool<DB: Database> {
url: String,
idle_conns: ArrayQueue<Idle<C>>,
idle_conns: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Waker>,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
options: Options,
}
impl<C> SharedPool<C>
where
C: Connection,
{
impl<DB: Database> SharedPool<DB> {
pub fn options(&self) -> &Options {
&self.options
}
@ -60,11 +58,11 @@ where
}
#[inline]
pub(super) fn try_acquire(&self) -> Option<Floating<Live<C>>> {
pub(super) fn try_acquire(&self) -> Option<Floating<Live<DB>>> {
Some(self.pop_idle()?.into_live())
}
fn pop_idle(&self) -> Option<Floating<Idle<C>>> {
fn pop_idle(&self) -> Option<Floating<Idle<DB>>> {
if self.is_closed.load(Ordering::Acquire) {
return None;
}
@ -72,7 +70,7 @@ where
Some(Floating::from_idle(self.idle_conns.pop().ok()?, self))
}
pub(super) fn release(&self, floating: Floating<Live<C>>) {
pub(super) fn release(&self, floating: Floating<Live<DB>>) {
self.idle_conns
.push(floating.into_idle().into_leakable())
.expect("BUG: connection queue overflow in release()");
@ -109,7 +107,7 @@ where
let mut waker_pushed = false;
timeout(
deadline_as_timeout::<C::Database>(deadline)?,
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|ctx| -> Poll<()> {
if !waker_pushed {
@ -125,12 +123,7 @@ where
.await
.map_err(|_| Error::PoolTimedOut)
}
}
impl<C> SharedPool<C>
where
C: 'static + Connect,
{
pub(super) async fn new_arc(url: &str, options: Options) -> Result<Arc<Self>, Error> {
let mut pool = Self {
url: url.to_owned(),
@ -151,7 +144,7 @@ where
}
#[allow(clippy::needless_lifetimes)]
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<C>>, Error> {
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> {
let start = Instant::now();
let deadline = start + self.options.connect_timeout;
@ -208,15 +201,15 @@ where
&'s self,
deadline: Instant,
guard: DecrementSizeGuard<'s>,
) -> Result<Option<Floating<'s, Live<C>>>, Error> {
) -> Result<Option<Floating<'s, Live<DB>>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
let timeout = super::deadline_as_timeout::<C::Database>(deadline)?;
let timeout = super::deadline_as_timeout::<DB>(deadline)?;
// result here is `Result<Result<C, Error>, TimeoutError>`
match sqlx_rt::timeout(timeout, C::connect(&self.url)).await {
match sqlx_rt::timeout(timeout, DB::Connection::connect(&self.url)).await {
// successfully established connection
Ok(Ok(raw)) => Ok(Some(Floating::new_live(raw, guard))),
@ -241,27 +234,24 @@ where
// NOTE: Function names here are bizzare. Helpful help would be appreciated.
fn is_beyond_lifetime<C>(live: &Live<C>, options: &Options) -> bool {
fn is_beyond_lifetime<DB: Database>(live: &Live<DB>, options: &Options) -> bool {
// check if connection was within max lifetime (or not set)
options
.max_lifetime
.map_or(false, |max| live.created.elapsed() > max)
}
fn is_beyond_idle<C>(idle: &Idle<C>, options: &Options) -> bool {
fn is_beyond_idle<DB: Database>(idle: &Idle<DB>, options: &Options) -> bool {
// if connection wasn't idle too long (or not set)
options
.idle_timeout
.map_or(false, |timeout| idle.since.elapsed() > timeout)
}
async fn check_conn<'s: 'p, 'p, C>(
mut conn: Floating<'s, Idle<C>>,
async fn check_conn<'s: 'p, 'p, DB: Database>(
mut conn: Floating<'s, Idle<DB>>,
options: &'p Options,
) -> Option<Floating<'s, Live<C>>>
where
C: Connection,
{
) -> Option<Floating<'s, Live<DB>>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_lifetime(&conn, options) {
@ -287,10 +277,7 @@ where
}
/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections
fn spawn_reaper<C>(pool: &Arc<SharedPool<C>>)
where
C: 'static + Connection,
{
fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
(Some(it), None) | (None, Some(it)) => it,
@ -341,7 +328,7 @@ pub(in crate::pool) struct DecrementSizeGuard<'a> {
}
impl<'a> DecrementSizeGuard<'a> {
pub fn new<C>(pool: &'a SharedPool<C>) -> Self {
pub fn new<DB: Database>(pool: &'a SharedPool<DB>) -> Self {
Self {
size: &pool.size,
waiters: &pool.waiters,
@ -350,7 +337,7 @@ impl<'a> DecrementSizeGuard<'a> {
}
/// Return `true` if the internal references point to the same fields in `SharedPool`.
pub fn same_pool<C>(&self, pool: &'a SharedPool<C>) -> bool {
pub fn same_pool<DB: Database>(&self, pool: &'a SharedPool<DB>) -> bool {
ptr::eq(self.size, &pool.size) && ptr::eq(self.waiters, &pool.waiters)
}

View File

@ -6,7 +6,6 @@ use std::{
time::{Duration, Instant},
};
use crate::connection::Connect;
use crate::database::Database;
use crate::error::Error;
@ -24,12 +23,9 @@ pub use self::connection::PoolConnection;
pub use self::options::Builder;
/// A pool of database connections.
pub struct Pool<C>(pub(crate) Arc<SharedPool<C>>);
pub struct Pool<DB: Database>(pub(crate) Arc<SharedPool<DB>>);
impl<C> Pool<C>
where
C: 'static + Connect,
{
impl<DB: Database> Pool<DB> {
/// Creates a connection pool with the default configuration.
///
/// The connection URL syntax is documented on the connection type for the respective
@ -42,25 +38,25 @@ where
}
async fn new_with(url: &str, options: Options) -> Result<Self, Error> {
Ok(Pool(SharedPool::<C>::new_arc(url, options).await?))
Ok(Pool(SharedPool::<DB>::new_arc(url, options).await?))
}
/// Returns a [`Builder`] to configure a new connection pool.
pub fn builder() -> Builder<C> {
pub fn builder() -> Builder<DB> {
Builder::new()
}
/// Retrieves a connection from the pool.
///
/// Waits for at most the configured connection timeout before returning an error.
pub async fn acquire(&self) -> Result<PoolConnection<C>, Error> {
pub async fn acquire(&self) -> Result<PoolConnection<DB>, Error> {
self.0.acquire().await.map(|conn| conn.attach(&self.0))
}
/// Attempts to retrieve a connection from the pool if there is one available.
///
/// Returns `None` immediately if there are no idle connections available in the pool.
pub fn try_acquire(&self) -> Option<PoolConnection<C>> {
pub fn try_acquire(&self) -> Option<PoolConnection<DB>> {
self.0.try_acquire().map(|conn| conn.attach(&self.0))
}
@ -114,16 +110,13 @@ where
}
/// Returns a new [Pool] tied to the same shared connection pool.
impl<C> Clone for Pool<C> {
impl<DB: Database> Clone for Pool<DB> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl<C> fmt::Debug for Pool<C>
where
C: Connect,
{
impl<DB: Database> fmt::Debug for Pool<DB> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Pool")
.field("url", &self.0.url())

View File

@ -1,20 +1,16 @@
use std::{marker::PhantomData, time::Duration};
use super::Pool;
use crate::connection::Connect;
use crate::database::Database;
use crate::error::Error;
/// Builder for [Pool].
pub struct Builder<C> {
phantom: PhantomData<C>,
pub struct Builder<DB: Database> {
phantom: PhantomData<DB>,
options: Options,
}
impl<C> Builder<C>
where
C: 'static + Connect,
{
impl<DB: Database> Builder<DB> {
/// Get a new builder with default options.
///
/// See the source of this method for current defaults.
@ -114,19 +110,12 @@ where
/// opened and placed into the pool.
///
/// [`min_size`]: #method.min_size
pub async fn build(self, url: &str) -> Result<Pool<C>, Error>
where
C: Connect,
{
Pool::<C>::new_with(url, self.options).await
pub async fn build(self, url: &str) -> Result<Pool<DB>, Error> {
Pool::<DB>::new_with(url, self.options).await
}
}
impl<C, DB> Default for Builder<C>
where
C: 'static + Connect<Database = DB>,
DB: Database<Connection = C>,
{
impl<DB: Database> Default for Builder<DB> {
fn default() -> Self {
Self::new()
}

View File

@ -12,7 +12,7 @@ use crate::ext::ustr::UStr;
use crate::io::Decode;
use crate::postgres::connection::stream::PgStream;
use crate::postgres::message::{
CommandComplete, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
};
use crate::postgres::row::PgColumn;
use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres};

View File

@ -22,8 +22,8 @@ use either::Either;
/// new connection, will re-subscribe to all of the originally specified channels, and will resume
/// operations as normal.
pub struct PgListener {
pool: Pool<PgConnection>,
connection: Option<PoolConnection<PgConnection>>,
pool: Pool<Postgres>,
connection: Option<PoolConnection<Postgres>>,
buffer_rx: mpsc::UnboundedReceiver<Notification>,
buffer_tx: Option<mpsc::UnboundedSender<Notification>>,
channels: Vec<String>,
@ -36,7 +36,7 @@ impl PgListener {
pub async fn new(url: &str) -> Result<Self, Error> {
// Create a pool of 1 without timeouts (as they don't apply here)
// We only use the pool to handle re-connections
let pool = Pool::<PgConnection>::builder()
let pool = Pool::<Postgres>::builder()
.max_size(1)
.max_lifetime(None)
.idle_timeout(None)
@ -46,7 +46,7 @@ impl PgListener {
Self::from_pool(&pool).await
}
pub async fn from_pool(pool: &Pool<PgConnection>) -> Result<Self, Error> {
pub async fn from_pool(pool: &Pool<Postgres>) -> Result<Self, Error> {
// Pull out an initial connection
let mut connection = pool.acquire().await?;

View File

@ -27,7 +27,7 @@ pub use type_info::{PgTypeInfo, PgTypeKind};
pub use value::{PgValue, PgValueFormat, PgValueRef};
/// An alias for [`Pool`][crate::pool::Pool], specialized for Postgres.
pub type PgPool = crate::pool::Pool<PgConnection>;
pub type PgPool = crate::pool::Pool<Postgres>;
// NOTE: required due to the lack of lazy normalization
impl_into_arguments_for_arguments!(PgArguments);

View File

@ -28,7 +28,7 @@ pub use type_info::SqliteTypeInfo;
pub use value::{SqliteValue, SqliteValueRef};
/// An alias for [`Pool`][crate::pool::Pool], specialized for SQLite.
pub type SqlitePool = crate::pool::Pool<SqliteConnection>;
pub type SqlitePool = crate::pool::Pool<Sqlite>;
// NOTE: required due to the lack of lazy normalization
impl_into_arguments_for_arguments!(SqliteArguments<'q>);