WIP pool changes

This commit is contained in:
Austin Bonander 2024-10-18 19:46:36 -07:00
parent 0601c3a75d
commit 3ec35e06bd
13 changed files with 851 additions and 522 deletions

View File

@ -22,11 +22,10 @@ json = ["serde", "serde_json"]
# for conditional compilation
_rt-async-global-executor = ["async-global-executor", "_rt-async-io", "_rt-async-task"]
_rt-async-io = ["async-io", "async-fs"] # see note at async-fs declaration
_rt-async-std = ["async-std", "_rt-async-io"]
_rt-async-std = ["async-std", "_rt-async-io", "ease-off/async-io-2"]
_rt-async-task = ["async-task"]
_rt-smol = ["smol", "_rt-async-io", "_rt-async-task"]
_rt-tokio = ["tokio", "tokio-stream"]
_rt-tokio = ["tokio", "tokio-stream", "ease-off/tokio"]
_tls-native-tls = ["native-tls"]
_tls-rustls-aws-lc-rs = ["_tls-rustls", "rustls/aws-lc-rs", "webpki-roots"]
_tls-rustls-ring-webpki = ["_tls-rustls", "rustls/ring", "webpki-roots"]
@ -83,7 +82,6 @@ crossbeam-queue = "0.3.2"
either = "1.6.1"
futures-core = { version = "0.3.19", default-features = false }
futures-io = "0.3.24"
futures-intrusive = "0.5.0"
futures-util = { version = "0.3.19", default-features = false, features = ["alloc", "sink", "io"] }
log = { version = "0.4.18", default-features = false }
memchr = { version = "2.4.1", default-features = false }
@ -105,6 +103,9 @@ hashbrown = "0.16.0"
thiserror.workspace = true
ease-off = { workspace = true, features = ["futures"] }
pin-project-lite = "0.2.14"
[dev-dependencies]
tokio = { version = "1", features = ["rt"] }

View File

@ -11,6 +11,9 @@ use crate::database::Database;
use crate::type_info::TypeInfo;
use crate::types::Type;
#[cfg(doc)]
use crate::pool::{PoolConnector, PoolOptions};
/// A specialized `Result` type for SQLx.
pub type Result<T, E = Error> = ::std::result::Result<T, E>;
@ -110,6 +113,19 @@ pub enum Error {
#[error("attempted to acquire a connection on a closed pool")]
PoolClosed,
/// A custom error that may be returned from a [`PoolConnector`] implementation.
#[error("error returned from pool connector")]
PoolConnector {
#[source]
source: BoxDynError,
/// If `true`, `PoolConnector::connect()` is called again in an exponential backoff loop
/// up to [`PoolOptions::connect_timeout`].
///
/// See [`PoolConnector::connect()`] for details.
retryable: bool,
},
/// A background worker has crashed.
#[error("attempted to communicate with a crashed background worker")]
WorkerCrashed,
@ -228,11 +244,6 @@ pub trait DatabaseError: 'static + Send + Sync + StdError {
#[doc(hidden)]
fn into_error(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static>;
#[doc(hidden)]
fn is_transient_in_connect_phase(&self) -> bool {
false
}
/// Returns the name of the constraint that triggered the error, if applicable.
/// If the error was caused by a conflict of a unique index, this will be the index name.
///
@ -270,6 +281,24 @@ pub trait DatabaseError: 'static + Send + Sync + StdError {
fn is_check_violation(&self) -> bool {
matches!(self.kind(), ErrorKind::CheckViolation)
}
/// Returns `true` if this error can be retried when connecting to the database.
///
/// Defaults to `false`.
///
/// For example, the Postgres driver overrides this to return `true` for the following error codes:
///
/// * `53300 too_many_connections`: returned when the maximum connections are exceeded
/// on the server. Assumed to be the result of a temporary overcommit
/// (e.g. an extra application replica being spun up to replace one that is going down).
/// * This error being consistently logged or returned is a likely indicator of a misconfiguration;
/// the sum of [`PoolOptions::max_connections`] for all replicas should not exceed
/// the maximum connections allowed by the server.
/// * `57P03 cannot_connect_now`: returned when the database server is still starting up
/// and the tcop component is not ready to accept connections yet.
fn is_retryable_connect_error(&self) -> bool {
false
}
}
impl dyn DatabaseError {

View File

@ -0,0 +1,461 @@
use crate::connection::{ConnectOptions, Connection};
use crate::database::Database;
use crate::pool::connection::{Floating, Live};
use crate::pool::inner::PoolInner;
use crate::pool::PoolConnection;
use crate::rt::JoinHandle;
use crate::Error;
use ease_off::EaseOff;
use event_listener::{Event, EventListener};
use std::future::Future;
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tracing::Instrument;
use std::io;
/// Custom connect callback for [`Pool`][crate::pool::Pool].
///
/// Implemented for closures with the signature
/// `Fn(PoolConnectMetadata) -> impl Future<Output = sqlx::Result<impl Connection>>`.
///
/// See [`Self::connect()`] for details and implementation advice.
///
/// # Example: `after_connect` Replacement
/// The `after_connect` callback was removed in 0.9.0 as it was redundant to this API.
///
/// This example uses Postgres but may be adapted to any driver.
///
/// ```rust,no_run
/// use std::sync::Arc;
/// use sqlx::PgConnection;
/// use sqlx::postgres::PgPoolOptions;
/// use sqlx::Connection;
///
/// # async fn _example() -> sqlx::Result<()> {
/// // `PoolConnector` is implemented for closures but has restrictions on returning borrows
/// // due to current language limitations.
/// //
/// // This example shows how to get around this using `Arc`.
/// let database_url: Arc<str> = "postgres://...".into();
///
/// let pool = PgPoolOptions::new()
/// .min_connections(5)
/// .max_connections(30)
/// .connect_with_connector(move |meta| {
/// let database_url = database_url.clone();
/// async move {
/// println!(
/// "opening connection {}, attempt {}; elapsed time: {}",
/// meta.pool_size,
/// meta.num_attempts + 1,
/// meta.start.elapsed()
/// );
///
/// let mut conn = PgConnection::connect(&database_url).await?;
///
/// // Override the time zone of the connection.
/// sqlx::raw_sql("SET TIME ZONE 'Europe/Berlin'").await?;
///
/// Ok(conn)
/// }
/// })
/// .await?;
/// # Ok(())
/// # }
/// ```
///
/// # Example: `set_connect_options` Replacement
/// `set_connect_options` and `get_connect_options` were removed in 0.9.0 because they complicated
/// the pool internals. They can be reimplemented by capturing a mutex, or similar, in the callback.
///
/// This example uses Postgres and [`tokio::sync::Mutex`] but may be adapted to any driver
/// or `async-std`, respectively.
///
/// ```rust,no_run
/// use std::sync::Arc;
/// use tokio::sync::{Mutex, RwLock};
/// use sqlx::PgConnection;
/// use sqlx::postgres::PgConnectOptions;
/// use sqlx::postgres::PgPoolOptions;
/// use sqlx::ConnectOptions;
///
/// # async fn _example() -> sqlx::Result<()> {
/// // If you do not wish to hold the lock during the connection attempt,
/// // you could use `Arc<PgConnectOptions>` instead.
/// let connect_opts: Arc<RwLock<PgConnectOptions>> = Arc::new(RwLock::new("postgres://...".parse()?));
/// // We need a copy that will be captured by the closure.
/// let connect_opts_ = connect_opts.clone();
///
/// let pool = PgPoolOptions::new()
/// .connect_with_connector(move |meta| {
/// let connect_opts_ = connect_opts.clone();
/// async move {
/// println!(
/// "opening connection {}, attempt {}; elapsed time: {}",
/// meta.pool_size,
/// meta.num_attempts + 1,
/// meta.start.elapsed()
/// );
///
/// connect_opts.read().await.connect().await
/// }
/// })
/// .await?;
///
/// // Close the connection that was previously opened by `connect_with_connector()`.
/// pool.acquire().await?.close().await?;
///
/// // Simulating a credential rotation
/// let mut write_connect_opts = connect_opts.write().await;
/// write_connect_opts
/// .set_username("new_username")
/// .set_password("new password");
///
/// // Should use the new credentials.
/// let mut conn = pool.acquire().await?;
///
/// # Ok(())
/// # }
/// ```
///
/// # Example: Custom Implementation
///
/// Custom implementations of `PoolConnector` trade a little bit of boilerplate for much
/// more flexibility. Thanks to the signature of `connect()`, they can return a `Future`
/// type that borrows from `self`.
///
/// This example uses Postgres but may be adapted to any driver.
///
/// ```rust,no_run
/// use sqlx::{PgConnection, Postgres};
/// use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
/// use sqlx_core::connection::ConnectOptions;
/// use sqlx_core::pool::{PoolConnectMetadata, PoolConnector};
///
/// struct MyConnector {
/// // A list of servers to connect to in a high-availability configuration.
/// host_ports: Vec<(String, u16)>,
/// username: String,
/// password: String,
/// }
///
/// impl PoolConnector<Postgres> for MyConnector {
/// // The desugaring of `async fn` is compatible with the signature of `connect()`.
/// async fn connect(&self, meta: PoolConnectMetadata) -> sqlx::Result<PgConnection> {
/// self.get_connect_options(meta.num_attempts)
/// .connect()
/// .await
/// }
/// }
///
/// impl MyConnector {
/// fn get_connect_options(&self, attempt: usize) -> PgConnectOptions {
/// // Select servers in a round-robin.
/// let (ref host, port) = self.host_ports[attempt % self.host_ports.len()];
///
/// PgConnectOptions::new()
/// .host(host)
/// .port(port)
/// .username(&self.username)
/// .password(&self.password)
/// }
/// }
///
/// # async fn _example() -> sqlx::Result<()> {
/// let pool = PgPoolOptions::new()
/// .max_connections(25)
/// .connect_with_connector(MyConnector {
/// host_ports: vec![
/// ("db1.postgres.cluster.local".into(), 5432),
/// ("db2.postgres.cluster.local".into(), 5432),
/// ("db3.postgres.cluster.local".into(), 5432),
/// ("db4.postgres.cluster.local".into(), 5432),
/// ],
/// username: "my_username".into(),
/// password: "my password".into(),
/// })
/// .await?;
///
/// let conn = pool.acquire().await?;
///
/// # Ok(())
/// # }
/// ```
pub trait PoolConnector<DB: Database>: Send + Sync + 'static {
/// Open a connection for the pool.
///
/// Any setup that must be done on the connection should be performed before it is returned.
///
/// If this method returns an error that is known to be retryable, it is called again
/// in an exponential backoff loop. Retryable errors include, but are not limited to:
///
/// * [`io::ErrorKind::ConnectionRefused`]
/// * Database errors for which
/// [`is_retryable_connect_error`][crate::error::DatabaseError::is_retryable_connect_error]
/// returns `true`.
/// * [`Error::PoolConnector`] with `retryable: true`.
/// This error kind is not returned internally and is designed to allow this method to return
/// arbitrary error types not otherwise supported.
///
/// Manual implementations of this method may also use the signature:
/// ```rust,ignore
/// async fn connect(
/// &self,
/// meta: PoolConnectMetadata
/// ) -> sqlx::Result<{PgConnection, MySqlConnection, SqliteConnection, etc.}>
/// ```
///
/// Note: the returned future must be `Send`.
fn connect(
&self,
meta: PoolConnectMetadata,
) -> impl Future<Output = crate::Result<DB::Connection>> + Send + '_;
}
impl<DB, F, Fut> PoolConnector<DB> for F
where
DB: Database,
F: Fn(PoolConnectMetadata) -> Fut + Send + Sync + 'static,
Fut: Future<Output = crate::Result<DB::Connection>> + Send + 'static,
{
fn connect(
&self,
meta: PoolConnectMetadata,
) -> impl Future<Output = crate::Result<DB::Connection>> + Send + '_ {
self(meta)
}
}
pub(crate) struct DefaultConnector<DB: Database>(
pub <<DB as Database>::Connection as Connection>::Options,
);
impl<DB: Database> PoolConnector<DB> for DefaultConnector<DB> {
fn connect(
&self,
_meta: PoolConnectMetadata,
) -> impl Future<Output = crate::Result<DB::Connection>> + Send + '_ {
self.0.connect()
}
}
/// Metadata passed to [`PoolConnector::connect()`] for every connection attempt.
#[derive(Debug)]
pub struct PoolConnectMetadata {
/// The instant at which the current connection task was started, including all attempts.
///
/// May be used for reporting purposes, or to implement a custom backoff.
pub start: Instant,
/// The number of attempts that have occurred so far.
pub num_attempts: usize,
pub pool_size: usize,
}
pub struct DynConnector<DB: Database> {
// We want to spawn the connection attempt as a task anyway
connect: Box<
dyn Fn(ConnectPermit<DB>, usize) -> JoinHandle<crate::Result<PoolConnection<DB>>>
+ Send
+ Sync
+ 'static,
>,
}
impl<DB: Database> DynConnector<DB> {
pub fn new(connector: impl PoolConnector<DB>) -> Self {
let connector = Arc::new(connector);
Self {
connect: Box::new(move |permit, size| {
crate::rt::spawn(connect_with_backoff(permit, connector.clone(), size))
}),
}
}
pub fn connect(
&self,
permit: ConnectPermit<DB>,
size: usize,
) -> JoinHandle<crate::Result<PoolConnection<DB>>> {
(self.connect)(permit, size)
}
}
pub struct ConnectionCounter {
connections: AtomicUsize,
connect_available: Event,
}
impl ConnectionCounter {
pub fn new() -> Self {
Self {
connections: AtomicUsize::new(0),
connect_available: Event::new(),
}
}
pub fn connections(&self) -> usize {
self.connections.load(Ordering::Acquire)
}
pub async fn drain(&self) {
while self.connections.load(Ordering::Acquire) > 0 {
self.connect_available.listen().await;
}
}
/// Attempt to acquire a permit from both this instance, and the parent pool, if applicable.
///
/// Returns the permit, and the current size of the pool.
pub async fn acquire_permit<DB: Database>(
&self,
pool: &Arc<PoolInner<DB>>,
) -> (usize, ConnectPermit<DB>) {
// Check that `self` can increase size first before we check the parent.
let (size, permit) = self.acquire_permit_self(pool).await;
if let Some(parent) = &pool.options.parent_pool {
let (_, permit) = parent.0.counter.acquire_permit_self(&parent.0).await;
// consume the parent permit
permit.consume();
}
(size, permit)
}
// Separate method because `async fn`s cannot be recursive.
/// Attempt to acquire a [`ConnectPermit`] from this instance and this instance only.
async fn acquire_permit_self<DB: Database>(
&self,
pool: &Arc<PoolInner<DB>>,
) -> (usize, ConnectPermit<DB>) {
debug_assert!(ptr::addr_eq(self, &pool.counter));
let mut should_wait = pool.options.fair && self.connect_available.total_listeners() > 0;
for attempt in 1usize.. {
if should_wait {
self.connect_available.listen().await;
}
let res = self.connections.fetch_update(
Ordering::Release,
Ordering::Acquire,
|connections| {
(connections < pool.options.max_connections).then_some(connections + 1)
},
);
if let Ok(prev_size) = res {
let size = prev_size + 1;
tracing::trace!(target: "sqlx::pool::connect", size, "increased size");
return (
prev_size + 1,
ConnectPermit {
pool: Some(Arc::clone(pool)),
},
);
}
should_wait = true;
if attempt == 2 {
tracing::warn!(
"unable to acquire a connect permit after sleeping; this may indicate a bug"
);
}
}
panic!("BUG: was never able to acquire a connection despite waking many times")
}
pub fn release_permit<DB: Database>(&self, pool: &PoolInner<DB>) {
debug_assert!(ptr::addr_eq(self, &pool.counter));
self.connections.fetch_sub(1, Ordering::Release);
self.connect_available.notify(1usize);
if let Some(parent) = &pool.options.parent_pool {
parent.0.counter.release_permit(&parent.0);
}
}
}
pub struct ConnectPermit<DB: Database> {
pool: Option<Arc<PoolInner<DB>>>,
}
impl<DB: Database> ConnectPermit<DB> {
pub fn float_existing(pool: Arc<PoolInner<DB>>) -> Self {
Self { pool: Some(pool) }
}
pub fn pool(&self) -> &Arc<PoolInner<DB>> {
self.pool.as_ref().unwrap()
}
pub fn consume(mut self) {
self.pool = None;
}
}
impl<DB: Database> Drop for ConnectPermit<DB> {
fn drop(&mut self) {
if let Some(pool) = self.pool.take() {
pool.counter.release_permit(&pool);
}
}
}
#[tracing::instrument(
target = "sqlx::pool::connect",
skip_all,
fields(connection = size),
err
)]
async fn connect_with_backoff<DB: Database>(
permit: ConnectPermit<DB>,
connector: Arc<impl PoolConnector<DB>>,
size: usize,
) -> crate::Result<PoolConnection<DB>> {
if permit.pool().is_closed() {
return Err(Error::PoolClosed);
}
let mut ease_off = EaseOff::start_timeout(permit.pool().options.connect_timeout);
for attempt in 1usize.. {
let meta = PoolConnectMetadata {
start: ease_off.started_at(),
num_attempts: attempt,
pool_size: size,
};
let conn = ease_off
.try_async(connector.connect(meta))
.await
.or_retry_if(|e| can_retry_error(e.inner()))?;
if let Some(conn) = conn {
return Ok(Floating::new_live(conn, permit).reattach());
}
}
Err(Error::PoolTimedOut)
}
fn can_retry_error(e: &Error) -> bool {
match e {
Error::Io(e) if e.kind() == io::ErrorKind::ConnectionRefused => true,
Error::Database(e) => e.is_retryable_connect_error(),
_ => false,
}
}

View File

@ -10,7 +10,8 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
use super::inner::{is_beyond_max_lifetime, PoolInner};
use crate::pool::connect::ConnectPermit;
use crate::pool::options::PoolConnectionMetadata;
const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
@ -37,7 +38,7 @@ pub(super) struct Idle<DB: Database> {
/// RAII wrapper for connections being handled by functions that may drop them
pub(super) struct Floating<DB: Database, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<DB>,
pub(super) permit: ConnectPermit<DB>,
}
const EXPECT_MSG: &str = "BUG: inner connection already taken!";
@ -127,6 +128,10 @@ impl<DB: Database> PoolConnection<DB> {
self.live.take().expect(EXPECT_MSG)
}
pub(super) fn into_floating(mut self) -> Floating<DB, Live<DB>> {
self.take_live().float(self.pool.clone())
}
/// Test the connection to make sure it is still live before returning it to the pool.
///
/// This effectively runs the drop handler eagerly instead of spawning a task to do it.
@ -215,7 +220,7 @@ impl<DB: Database> Live<DB> {
Floating {
inner: self,
// create a new guard from a previously leaked permit
guard: DecrementSizeGuard::new_permit(pool),
permit: ConnectPermit::float_existing(pool),
}
}
@ -242,22 +247,22 @@ impl<DB: Database> DerefMut for Idle<DB> {
}
impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
pub fn new_live(conn: DB::Connection, permit: ConnectPermit<DB>) -> Self {
Self {
inner: Live {
raw: conn,
created_at: Instant::now(),
},
guard,
permit,
}
}
pub fn reattach(self) -> PoolConnection<DB> {
let Floating { inner, guard } = self;
let Floating { inner, permit } = self;
let pool = Arc::clone(&guard.pool);
let pool = Arc::clone(permit.pool());
guard.cancel();
permit.consume();
PoolConnection {
live: Some(inner),
close_on_drop: false,
@ -266,7 +271,7 @@ impl<DB: Database> Floating<DB, Live<DB>> {
}
pub fn release(self) {
self.guard.pool.clone().release(self);
self.permit.pool().clone().release(self);
}
/// Return the connection to the pool.
@ -274,19 +279,19 @@ impl<DB: Database> Floating<DB, Live<DB>> {
/// Returns `true` if the connection was successfully returned, `false` if it was closed.
async fn return_to_pool(mut self) -> bool {
// Immediately close the connection.
if self.guard.pool.is_closed() {
if self.permit.pool().is_closed() {
self.close().await;
return false;
}
// If the connection is beyond max lifetime, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
if is_beyond_max_lifetime(&self.inner, &self.permit.pool().options) {
self.close().await;
return false;
}
if let Some(test) = &self.guard.pool.options.after_release {
if let Some(test) = &self.permit.pool().options.after_release {
let meta = self.metadata();
match (test)(&mut self.inner.raw, meta).await {
Ok(true) => (),
@ -345,7 +350,7 @@ impl<DB: Database> Floating<DB, Live<DB>> {
pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
permit: self.permit,
}
}
@ -358,14 +363,10 @@ impl<DB: Database> Floating<DB, Live<DB>> {
}
impl<DB: Database> Floating<DB, Idle<DB>> {
pub fn from_idle(
idle: Idle<DB>,
pool: Arc<PoolInner<DB>>,
permit: AsyncSemaphoreReleaser<'_>,
) -> Self {
pub fn from_idle(idle: Idle<DB>, pool: Arc<PoolInner<DB>>) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::from_permit(pool, permit),
permit: ConnectPermit::float_existing(pool),
}
}
@ -376,21 +377,21 @@ impl<DB: Database> Floating<DB, Idle<DB>> {
pub fn into_live(self) -> Floating<DB, Live<DB>> {
Floating {
inner: self.inner.live,
guard: self.guard,
permit: self.permit,
}
}
pub async fn close(self) -> DecrementSizeGuard<DB> {
pub async fn close(self) -> ConnectPermit<DB> {
if let Err(error) = self.inner.live.raw.close().await {
tracing::debug!(%error, "error occurred while closing the pool connection");
}
self.guard
self.permit
}
pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
pub async fn close_hard(self) -> ConnectPermit<DB> {
let _ = self.inner.live.raw.close_hard().await;
self.guard
self.permit
}
pub fn metadata(&self) -> PoolConnectionMetadata {

View File

@ -0,0 +1,97 @@
use crate::connection::Connection;
use crate::database::Database;
use crate::pool::connection::{Floating, Idle, Live};
use crate::pool::inner::PoolInner;
use crossbeam_queue::ArrayQueue;
use event_listener::Event;
use futures_util::FutureExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub struct IdleQueue<DB: Database> {
queue: ArrayQueue<Idle<DB>>,
// Keep a separate count because `ArrayQueue::len()` loops until the head and tail pointers
// stop changing, which may never happen at high contention.
len: AtomicUsize,
release_event: Event,
fair: bool,
}
impl<DB: Database> IdleQueue<DB> {
pub fn new(fair: bool, cap: usize) -> Self {
Self {
queue: ArrayQueue::new(cap),
len: AtomicUsize::new(0),
release_event: Event::new(),
fair,
}
}
pub fn len(&self) -> usize {
self.len.load(Ordering::Acquire)
}
pub async fn acquire(&self, pool: &Arc<PoolInner<DB>>) -> Floating<DB, Idle<DB>> {
let mut should_wait = self.fair && self.release_event.total_listeners() > 0;
for attempt in 1usize.. {
if should_wait {
self.release_event.listen().await;
}
if let Some(conn) = self.try_acquire(pool) {
return conn;
}
should_wait = true;
if attempt == 2 {
tracing::warn!(
"unable to acquire a connection after sleeping; this may indicate a bug"
);
}
}
panic!("BUG: was never able to acquire a connection despite waking many times")
}
pub fn try_acquire(&self, pool: &Arc<PoolInner<DB>>) -> Option<Floating<DB, Idle<DB>>> {
self.len
.fetch_update(Ordering::Release, Ordering::Acquire, |len| {
len.checked_sub(1)
})
.ok()
.and_then(|_| {
let conn = self.queue.pop()?;
Some(Floating::from_idle(conn, Arc::clone(pool)))
})
}
pub fn release(&self, conn: Floating<DB, Live<DB>>) {
let Floating {
inner: conn,
permit,
} = conn.into_idle();
self.queue
.push(conn)
.unwrap_or_else(|_| panic!("BUG: idle queue capacity exceeded"));
self.len.fetch_add(1, Ordering::Release);
self.release_event.notify(1usize);
// Don't decrease the size.
permit.consume();
}
pub fn drain(&self, pool: &PoolInner<DB>) {
while let Some(conn) = self.queue.pop() {
// Hopefully will send at least a TCP FIN packet.
conn.live.raw.close_hard().now_or_never();
pool.counter.release_permit(pool);
}
}
}

View File

@ -1,33 +1,29 @@
use super::connection::{Floating, Idle, Live};
use crate::connection::ConnectOptions;
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions};
use crossbeam_queue::ArrayQueue;
use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};
use crate::pool::{CloseEvent, Pool, PoolConnection, PoolConnector, PoolOptions};
use std::cmp;
use std::future::{self, Future};
use std::pin::pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::task::Poll;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::ready;
use crate::logger::private_level_filter_to_trace_level;
use crate::pool::options::PoolConnectionMetadata;
use crate::pool::connect::{ConnectPermit, ConnectionCounter, DynConnector};
use crate::pool::idle::IdleQueue;
use crate::private_tracing_dynamic_event;
use futures_util::future::{self, OptionFuture};
use futures_util::FutureExt;
use std::time::{Duration, Instant};
use tracing::Level;
pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
pub(super) semaphore: AsyncSemaphore,
pub(super) size: AtomicUsize,
pub(super) num_idle: AtomicUsize,
pub(super) connector: DynConnector<DB>,
pub(super) counter: ConnectionCounter,
pub(super) idle: IdleQueue<DB>,
is_closed: AtomicBool,
pub(super) on_closed: event_listener::Event,
pub(super) options: PoolOptions<DB>,
@ -38,25 +34,12 @@ pub(crate) struct PoolInner<DB: Database> {
impl<DB: Database> PoolInner<DB> {
pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
connector: impl PoolConnector<DB>,
) -> Arc<Self> {
let capacity = options.max_connections as usize;
let semaphore_capacity = if let Some(parent) = &options.parent_pool {
assert!(options.max_connections <= parent.options().max_connections);
assert_eq!(options.fair, parent.options().fair);
// The child pool must steal permits from the parent
0
} else {
capacity
};
let pool = Self {
connect_options: RwLock::new(Arc::new(connect_options)),
idle_conns: ArrayQueue::new(capacity),
semaphore: AsyncSemaphore::new(options.fair, semaphore_capacity),
size: AtomicUsize::new(0),
num_idle: AtomicUsize::new(0),
connector: DynConnector::new(connector),
counter: ConnectionCounter::new(),
idle: IdleQueue::new(options.fair, options.max_connections),
is_closed: AtomicBool::new(false),
on_closed: event_listener::Event::new(),
acquire_time_level: private_level_filter_to_trace_level(options.acquire_time_level),
@ -72,16 +55,11 @@ impl<DB: Database> PoolInner<DB> {
}
pub(super) fn size(&self) -> usize {
self.size.load(Ordering::Acquire)
self.counter.connections()
}
pub(super) fn num_idle(&self) -> usize {
// We don't use `self.idle_conns.len()` as it waits for the internal
// head and tail pointers to stop changing for a moment before calculating the length,
// which may take a long time at high levels of churn.
//
// By maintaining our own atomic count, we avoid that issue entirely.
self.num_idle.load(Ordering::Acquire)
self.idle.len()
}
pub(super) fn is_closed(&self) -> bool {
@ -97,24 +75,11 @@ impl<DB: Database> PoolInner<DB> {
self.mark_closed();
async move {
// For child pools, we need to acquire permits we actually have rather than
// max_connections
let permits_to_acquire = if self.options.parent_pool.is_some() {
// Child pools start with 0 permits, so we acquire based on current size
self.size()
} else {
// Parent pools can acquire all max_connections permits
self.options.max_connections
};
let _permits = self.semaphore.acquire(permits_to_acquire).await;
while let Some(idle) = self.idle_conns.pop() {
let _ = idle.live.raw.close().await;
while let Some(idle) = self.idle.try_acquire(self) {
idle.close().await;
}
self.num_idle.store(0, Ordering::Release);
self.size.store(0, Ordering::Release);
self.counter.drain().await;
}
}
@ -124,56 +89,6 @@ impl<DB: Database> PoolInner<DB> {
}
}
/// Attempt to pull a permit from `self.semaphore` or steal one from the parent.
///
/// If we steal a permit from the parent but *don't* open a connection,
/// it should be returned to the parent.
async fn acquire_permit(self: &Arc<Self>) -> Result<AsyncSemaphoreReleaser<'_>, Error> {
let parent = self
.parent()
// If we're already at the max size, we shouldn't try to steal from the parent.
// This is just going to cause unnecessary churn in `acquire()`.
.filter(|_| self.size() < self.options.max_connections);
let mut acquire_self = pin!(self.semaphore.acquire(1).fuse());
let mut close_event = pin!(self.close_event());
if let Some(parent) = parent {
let mut acquire_parent = pin!(parent.0.semaphore.acquire(1));
let mut parent_close_event = pin!(parent.0.close_event());
let mut poll_parent = false;
future::poll_fn(|cx| {
if close_event.as_mut().poll(cx).is_ready() {
return Poll::Ready(Err(Error::PoolClosed));
}
if parent_close_event.as_mut().poll(cx).is_ready() {
// Propagate the parent's close event to the child.
self.mark_closed();
return Poll::Ready(Err(Error::PoolClosed));
}
if let Poll::Ready(permit) = acquire_self.as_mut().poll(cx) {
return Poll::Ready(Ok(permit));
}
// Don't try the parent right away.
if poll_parent {
acquire_parent.as_mut().poll(cx).map(Ok)
} else {
poll_parent = true;
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
} else {
close_event.do_until(acquire_self).await
}
}
fn parent(&self) -> Option<&Pool<DB>> {
self.options.parent_pool.as_ref()
}
@ -184,117 +99,103 @@ impl<DB: Database> PoolInner<DB> {
return None;
}
let permit = self.semaphore.try_acquire(1)?;
self.pop_idle(permit).ok()
}
fn pop_idle<'a>(
self: &'a Arc<Self>,
permit: AsyncSemaphoreReleaser<'a>,
) -> Result<Floating<DB, Idle<DB>>, AsyncSemaphoreReleaser<'a>> {
if let Some(idle) = self.idle_conns.pop() {
self.num_idle.fetch_sub(1, Ordering::AcqRel);
Ok(Floating::from_idle(idle, (*self).clone(), permit))
} else {
Err(permit)
}
self.idle.try_acquire(self)
}
pub(super) fn release(&self, floating: Floating<DB, Live<DB>>) {
// `options.after_release` and other checks are in `PoolConnection::return_to_pool()`.
let Floating { inner: idle, guard } = floating.into_idle();
if self.idle_conns.push(idle).is_err() {
panic!("BUG: connection queue overflow in release()");
}
// NOTE: we need to make sure we drop the permit *after* we push to the idle queue
// don't decrease the size
guard.release_permit();
self.num_idle.fetch_add(1, Ordering::AcqRel);
self.idle.release(floating);
}
/// Try to atomically increment the pool size for a new connection.
///
/// Returns `Err` if the pool is at max capacity already or is closed.
pub(super) fn try_increment_size<'a>(
self: &'a Arc<Self>,
permit: AsyncSemaphoreReleaser<'a>,
) -> Result<DecrementSizeGuard<DB>, AsyncSemaphoreReleaser<'a>> {
let result = self
.size
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
if self.is_closed() {
return None;
}
size.checked_add(1)
.filter(|size| size <= &self.options.max_connections)
});
match result {
// we successfully incremented the size
Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
// the pool is at max capacity or is closed
Err(_) => Err(permit),
}
}
pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
pub(super) async fn acquire(self: &Arc<Self>) -> Result<PoolConnection<DB>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
let acquire_started_at = Instant::now();
let deadline = acquire_started_at + self.options.acquire_timeout;
let acquired = crate::rt::timeout(
self.options.acquire_timeout,
async {
loop {
// Handles the close-event internally
let permit = self.acquire_permit().await?;
let mut close_event = pin!(self.close_event());
let mut deadline = pin!(crate::rt::sleep(self.options.acquire_timeout));
let mut acquire_idle = pin!(self.idle.acquire(self).fuse());
let mut check_idle = pin!(OptionFuture::from(None));
let mut acquire_connect_permit = pin!(OptionFuture::from(Some(
self.counter.acquire_permit(self).fuse()
)));
let mut connect = OptionFuture::from(None);
// The internal state machine of `acquire()`.
//
// * The initial state is racing to acquire either an idle connection or a new `ConnectPermit`.
// * If we acquire a `ConnectPermit`, we begin the connection loop (with backoff)
// as implemented by `DynConnector`.
// * If we acquire an idle connection, we then start polling `check_idle_conn()`.
let acquired = future::poll_fn(|cx| {
use std::task::Poll::*;
// First attempt to pop a connection from the idle queue.
let guard = match self.pop_idle(permit) {
// Then, check that we can use it...
Ok(conn) => match check_idle_conn(conn, &self.options).await {
// All good!
Ok(live) => return Ok(live),
// if the connection isn't usable for one reason or another,
// we get the `DecrementSizeGuard` back to open a new one
Err(guard) => guard,
},
Err(permit) => if let Ok(guard) = self.try_increment_size(permit) {
// we can open a new connection
guard
} else {
// This can happen for a child pool that's at its connection limit,
// or if the pool was closed between `acquire_permit()` and
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio,
// and so we should yield to let any spawned return_to_pool() tasks
// execute.
crate::rt::yield_now().await;
continue;
}
};
// Attempt to connect...
return self.connect(deadline, guard).await;
}
// First check if the pool is already closed,
// or register for a wakeup if it gets closed.
if let Ready(()) = close_event.poll_unpin(cx) {
return Ready(Err(Error::PoolClosed));
}
)
.await
.map_err(|_| Error::PoolTimedOut)??;
// Then check if our deadline has elapsed, or schedule a wakeup for when that happens.
if let Ready(()) = deadline.poll_unpin(cx) {
return Ready(Err(Error::PoolTimedOut));
}
// Attempt to acquire a connection from the idle queue.
if let Ready(idle) = acquire_idle.poll_unpin(cx) {
check_idle.set(Some(check_idle_conn(idle, &self.options)).into());
}
// If we acquired an idle connection, run any checks that need to be done.
//
// Includes `test_on_acquire` and the `before_acquire` callback, if set.
//
// We don't want to race this step if it's already running because canceling it
// will result in the potentially unnecessary closure of a connection.
//
// Instead, we just wait and see what happens. If we already started connecting,
// that'll happen concurrently.
match ready!(check_idle.poll_unpin(cx)) {
// The `.reattach()` call errors with "type annotations needed" if not qualified.
Some(Ok(live)) => return Ready(Ok(Floating::reattach(live))),
Some(Err(permit)) => {
// We don't strictly need to poll `connect` here; all we really want to do
// is to check if it is `None`. But since currently there's no getter for that,
// it doesn't really hurt to just poll it here.
match connect.poll_unpin(cx) {
Ready(None) => {
// If we're not already attempting to connect,
// take the permit returned from closing the connection and
// attempt to open a new one.
connect = Some(self.connector.connect(permit, self.size())).into();
}
// `permit` is dropped in these branches, allowing another task to use it
Ready(Some(res)) => return Ready(res),
Pending => (),
}
// Attempt to acquire another idle connection concurrently to opening a new one.
acquire_idle.set(self.idle.acquire(self).fuse());
// Annoyingly, `OptionFuture` doesn't fuse to `None` on its own
check_idle.set(None.into());
}
None => (),
}
if let Ready(Some((size, permit))) = acquire_connect_permit.poll_unpin(cx) {
connect = Some(self.connector.connect(permit, size)).into();
}
if let Ready(Some(res)) = connect.poll_unpin(cx) {
// RFC: suppress errors here?
return Ready(res);
}
Pending
})
.await?;
let acquired_after = acquire_started_at.elapsed();
@ -322,102 +223,29 @@ impl<DB: Database> PoolInner<DB> {
Ok(acquired)
}
pub(super) async fn connect(
self: &Arc<Self>,
deadline: Instant,
guard: DecrementSizeGuard<DB>,
) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
let mut backoff = Duration::from_millis(10);
let max_backoff = deadline_as_timeout(deadline)? / 5;
loop {
let timeout = deadline_as_timeout(deadline)?;
// clone the connect options arc so it can be used without holding the RwLockReadGuard
// across an async await point
let connect_options = self
.connect_options
.read()
.expect("write-lock holder panicked")
.clone();
// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match crate::rt::timeout(timeout, connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
let meta = PoolConnectionMetadata {
age: Duration::ZERO,
idle_for: Duration::ZERO,
};
let res = if let Some(callback) = &self.options.after_connect {
callback(&mut raw, meta).await
} else {
Ok(())
};
match res {
Ok(()) => return Ok(Floating::new_live(raw, guard)),
Err(error) => {
tracing::error!(%error, "error returned from after_connect");
// The connection is broken, don't try to close nicely.
let _ = raw.close_hard().await;
// Fall through to the backoff.
}
}
}
// an IO error while connecting is assumed to be the system starting up
Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => (),
// We got a transient database error, retry.
Ok(Err(Error::Database(error))) if error.is_transient_in_connect_phase() => (),
// Any other error while connection should immediately
// terminate and bubble the error up
Ok(Err(e)) => return Err(e),
// timed out
Err(_) => return Err(Error::PoolTimedOut),
}
// If the connection is refused, wait in exponentially
// increasing steps for the server to come up,
// capped by a factor of the remaining time until the deadline
crate::rt::sleep(backoff).await;
backoff = cmp::min(backoff * 2, max_backoff);
}
}
/// Try to maintain `min_connections`, returning any errors (including `PoolTimedOut`).
pub async fn try_min_connections(self: &Arc<Self>, deadline: Instant) -> Result<(), Error> {
while self.size() < self.options.min_connections {
// Don't wait for a semaphore permit.
//
// If no extra permits are available then we shouldn't be trying to spin up
// connections anyway.
let Some(permit) = self.semaphore.try_acquire(1) else {
return Ok(());
};
crate::rt::timeout_at(deadline, async {
while self.size() < self.options.min_connections {
// Don't wait for a connect permit.
//
// If no extra permits are available then we shouldn't be trying to spin up
// connections anyway.
let Some((size, permit)) = self.counter.acquire_permit(self).now_or_never() else {
return Ok(());
};
// We must always obey `max_connections`.
let Some(guard) = self.try_increment_size(permit).ok() else {
return Ok(());
};
let conn = self.connector.connect(permit, size).await?;
// We skip `after_release` since the connection was never provided to user code
// besides `after_connect`, if they set it.
self.release(self.connect(deadline, guard).await?);
}
// We skip `after_release` since the connection was never provided to user code
// besides inside `PollConnector::connect()`, if they override it.
self.release(conn.into_floating());
}
Ok(())
Ok(())
})
.await
.unwrap_or_else(|_| Err(Error::PoolTimedOut))
}
/// Attempt to maintain `min_connections`, logging if unable.
@ -441,11 +269,7 @@ impl<DB: Database> PoolInner<DB> {
impl<DB: Database> Drop for PoolInner<DB> {
fn drop(&mut self) {
self.mark_closed();
if let Some(parent) = &self.options.parent_pool {
// Release the stolen permits.
parent.0.semaphore.release(self.semaphore.permits());
}
self.idle.drain(self);
}
}
@ -469,7 +293,7 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
) -> Result<Floating<DB, Live<DB>>, ConnectPermit<DB>> {
if options.test_before_acquire {
// Check that the connection is still live
if let Err(error) = conn.ping().await {
@ -573,51 +397,3 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
.await;
});
}
/// RAII guard returned by `Pool::try_increment_size()` and others.
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<DB: Database> {
pub(crate) pool: Arc<PoolInner<DB>>,
cancelled: bool,
}
impl<DB: Database> DecrementSizeGuard<DB> {
/// Create a new guard that will release a semaphore permit on-drop.
pub fn new_permit(pool: Arc<PoolInner<DB>>) -> Self {
Self {
pool,
cancelled: false,
}
}
pub fn from_permit(pool: Arc<PoolInner<DB>>, permit: AsyncSemaphoreReleaser<'_>) -> Self {
// here we effectively take ownership of the permit
permit.disarm();
Self::new_permit(pool)
}
/// Release the semaphore permit without decreasing the pool size.
///
/// If the permit was stolen from the pool's parent, it will be returned to the child's semaphore.
fn release_permit(self) {
self.pool.semaphore.release(1);
self.cancel();
}
pub fn cancel(mut self) {
self.cancelled = true;
}
}
impl<DB: Database> Drop for DecrementSizeGuard<DB> {
fn drop(&mut self) {
if !self.cancelled {
self.pool.size.fetch_sub(1, Ordering::AcqRel);
// and here we release the permit we got on construction
self.pool.semaphore.release(1);
}
}
}

View File

@ -71,6 +71,7 @@ use crate::error::Error;
use crate::sql_str::SqlSafeStr;
use crate::transaction::Transaction;
pub use self::connect::{PoolConnectMetadata, PoolConnector};
pub use self::connection::PoolConnection;
use self::inner::PoolInner;
#[doc(hidden)]
@ -83,8 +84,11 @@ mod executor;
#[macro_use]
pub mod maybe;
mod connect;
mod connection;
mod inner;
mod idle;
mod options;
/// An asynchronous pool of SQLx database connections.
@ -356,7 +360,7 @@ impl<DB: Database> Pool<DB> {
/// returning it.
pub fn acquire(&self) -> impl Future<Output = Result<PoolConnection<DB>, Error>> + 'static {
let shared = self.0.clone();
async move { shared.acquire().await.map(|conn| conn.reattach()) }
async move { shared.acquire().await }
}
/// Attempts to retrieve a connection from the pool if there is one available.
@ -541,28 +545,6 @@ impl<DB: Database> Pool<DB> {
self.0.num_idle()
}
/// Gets a clone of the connection options for this pool
pub fn connect_options(&self) -> Arc<<DB::Connection as Connection>::Options> {
self.0
.connect_options
.read()
.expect("write-lock holder panicked")
.clone()
}
/// Updates the connection options this pool will use when opening any future connections. Any
/// existing open connection in the pool will be left as-is.
pub fn set_connect_options(&self, connect_options: <DB::Connection as Connection>::Options) {
// technically write() could also panic if the current thread already holds the lock,
// but because this method can't be re-entered by the same thread that shouldn't be a problem
let mut guard = self
.0
.connect_options
.write()
.expect("write-lock holder panicked");
*guard = Arc::new(connect_options);
}
/// Get the options for this pool
pub fn options(&self) -> &PoolOptions<DB> {
&self.0.options

View File

@ -1,8 +1,9 @@
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::pool::connect::DefaultConnector;
use crate::pool::inner::PoolInner;
use crate::pool::Pool;
use crate::pool::{Pool, PoolConnector};
use futures_core::future::BoxFuture;
use log::LevelFilter;
use std::fmt::{self, Debug, Formatter};
@ -44,14 +45,6 @@ use std::time::{Duration, Instant};
/// the perspectives of both API designer and consumer.
pub struct PoolOptions<DB: Database> {
pub(crate) test_before_acquire: bool,
pub(crate) after_connect: Option<
Arc<
dyn Fn(&mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'_, Result<(), Error>>
+ 'static
+ Send
+ Sync,
>,
>,
pub(crate) before_acquire: Option<
Arc<
dyn Fn(
@ -79,6 +72,7 @@ pub struct PoolOptions<DB: Database> {
pub(crate) acquire_slow_level: LevelFilter,
pub(crate) acquire_slow_threshold: Duration,
pub(crate) acquire_timeout: Duration,
pub(crate) connect_timeout: Duration,
pub(crate) min_connections: usize,
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
@ -94,7 +88,6 @@ impl<DB: Database> Clone for PoolOptions<DB> {
fn clone(&self) -> Self {
PoolOptions {
test_before_acquire: self.test_before_acquire,
after_connect: self.after_connect.clone(),
before_acquire: self.before_acquire.clone(),
after_release: self.after_release.clone(),
max_connections: self.max_connections,
@ -102,6 +95,7 @@ impl<DB: Database> Clone for PoolOptions<DB> {
acquire_slow_threshold: self.acquire_slow_threshold,
acquire_slow_level: self.acquire_slow_level,
acquire_timeout: self.acquire_timeout,
connect_timeout: self.connect_timeout,
min_connections: self.min_connections,
max_lifetime: self.max_lifetime,
idle_timeout: self.idle_timeout,
@ -143,7 +137,6 @@ impl<DB: Database> PoolOptions<DB> {
pub fn new() -> Self {
Self {
// User-specifiable routines
after_connect: None,
before_acquire: None,
after_release: None,
test_before_acquire: true,
@ -158,6 +151,7 @@ impl<DB: Database> PoolOptions<DB> {
// to not flag typical time to add a new connection to a pool.
acquire_slow_threshold: Duration::from_secs(2),
acquire_timeout: Duration::from_secs(30),
connect_timeout: Duration::from_secs(2 * 60),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
fair: true,
@ -268,6 +262,23 @@ impl<DB: Database> PoolOptions<DB> {
self.acquire_timeout
}
/// Set the maximum amount of time to spend attempting to open a connection.
///
/// This timeout happens independently of [`acquire_timeout`][Self::acquire_timeout].
///
/// If shorter than `acquire_timeout`, this will cause the last connec
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
/// Get the maximum amount of time to spend attempting to open a connection.
///
/// This timeout happens independently of [`acquire_timeout`][Self::acquire_timeout].
pub fn get_connect_timeout(&self) -> Duration {
self.connect_timeout
}
/// Set the maximum lifetime of individual connections.
///
/// Any connection with a lifetime greater than this will be closed.
@ -339,57 +350,6 @@ impl<DB: Database> PoolOptions<DB> {
self
}
/// Perform an asynchronous action after connecting to the database.
///
/// If the operation returns with an error then the error is logged, the connection is closed
/// and a new one is opened in its place and the callback is invoked again.
///
/// This occurs in a backoff loop to avoid high CPU usage and spamming logs during a transient
/// error condition.
///
/// Note that this may be called for internally opened connections, such as when maintaining
/// [`min_connections`][Self::min_connections], that are then immediately returned to the pool
/// without invoking [`after_release`][Self::after_release].
///
/// # Example: Additional Parameters
/// This callback may be used to set additional configuration parameters
/// that are not exposed by the database's `ConnectOptions`.
///
/// This example is written for PostgreSQL but can likely be adapted to other databases.
///
/// ```no_run
/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
/// use sqlx::Executor;
/// use sqlx::postgres::PgPoolOptions;
///
/// let pool = PgPoolOptions::new()
/// .after_connect(|conn, _meta| Box::pin(async move {
/// // When directly invoking `Executor` methods,
/// // it is possible to execute multiple statements with one call.
/// conn.execute("SET application_name = 'your_app'; SET search_path = 'my_schema';")
/// .await?;
///
/// Ok(())
/// }))
/// .connect("postgres:// …").await?;
/// # Ok(())
/// # }
/// ```
///
/// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self].
pub fn after_connect<F>(mut self, callback: F) -> Self
where
// We're passing the `PoolConnectionMetadata` here mostly for future-proofing.
// `age` and `idle_for` are obviously not useful for fresh connections.
for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<(), Error>>
+ 'static
+ Send
+ Sync,
{
self.after_connect = Some(Arc::new(callback));
self
}
/// Perform an asynchronous action on a previously idle connection before giving it out.
///
/// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains
@ -537,11 +497,25 @@ impl<DB: Database> PoolOptions<DB> {
pub async fn connect_with(
self,
options: <DB::Connection as Connection>::Options,
) -> Result<Pool<DB>, Error> {
self.connect_with_connector(DefaultConnector(options)).await
}
/// Create a new pool from this `PoolOptions` and immediately open at least one connection.
///
/// This ensures the configuration is correct.
///
/// The total number of connections opened is <code>max(1, [min_connections][Self::min_connections])</code>.
///
/// See [PoolConnector] for examples.
pub async fn connect_with_connector(
self,
connector: impl PoolConnector<DB>,
) -> Result<Pool<DB>, Error> {
// Don't take longer than `acquire_timeout` starting from when this is called.
let deadline = Instant::now() + self.acquire_timeout;
let inner = PoolInner::new_arc(self, options);
let inner = PoolInner::new_arc(self, connector);
if inner.options.min_connections > 0 {
// If the idle reaper is spawned then this will race with the call from that task
@ -552,7 +526,7 @@ impl<DB: Database> PoolOptions<DB> {
// If `min_connections` is nonzero then we'll likely just pull a connection
// from the idle queue here, but it should at least get tested first.
let conn = inner.acquire().await?;
inner.release(conn);
inner.release(conn.into_floating());
Ok(Pool(inner))
}
@ -578,7 +552,11 @@ impl<DB: Database> PoolOptions<DB> {
/// optimistically establish that many connections for the pool.
pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
// `min_connections` is guaranteed by the idle reaper now.
Pool(PoolInner::new_arc(self, options))
self.connect_lazy_with_connector(DefaultConnector(options))
}
pub fn connect_lazy_with_connector(self, connector: impl PoolConnector<DB>) -> Pool<DB> {
Pool(PoolInner::new_arc(self, connector))
}
}

View File

@ -2,7 +2,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::{Duration, Instant};
use cfg_if::cfg_if;
@ -51,6 +51,29 @@ pub async fn timeout<F: Future>(duration: Duration, f: F) -> Result<F::Output, T
}
}
pub async fn timeout_at<F: Future>(deadline: Instant, f: F) -> Result<F::Output, TimeoutError> {
#[cfg(feature = "_rt-tokio")]
if rt_tokio::available() {
return tokio::time::timeout_at(deadline.into(), f)
.await
.map_err(|_| TimeoutError(()));
}
#[cfg(feature = "_rt-async-std")]
{
let Some(duration) = deadline.checked_duration_since(Instant::now()) else {
return Err(TimeoutError(()));
};
async_std::future::timeout(duration, f)
.await
.map_err(|_| TimeoutError(()))
}
#[cfg(not(feature = "_rt-async-std"))]
missing_rt((duration, f))
}
pub async fn sleep(duration: Duration) {
#[cfg(feature = "_rt-tokio")]
if rt_tokio::available() {

View File

@ -1,5 +1,4 @@
use std::future::Future;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::OnceLock;
use std::time::Duration;
@ -108,27 +107,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
.max_connections(20)
// Immediately close master connections. Tokio's I/O streams don't like hopping runtimes.
.after_release(|_conn, _| Box::pin(async move { Ok(false) }))
.connect_lazy_with(master_opts);
.connect_lazy_with(master_opts.clone());
let master_pool = match once_lock_try_insert_polyfill(&MASTER_POOL, pool) {
Ok(inserted) => inserted,
Err((existing, pool)) => {
// Sanity checks.
assert_eq!(
existing.connect_options().host,
pool.connect_options().host,
"DATABASE_URL changed at runtime, host differs"
);
assert_eq!(
existing.connect_options().database,
pool.connect_options().database,
"DATABASE_URL changed at runtime, database differs"
);
existing
}
};
let master_pool = MASTER_POOL
.try_insert(pool)
.unwrap_or_else(|(existing, _pool)| existing);
let mut conn = master_pool.acquire().await?;
@ -144,7 +127,7 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
-- BLOB/TEXT columns can only be used as index keys with a prefix length:
-- https://dev.mysql.com/doc/refman/8.4/en/column-indexes.html#column-indexes-prefix
primary key(db_name(63))
);
);
"#,
)
.await?;
@ -172,11 +155,7 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
// Close connections ASAP if left in the idle queue.
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts: master_pool
.connect_options()
.deref()
.clone()
.database(&db_name),
connect_opts: master_opts.database(&db_name),
db_name,
})
}

View File

@ -186,7 +186,7 @@ impl DatabaseError for PgDatabaseError {
self
}
fn is_transient_in_connect_phase(&self) -> bool {
fn is_retryable_connect_error(&self) -> bool {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
[
// too_many_connections

View File

@ -127,6 +127,11 @@ impl PgConnectOptions {
self
}
/// Identical to [Self::host()], but through a mutable reference.
pub fn set_host(&mut self, host: &str) {
host.clone_into(&mut self.host);
}
/// Sets the port to connect to at the server host.
///
/// The default port for PostgreSQL is `5432`.
@ -143,6 +148,12 @@ impl PgConnectOptions {
self
}
/// Identical to [`Self::port()`], but through a mutable reference.
pub fn set_port(&mut self, port: u16) -> &mut Self {
self.port = port;
self
}
/// Sets a custom path to a directory containing a unix domain socket,
/// switching the connection method from TCP to the corresponding socket.
///
@ -169,6 +180,12 @@ impl PgConnectOptions {
self
}
/// Identical to [`Self::username()`], but through a mutable reference.
pub fn set_username(&mut self, username: &str) -> &mut Self {
username.clone_into(&mut self.username);
self
}
/// Sets the password to use if the server demands password authentication.
///
/// # Example
@ -184,6 +201,12 @@ impl PgConnectOptions {
self
}
/// Identical to [`Self::password()`]. but through a mutable reference.
pub fn set_password(&mut self, password: &str) -> &mut Self {
self.password = Some(password.to_owned());
self
}
/// Sets the database name. Defaults to be the same as the user name.
///
/// # Example

View File

@ -1,5 +1,4 @@
use std::future::Future;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::OnceLock;
use std::time::Duration;
@ -101,27 +100,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
.max_connections(20)
// Immediately close master connections. Tokio's I/O streams don't like hopping runtimes.
.after_release(|_conn, _| Box::pin(async move { Ok(false) }))
.connect_lazy_with(master_opts);
.connect_lazy_with(master_opts.clone());
let master_pool = match once_lock_try_insert_polyfill(&MASTER_POOL, pool) {
Ok(inserted) => inserted,
Err((existing, pool)) => {
// Sanity checks.
assert_eq!(
existing.connect_options().host,
pool.connect_options().host,
"DATABASE_URL changed at runtime, host differs"
);
assert_eq!(
existing.connect_options().database,
pool.connect_options().database,
"DATABASE_URL changed at runtime, database differs"
);
existing
}
};
let master_pool = MASTER_POOL
.try_insert(pool)
.unwrap_or_else(|(existing, _pool)| existing);
let mut conn = master_pool.acquire().await?;
@ -177,11 +160,7 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
// Close connections ASAP if left in the idle queue.
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts: master_pool
.connect_options()
.deref()
.clone()
.database(&db_name),
connect_opts: master_opts.database(&db_name),
db_name,
})
}