mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-12-27 11:08:05 +00:00
Handle dropping wait_for_conn
If wait_for_conn is dropped before completing, release will call wake on a waker that noone is listining on. This leads to a state where waiting will gro indefinitly while all connections are idle. To fix this we turn waiting into a queue of unique Weak pointers. This way if wait_for_conn is dropped the pointer in waiting cannot be upgraded, we can use this as a signal that we shoud wake the next one instead.
This commit is contained in:
parent
ffc2c6c67a
commit
c7cf104a8f
@ -12,14 +12,14 @@ use std::cmp;
|
||||
use std::mem;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::task::Context;
|
||||
use std::time::Instant;
|
||||
|
||||
pub(crate) struct SharedPool<DB: Database> {
|
||||
pub(super) connect_options: <DB::Connection as Connection>::Options,
|
||||
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
|
||||
waiters: SegQueue<Arc<Waiter>>,
|
||||
waiters: SegQueue<Weak<Waiter>>,
|
||||
pub(super) size: AtomicU32,
|
||||
is_closed: AtomicBool,
|
||||
pub(super) options: PoolOptions<DB>,
|
||||
@ -42,7 +42,9 @@ impl<DB: Database> SharedPool<DB> {
|
||||
pub(super) async fn close(&self) {
|
||||
self.is_closed.store(true, Ordering::Release);
|
||||
while let Some(waker) = self.waiters.pop() {
|
||||
waker.wake();
|
||||
if let Some(waker) = waker.upgrade() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
// ensure we wait until the pool is actually closed
|
||||
@ -92,8 +94,11 @@ impl<DB: Database> SharedPool<DB> {
|
||||
panic!("BUG: connection queue overflow in release()");
|
||||
}
|
||||
|
||||
if let Some(waker) = self.waiters.pop() {
|
||||
waker.wake();
|
||||
while let Some(waker) = self.waiters.pop() {
|
||||
if let Some(waker) = waker.upgrade() {
|
||||
waker.wake();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,7 +142,7 @@ impl<DB: Database> SharedPool<DB> {
|
||||
future::poll_fn(|cx| -> Poll<()> {
|
||||
let waiter = waiter.get_or_insert_with(|| {
|
||||
let waiter = Waiter::new(cx);
|
||||
self.waiters.push(waiter.clone());
|
||||
self.waiters.push(Arc::downgrade(&waiter));
|
||||
waiter
|
||||
});
|
||||
|
||||
@ -358,7 +363,7 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
|
||||
/// (where the pool thinks it has more connections than it does).
|
||||
pub(in crate::pool) struct DecrementSizeGuard<'a> {
|
||||
size: &'a AtomicU32,
|
||||
waiters: &'a SegQueue<Arc<Waiter>>,
|
||||
waiters: &'a SegQueue<Weak<Waiter>>,
|
||||
dropped: bool,
|
||||
}
|
||||
|
||||
@ -387,7 +392,9 @@ impl Drop for DecrementSizeGuard<'_> {
|
||||
self.dropped = true;
|
||||
self.size.fetch_sub(1, Ordering::SeqCst);
|
||||
if let Some(waker) = self.waiters.pop() {
|
||||
waker.wake();
|
||||
if let Some(waker) = waker.upgrade() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user