refactor(pool): use a unique ID per connection

This commit is contained in:
Austin Bonander 2024-11-08 15:50:51 -08:00
parent c6f4b9fe29
commit e95bb03e95
3 changed files with 110 additions and 67 deletions

View File

@ -7,6 +7,7 @@ use crate::rt::JoinHandle;
use crate::Error;
use ease_off::EaseOff;
use event_listener::{Event, EventListener};
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::ptr;
@ -246,6 +247,7 @@ impl<DB: Database> PoolConnector<DB> for DefaultConnector<DB> {
/// Metadata passed to [`PoolConnector::connect()`] for every connection attempt.
#[derive(Debug)]
#[non_exhaustive]
pub struct PoolConnectMetadata {
/// The instant at which the current connection task was started, including all attempts.
///
@ -253,13 +255,16 @@ pub struct PoolConnectMetadata {
pub start: Instant,
/// The number of attempts that have occurred so far.
pub num_attempts: usize,
/// The current size of the pool.
pub pool_size: usize,
/// The ID of the connection, unique for the pool.
pub connection_id: ConnectionId,
}
pub struct DynConnector<DB: Database> {
// We want to spawn the connection attempt as a task anyway
connect: Box<
dyn Fn(ConnectPermit<DB>, usize) -> JoinHandle<crate::Result<PoolConnection<DB>>>
dyn Fn(ConnectionId, ConnectPermit<DB>) -> JoinHandle<crate::Result<PoolConnection<DB>>>
+ Send
+ Sync
+ 'static,
@ -271,53 +276,92 @@ impl<DB: Database> DynConnector<DB> {
let connector = Arc::new(connector);
Self {
connect: Box::new(move |permit, size| {
crate::rt::spawn(connect_with_backoff(permit, connector.clone(), size))
connect: Box::new(move |id, permit| {
crate::rt::spawn(connect_with_backoff(id, permit, connector.clone()))
}),
}
}
pub fn connect(
&self,
id: ConnectionId,
permit: ConnectPermit<DB>,
size: usize,
) -> JoinHandle<crate::Result<PoolConnection<DB>>> {
(self.connect)(permit, size)
(self.connect)(id, permit)
}
}
pub struct ConnectionCounter {
connections: AtomicUsize,
count: AtomicUsize,
next_id: AtomicUsize,
connect_available: Event,
}
/// An opaque connection ID, unique for every connection attempt with the same pool.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ConnectionId(usize);
impl ConnectionCounter {
pub fn new() -> Self {
Self {
connections: AtomicUsize::new(0),
count: AtomicUsize::new(0),
next_id: AtomicUsize::new(1),
connect_available: Event::new(),
}
}
pub fn connections(&self) -> usize {
self.connections.load(Ordering::Acquire)
self.count.load(Ordering::Acquire)
}
pub async fn drain(&self) {
while self.connections.load(Ordering::Acquire) > 0 {
while self.count.load(Ordering::Acquire) > 0 {
self.connect_available.listen().await;
}
}
/// Attempt to acquire a permit from both this instance, and the parent pool, if applicable.
///
/// Returns the permit, and the ID of the new connection.
pub fn try_acquire_permit<DB: Database>(
&self,
pool: &Arc<PoolInner<DB>>,
) -> Option<(ConnectionId, ConnectPermit<DB>)> {
debug_assert!(ptr::addr_eq(self, &pool.counter));
// Don't skip the queue.
if pool.options.fair && self.connect_available.total_listeners() > 0 {
return None;
}
let prev_size = self
.count
.fetch_update(Ordering::Release, Ordering::Acquire, |connections| {
(connections < pool.options.max_connections).then_some(connections + 1)
})
.ok()?;
let size = prev_size + 1;
tracing::trace!(target: "sqlx::pool::connect", size, "increased size");
Some((
ConnectionId(self.next_id.fetch_add(1, Ordering::SeqCst)),
ConnectPermit {
pool: Some(Arc::clone(pool)),
},
))
}
/// Attempt to acquire a permit from both this instance, and the parent pool, if applicable.
///
/// Returns the permit, and the current size of the pool.
pub async fn acquire_permit<DB: Database>(
&self,
pool: &Arc<PoolInner<DB>>,
) -> (usize, ConnectPermit<DB>) {
) -> (ConnectionId, ConnectPermit<DB>) {
// Check that `self` can increase size first before we check the parent.
let (size, permit) = self.acquire_permit_self(pool).await;
let acquired = self.acquire_permit_self(pool).await;
if let Some(parent) = &pool.options.parent_pool {
let (_, permit) = parent.0.counter.acquire_permit_self(&parent.0).await;
@ -326,7 +370,7 @@ impl ConnectionCounter {
permit.consume();
}
(size, permit)
acquired
}
// Separate method because `async fn`s cannot be recursive.
@ -334,38 +378,13 @@ impl ConnectionCounter {
async fn acquire_permit_self<DB: Database>(
&self,
pool: &Arc<PoolInner<DB>>,
) -> (usize, ConnectPermit<DB>) {
debug_assert!(ptr::addr_eq(self, &pool.counter));
let mut should_wait = pool.options.fair && self.connect_available.total_listeners() > 0;
) -> (ConnectionId, ConnectPermit<DB>) {
for attempt in 1usize.. {
if should_wait {
self.connect_available.listen().await;
if let Some(acquired) = self.try_acquire_permit(pool) {
return acquired;
}
let res = self.connections.fetch_update(
Ordering::Release,
Ordering::Acquire,
|connections| {
(connections < pool.options.max_connections).then_some(connections + 1)
},
);
if let Ok(prev_size) = res {
let size = prev_size + 1;
tracing::trace!(target: "sqlx::pool::connect", size, "increased size");
return (
prev_size + 1,
ConnectPermit {
pool: Some(Arc::clone(pool)),
},
);
}
should_wait = true;
self.connect_available.listen().await;
if attempt == 2 {
tracing::warn!(
@ -380,7 +399,7 @@ impl ConnectionCounter {
pub fn release_permit<DB: Database>(&self, pool: &PoolInner<DB>) {
debug_assert!(ptr::addr_eq(self, &pool.counter));
self.connections.fetch_sub(1, Ordering::Release);
self.count.fetch_sub(1, Ordering::Release);
self.connect_available.notify(1usize);
if let Some(parent) = &pool.options.parent_pool {
@ -415,16 +434,22 @@ impl<DB: Database> Drop for ConnectPermit<DB> {
}
}
impl Display for ConnectionId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}
#[tracing::instrument(
target = "sqlx::pool::connect",
skip_all,
fields(connection = size),
skip_all,
fields(%connection_id),
err
)]
async fn connect_with_backoff<DB: Database>(
connection_id: ConnectionId,
permit: ConnectPermit<DB>,
connector: Arc<impl PoolConnector<DB>>,
size: usize,
) -> crate::Result<PoolConnection<DB>> {
if permit.pool().is_closed() {
return Err(Error::PoolClosed);
@ -436,7 +461,8 @@ async fn connect_with_backoff<DB: Database>(
let meta = PoolConnectMetadata {
start: ease_off.started_at(),
num_attempts: attempt,
pool_size: size,
pool_size: permit.pool().size(),
connection_id,
};
let conn = ease_off
@ -445,7 +471,7 @@ async fn connect_with_backoff<DB: Database>(
.or_retry_if(|e| can_retry_error(e.inner()))?;
if let Some(conn) = conn {
return Ok(Floating::new_live(conn, permit).reattach());
return Ok(Floating::new_live(conn, connection_id, permit).reattach());
}
}

View File

@ -11,7 +11,7 @@ use crate::database::Database;
use crate::error::Error;
use super::inner::{is_beyond_max_lifetime, PoolInner};
use crate::pool::connect::ConnectPermit;
use crate::pool::connect::{ConnectPermit, ConnectionId};
use crate::pool::options::PoolConnectionMetadata;
const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
@ -27,6 +27,7 @@ pub struct PoolConnection<DB: Database> {
pub(super) struct Live<DB: Database> {
pub(super) raw: DB::Connection,
pub(super) id: ConnectionId,
pub(super) created_at: Instant,
}
@ -247,10 +248,11 @@ impl<DB: Database> DerefMut for Idle<DB> {
}
impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, permit: ConnectPermit<DB>) -> Self {
pub fn new_live(conn: DB::Connection, id: ConnectionId, permit: ConnectPermit<DB>) -> Self {
Self {
inner: Live {
raw: conn,
id,
created_at: Instant::now(),
},
permit,
@ -381,17 +383,29 @@ impl<DB: Database> Floating<DB, Idle<DB>> {
}
}
pub async fn close(self) -> ConnectPermit<DB> {
pub async fn close(self) -> (ConnectionId, ConnectPermit<DB>) {
let connection_id = self.inner.live.id;
tracing::debug!(%connection_id, "closing connection (gracefully)");
if let Err(error) = self.inner.live.raw.close().await {
tracing::debug!(%error, "error occurred while closing the pool connection");
tracing::debug!(
%connection_id,
%error,
"error occurred while closing the pool connection"
);
}
self.permit
(connection_id, self.permit)
}
pub async fn close_hard(self) -> ConnectPermit<DB> {
pub async fn close_hard(self) -> (ConnectionId, ConnectPermit<DB>) {
let connection_id = self.inner.live.id;
tracing::debug!(%connection_id, "closing connection (hard)");
let _ = self.inner.live.raw.close_hard().await;
self.permit
(connection_id, self.permit)
}
pub fn metadata(&self) -> PoolConnectionMetadata {

View File

@ -12,7 +12,7 @@ use std::sync::Arc;
use std::task::ready;
use crate::logger::private_level_filter_to_trace_level;
use crate::pool::connect::{ConnectPermit, ConnectionCounter, DynConnector};
use crate::pool::connect::{ConnectPermit, ConnectionCounter, ConnectionId, DynConnector};
use crate::pool::idle::IdleQueue;
use crate::rt::JoinHandle;
use crate::{private_tracing_dynamic_event, rt};
@ -166,7 +166,7 @@ impl<DB: Database> PoolInner<DB> {
// Poll the task returned by `finish_acquire`
match ready!(before_acquire.poll_unpin(cx)) {
Some(Ok(conn)) => return Ready(Ok(conn)),
Some(Err(permit)) => {
Some(Err((id, permit))) => {
// We don't strictly need to poll `connect` here; all we really want to do
// is to check if it is `None`. But since currently there's no getter for that,
// it doesn't really hurt to just poll it here.
@ -175,7 +175,7 @@ impl<DB: Database> PoolInner<DB> {
// If we're not already attempting to connect,
// take the permit returned from closing the connection and
// attempt to open a new one.
connect = Some(self.connector.connect(permit, self.size())).into();
connect = Some(self.connector.connect(id, permit)).into();
}
// `permit` is dropped in these branches, allowing another task to use it
Ready(Some(res)) => return Ready(res),
@ -190,8 +190,8 @@ impl<DB: Database> PoolInner<DB> {
None => (),
}
if let Ready(Some((size, permit))) = acquire_connect_permit.poll_unpin(cx) {
connect = Some(self.connector.connect(permit, size)).into();
if let Ready(Some((id, permit))) = acquire_connect_permit.poll_unpin(cx) {
connect = Some(self.connector.connect(id, permit)).into();
}
if let Ready(Some(res)) = connect.poll_unpin(cx) {
@ -237,11 +237,11 @@ impl<DB: Database> PoolInner<DB> {
//
// If no extra permits are available then we shouldn't be trying to spin up
// connections anyway.
let Some((size, permit)) = self.counter.acquire_permit(self).now_or_never() else {
let Some((id, permit)) = self.counter.acquire_permit(self).now_or_never() else {
return Ok(());
};
let conn = self.connector.connect(permit, size).await?;
let conn = self.connector.connect(id, permit).await?;
// We skip `after_release` since the connection was never provided to user code
// besides inside `PollConnector::connect()`, if they override it.
@ -297,13 +297,16 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
}
/// Execute `test_before_acquire` and/or `before_acquire` in a background task, if applicable.
///
///
/// Otherwise, immediately returns the connection.
fn finish_acquire<DB: Database>(
mut conn: Floating<DB, Idle<DB>>
) -> Either<JoinHandle<Result<PoolConnection<DB>, ConnectPermit<DB>>>, PoolConnection<DB>> {
mut conn: Floating<DB, Idle<DB>>,
) -> Either<
JoinHandle<Result<PoolConnection<DB>, (ConnectionId, ConnectPermit<DB>)>>,
PoolConnection<DB>,
> {
let pool = conn.permit.pool();
if pool.options.test_before_acquire || pool.options.before_acquire.is_some() {
// Spawn a task so the call may complete even if `acquire()` is cancelled.
return Either::Left(rt::spawn(async move {
@ -334,11 +337,11 @@ fn finish_acquire<DB: Database>(
Ok(true) => {}
}
}
Ok(conn.into_live().reattach())
}));
}
// No checks are configured, return immediately.
Either::Right(conn.into_live().reattach())
}