fix(pool): always ping connection on release to see if it's still viable

Signed-off-by: Austin Bonander <austin@launchbadge.com>
This commit is contained in:
Austin Bonander 2021-02-01 21:54:32 -08:00 committed by Ryan Leckey
parent 68d404568e
commit 0ed524d65c
2 changed files with 24 additions and 19 deletions

View File

@ -71,26 +71,29 @@ impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(mut live) = self.live.take() {
let pool = self.pool.clone();
spawn(async move {
let mut floating = live.float(&pool);
if live.raw.should_flush() {
spawn(async move {
// flush the connection (will immediately return if not needed) before
// we fully release to the pool
if let Err(e) = live.raw.flush().await {
log::error!("error occurred while flushing the connection: {}", e);
// test the connection on-release to ensure it is still viable
// if an Executor future/stream is dropped during an `.await` call, the connection
// is likely to be left in an inconsistent state, in which case it should not be
// returned to the pool; also of course, if it was dropped due to an error
// this is simply a band-aid as SQLx-next (0.6) connections should be able
// to recover from cancellations
if let Err(e) = floating.raw.ping().await {
log::warn!(
"error occurred while testing the connection on-release: {}",
e
);
// we now consider the connection to be broken
// close the connection and drop from the pool
let _ = live.float(&pool).into_idle().close().await;
} else {
// after we have flushed successfully, release to the pool
pool.release(live.float(&pool));
}
});
} else {
// nothing to flush, release immediately outside of a spawn
pool.release(live.float(&pool));
}
// we now consider the connection to be broken; just drop it to close
// trying to close gracefully might cause something weird to happen
drop(floating);
} else {
// if the connection is still viable, release it to th epool
pool.release(floating);
}
});
}
}
}

View File

@ -29,7 +29,9 @@ async fn pool_should_invoke_after_connect() -> anyhow::Result<()> {
let _ = pool.acquire().await?;
let _ = pool.acquire().await?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
// since connections are released asynchronously,
// `.after_connect()` may be called more than once
assert!(counter.load(Ordering::SeqCst) >= 1);
Ok(())
}