From 5f8fc6b752b20bbfbc80d1d381e469d48b05b561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Sampaio?= Date: Fri, 15 Aug 2025 20:20:51 -0400 Subject: [PATCH] `Pool.close`: close all connections before returning (#3952) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix race condition in pool close (#3217) * fix deadlock in the postgres/listen example * Only acquire as many permits as a child pool can offer --------- Co-authored-by: Adam Cigánek --- examples/postgres/listen/src/main.rs | 4 ++++ sqlx-core/src/pool/inner.rs | 25 +++++++++++++++---------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/examples/postgres/listen/src/main.rs b/examples/postgres/listen/src/main.rs index 587dab2a..fe39db73 100644 --- a/examples/postgres/listen/src/main.rs +++ b/examples/postgres/listen/src/main.rs @@ -68,6 +68,10 @@ async fn main() -> Result<(), Box> { } } + // The stream is holding one connection. It needs to be dropped to allow the connection to + // return to the pool, otherwise `pool.close()` would never return. + drop(stream); + pool.close().await; Ok(()) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 4254aee3..b698dc9d 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -97,19 +97,24 @@ impl PoolInner { self.mark_closed(); async move { - for permits in 1..=self.options.max_connections { - // Close any currently idle connections in the pool. - while let Some(idle) = self.idle_conns.pop() { - let _ = idle.live.float((*self).clone()).close().await; - } + // For child pools, we need to acquire permits we actually have rather than + // max_connections + let permits_to_acquire = if self.options.parent_pool.is_some() { + // Child pools start with 0 permits, so we acquire based on current size + self.size() + } else { + // Parent pools can acquire all max_connections permits + self.options.max_connections + }; - if self.size() == 0 { - break; - } + let _permits = self.semaphore.acquire(permits_to_acquire).await; - // Wait for all permits to be released. - let _permits = self.semaphore.acquire(permits).await; + while let Some(idle) = self.idle_conns.pop() { + let _ = idle.live.raw.close().await; } + + self.num_idle.store(0, Ordering::Release); + self.size.store(0, Ordering::Release); } }