more pool fixes (#1211)

* a task that is marked woken but didn't actually wake before being cancelled will instead wake the next task in the queue

* a task that wakes but doesn't get a connection will put itself back in the queue instead of waiting until it times out with no way to be woken

* the idle reaper now won't run if there are tasks waiting for a connection, and also uses
the proper `SharedPool::release()` to return validated connections to the pool so waiting tasks get woken

closes #622, #1210

(hopefully for good this time)

Signed-off-by: Austin Bonander <austin@launchbadge.com>
This commit is contained in:
Austin Bonander 2021-05-17 19:24:40 -07:00 committed by GitHub
parent 78a94240e6
commit 8f1d8c7e2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 60 deletions

View File

@ -15,14 +15,12 @@ use std::sync::{Arc, Weak};
use std::task::Context; use std::task::Context;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
/// Waiters should wake at least every this often to check if a connection has not come available type Waiters = SegQueue<Weak<WaiterInner>>;
/// since they went to sleep.
const MIN_WAKE_PERIOD: Duration = Duration::from_millis(500);
pub(crate) struct SharedPool<DB: Database> { pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options, pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) idle_conns: ArrayQueue<Idle<DB>>, pub(super) idle_conns: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Weak<Waiter>>, waiters: Waiters,
pub(super) size: AtomicU32, pub(super) size: AtomicU32,
is_closed: AtomicBool, is_closed: AtomicBool,
pub(super) options: PoolOptions<DB>, pub(super) options: PoolOptions<DB>,
@ -152,7 +150,7 @@ impl<DB: Database> SharedPool<DB> {
// the strong ref of the `Weak<Waiter>` that we push to the queue // the strong ref of the `Weak<Waiter>` that we push to the queue
// initialized during the `timeout()` call below // initialized during the `timeout()` call below
// as long as we own this, we keep our place in line // as long as we own this, we keep our place in line
let mut waiter = None; let mut waiter: Option<Waiter<'_>> = None;
// Unless the pool has been closed ... // Unless the pool has been closed ...
while !self.is_closed() { while !self.is_closed() {
@ -173,24 +171,21 @@ impl<DB: Database> SharedPool<DB> {
} }
} }
// Wait for a connection to become available (or we are allowed to open a new one) if let Some(ref waiter) = waiter {
let timeout_duration = cmp::min( // return the waiter to the queue, note that this does put it to the back
// Returns an error if `deadline` passes // of the queue when it should ideally stay at the front
deadline_as_timeout::<DB>(deadline)?, self.waiters.push(Arc::downgrade(&waiter.inner));
MIN_WAKE_PERIOD, }
);
sqlx_rt::timeout( sqlx_rt::timeout(
timeout_duration, // Returns an error if `deadline` passes
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue // `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|cx| -> Poll<()> { future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| { let waiter = waiter.get_or_insert_with(|| Waiter::push_new(cx, &self.waiters));
let waiter = Waiter::new(cx);
self.waiters.push(Arc::downgrade(&waiter));
waiter
});
if waiter.is_woken() { if waiter.is_woken() {
waiter.actually_woke = true;
Poll::Ready(()) Poll::Ready(())
} else { } else {
Poll::Pending Poll::Pending
@ -198,7 +193,11 @@ impl<DB: Database> SharedPool<DB> {
}), }),
) )
.await .await
.ok(); // timeout is no longer fatal here; we check if the deadline expired above .map_err(|_| Error::PoolTimedOut)?;
if let Some(ref mut waiter) = waiter {
waiter.reset();
}
waited = true; waited = true;
} }
@ -329,7 +328,18 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
let pool = Arc::clone(&pool); let pool = Arc::clone(&pool);
sqlx_rt::spawn(async move { sqlx_rt::spawn(async move {
while !pool.is_closed.load(Ordering::Acquire) { while !pool.is_closed() {
// only reap idle connections when no tasks are waiting
if pool.waiters.is_empty() {
do_reap(&pool).await;
}
sqlx_rt::sleep(period).await;
}
});
}
async fn do_reap<DB: Database>(pool: &SharedPool<DB>) {
// reap at most the current size minus the minimum idle // reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections); let max_reaped = pool.size().saturating_sub(pool.options.min_connections);
@ -342,24 +352,16 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
}); });
for conn in keep { for conn in keep {
// return these connections to the pool first // return valid connections to the pool first
let is_ok = pool.idle_conns.push(conn.into_leakable()).is_ok(); pool.release(conn.into_live());
if !is_ok {
panic!("BUG: connection queue overflow in spawn_reaper");
}
} }
for conn in reap { for conn in reap {
let _ = conn.close().await; let _ = conn.close().await;
} }
sqlx_rt::sleep(period).await;
}
});
} }
fn wake_one(waiters: &SegQueue<Weak<Waiter>>) { fn wake_one(waiters: &Waiters) {
while let Some(weak) = waiters.pop() { while let Some(weak) = waiters.pop() {
if let Some(waiter) = weak.upgrade() { if let Some(waiter) = weak.upgrade() {
if waiter.wake() { if waiter.wake() {
@ -375,7 +377,7 @@ fn wake_one(waiters: &SegQueue<Weak<Waiter>>) {
/// (where the pool thinks it has more connections than it does). /// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> { pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32, size: &'a AtomicU32,
waiters: &'a SegQueue<Weak<Waiter>>, waiters: &'a Waiters,
dropped: bool, dropped: bool,
} }
@ -407,19 +409,12 @@ impl Drop for DecrementSizeGuard<'_> {
} }
} }
struct Waiter { struct WaiterInner {
woken: AtomicBool, woken: AtomicBool,
waker: Waker, waker: Waker,
} }
impl Waiter { impl WaiterInner {
fn new(cx: &mut Context<'_>) -> Arc<Self> {
Arc::new(Self {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
})
}
/// Wake this waiter if it has not previously been woken. /// Wake this waiter if it has not previously been woken.
/// ///
/// Return `true` if this waiter was newly woken, or `false` if it was already woken. /// Return `true` if this waiter was newly woken, or `false` if it was already woken.
@ -435,8 +430,48 @@ impl Waiter {
false false
} }
}
struct Waiter<'a> {
inner: Arc<WaiterInner>,
queue: &'a Waiters,
actually_woke: bool,
}
impl<'a> Waiter<'a> {
fn push_new(cx: &mut Context<'_>, queue: &'a Waiters) -> Self {
let inner = Arc::new(WaiterInner {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
});
queue.push(Arc::downgrade(&inner));
Self {
inner,
queue,
actually_woke: false,
}
}
fn is_woken(&self) -> bool { fn is_woken(&self) -> bool {
self.woken.load(Ordering::Acquire) self.inner.woken.load(Ordering::Acquire)
}
fn reset(&mut self) {
self.inner
.woken
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.ok();
self.actually_woke = false;
}
}
impl Drop for Waiter<'_> {
fn drop(&mut self) {
// if we didn't actually wake to get a connection, wake the next task instead
if self.is_woken() && !self.actually_woke {
wake_one(self.queue);
}
} }
} }

View File

@ -509,26 +509,31 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
eprintln!("starting pool"); eprintln!("starting pool");
let pool = PgPoolOptions::new() let pool = PgPoolOptions::new()
.connect_timeout(Duration::from_secs(30)) .connect_timeout(Duration::from_secs(5))
.min_connections(5) .min_connections(1)
.max_connections(10) .max_connections(1)
.connect(&dotenv::var("DATABASE_URL")?) .connect(&dotenv::var("DATABASE_URL")?)
.await?; .await?;
// spin up more tasks than connections available, and ensure we don't deadlock // spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 { for i in 0..200 {
let pool = pool.clone(); let pool = pool.clone();
sqlx_rt::spawn(async move { sqlx_rt::spawn(async move {
loop { loop {
if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await { if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await {
// normal error at termination of the test
if !matches!(e, sqlx::Error::PoolClosed) {
eprintln!("pool task {} dying due to {}", i, e); eprintln!("pool task {} dying due to {}", i, e);
break; break;
} }
} }
}
}); });
} }
for _ in 0..5 { // spawn a bunch of tasks that attempt to acquire but give up to ensure correct handling
// of cancellations
for _ in 0..50 {
let pool = pool.clone(); let pool = pool.clone();
sqlx_rt::spawn(async move { sqlx_rt::spawn(async move {
while !pool.is_closed() { while !pool.is_closed() {