diff --git a/sqlx-core/src/pool/connect.rs b/sqlx-core/src/pool/connect.rs index 24d82493a..edd39a941 100644 --- a/sqlx-core/src/pool/connect.rs +++ b/sqlx-core/src/pool/connect.rs @@ -1,15 +1,12 @@ use crate::connection::{ConnectOptions, Connection}; use crate::database::Database; use crate::pool::connection::ConnectionInner; -use crate::pool::inner::PoolInner; use crate::pool::{Pool, PoolConnection}; use crate::rt::JoinHandle; use crate::{rt, Error}; -use ease_off::EaseOff; use event_listener::{listener, Event}; use std::fmt::{Display, Formatter}; use std::future::Future; -use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -17,7 +14,6 @@ use std::time::Instant; use crate::pool::connection_set::DisconnectedSlot; #[cfg(doc)] use crate::pool::PoolOptions; -use crate::sync::AsyncMutexGuard; use ease_off::core::EaseOffCore; use std::ops::ControlFlow; use std::pin::{pin, Pin}; @@ -438,12 +434,6 @@ impl ConnectTaskShared { } } -pub struct ConnectionCounter { - count: AtomicUsize, - next_id: AtomicUsize, - connect_available: Event, -} - /// An opaque connection ID, unique for every connection attempt with the same pool. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct ConnectionId(usize); @@ -456,141 +446,6 @@ impl ConnectionId { } } -impl ConnectionCounter { - pub fn new() -> Self { - Self { - count: AtomicUsize::new(0), - next_id: AtomicUsize::new(1), - connect_available: Event::new(), - } - } - - pub fn connections(&self) -> usize { - self.count.load(Ordering::Acquire) - } - - pub async fn drain(&self) { - while self.count.load(Ordering::Acquire) > 0 { - listener!(self.connect_available => permit_released); - permit_released.await; - } - } - - /// Attempt to acquire a permit from both this instance, and the parent pool, if applicable. - /// - /// Returns the permit, and the ID of the new connection. - pub fn try_acquire_permit( - &self, - pool: &Arc>, - ) -> Option<(ConnectionId, ConnectPermit)> { - debug_assert!(ptr::addr_eq(self, &pool.counter)); - - // Don't skip the queue. - if pool.options.fair && self.connect_available.total_listeners() > 0 { - return None; - } - - let prev_size = self - .count - .fetch_update(Ordering::Release, Ordering::Acquire, |connections| { - (connections < pool.options.max_connections).then_some(connections + 1) - }) - .ok()?; - - let size = prev_size + 1; - - tracing::trace!(target: "sqlx::pool::connect", size, "increased size"); - - Some(( - ConnectionId(self.next_id.fetch_add(1, Ordering::SeqCst)), - ConnectPermit { - pool: Some(Arc::clone(pool)), - }, - )) - } - - /// Attempt to acquire a permit from both this instance, and the parent pool, if applicable. - /// - /// Returns the permit, and the current size of the pool. - pub async fn acquire_permit( - &self, - pool: &Arc>, - ) -> (ConnectionId, ConnectPermit) { - // Check that `self` can increase size first before we check the parent. - let acquired = self.acquire_permit_self(pool).await; - - if let Some(parent) = pool.parent() { - let (_, permit) = parent.0.counter.acquire_permit_self(&parent.0).await; - - // consume the parent permit - permit.consume(); - } - - acquired - } - - // Separate method because `async fn`s cannot be recursive. - /// Attempt to acquire a [`ConnectPermit`] from this instance and this instance only. - async fn acquire_permit_self( - &self, - pool: &Arc>, - ) -> (ConnectionId, ConnectPermit) { - for attempt in 1usize.. { - if let Some(acquired) = self.try_acquire_permit(pool) { - return acquired; - } - - if attempt == 2 { - tracing::warn!( - "unable to acquire a connect permit after sleeping; this may indicate a bug" - ); - } - - listener!(self.connect_available => connect_available); - connect_available.await; - } - - panic!("BUG: was never able to acquire a connection despite waking many times") - } - - pub fn release_permit(&self, pool: &PoolInner) { - debug_assert!(ptr::addr_eq(self, &pool.counter)); - - self.count.fetch_sub(1, Ordering::Release); - self.connect_available.notify(1usize); - - if let Some(parent) = &pool.options.parent_pool { - parent.0.counter.release_permit(&parent.0); - } - } -} - -pub struct ConnectPermit { - pool: Option>>, -} - -impl ConnectPermit { - pub fn float_existing(pool: Arc>) -> Self { - Self { pool: Some(pool) } - } - - pub fn pool(&self) -> &Arc> { - self.pool.as_ref().unwrap() - } - - pub fn consume(mut self) { - self.pool = None; - } -} - -impl Drop for ConnectPermit { - fn drop(&mut self) { - if let Some(pool) = self.pool.take() { - pool.counter.release_permit(&pool); - } - } -} - impl Display for ConnectionId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(&self.0, f) diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 1103374cc..2696819cf 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -10,10 +10,10 @@ use crate::database::Database; use crate::error::Error; use super::inner::PoolInner; -use crate::pool::connect::{ConnectPermit, ConnectTaskShared, ConnectionId}; +use crate::pool::connect::ConnectionId; use crate::pool::connection_set::{ConnectedSlot, DisconnectedSlot}; use crate::pool::options::PoolConnectionMetadata; -use crate::pool::{Pool, PoolOptions}; +use crate::pool::PoolOptions; use crate::rt; const RETURN_TO_POOL_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 1d76f25a4..b92764ce5 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; use crate::connection::Connection; use crate::ext::future::race; use crate::logger::private_level_filter_to_trace_level; -use crate::pool::connect::{ConnectTaskShared, ConnectionCounter, ConnectionId, DynConnector}; +use crate::pool::connect::{ConnectTaskShared, ConnectionId, DynConnector}; use crate::pool::connection_set::{ConnectedSlot, ConnectionSet, DisconnectedSlot}; use crate::{private_tracing_dynamic_event, rt}; use event_listener::listener; @@ -22,12 +22,10 @@ use futures_util::future::{self}; use std::time::{Duration, Instant}; use tracing::{Instrument, Level}; -const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5); const TEST_BEFORE_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(60); pub(crate) struct PoolInner { pub(super) connector: DynConnector, - pub(super) counter: ConnectionCounter, pub(super) connections: ConnectionSet>, is_closed: AtomicBool, pub(super) on_closed: event_listener::Event, @@ -43,7 +41,6 @@ impl PoolInner { ) -> Arc { let pool = Arc::new(Self { connector: DynConnector::new(connector), - counter: ConnectionCounter::new(), connections: ConnectionSet::new(options.min_connections..=options.max_connections), is_closed: AtomicBool::new(false), on_closed: event_listener::Event::new(), @@ -90,10 +87,6 @@ impl PoolInner { } } - pub(super) fn parent(&self) -> Option<&Pool> { - self.options.parent_pool.as_ref() - } - #[inline] pub(super) fn try_acquire(self: &Arc) -> Option>> { if self.is_closed() { diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 9c838f754..2bbd28967 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -77,8 +77,6 @@ pub struct PoolOptions { pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, pub(crate) fair: bool, - - pub(crate) parent_pool: Option>, } // Manually implement `Clone` to avoid a trait bound issue. @@ -100,7 +98,6 @@ impl Clone for PoolOptions { max_lifetime: self.max_lifetime, idle_timeout: self.idle_timeout, fair: self.fair, - parent_pool: self.parent_pool.clone(), } } } @@ -155,7 +152,6 @@ impl PoolOptions { idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), fair: true, - parent_pool: None, } } @@ -460,19 +456,6 @@ impl PoolOptions { self } - /// Set the parent `Pool` from which the new pool will inherit its semaphore. - /// - /// This is currently an internal-only API. - /// - /// ### Panics - /// If `self.max_connections` is greater than the setting the given pool was created with, - /// or `self.fair` differs from the setting the given pool was created with. - #[doc(hidden)] - pub fn parent(mut self, pool: Pool) -> Self { - self.parent_pool = Some(pool); - self - } - /// Create a new pool from this `PoolOptions` and immediately open at least one connection. /// /// This ensures the configuration is correct. diff --git a/sqlx-core/src/testing/mod.rs b/sqlx-core/src/testing/mod.rs index 17022b465..e3f67729a 100644 --- a/sqlx-core/src/testing/mod.rs +++ b/sqlx-core/src/testing/mod.rs @@ -66,6 +66,7 @@ pub struct TestArgs { pub test_path: &'static str, pub migrator: Option<&'static Migrator>, pub fixtures: &'static [TestFixture], + pub max_connections: usize, } pub trait TestFn { @@ -158,6 +159,10 @@ impl TestArgs { test_path, migrator: None, fixtures: &[], + // Don't allow a single test to take all the connections. + // Most tests shouldn't require more than 4 connections concurrently, + // or else they're likely doing too much in one test. + max_connections: 4, } } @@ -168,6 +173,10 @@ impl TestArgs { pub fn fixtures(&mut self, fixtures: &'static [TestFixture]) { self.fixtures = fixtures; } + + pub fn max_connections(&mut self, n: usize) { + self.max_connections = n; + } } impl TestTermination for () { diff --git a/sqlx-mysql/src/testing/mod.rs b/sqlx-mysql/src/testing/mod.rs index f532dcc5a..910c295a3 100644 --- a/sqlx-mysql/src/testing/mod.rs +++ b/sqlx-mysql/src/testing/mod.rs @@ -152,8 +152,7 @@ async fn test_context(args: &TestArgs) -> Result, Error> { // or else they're likely doing too much in one test. .max_connections(5) // Close connections ASAP if left in the idle queue. - .idle_timeout(Some(Duration::from_secs(1))) - .parent(master_pool.clone()), + .idle_timeout(Some(Duration::from_secs(1))), connect_opts: master_opts.database(&db_name), db_name, }) diff --git a/sqlx-postgres/src/testing/mod.rs b/sqlx-postgres/src/testing/mod.rs index 70b00b635..1b8502aae 100644 --- a/sqlx-postgres/src/testing/mod.rs +++ b/sqlx-postgres/src/testing/mod.rs @@ -152,13 +152,9 @@ async fn test_context(args: &TestArgs) -> Result, Error> { Ok(TestContext { pool_opts: PoolOptions::new() - // Don't allow a single test to take all the connections. - // Most tests shouldn't require more than 5 connections concurrently, - // or else they're likely doing too much in one test. - .max_connections(5) + .max_connections(args.max_connections) // Close connections ASAP if left in the idle queue. - .idle_timeout(Some(Duration::from_secs(1))) - .parent(master_pool.clone()), + .idle_timeout(Some(Duration::from_secs(1))), connect_opts: master_opts.database(&db_name), db_name, })