diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index da9b0add..ec86c615 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -18,7 +18,7 @@ use crate::rt::JoinHandle; use crate::{private_tracing_dynamic_event, rt}; use either::Either; use futures_util::future::{self, OptionFuture}; -use futures_util::FutureExt; +use futures_util::{select, FutureExt}; use std::time::{Duration, Instant}; use tracing::Level; @@ -76,12 +76,18 @@ impl PoolInner { pub(super) fn close(self: &Arc) -> impl Future + '_ { self.mark_closed(); + // Keep clearing the idle queue as connections are released until the count reaches zero. async move { - while let Some(idle) = self.idle.try_acquire(self) { - idle.close().await; - } + let mut drained = pin!(self.counter.drain()); - self.counter.drain().await; + loop { + select! { + idle = self.idle.acquire(self) => { + idle.close().await; + }, + () = drained.as_mut() => break, + } + } } } @@ -117,7 +123,7 @@ impl PoolInner { let acquire_started_at = Instant::now(); let mut close_event = pin!(self.close_event()); - let mut deadline = pin!(crate::rt::sleep(self.options.acquire_timeout)); + let mut deadline = pin!(rt::sleep(self.options.acquire_timeout)); let mut acquire_idle = pin!(self.idle.acquire(self).fuse()); let mut before_acquire = OptionFuture::from(None); let mut acquire_connect_permit = pin!(OptionFuture::from(Some( @@ -131,6 +137,9 @@ impl PoolInner { // * If we acquire a `ConnectPermit`, we begin the connection loop (with backoff) // as implemented by `DynConnector`. // * If we acquire an idle connection, we then start polling `check_idle_conn()`. + // + // This doesn't quite fit into `select!{}` because the set of futures that may be polled + // at a given time is dynamic, so it's actually simpler to hand-roll it. let acquired = future::poll_fn(|cx| { use std::task::Poll::*;