fix(pool): ignore spurious wakeups when waiting for a connection

fixes #622
This commit is contained in:
Austin Bonander 2020-08-12 23:55:32 -07:00 committed by Ryan Leckey
parent 7a70717944
commit fa7981f68a
3 changed files with 145 additions and 10 deletions

View File

@ -13,12 +13,13 @@ use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::task::Context;
use std::time::Instant;
pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Waker>,
waiters: SegQueue<Arc<Waiter>>,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
pub(super) options: PoolOptions<DB>,
@ -122,19 +123,22 @@ impl<DB: Database> SharedPool<DB> {
return Err(Error::PoolClosed);
}
let mut waker_pushed = false;
let mut waiter = None;
timeout(
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|ctx| -> Poll<()> {
if !waker_pushed {
// only push the waker once
self.waiters.push(ctx.waker().to_owned());
waker_pushed = true;
Poll::Pending
} else {
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(waiter.clone());
waiter
});
if waiter.is_woken() {
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
@ -346,7 +350,7 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32,
waiters: &'a SegQueue<Waker>,
waiters: &'a SegQueue<Arc<Waiter>>,
dropped: bool,
}
@ -379,3 +383,26 @@ impl Drop for DecrementSizeGuard<'_> {
}
}
}
struct Waiter {
woken: AtomicBool,
waker: Waker,
}
impl Waiter {
fn new(cx: &mut Context<'_>) -> Arc<Self> {
Arc::new(Self {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
})
}
fn wake(&self) {
self.woken.store(true, Ordering::Release);
self.waker.wake_by_ref();
}
fn is_woken(&self) -> bool {
self.woken.load(Ordering::Acquire)
}
}

View File

@ -333,3 +333,57 @@ async fn it_can_prepare_then_execute() -> anyhow::Result<()> {
Ok(())
}
// repro is more reliable with the basic scheduler used by `#[tokio::test]`
#[cfg(feature = "runtime-tokio")]
#[tokio::test]
async fn test_issue_622() -> anyhow::Result<()> {
use std::time::Instant;
setup_if_needed();
let pool = MySqlPoolOptions::new()
.max_connections(1) // also fails with higher counts, e.g. 5
.connect(&std::env::var("DATABASE_URL").unwrap())
.await?;
println!("pool state: {:?}", pool);
let mut handles = vec![];
// given repro spawned 100 tasks but I found it reliably reproduced with 3
for i in 0..3 {
let pool = pool.clone();
handles.push(sqlx_rt::spawn(async move {
{
let mut conn = pool.acquire().await.unwrap();
let _ = sqlx::query("SELECT 1").fetch_one(&mut conn).await.unwrap();
// conn gets dropped here and should be returned to the pool
}
// (do some other work here without holding on to a connection)
// this actually fixes the issue, depending on the timeout used
// sqlx_rt::sleep(Duration::from_millis(500)).await;
{
let start = Instant::now();
match pool.acquire().await {
Ok(conn) => {
println!("{} acquire took {:?}", i, start.elapsed());
drop(conn);
}
Err(e) => panic!("{} acquire returned error: {} pool state: {:?}", i, e, pool),
}
}
Result::<(), anyhow::Error>::Ok(())
}));
}
futures::future::try_join_all(handles).await?;
Ok(())
}

View File

@ -709,3 +709,57 @@ async fn it_can_prepare_then_execute() -> anyhow::Result<()> {
Ok(())
}
// repro is more reliable with the basic scheduler used by `#[tokio::test]`
#[cfg(feature = "runtime-tokio")]
#[tokio::test]
async fn test_issue_622() -> anyhow::Result<()> {
use std::time::Instant;
setup_if_needed();
let pool = PgPoolOptions::new()
.max_connections(1) // also fails with higher counts, e.g. 5
.connect(&std::env::var("DATABASE_URL").unwrap())
.await?;
println!("pool state: {:?}", pool);
let mut handles = vec![];
// given repro spawned 100 tasks but I found it reliably reproduced with 3
for i in 0..3 {
let pool = pool.clone();
handles.push(sqlx_rt::spawn(async move {
{
let mut conn = pool.acquire().await.unwrap();
let _ = sqlx::query("SELECT 1").fetch_one(&mut conn).await.unwrap();
// conn gets dropped here and should be returned to the pool
}
// (do some other work here without holding on to a connection)
// this actually fixes the issue, depending on the timeout used
// sqlx_rt::sleep(Duration::from_millis(500)).await;
{
let start = Instant::now();
match pool.acquire().await {
Ok(conn) => {
println!("{} acquire took {:?}", i, start.elapsed());
drop(conn);
}
Err(e) => panic!("{} acquire returned error: {} pool state: {:?}", i, e, pool),
}
}
Result::<(), anyhow::Error>::Ok(())
}));
}
futures::future::try_join_all(handles).await?;
Ok(())
}