diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 052b59f2..732c1a8c 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -69,7 +69,7 @@ impl PoolConnection { /// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from. impl Drop for PoolConnection { fn drop(&mut self) { - if let Some(mut live) = self.live.take() { + if let Some(live) = self.live.take() { let pool = self.pool.clone(); spawn(async move { let mut floating = live.float(&pool); diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 46f37d91..d1c4e82f 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -7,14 +7,17 @@ use crate::pool::{deadline_as_timeout, PoolOptions}; use crossbeam_queue::{ArrayQueue, SegQueue}; use futures_core::task::{Poll, Waker}; use futures_util::future; -use sqlx_rt::{sleep, spawn, timeout}; use std::cmp; use std::mem; use std::ptr; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Weak}; use std::task::Context; -use std::time::Instant; +use std::time::{Duration, Instant}; + +/// Waiters should wake at least every this often to check if a connection has not come available +/// since they went to sleep. +const MIN_WAKE_PERIOD: Duration = Duration::from_millis(500); pub(crate) struct SharedPool { pub(super) connect_options: ::Options, @@ -26,6 +29,26 @@ pub(crate) struct SharedPool { } impl SharedPool { + pub(super) fn new_arc( + options: PoolOptions, + connect_options: ::Options, + ) -> Arc { + let pool = Self { + connect_options, + idle_conns: ArrayQueue::new(options.max_connections as usize), + waiters: SegQueue::new(), + size: AtomicU32::new(0), + is_closed: AtomicBool::new(false), + options, + }; + + let pool = Arc::new(pool); + + spawn_reaper(&pool); + + pool + } + pub(super) fn size(&self) -> u32 { self.size.load(Ordering::Acquire) } @@ -94,12 +117,7 @@ impl SharedPool { panic!("BUG: connection queue overflow in release()"); } - while let Some(waker) = self.waiters.pop() { - if let Some(waker) = waker.upgrade() { - waker.wake(); - break; - } - } + wake_one(&self.waiters); } /// Try to atomically increment the pool size for a new connection. @@ -125,68 +143,20 @@ impl SharedPool { None } - /// Wait for a connection, if either `size` drops below `max_connections` so we can - /// open a new connection, or if an idle connection is returned to the pool. - /// - /// Returns an error if `deadline` elapses before we are woken. - async fn wait_for_conn(&self, deadline: Instant) -> Result<(), Error> { - if self.is_closed() { - return Err(Error::PoolClosed); - } - - let mut waiter = None; - - timeout( - deadline_as_timeout::(deadline)?, - // `poll_fn` gets us easy access to a `Waker` that we can push to our queue - future::poll_fn(|cx| -> Poll<()> { - let waiter = waiter.get_or_insert_with(|| { - let waiter = Waiter::new(cx); - self.waiters.push(Arc::downgrade(&waiter)); - waiter - }); - - if waiter.is_woken() { - Poll::Ready(()) - } else { - Poll::Pending - } - }), - ) - .await - .map_err(|_| Error::PoolTimedOut) - } - - pub(super) fn new_arc( - options: PoolOptions, - connect_options: ::Options, - ) -> Arc { - let pool = Self { - connect_options, - idle_conns: ArrayQueue::new(options.max_connections as usize), - waiters: SegQueue::new(), - size: AtomicU32::new(0), - is_closed: AtomicBool::new(false), - options, - }; - - let pool = Arc::new(pool); - - spawn_reaper(&pool); - - pool - } - #[allow(clippy::needless_lifetimes)] pub(super) async fn acquire<'s>(&'s self) -> Result>, Error> { let start = Instant::now(); let deadline = start + self.options.connect_timeout; let mut waited = !self.options.fair; - let mut backoff = 0.01; + + // the strong ref of the `Weak` that we push to the queue + // initialized during the `timeout()` call below + // as long as we own this, we keep our place in line + let mut waiter = None; // Unless the pool has been closed ... while !self.is_closed() { - // Don't cut in line + // Don't cut in line unless no one is waiting if waited || self.waiters.is_empty() { // Attempt to immediately acquire a connection. This will return Some // if there is an idle connection in our channel. @@ -195,28 +165,40 @@ impl SharedPool { return Ok(live); } } - } - if let Some(guard) = self.try_increment_size() { - // pool has slots available; open a new connection - match self.connection(deadline, guard).await { - Ok(Some(conn)) => return Ok(conn), - // [size] is internally decremented on _retry_ and _error_ - Ok(None) => { - // If the connection is refused wait in exponentially - // increasing steps for the server to come up, capped by - // two seconds. - sqlx_rt::sleep(std::time::Duration::from_secs_f64(backoff)).await; - backoff = f64::min(backoff * 2.0, 2.0); - continue; - } - Err(e) => return Err(e), + // check if we can open a new connection + if let Some(guard) = self.try_increment_size() { + // pool has slots available; open a new connection + return self.connection(deadline, guard).await; } } // Wait for a connection to become available (or we are allowed to open a new one) - // Returns an error if `deadline` passes - self.wait_for_conn(deadline).await?; + let timeout_duration = cmp::min( + // Returns an error if `deadline` passes + deadline_as_timeout::(deadline)?, + MIN_WAKE_PERIOD, + ); + + sqlx_rt::timeout( + timeout_duration, + // `poll_fn` gets us easy access to a `Waker` that we can push to our queue + future::poll_fn(|cx| -> Poll<()> { + let waiter = waiter.get_or_insert_with(|| { + let waiter = Waiter::new(cx); + self.waiters.push(Arc::downgrade(&waiter)); + waiter + }); + + if waiter.is_woken() { + Poll::Ready(()) + } else { + Poll::Pending + } + }), + ) + .await + .ok(); // timeout is no longer fatal here; we check if the deadline expired above waited = true; } @@ -228,39 +210,51 @@ impl SharedPool { &'s self, deadline: Instant, guard: DecrementSizeGuard<'s>, - ) -> Result>>, Error> { + ) -> Result>, Error> { if self.is_closed() { return Err(Error::PoolClosed); } - let timeout = super::deadline_as_timeout::(deadline)?; + let mut backoff = Duration::from_millis(10); + let max_backoff = deadline_as_timeout::(deadline)? / 5; - // result here is `Result, TimeoutError>` - match sqlx_rt::timeout(timeout, self.connect_options.connect()).await { - // successfully established connection - Ok(Ok(mut raw)) => { - if let Some(callback) = &self.options.after_connect { - callback(&mut raw).await?; + loop { + let timeout = deadline_as_timeout::(deadline)?; + + // result here is `Result, TimeoutError>` + // if this block does not return, sleep for the backoff timeout and try again + match sqlx_rt::timeout(timeout, self.connect_options.connect()).await { + // successfully established connection + Ok(Ok(mut raw)) => { + if let Some(callback) = &self.options.after_connect { + callback(&mut raw).await?; + } + + return Ok(Floating::new_live(raw, guard)); } - Ok(Some(Floating::new_live(raw, guard))) + // an IO error while connecting is assumed to be the system starting up + Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => (), + + // TODO: Handle other database "boot period"s + + // [postgres] the database system is starting up + // TODO: Make this check actually check if this is postgres + Ok(Err(Error::Database(error))) if error.code().as_deref() == Some("57P03") => (), + + // Any other error while connection should immediately + // terminate and bubble the error up + Ok(Err(e)) => return Err(e), + + // timed out + Err(_) => return Err(Error::PoolTimedOut), } - // an IO error while connecting is assumed to be the system starting up - Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => Ok(None), - - // TODO: Handle other database "boot period"s - - // [postgres] the database system is starting up - // TODO: Make this check actually check if this is postgres - Ok(Err(Error::Database(error))) if error.code().as_deref() == Some("57P03") => Ok(None), - - // Any other error while connection should immediately - // terminate and bubble the error up - Ok(Err(e)) => Err(e), - - // timed out - Err(_) => Err(Error::PoolTimedOut), + // If the connection is refused wait in exponentially + // increasing steps for the server to come up, + // capped by a factor of the remaining time until the deadline + sqlx_rt::sleep(backoff).await; + backoff = cmp::min(backoff * 2, max_backoff); } } } @@ -334,7 +328,7 @@ fn spawn_reaper(pool: &Arc>) { let pool = Arc::clone(&pool); - spawn(async move { + sqlx_rt::spawn(async move { while !pool.is_closed.load(Ordering::Acquire) { // reap at most the current size minus the minimum idle let max_reaped = pool.size().saturating_sub(pool.options.min_connections); @@ -360,11 +354,21 @@ fn spawn_reaper(pool: &Arc>) { let _ = conn.close().await; } - sleep(period).await; + sqlx_rt::sleep(period).await; } }); } +fn wake_one(waiters: &SegQueue>) { + while let Some(weak) = waiters.pop() { + if let Some(waiter) = weak.upgrade() { + if waiter.wake() { + return; + } + } + } +} + /// RAII guard returned by `Pool::try_increment_size()` and others. /// /// Will decrement the pool size if dropped, to avoid semantically "leaking" connections @@ -399,11 +403,7 @@ impl Drop for DecrementSizeGuard<'_> { assert!(!self.dropped, "double-dropped!"); self.dropped = true; self.size.fetch_sub(1, Ordering::SeqCst); - if let Some(waker) = self.waiters.pop() { - if let Some(waker) = waker.upgrade() { - waker.wake(); - } - } + wake_one(&self.waiters); } } @@ -420,9 +420,20 @@ impl Waiter { }) } - fn wake(&self) { - self.woken.store(true, Ordering::Release); - self.waker.wake_by_ref(); + /// Wake this waiter if it has not previously been woken. + /// + /// Return `true` if this waiter was newly woken, or `false` if it was already woken. + fn wake(&self) -> bool { + // if we were the thread to flip this boolean from false to true + if let Ok(_) = self + .woken + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + { + self.waker.wake_by_ref(); + return true; + } + + false } fn is_woken(&self) -> bool { diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 8eb226e4..a1b07f37 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -235,16 +235,14 @@ async fn init_min_connections(pool: &SharedPool) -> Result<(), // this guard will prevent us from exceeding `max_size` if let Some(guard) = pool.try_increment_size() { // [connect] will raise an error when past deadline - // [connect] returns None if its okay to retry - if let Some(conn) = pool.connection(deadline, guard).await? { - let is_ok = pool - .idle_conns - .push(conn.into_idle().into_leakable()) - .is_ok(); + let conn = pool.connection(deadline, guard).await?; + let is_ok = pool + .idle_conns + .push(conn.into_idle().into_leakable()) + .is_ok(); - if !is_ok { - panic!("BUG: connection queue overflow in init_min_connections"); - } + if !is_ok { + panic!("BUG: connection queue overflow in init_min_connections"); } } } diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index dee9062d..1612e180 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -6,7 +6,6 @@ use sqlx::postgres::{PgPoolOptions, PgRow, Postgres}; use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo}; use sqlx_test::{new, setup_if_needed}; use std::env; -use std::thread; use std::time::Duration; #[sqlx_macros::test] @@ -505,11 +504,7 @@ async fn it_can_drop_multiple_transactions() -> anyhow::Result<()> { #[ignore] #[sqlx_macros::test] async fn pool_smoke_test() -> anyhow::Result<()> { - #[cfg(any(feature = "_rt-tokio", feature = "_rt-actix"))] - use tokio::{task::spawn, time::sleep, time::timeout}; - - #[cfg(feature = "_rt-async-std")] - use async_std::{future::timeout, task::sleep, task::spawn}; + use futures::{future, task::Poll, Future}; eprintln!("starting pool"); @@ -523,7 +518,7 @@ async fn pool_smoke_test() -> anyhow::Result<()> { // spin up more tasks than connections available, and ensure we don't deadlock for i in 0..20 { let pool = pool.clone(); - spawn(async move { + sqlx_rt::spawn(async move { loop { if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await { eprintln!("pool task {} dying due to {}", i, e); @@ -535,26 +530,32 @@ async fn pool_smoke_test() -> anyhow::Result<()> { for _ in 0..5 { let pool = pool.clone(); - // we don't need async, just need this to run concurrently - // if we use `task::spawn()` we risk starving the event loop because we don't yield - thread::spawn(move || { + sqlx_rt::spawn(async move { while !pool.is_closed() { - // drop acquire() futures in a hot loop - // https://github.com/launchbadge/sqlx/issues/83 - drop(pool.acquire()); + let acquire = pool.acquire(); + futures::pin_mut!(acquire); + + // poll the acquire future once to put the waiter in the queue + future::poll_fn(move |cx| { + let _ = acquire.as_mut().poll(cx); + Poll::Ready(()) + }) + .await; + + sqlx_rt::yield_now().await; } }); } eprintln!("sleeping for 30 seconds"); - sleep(Duration::from_secs(30)).await; + sqlx_rt::sleep(Duration::from_secs(30)).await; // assert_eq!(pool.size(), 10); eprintln!("closing pool"); - timeout(Duration::from_secs(30), pool.close()).await?; + sqlx_rt::timeout(Duration::from_secs(30), pool.close()).await?; eprintln!("pool closed successfully");