Improve max_lifetime handling (#3065)

* Check max lifetime in return_to_pool, not on acquire

* Improve checks in backgrand maintenance task

* add tests

* adjust test to fix
This commit is contained in:
Mirek Klimos
2024-03-04 16:39:59 -08:00
committed by GitHub
parent 27a49914ad
commit b4f6596b06
3 changed files with 98 additions and 49 deletions

View File

@@ -9,7 +9,7 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use super::inner::{DecrementSizeGuard, PoolInner};
use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
use crate::pool::options::PoolConnectionMetadata;
use std::future::Future;
@@ -239,6 +239,13 @@ impl<DB: Database> Floating<DB, Live<DB>> {
return false;
}
// If the connection is beyond max lifetime, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
self.close().await;
return false;
}
if let Some(test) = &self.guard.pool.options.after_release {
let meta = self.metadata();
match (test)(&mut self.inner.raw, meta).await {

View File

@@ -197,7 +197,7 @@ impl<DB: Database> PoolInner<DB> {
}
pub(super) fn release(&self, floating: Floating<DB, Live<DB>>) {
// `options.after_release` is invoked by `PoolConnection::release_to_pool()`.
// `options.after_release` and other checks are in `PoolConnection::return_to_pool()`.
let Floating { inner: idle, guard } = floating.into_idle();
@@ -273,7 +273,7 @@ impl<DB: Database> PoolInner<DB> {
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio
// and so we should yield to let any spawned release_to_pool() tasks
// and so we should yield to let any spawned return_to_pool() tasks
// execute.
crate::rt::yield_now().await;
continue;
@@ -417,7 +417,10 @@ impl<DB: Database> Drop for PoolInner<DB> {
}
/// Returns `true` if the connection has exceeded `options.max_lifetime` if set, `false` otherwise.
fn is_beyond_max_lifetime<DB: Database>(live: &Live<DB>, options: &PoolOptions<DB>) -> bool {
pub(super) fn is_beyond_max_lifetime<DB: Database>(
live: &Live<DB>,
options: &PoolOptions<DB>,
) -> bool {
options
.max_lifetime
.map_or(false, |max| live.created_at.elapsed() > max)
@@ -434,12 +437,6 @@ async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&conn, options) {
return Err(conn.close().await);
}
if options.test_before_acquire {
// Check that the connection is still live
if let Err(error) = conn.ping().await {
@@ -503,22 +500,30 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
crate::rt::spawn(async move {
let _ = close_event
.do_until(async {
let mut slept = true;
// If the last handle to the pool was dropped while we were sleeping
while let Some(pool) = pool_weak.upgrade() {
if pool.is_closed() {
return;
}
// Don't run the reaper right away.
if slept && !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}
let next_run = Instant::now() + period;
pool.min_connections_maintenance(Some(next_run)).await;
// Go over all idle connections, check for idleness and lifetime,
// and if we have fewer than min_connections after reaping a connection,
// open a new one immediately. Note that other connections may be popped from
// the queue in the meantime - that's fine, there is no harm in checking more
for _ in 0..pool.num_idle() {
if let Some(conn) = pool.try_acquire() {
if is_beyond_idle_timeout(&conn, &pool.options)
|| is_beyond_max_lifetime(&conn, &pool.options)
{
let _ = conn.close().await;
pool.min_connections_maintenance(Some(next_run)).await;
} else {
pool.release(conn.into_live());
}
}
}
// Don't hold a reference to the pool while sleeping.
drop(pool);
@@ -530,37 +535,12 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
// `next_run` is in the past, just yield.
crate::rt::yield_now().await;
}
slept = true;
}
})
.await;
});
}
async fn do_reap<DB: Database>(pool: &Arc<PoolInner<DB>>) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);
// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.try_acquire())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle_timeout(conn, &pool.options)
|| is_beyond_max_lifetime(conn, &pool.options)
});
for conn in keep {
// return valid connections to the pool first
pool.release(conn.into_live());
}
for conn in reap {
let _ = conn.close().await;
}
}
/// RAII guard returned by `Pool::try_increment_size()` and others.
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections

View File

@@ -1,9 +1,8 @@
use sqlx::any::{AnyConnectOptions, AnyPoolOptions};
use sqlx::Executor;
use std::sync::atomic::AtomicI32;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
atomic::{AtomicI32, AtomicUsize, Ordering},
Arc, Mutex,
};
use std::time::Duration;
@@ -116,7 +115,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
CREATE TEMPORARY TABLE conn_stats(
id int primary key,
before_acquire_calls int default 0,
after_release_calls int default 0
after_release_calls int default 0
);
INSERT INTO conn_stats(id) VALUES ({});
"#,
@@ -137,7 +136,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
// MySQL and MariaDB don't support UPDATE ... RETURNING
sqlx::query(
r#"
UPDATE conn_stats
UPDATE conn_stats
SET before_acquire_calls = before_acquire_calls + 1
"#,
)
@@ -161,7 +160,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
Box::pin(async move {
sqlx::query(
r#"
UPDATE conn_stats
UPDATE conn_stats
SET after_release_calls = after_release_calls + 1
"#,
)
@@ -216,3 +215,66 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
Ok(())
}
#[sqlx_macros::test]
async fn test_connection_maintenance() -> anyhow::Result<()> {
sqlx::any::install_default_drivers();
sqlx_test::setup_if_needed();
let conn_options: AnyConnectOptions = std::env::var("DATABASE_URL")?.parse()?;
let last_meta = Arc::new(Mutex::new(None));
let last_meta_ = last_meta.clone();
let pool = AnyPoolOptions::new()
.max_lifetime(Duration::from_millis(400))
.min_connections(3)
.before_acquire(move |_conn, _meta| {
*last_meta_.lock().unwrap() = Some(_meta);
Box::pin(async { Ok(true) })
})
.connect_lazy_with(conn_options);
// Open and release 5 connections
let conns = vec![
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
];
assert_eq!(pool.size(), 5);
assert_eq!(pool.num_idle(), 0);
for mut conn in conns {
conn.return_to_pool().await;
}
assert_eq!(pool.size(), 5);
assert_eq!(pool.num_idle(), 5);
// Wait for at least two iterations of maintenance task
sqlx_core::rt::sleep(Duration::from_secs(1)).await;
// Existing connections should have been closed due to max lifetime
// and the pool should have reopened min_connections new ones.
// One connection might be in the process of being replaced so we assert 2-3.
assert!(
pool.size() >= 2 && pool.size() <= 3,
"pool.size() = {}",
pool.size()
);
for _ in 0..2 {
// Check that the connections was both acquired from the pool AND it's new
let _ = pool.acquire().await.expect("failed to acquire connection");
let meta = last_meta
.lock()
.unwrap()
.take()
.expect("expected a connection from the pool");
assert!(
meta.age < Duration::from_secs(1),
"expected a fresh connection (age {:?})",
meta.age
);
}
Ok(())
}