diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 8512508c..0eadf293 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -107,7 +107,7 @@ ease-off = { workspace = true, features = ["futures"] } pin-project-lite = "0.2.14" [dev-dependencies] -tokio = { version = "1", features = ["rt"] } +tokio = { version = "1", features = ["rt", "sync"] } [dev-dependencies.sqlx] # FIXME: https://github.com/rust-lang/cargo/issues/15622 diff --git a/sqlx-core/src/pool/connect.rs b/sqlx-core/src/pool/connect.rs index 7fdc8c47..ee805914 100644 --- a/sqlx-core/src/pool/connect.rs +++ b/sqlx-core/src/pool/connect.rs @@ -6,7 +6,7 @@ use crate::pool::PoolConnection; use crate::rt::JoinHandle; use crate::Error; use ease_off::EaseOff; -use event_listener::Event; +use event_listener::{listener, Event}; use std::fmt::{Display, Formatter}; use std::future::Future; use std::ptr; @@ -50,7 +50,7 @@ use std::io; /// let database_url = database_url.clone(); /// async move { /// println!( -/// "opening connection {}, attempt {}; elapsed time: {}", +/// "opening connection {}, attempt {}; elapsed time: {:?}", /// meta.pool_size, /// meta.num_attempts + 1, /// meta.start.elapsed() @@ -96,10 +96,10 @@ use std::io; /// /// let pool = PgPoolOptions::new() /// .connect_with_connector(move |meta: PoolConnectMetadata| { -/// let connect_opts_ = connect_opts.clone(); +/// let connect_opts = connect_opts_.clone(); /// async move { /// println!( -/// "opening connection {}, attempt {}; elapsed time: {}", +/// "opening connection {}, attempt {}; elapsed time: {:?}", /// meta.pool_size, /// meta.num_attempts + 1, /// meta.start.elapsed() @@ -318,7 +318,8 @@ impl ConnectionCounter { pub async fn drain(&self) { while self.count.load(Ordering::Acquire) > 0 { - self.connect_available.listen().await; + listener!(self.connect_available => permit_released); + permit_released.await; } } @@ -386,13 +387,14 @@ impl ConnectionCounter { return acquired; } - self.connect_available.listen().await; - if attempt == 2 { tracing::warn!( "unable to acquire a connect permit after sleeping; this may indicate a bug" ); } + + listener!(self.connect_available => connect_available); + connect_available.await; } panic!("BUG: was never able to acquire a connection despite waking many times") diff --git a/sqlx-core/src/pool/idle.rs b/sqlx-core/src/pool/idle.rs index 239313f7..8b07b8e7 100644 --- a/sqlx-core/src/pool/idle.rs +++ b/sqlx-core/src/pool/idle.rs @@ -8,6 +8,8 @@ use futures_util::FutureExt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use event_listener::listener; + pub struct IdleQueue { queue: ArrayQueue>, // Keep a separate count because `ArrayQueue::len()` loops until the head and tail pointers @@ -36,7 +38,8 @@ impl IdleQueue { for attempt in 1usize.. { if should_wait { - self.release_event.listen().await; + listener!(self.release_event => release_event); + release_event.await; } if let Some(conn) = self.try_acquire(pool) { diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 4a515342..5eb1d203 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -18,7 +18,7 @@ use crate::rt::JoinHandle; use crate::{private_tracing_dynamic_event, rt}; use either::Either; use futures_util::future::{self, OptionFuture}; -use futures_util::{select, FutureExt}; +use futures_util::{FutureExt}; use std::time::{Duration, Instant}; use tracing::Level; @@ -78,14 +78,19 @@ impl PoolInner { // Keep clearing the idle queue as connections are released until the count reaches zero. async move { - let mut drained = pin!(self.counter.drain()).fuse(); + let mut drained = pin!(self.counter.drain()); loop { - select! { - idle = self.idle.acquire(self) => { + let mut acquire_idle = pin!(self.idle.acquire(self)); + + // Not using `futures::select!{}` here because it requires a proc-macro dep, + // and frankly it's a little broken. + match future::select(drained.as_mut(), acquire_idle.as_mut()).await { + // *not* `either::Either`; they rolled their own + future::Either::Left(_) => break, + future::Either::Right((idle, _)) => { idle.close().await; - }, - () = drained.as_mut() => break, + } } } }