refactor: PoolOptions::new() takes no parameters and the final .connect method takes the URI

This commit is contained in:
Ryan Leckey 2020-07-14 06:07:29 -07:00
parent 8de2419b14
commit 793f247604
7 changed files with 113 additions and 49 deletions

View File

@ -102,6 +102,11 @@ name = "any"
path = "tests/any/any.rs"
required-features = [ "any" ]
[[test]]
name = "any-pool"
path = "tests/any/pool.rs"
required-features = [ "any" ]
#
# SQLite
#

View File

@ -1,4 +1,5 @@
use super::connection::{Floating, Idle, Live};
use crate::connection::ConnectOptions;
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
@ -15,6 +16,7 @@ use std::sync::Arc;
use std::time::Instant;
pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Waker>,
pub(super) size: AtomicU32,
@ -140,8 +142,12 @@ impl<DB: Database> SharedPool<DB> {
.map_err(|_| Error::PoolTimedOut)
}
pub(super) fn new_arc(options: PoolOptions<DB>) -> Arc<Self> {
pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
) -> Arc<Self> {
let pool = Self {
connect_options,
idle_conns: ArrayQueue::new(options.max_connections as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
@ -207,12 +213,7 @@ impl<DB: Database> SharedPool<DB> {
let timeout = super::deadline_as_timeout::<DB>(deadline)?;
// result here is `Result<Result<C, Error>, TimeoutError>`
match sqlx_rt::timeout(
timeout,
DB::Connection::connect_with(&self.options.connect_options),
)
.await
{
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
if let Some(callback) = &self.options.after_connect {

View File

@ -20,7 +20,7 @@
//! use sqlx::Pool;
//! use sqlx::postgres::Postgres;
//!
//! let pool = Pool::<Postgres>::new("postgres://").await?;
//! let pool = Pool::<Postgres>::connect("postgres://").await?;
//! ```
//!
//! For convenience, database-specific type aliases are provided:
@ -28,7 +28,7 @@
//! ```rust,ignore
//! use sqlx::mssql::MssqlPool;
//!
//! let pool = MssqlPool::new("mssql://").await?;
//! let pool = MssqlPool::connect("mssql://").await?;
//! ```
//!
//! # Using a connection pool
@ -46,11 +46,11 @@
//!
use self::inner::SharedPool;
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::transaction::Transaction;
use std::fmt;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -76,14 +76,29 @@ impl<DB: Database> Pool<DB> {
/// Creates a new connection pool with a default pool configuration and
/// the given connection URI; and, immediately establishes one connection.
pub async fn connect(uri: &str) -> Result<Self, Error> {
PoolOptions::<DB>::new(uri)?.connect().await
PoolOptions::<DB>::new().connect(uri).await
}
/// Creates a new connection pool with a default pool configuration and
/// the given connection options; and, immediately establishes one connection.
pub async fn connect_with(
options: <DB::Connection as Connection>::Options,
) -> Result<Self, Error> {
PoolOptions::<DB>::new().connect_with(options).await
}
/// Creates a new connection pool with a default pool configuration and
/// the given connection URI; and, will establish a connections as the pool
/// starts to be used.
pub fn connect_lazy(uri: &str) -> Result<Self, Error> {
Ok(PoolOptions::<DB>::new(uri)?.connect_lazy())
PoolOptions::<DB>::new().connect_lazy(uri)
}
/// Creates a new connection pool with a default pool configuration and
/// the given connection options; and, will establish a connections as the pool
/// starts to be used.
pub fn connect_lazy_with(options: <DB::Connection as Connection>::Options) -> Self {
PoolOptions::<DB>::new().connect_lazy_with(options)
}
/// Retrieves a connection from the pool.

View File

@ -4,15 +4,12 @@ use crate::error::Error;
use crate::pool::inner::SharedPool;
use crate::pool::Pool;
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use sqlx_rt::spawn;
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
pub struct PoolOptions<DB: Database> {
pub(crate) connect_options: <DB::Connection as Connection>::Options,
pub(crate) test_before_acquire: bool,
pub(crate) after_connect: Option<
Box<
@ -37,31 +34,26 @@ pub struct PoolOptions<DB: Database> {
pub(crate) fair: bool,
}
fn default<DB: Database>(
connect_options: <DB::Connection as Connection>::Options,
) -> PoolOptions<DB> {
PoolOptions {
connect_options,
after_connect: None,
test_before_acquire: true,
before_acquire: None,
after_release: None,
max_connections: 10,
min_connections: 0,
connect_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
fair: true,
impl<DB: Database> Default for PoolOptions<DB> {
fn default() -> Self {
Self::new()
}
}
impl<DB: Database> PoolOptions<DB> {
pub fn new(uri: &str) -> Result<Self, Error> {
Ok(default(uri.parse()?))
}
pub fn new_with(options: <DB::Connection as Connection>::Options) -> Self {
default(options)
pub fn new() -> Self {
Self {
after_connect: None,
test_before_acquire: true,
before_acquire: None,
after_release: None,
max_connections: 10,
min_connections: 0,
connect_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
fair: true,
}
}
/// Set the maximum number of connections that this pool should maintain.
@ -154,7 +146,6 @@ impl<DB: Database> PoolOptions<DB> {
where
for<'c> F:
Fn(&'c mut DB::Connection) -> BoxFuture<'c, Result<(), Error>> + 'static + Send + Sync,
// Fut: Future<Output = Result<(), Error>> + Send,
{
self.after_connect = Some(Box::new(callback));
self
@ -162,10 +153,12 @@ impl<DB: Database> PoolOptions<DB> {
pub fn before_acquire<F, Fut>(mut self, callback: F) -> Self
where
F: Fn(&mut DB::Connection) -> Fut + 'static + Send + Sync,
Fut: Future<Output = Result<bool, Error>> + 'static + Send,
for<'c> F: Fn(&'c mut DB::Connection) -> BoxFuture<'c, Result<bool, Error>>
+ 'static
+ Send
+ Sync,
{
self.before_acquire = Some(Box::new(move |conn| callback(conn).boxed()));
self.before_acquire = Some(Box::new(callback));
self
}
@ -178,8 +171,16 @@ impl<DB: Database> PoolOptions<DB> {
}
/// Creates a new pool from this configuration and immediately establishes one connection.
pub async fn connect(self) -> Result<Pool<DB>, Error> {
let shared = SharedPool::new_arc(self);
pub async fn connect(self, uri: &str) -> Result<Pool<DB>, Error> {
self.connect_with(uri.parse()?).await
}
/// Creates a new pool from this configuration and immediately establishes one connection.
pub async fn connect_with(
self,
options: <DB::Connection as Connection>::Options,
) -> Result<Pool<DB>, Error> {
let shared = SharedPool::new_arc(self, options);
init_min_connections(&shared).await?;
@ -188,8 +189,14 @@ impl<DB: Database> PoolOptions<DB> {
/// Creates a new pool from this configuration and will establish a connections as the pool
/// starts to be used.
pub fn connect_lazy(self) -> Pool<DB> {
let shared = SharedPool::new_arc(self);
pub fn connect_lazy(self, uri: &str) -> Result<Pool<DB>, Error> {
Ok(self.connect_lazy_with(uri.parse()?))
}
/// Creates a new pool from this configuration and will establish a connections as the pool
/// starts to be used.
pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
let shared = SharedPool::new_arc(self, options);
let _ = spawn({
let shared = Arc::clone(&shared);
@ -230,7 +237,6 @@ impl<DB: Database> Debug for PoolOptions<DB> {
.field("max_lifetime", &self.max_lifetime)
.field("idle_timeout", &self.idle_timeout)
.field("test_before_acquire", &self.test_before_acquire)
.field("connect_options", &self.connect_options)
.finish()
}
}

View File

@ -34,11 +34,11 @@ impl PgListener {
pub async fn connect(uri: &str) -> Result<Self, Error> {
// Create a pool of 1 without timeouts (as they don't apply here)
// We only use the pool to handle re-connections
let pool = PoolOptions::<Postgres>::new(uri)?
let pool = PoolOptions::<Postgres>::new()
.max_connections(1)
.max_lifetime(None)
.idle_timeout(None)
.connect()
.connect(uri)
.await?;
Self::connect_with(&pool).await

View File

@ -26,11 +26,11 @@ where
{
setup_if_needed();
let pool = PoolOptions::<DB>::new(&env::var("DATABASE_URL")?)?
let pool = PoolOptions::<DB>::new()
.min_connections(0)
.max_connections(5)
.test_before_acquire(true)
.connect()
.connect(&env::var("DATABASE_URL")?)
.await?;
Ok(pool)

37
tests/any/pool.rs Normal file
View File

@ -0,0 +1,37 @@
use futures::{FutureExt, TryFutureExt};
use sqlx::any::AnyPoolOptions;
use sqlx::prelude::*;
use sqlx_test::new;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[sqlx_macros::test]
async fn pool_should_invoke_after_connect() -> anyhow::Result<()> {
let counter = Arc::new(AtomicUsize::new(0));
let pool = AnyPoolOptions::new()
.after_connect({
let counter = counter.clone();
move |conn| {
let counter = counter.clone();
Box::pin(async move {
counter.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
})
.connect(&dotenv::var("DATABASE_URL")?)
.await?;
let _ = pool.acquire().await?;
let _ = pool.acquire().await?;
let _ = pool.acquire().await?;
let _ = pool.acquire().await?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
Ok(())
}