mirror of
https://github.com/launchbadge/sqlx.git
synced 2026-03-10 07:39:56 +00:00
fix(pool): spawn task for before_acquire
This commit is contained in:
parent
3d6764f592
commit
c6f4b9fe29
@ -14,7 +14,9 @@ use std::task::ready;
|
||||
use crate::logger::private_level_filter_to_trace_level;
|
||||
use crate::pool::connect::{ConnectPermit, ConnectionCounter, DynConnector};
|
||||
use crate::pool::idle::IdleQueue;
|
||||
use crate::private_tracing_dynamic_event;
|
||||
use crate::rt::JoinHandle;
|
||||
use crate::{private_tracing_dynamic_event, rt};
|
||||
use either::Either;
|
||||
use futures_util::future::{self, OptionFuture};
|
||||
use futures_util::FutureExt;
|
||||
use std::time::{Duration, Instant};
|
||||
@ -117,7 +119,7 @@ impl<DB: Database> PoolInner<DB> {
|
||||
let mut close_event = pin!(self.close_event());
|
||||
let mut deadline = pin!(crate::rt::sleep(self.options.acquire_timeout));
|
||||
let mut acquire_idle = pin!(self.idle.acquire(self).fuse());
|
||||
let mut check_idle = pin!(OptionFuture::from(None));
|
||||
let mut before_acquire = OptionFuture::from(None);
|
||||
let mut acquire_connect_permit = pin!(OptionFuture::from(Some(
|
||||
self.counter.acquire_permit(self).fuse()
|
||||
)));
|
||||
@ -145,21 +147,25 @@ impl<DB: Database> PoolInner<DB> {
|
||||
|
||||
// Attempt to acquire a connection from the idle queue.
|
||||
if let Ready(idle) = acquire_idle.poll_unpin(cx) {
|
||||
check_idle.set(Some(check_idle_conn(idle, &self.options)).into());
|
||||
// If we acquired an idle connection, run any checks that need to be done.
|
||||
//
|
||||
// Includes `test_on_acquire` and the `before_acquire` callback, if set.
|
||||
match finish_acquire(idle) {
|
||||
// There are checks needed to be done, so they're spawned as a task
|
||||
// to be cancellation-safe.
|
||||
Either::Left(check_task) => {
|
||||
before_acquire = Some(check_task).into();
|
||||
}
|
||||
// The connection is ready to go.
|
||||
Either::Right(conn) => {
|
||||
return Ready(Ok(conn));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we acquired an idle connection, run any checks that need to be done.
|
||||
//
|
||||
// Includes `test_on_acquire` and the `before_acquire` callback, if set.
|
||||
//
|
||||
// We don't want to race this step if it's already running because canceling it
|
||||
// will result in the potentially unnecessary closure of a connection.
|
||||
//
|
||||
// Instead, we just wait and see what happens. If we already started connecting,
|
||||
// that'll happen concurrently.
|
||||
match ready!(check_idle.poll_unpin(cx)) {
|
||||
// The `.reattach()` call errors with "type annotations needed" if not qualified.
|
||||
Some(Ok(live)) => return Ready(Ok(Floating::reattach(live))),
|
||||
// Poll the task returned by `finish_acquire`
|
||||
match ready!(before_acquire.poll_unpin(cx)) {
|
||||
Some(Ok(conn)) => return Ready(Ok(conn)),
|
||||
Some(Err(permit)) => {
|
||||
// We don't strictly need to poll `connect` here; all we really want to do
|
||||
// is to check if it is `None`. But since currently there's no getter for that,
|
||||
@ -179,7 +185,7 @@ impl<DB: Database> PoolInner<DB> {
|
||||
// Attempt to acquire another idle connection concurrently to opening a new one.
|
||||
acquire_idle.set(self.idle.acquire(self).fuse());
|
||||
// Annoyingly, `OptionFuture` doesn't fuse to `None` on its own
|
||||
check_idle.set(None.into());
|
||||
before_acquire = None.into();
|
||||
}
|
||||
None => (),
|
||||
}
|
||||
@ -290,42 +296,51 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
|
||||
.is_some_and(|timeout| idle.idle_since.elapsed() > timeout)
|
||||
}
|
||||
|
||||
async fn check_idle_conn<DB: Database>(
|
||||
mut conn: Floating<DB, Idle<DB>>,
|
||||
options: &PoolOptions<DB>,
|
||||
) -> Result<Floating<DB, Live<DB>>, ConnectPermit<DB>> {
|
||||
if options.test_before_acquire {
|
||||
// Check that the connection is still live
|
||||
if let Err(error) = conn.ping().await {
|
||||
// an error here means the other end has hung up or we lost connectivity
|
||||
// either way we're fine to just discard the connection
|
||||
// the error itself here isn't necessarily unexpected so WARN is too strong
|
||||
tracing::info!(%error, "ping on idle connection returned error");
|
||||
// connection is broken so don't try to close nicely
|
||||
return Err(conn.close_hard().await);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(test) = &options.before_acquire {
|
||||
let meta = conn.metadata();
|
||||
match test(&mut conn.live.raw, meta).await {
|
||||
Ok(false) => {
|
||||
// connection was rejected by user-defined hook, close nicely
|
||||
return Err(conn.close().await);
|
||||
}
|
||||
|
||||
Err(error) => {
|
||||
tracing::warn!(%error, "error from `before_acquire`");
|
||||
/// Execute `test_before_acquire` and/or `before_acquire` in a background task, if applicable.
|
||||
///
|
||||
/// Otherwise, immediately returns the connection.
|
||||
fn finish_acquire<DB: Database>(
|
||||
mut conn: Floating<DB, Idle<DB>>
|
||||
) -> Either<JoinHandle<Result<PoolConnection<DB>, ConnectPermit<DB>>>, PoolConnection<DB>> {
|
||||
let pool = conn.permit.pool();
|
||||
|
||||
if pool.options.test_before_acquire || pool.options.before_acquire.is_some() {
|
||||
// Spawn a task so the call may complete even if `acquire()` is cancelled.
|
||||
return Either::Left(rt::spawn(async move {
|
||||
// Check that the connection is still live
|
||||
if let Err(error) = conn.ping().await {
|
||||
// an error here means the other end has hung up or we lost connectivity
|
||||
// either way we're fine to just discard the connection
|
||||
// the error itself here isn't necessarily unexpected so WARN is too strong
|
||||
tracing::info!(%error, "ping on idle connection returned error");
|
||||
// connection is broken so don't try to close nicely
|
||||
return Err(conn.close_hard().await);
|
||||
}
|
||||
|
||||
Ok(true) => {}
|
||||
}
|
||||
}
|
||||
if let Some(test) = &conn.permit.pool().options.before_acquire {
|
||||
let meta = conn.metadata();
|
||||
match test(&mut conn.inner.live.raw, meta).await {
|
||||
Ok(false) => {
|
||||
// connection was rejected by user-defined hook, close nicely
|
||||
return Err(conn.close().await);
|
||||
}
|
||||
|
||||
// No need to re-connect; connection is alive or we don't care
|
||||
Ok(conn.into_live())
|
||||
Err(error) => {
|
||||
tracing::warn!(%error, "error from `before_acquire`");
|
||||
// connection is broken so don't try to close nicely
|
||||
return Err(conn.close_hard().await);
|
||||
}
|
||||
|
||||
Ok(true) => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(conn.into_live().reattach())
|
||||
}));
|
||||
}
|
||||
|
||||
// No checks are configured, return immediately.
|
||||
Either::Right(conn.into_live().reattach())
|
||||
}
|
||||
|
||||
fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
|
||||
@ -340,7 +355,7 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
|
||||
|
||||
(None, None) => {
|
||||
if pool.options.min_connections > 0 {
|
||||
crate::rt::spawn(async move {
|
||||
rt::spawn(async move {
|
||||
if let Some(pool) = pool_weak.upgrade() {
|
||||
pool.min_connections_maintenance(None).await;
|
||||
}
|
||||
@ -354,7 +369,7 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
|
||||
// Immediately cancel this task if the pool is closed.
|
||||
let mut close_event = pool.close_event();
|
||||
|
||||
crate::rt::spawn(async move {
|
||||
rt::spawn(async move {
|
||||
let _ = close_event
|
||||
.do_until(async {
|
||||
// If the last handle to the pool was dropped while we were sleeping
|
||||
@ -387,10 +402,10 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
|
||||
|
||||
if let Some(duration) = next_run.checked_duration_since(Instant::now()) {
|
||||
// `async-std` doesn't have a `sleep_until()`
|
||||
crate::rt::sleep(duration).await;
|
||||
rt::sleep(duration).await;
|
||||
} else {
|
||||
// `next_run` is in the past, just yield.
|
||||
crate::rt::yield_now().await;
|
||||
rt::yield_now().await;
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user