mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-12-29 21:00:54 +00:00
fix(pool): tweaks and fixes
This commit is contained in:
parent
4d73193c39
commit
782011bec4
@ -107,7 +107,7 @@ ease-off = { workspace = true, features = ["futures"] }
|
||||
pin-project-lite = "0.2.14"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["rt"] }
|
||||
tokio = { version = "1", features = ["rt", "sync"] }
|
||||
|
||||
[dev-dependencies.sqlx]
|
||||
# FIXME: https://github.com/rust-lang/cargo/issues/15622
|
||||
|
||||
@ -6,7 +6,7 @@ use crate::pool::PoolConnection;
|
||||
use crate::rt::JoinHandle;
|
||||
use crate::Error;
|
||||
use ease_off::EaseOff;
|
||||
use event_listener::Event;
|
||||
use event_listener::{listener, Event};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::future::Future;
|
||||
use std::ptr;
|
||||
@ -50,7 +50,7 @@ use std::io;
|
||||
/// let database_url = database_url.clone();
|
||||
/// async move {
|
||||
/// println!(
|
||||
/// "opening connection {}, attempt {}; elapsed time: {}",
|
||||
/// "opening connection {}, attempt {}; elapsed time: {:?}",
|
||||
/// meta.pool_size,
|
||||
/// meta.num_attempts + 1,
|
||||
/// meta.start.elapsed()
|
||||
@ -96,10 +96,10 @@ use std::io;
|
||||
///
|
||||
/// let pool = PgPoolOptions::new()
|
||||
/// .connect_with_connector(move |meta: PoolConnectMetadata| {
|
||||
/// let connect_opts_ = connect_opts.clone();
|
||||
/// let connect_opts = connect_opts_.clone();
|
||||
/// async move {
|
||||
/// println!(
|
||||
/// "opening connection {}, attempt {}; elapsed time: {}",
|
||||
/// "opening connection {}, attempt {}; elapsed time: {:?}",
|
||||
/// meta.pool_size,
|
||||
/// meta.num_attempts + 1,
|
||||
/// meta.start.elapsed()
|
||||
@ -318,7 +318,8 @@ impl ConnectionCounter {
|
||||
|
||||
pub async fn drain(&self) {
|
||||
while self.count.load(Ordering::Acquire) > 0 {
|
||||
self.connect_available.listen().await;
|
||||
listener!(self.connect_available => permit_released);
|
||||
permit_released.await;
|
||||
}
|
||||
}
|
||||
|
||||
@ -386,13 +387,14 @@ impl ConnectionCounter {
|
||||
return acquired;
|
||||
}
|
||||
|
||||
self.connect_available.listen().await;
|
||||
|
||||
if attempt == 2 {
|
||||
tracing::warn!(
|
||||
"unable to acquire a connect permit after sleeping; this may indicate a bug"
|
||||
);
|
||||
}
|
||||
|
||||
listener!(self.connect_available => connect_available);
|
||||
connect_available.await;
|
||||
}
|
||||
|
||||
panic!("BUG: was never able to acquire a connection despite waking many times")
|
||||
|
||||
@ -8,6 +8,8 @@ use futures_util::FutureExt;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use event_listener::listener;
|
||||
|
||||
pub struct IdleQueue<DB: Database> {
|
||||
queue: ArrayQueue<Idle<DB>>,
|
||||
// Keep a separate count because `ArrayQueue::len()` loops until the head and tail pointers
|
||||
@ -36,7 +38,8 @@ impl<DB: Database> IdleQueue<DB> {
|
||||
|
||||
for attempt in 1usize.. {
|
||||
if should_wait {
|
||||
self.release_event.listen().await;
|
||||
listener!(self.release_event => release_event);
|
||||
release_event.await;
|
||||
}
|
||||
|
||||
if let Some(conn) = self.try_acquire(pool) {
|
||||
|
||||
@ -18,7 +18,7 @@ use crate::rt::JoinHandle;
|
||||
use crate::{private_tracing_dynamic_event, rt};
|
||||
use either::Either;
|
||||
use futures_util::future::{self, OptionFuture};
|
||||
use futures_util::{select, FutureExt};
|
||||
use futures_util::{FutureExt};
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::Level;
|
||||
|
||||
@ -78,14 +78,19 @@ impl<DB: Database> PoolInner<DB> {
|
||||
|
||||
// Keep clearing the idle queue as connections are released until the count reaches zero.
|
||||
async move {
|
||||
let mut drained = pin!(self.counter.drain()).fuse();
|
||||
let mut drained = pin!(self.counter.drain());
|
||||
|
||||
loop {
|
||||
select! {
|
||||
idle = self.idle.acquire(self) => {
|
||||
let mut acquire_idle = pin!(self.idle.acquire(self));
|
||||
|
||||
// Not using `futures::select!{}` here because it requires a proc-macro dep,
|
||||
// and frankly it's a little broken.
|
||||
match future::select(drained.as_mut(), acquire_idle.as_mut()).await {
|
||||
// *not* `either::Either`; they rolled their own
|
||||
future::Either::Left(_) => break,
|
||||
future::Either::Right((idle, _)) => {
|
||||
idle.close().await;
|
||||
},
|
||||
() = drained.as_mut() => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user