diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index b9b78bb7..4a9fd61f 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -31,9 +31,9 @@ pub(super) struct Idle { } /// RAII wrapper for connections being handled by functions that may drop them -pub(super) struct Floating<'p, C> { +pub(super) struct Floating { pub(super) inner: C, - pub(super) guard: DecrementSizeGuard<'p>, + pub(super) guard: DecrementSizeGuard, } const DEREF_ERR: &str = "(bug) connection already released to pool"; @@ -89,7 +89,7 @@ impl PoolConnection { self.live .take() .expect("PoolConnection double-dropped") - .float(&self.pool) + .float(self.pool.clone()) .detach() } @@ -106,19 +106,20 @@ impl PoolConnection { /// /// This effectively runs the drop handler eagerly instead of spawning a task to do it. pub(crate) fn return_to_pool(&mut self) -> impl Future + Send + 'static { - // we want these to happen synchronously so the drop handler doesn't try to spawn a task anyway - // this also makes the returned future `'static` - let live = self.live.take(); - let pool = self.pool.clone(); + // float the connection in the pool before we move into the task + // in case the returned `Future` isn't executed, like if it's spawned into a dying runtime + // https://github.com/launchbadge/sqlx/issues/1396 + let floating = self.live.take().map(|live| live.float(self.pool.clone())); async move { - let mut floating = if let Some(live) = live { - live.float(&pool) + let mut floating = if let Some(floating) = floating { + floating } else { return; }; - // test the connection on-release to ensure it is still viable + // test the connection on-release to ensure it is still viable, + // and flush anything time-sensitive like transaction rollbacks // if an Executor future/stream is dropped during an `.await` call, the connection // is likely to be left in an inconsistent state, in which case it should not be // returned to the pool; also of course, if it was dropped due to an error @@ -135,7 +136,7 @@ impl PoolConnection { drop(floating); } else { // if the connection is still viable, release it to the pool - pool.release(floating); + floating.release(); } } } @@ -157,7 +158,7 @@ impl Drop for PoolConnection { } impl Live { - pub fn float(self, pool: &SharedPool) -> Floating<'_, Self> { + pub fn float(self, pool: Arc>) -> Floating { Floating { inner: self, // create a new guard from a previously leaked permit @@ -187,8 +188,8 @@ impl DerefMut for Idle { } } -impl<'s, DB: Database> Floating<'s, Live> { - pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<'s>) -> Self { +impl Floating> { + pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard) -> Self { Self { inner: Live { raw: conn, @@ -213,6 +214,10 @@ impl<'s, DB: Database> Floating<'s, Live> { } } + pub fn release(self) { + self.guard.pool.clone().release(self); + } + pub async fn close(self) -> Result<(), Error> { // `guard` is dropped as intended self.inner.raw.close().await @@ -222,7 +227,7 @@ impl<'s, DB: Database> Floating<'s, Live> { self.inner.raw } - pub fn into_idle(self) -> Floating<'s, Idle> { + pub fn into_idle(self) -> Floating> { Floating { inner: self.inner.into_idle(), guard: self.guard, @@ -230,11 +235,11 @@ impl<'s, DB: Database> Floating<'s, Live> { } } -impl<'s, DB: Database> Floating<'s, Idle> { +impl Floating> { pub fn from_idle( idle: Idle, - pool: &'s SharedPool, - permit: SemaphoreReleaser<'s>, + pool: Arc>, + permit: SemaphoreReleaser<'_>, ) -> Self { Self { inner: idle, @@ -246,14 +251,14 @@ impl<'s, DB: Database> Floating<'s, Idle> { self.live.raw.ping().await } - pub fn into_live(self) -> Floating<'s, Live> { + pub fn into_live(self) -> Floating> { Floating { inner: self.inner.live, guard: self.guard, } } - pub async fn close(self) -> DecrementSizeGuard<'s> { + pub async fn close(self) -> DecrementSizeGuard { // `guard` is dropped as intended if let Err(e) = self.inner.live.raw.close().await { log::debug!("error occurred while closing the pool connection: {}", e); @@ -262,7 +267,7 @@ impl<'s, DB: Database> Floating<'s, Idle> { } } -impl Deref for Floating<'_, C> { +impl Deref for Floating { type Target = C; fn deref(&self) -> &Self::Target { @@ -270,7 +275,7 @@ impl Deref for Floating<'_, C> { } } -impl DerefMut for Floating<'_, C> { +impl DerefMut for Floating { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index ed82ca05..4648805b 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -75,7 +75,7 @@ impl SharedPool { self.is_closed.load(Ordering::Acquire) } - pub(super) async fn close(&self) { + pub(super) async fn close(self: &Arc) { let already_closed = self.is_closed.swap(true, Ordering::AcqRel); if !already_closed { @@ -93,12 +93,12 @@ impl SharedPool { .await; while let Some(idle) = self.idle_conns.pop() { - let _ = idle.live.float(self).close().await; + let _ = idle.live.float((*self).clone()).close().await; } } #[inline] - pub(super) fn try_acquire(&self) -> Option>> { + pub(super) fn try_acquire(self: &Arc) -> Option>> { if self.is_closed() { return None; } @@ -108,17 +108,17 @@ impl SharedPool { } fn pop_idle<'a>( - &'a self, + self: &'a Arc, permit: SemaphoreReleaser<'a>, - ) -> Result>, SemaphoreReleaser<'a>> { + ) -> Result>, SemaphoreReleaser<'a>> { if let Some(idle) = self.idle_conns.pop() { - Ok(Floating::from_idle(idle, self, permit)) + Ok(Floating::from_idle(idle, (*self).clone(), permit)) } else { Err(permit) } } - pub(super) fn release(&self, mut floating: Floating<'_, Live>) { + pub(super) fn release(&self, mut floating: Floating>) { if let Some(test) = &self.options.after_release { if !test(&mut floating.raw) { // drop the connection and do not return it to the pool @@ -141,9 +141,9 @@ impl SharedPool { /// /// Returns `None` if we are at max_connections or if the pool is closed. pub(super) fn try_increment_size<'a>( - &'a self, + self: &'a Arc, permit: SemaphoreReleaser<'a>, - ) -> Result, SemaphoreReleaser<'a>> { + ) -> Result, SemaphoreReleaser<'a>> { match self .size .fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| { @@ -151,14 +151,14 @@ impl SharedPool { .filter(|size| size <= &self.options.max_connections) }) { // we successfully incremented the size - Ok(_) => Ok(DecrementSizeGuard::from_permit(self, permit)), + Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)), // the pool is at max capacity Err(_) => Err(permit), } } #[allow(clippy::needless_lifetimes)] - pub(super) async fn acquire<'s>(&'s self) -> Result>, Error> { + pub(super) async fn acquire(self: &Arc) -> Result>, Error> { if self.is_closed() { return Err(Error::PoolClosed); } @@ -206,11 +206,11 @@ impl SharedPool { .map_err(|_| Error::PoolTimedOut)? } - pub(super) async fn connection<'s>( - &'s self, + pub(super) async fn connection( + self: &Arc, deadline: Instant, - guard: DecrementSizeGuard<'s>, - ) -> Result>, Error> { + guard: DecrementSizeGuard, + ) -> Result>, Error> { if self.is_closed() { return Err(Error::PoolClosed); } @@ -275,10 +275,10 @@ fn is_beyond_idle(idle: &Idle, options: &PoolOptions) -> b .map_or(false, |timeout| idle.since.elapsed() > timeout) } -async fn check_conn<'s: 'p, 'p, DB: Database>( - mut conn: Floating<'s, Idle>, - options: &'p PoolOptions, -) -> Result>, DecrementSizeGuard<'s>> { +async fn check_conn( + mut conn: Floating>, + options: &PoolOptions, +) -> Result>, DecrementSizeGuard> { // If the connection we pulled has expired, close the connection and // immediately create a new connection if is_beyond_lifetime(&conn, options) { @@ -337,7 +337,7 @@ fn spawn_reaper(pool: &Arc>) { }); } -async fn do_reap(pool: &SharedPool) { +async fn do_reap(pool: &Arc>) { // reap at most the current size minus the minimum idle let max_reaped = pool.size().saturating_sub(pool.options.min_connections); @@ -363,39 +363,34 @@ async fn do_reap(pool: &SharedPool) { /// /// Will decrement the pool size if dropped, to avoid semantically "leaking" connections /// (where the pool thinks it has more connections than it does). -pub(in crate::pool) struct DecrementSizeGuard<'a> { - size: &'a AtomicU32, - semaphore: &'a Semaphore, +pub(in crate::pool) struct DecrementSizeGuard { + pub(crate) pool: Arc>, dropped: bool, } -impl<'a> DecrementSizeGuard<'a> { +impl DecrementSizeGuard { /// Create a new guard that will release a semaphore permit on-drop. - pub fn new_permit(pool: &'a SharedPool) -> Self { + pub fn new_permit(pool: Arc>) -> Self { Self { - size: &pool.size, - semaphore: &pool.semaphore, + pool, dropped: false, } } - pub fn from_permit( - pool: &'a SharedPool, - mut permit: SemaphoreReleaser<'a>, - ) -> Self { + pub fn from_permit(pool: Arc>, mut permit: SemaphoreReleaser<'_>) -> Self { // here we effectively take ownership of the permit permit.disarm(); Self::new_permit(pool) } /// Return `true` if the internal references point to the same fields in `SharedPool`. - pub fn same_pool(&self, pool: &'a SharedPool) -> bool { - ptr::eq(self.size, &pool.size) + pub fn same_pool(&self, pool: &SharedPool) -> bool { + ptr::eq(&*self.pool, pool) } /// Release the semaphore permit without decreasing the pool size. fn release_permit(self) { - self.semaphore.release(1); + self.pool.semaphore.release(1); self.cancel(); } @@ -404,13 +399,13 @@ impl<'a> DecrementSizeGuard<'a> { } } -impl Drop for DecrementSizeGuard<'_> { +impl Drop for DecrementSizeGuard { fn drop(&mut self) { assert!(!self.dropped, "double-dropped!"); self.dropped = true; - self.size.fetch_sub(1, Ordering::SeqCst); + self.pool.size.fetch_sub(1, Ordering::SeqCst); // and here we release the permit we got on construction - self.semaphore.release(1); + self.pool.semaphore.release(1); } } diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 32313808..3be566d7 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -228,7 +228,7 @@ impl PoolOptions { } } -async fn init_min_connections(pool: &SharedPool) -> Result<(), Error> { +async fn init_min_connections(pool: &Arc>) -> Result<(), Error> { for _ in 0..cmp::max(pool.options.min_connections, 1) { let deadline = Instant::now() + pool.options.connect_timeout; let permit = pool.semaphore.acquire(1).await;