diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index fff4ef3d..8512508c 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -22,11 +22,10 @@ json = ["serde", "serde_json"] # for conditional compilation _rt-async-global-executor = ["async-global-executor", "_rt-async-io", "_rt-async-task"] _rt-async-io = ["async-io", "async-fs"] # see note at async-fs declaration -_rt-async-std = ["async-std", "_rt-async-io"] +_rt-async-std = ["async-std", "_rt-async-io", "ease-off/async-io-2"] _rt-async-task = ["async-task"] _rt-smol = ["smol", "_rt-async-io", "_rt-async-task"] -_rt-tokio = ["tokio", "tokio-stream"] - +_rt-tokio = ["tokio", "tokio-stream", "ease-off/tokio"] _tls-native-tls = ["native-tls"] _tls-rustls-aws-lc-rs = ["_tls-rustls", "rustls/aws-lc-rs", "webpki-roots"] _tls-rustls-ring-webpki = ["_tls-rustls", "rustls/ring", "webpki-roots"] @@ -83,7 +82,6 @@ crossbeam-queue = "0.3.2" either = "1.6.1" futures-core = { version = "0.3.19", default-features = false } futures-io = "0.3.24" -futures-intrusive = "0.5.0" futures-util = { version = "0.3.19", default-features = false, features = ["alloc", "sink", "io"] } log = { version = "0.4.18", default-features = false } memchr = { version = "2.4.1", default-features = false } @@ -105,6 +103,9 @@ hashbrown = "0.16.0" thiserror.workspace = true +ease-off = { workspace = true, features = ["futures"] } +pin-project-lite = "0.2.14" + [dev-dependencies] tokio = { version = "1", features = ["rt"] } diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index 8c6f424c..ff416ed2 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -11,6 +11,9 @@ use crate::database::Database; use crate::type_info::TypeInfo; use crate::types::Type; +#[cfg(doc)] +use crate::pool::{PoolConnector, PoolOptions}; + /// A specialized `Result` type for SQLx. pub type Result = ::std::result::Result; @@ -110,6 +113,19 @@ pub enum Error { #[error("attempted to acquire a connection on a closed pool")] PoolClosed, + /// A custom error that may be returned from a [`PoolConnector`] implementation. + #[error("error returned from pool connector")] + PoolConnector { + #[source] + source: BoxDynError, + + /// If `true`, `PoolConnector::connect()` is called again in an exponential backoff loop + /// up to [`PoolOptions::connect_timeout`]. + /// + /// See [`PoolConnector::connect()`] for details. + retryable: bool, + }, + /// A background worker has crashed. #[error("attempted to communicate with a crashed background worker")] WorkerCrashed, @@ -228,11 +244,6 @@ pub trait DatabaseError: 'static + Send + Sync + StdError { #[doc(hidden)] fn into_error(self: Box) -> Box; - #[doc(hidden)] - fn is_transient_in_connect_phase(&self) -> bool { - false - } - /// Returns the name of the constraint that triggered the error, if applicable. /// If the error was caused by a conflict of a unique index, this will be the index name. /// @@ -270,6 +281,24 @@ pub trait DatabaseError: 'static + Send + Sync + StdError { fn is_check_violation(&self) -> bool { matches!(self.kind(), ErrorKind::CheckViolation) } + + /// Returns `true` if this error can be retried when connecting to the database. + /// + /// Defaults to `false`. + /// + /// For example, the Postgres driver overrides this to return `true` for the following error codes: + /// + /// * `53300 too_many_connections`: returned when the maximum connections are exceeded + /// on the server. Assumed to be the result of a temporary overcommit + /// (e.g. an extra application replica being spun up to replace one that is going down). + /// * This error being consistently logged or returned is a likely indicator of a misconfiguration; + /// the sum of [`PoolOptions::max_connections`] for all replicas should not exceed + /// the maximum connections allowed by the server. + /// * `57P03 cannot_connect_now`: returned when the database server is still starting up + /// and the tcop component is not ready to accept connections yet. + fn is_retryable_connect_error(&self) -> bool { + false + } } impl dyn DatabaseError { diff --git a/sqlx-core/src/pool/connect.rs b/sqlx-core/src/pool/connect.rs new file mode 100644 index 00000000..29e59caf --- /dev/null +++ b/sqlx-core/src/pool/connect.rs @@ -0,0 +1,461 @@ +use crate::connection::{ConnectOptions, Connection}; +use crate::database::Database; +use crate::pool::connection::{Floating, Live}; +use crate::pool::inner::PoolInner; +use crate::pool::PoolConnection; +use crate::rt::JoinHandle; +use crate::Error; +use ease_off::EaseOff; +use event_listener::{Event, EventListener}; +use std::future::Future; +use std::pin::Pin; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tracing::Instrument; + +use std::io; + +/// Custom connect callback for [`Pool`][crate::pool::Pool]. +/// +/// Implemented for closures with the signature +/// `Fn(PoolConnectMetadata) -> impl Future>`. +/// +/// See [`Self::connect()`] for details and implementation advice. +/// +/// # Example: `after_connect` Replacement +/// The `after_connect` callback was removed in 0.9.0 as it was redundant to this API. +/// +/// This example uses Postgres but may be adapted to any driver. +/// +/// ```rust,no_run +/// use std::sync::Arc; +/// use sqlx::PgConnection; +/// use sqlx::postgres::PgPoolOptions; +/// use sqlx::Connection; +/// +/// # async fn _example() -> sqlx::Result<()> { +/// // `PoolConnector` is implemented for closures but has restrictions on returning borrows +/// // due to current language limitations. +/// // +/// // This example shows how to get around this using `Arc`. +/// let database_url: Arc = "postgres://...".into(); +/// +/// let pool = PgPoolOptions::new() +/// .min_connections(5) +/// .max_connections(30) +/// .connect_with_connector(move |meta| { +/// let database_url = database_url.clone(); +/// async move { +/// println!( +/// "opening connection {}, attempt {}; elapsed time: {}", +/// meta.pool_size, +/// meta.num_attempts + 1, +/// meta.start.elapsed() +/// ); +/// +/// let mut conn = PgConnection::connect(&database_url).await?; +/// +/// // Override the time zone of the connection. +/// sqlx::raw_sql("SET TIME ZONE 'Europe/Berlin'").await?; +/// +/// Ok(conn) +/// } +/// }) +/// .await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// # Example: `set_connect_options` Replacement +/// `set_connect_options` and `get_connect_options` were removed in 0.9.0 because they complicated +/// the pool internals. They can be reimplemented by capturing a mutex, or similar, in the callback. +/// +/// This example uses Postgres and [`tokio::sync::Mutex`] but may be adapted to any driver +/// or `async-std`, respectively. +/// +/// ```rust,no_run +/// use std::sync::Arc; +/// use tokio::sync::{Mutex, RwLock}; +/// use sqlx::PgConnection; +/// use sqlx::postgres::PgConnectOptions; +/// use sqlx::postgres::PgPoolOptions; +/// use sqlx::ConnectOptions; +/// +/// # async fn _example() -> sqlx::Result<()> { +/// // If you do not wish to hold the lock during the connection attempt, +/// // you could use `Arc` instead. +/// let connect_opts: Arc> = Arc::new(RwLock::new("postgres://...".parse()?)); +/// // We need a copy that will be captured by the closure. +/// let connect_opts_ = connect_opts.clone(); +/// +/// let pool = PgPoolOptions::new() +/// .connect_with_connector(move |meta| { +/// let connect_opts_ = connect_opts.clone(); +/// async move { +/// println!( +/// "opening connection {}, attempt {}; elapsed time: {}", +/// meta.pool_size, +/// meta.num_attempts + 1, +/// meta.start.elapsed() +/// ); +/// +/// connect_opts.read().await.connect().await +/// } +/// }) +/// .await?; +/// +/// // Close the connection that was previously opened by `connect_with_connector()`. +/// pool.acquire().await?.close().await?; +/// +/// // Simulating a credential rotation +/// let mut write_connect_opts = connect_opts.write().await; +/// write_connect_opts +/// .set_username("new_username") +/// .set_password("new password"); +/// +/// // Should use the new credentials. +/// let mut conn = pool.acquire().await?; +/// +/// # Ok(()) +/// # } +/// ``` +/// +/// # Example: Custom Implementation +/// +/// Custom implementations of `PoolConnector` trade a little bit of boilerplate for much +/// more flexibility. Thanks to the signature of `connect()`, they can return a `Future` +/// type that borrows from `self`. +/// +/// This example uses Postgres but may be adapted to any driver. +/// +/// ```rust,no_run +/// use sqlx::{PgConnection, Postgres}; +/// use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; +/// use sqlx_core::connection::ConnectOptions; +/// use sqlx_core::pool::{PoolConnectMetadata, PoolConnector}; +/// +/// struct MyConnector { +/// // A list of servers to connect to in a high-availability configuration. +/// host_ports: Vec<(String, u16)>, +/// username: String, +/// password: String, +/// } +/// +/// impl PoolConnector for MyConnector { +/// // The desugaring of `async fn` is compatible with the signature of `connect()`. +/// async fn connect(&self, meta: PoolConnectMetadata) -> sqlx::Result { +/// self.get_connect_options(meta.num_attempts) +/// .connect() +/// .await +/// } +/// } +/// +/// impl MyConnector { +/// fn get_connect_options(&self, attempt: usize) -> PgConnectOptions { +/// // Select servers in a round-robin. +/// let (ref host, port) = self.host_ports[attempt % self.host_ports.len()]; +/// +/// PgConnectOptions::new() +/// .host(host) +/// .port(port) +/// .username(&self.username) +/// .password(&self.password) +/// } +/// } +/// +/// # async fn _example() -> sqlx::Result<()> { +/// let pool = PgPoolOptions::new() +/// .max_connections(25) +/// .connect_with_connector(MyConnector { +/// host_ports: vec![ +/// ("db1.postgres.cluster.local".into(), 5432), +/// ("db2.postgres.cluster.local".into(), 5432), +/// ("db3.postgres.cluster.local".into(), 5432), +/// ("db4.postgres.cluster.local".into(), 5432), +/// ], +/// username: "my_username".into(), +/// password: "my password".into(), +/// }) +/// .await?; +/// +/// let conn = pool.acquire().await?; +/// +/// # Ok(()) +/// # } +/// ``` +pub trait PoolConnector: Send + Sync + 'static { + /// Open a connection for the pool. + /// + /// Any setup that must be done on the connection should be performed before it is returned. + /// + /// If this method returns an error that is known to be retryable, it is called again + /// in an exponential backoff loop. Retryable errors include, but are not limited to: + /// + /// * [`io::ErrorKind::ConnectionRefused`] + /// * Database errors for which + /// [`is_retryable_connect_error`][crate::error::DatabaseError::is_retryable_connect_error] + /// returns `true`. + /// * [`Error::PoolConnector`] with `retryable: true`. + /// This error kind is not returned internally and is designed to allow this method to return + /// arbitrary error types not otherwise supported. + /// + /// Manual implementations of this method may also use the signature: + /// ```rust,ignore + /// async fn connect( + /// &self, + /// meta: PoolConnectMetadata + /// ) -> sqlx::Result<{PgConnection, MySqlConnection, SqliteConnection, etc.}> + /// ``` + /// + /// Note: the returned future must be `Send`. + fn connect( + &self, + meta: PoolConnectMetadata, + ) -> impl Future> + Send + '_; +} + +impl PoolConnector for F +where + DB: Database, + F: Fn(PoolConnectMetadata) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, +{ + fn connect( + &self, + meta: PoolConnectMetadata, + ) -> impl Future> + Send + '_ { + self(meta) + } +} + +pub(crate) struct DefaultConnector( + pub <::Connection as Connection>::Options, +); + +impl PoolConnector for DefaultConnector { + fn connect( + &self, + _meta: PoolConnectMetadata, + ) -> impl Future> + Send + '_ { + self.0.connect() + } +} + +/// Metadata passed to [`PoolConnector::connect()`] for every connection attempt. +#[derive(Debug)] +pub struct PoolConnectMetadata { + /// The instant at which the current connection task was started, including all attempts. + /// + /// May be used for reporting purposes, or to implement a custom backoff. + pub start: Instant, + /// The number of attempts that have occurred so far. + pub num_attempts: usize, + pub pool_size: usize, +} + +pub struct DynConnector { + // We want to spawn the connection attempt as a task anyway + connect: Box< + dyn Fn(ConnectPermit, usize) -> JoinHandle>> + + Send + + Sync + + 'static, + >, +} + +impl DynConnector { + pub fn new(connector: impl PoolConnector) -> Self { + let connector = Arc::new(connector); + + Self { + connect: Box::new(move |permit, size| { + crate::rt::spawn(connect_with_backoff(permit, connector.clone(), size)) + }), + } + } + + pub fn connect( + &self, + permit: ConnectPermit, + size: usize, + ) -> JoinHandle>> { + (self.connect)(permit, size) + } +} + +pub struct ConnectionCounter { + connections: AtomicUsize, + connect_available: Event, +} + +impl ConnectionCounter { + pub fn new() -> Self { + Self { + connections: AtomicUsize::new(0), + connect_available: Event::new(), + } + } + + pub fn connections(&self) -> usize { + self.connections.load(Ordering::Acquire) + } + + pub async fn drain(&self) { + while self.connections.load(Ordering::Acquire) > 0 { + self.connect_available.listen().await; + } + } + + /// Attempt to acquire a permit from both this instance, and the parent pool, if applicable. + /// + /// Returns the permit, and the current size of the pool. + pub async fn acquire_permit( + &self, + pool: &Arc>, + ) -> (usize, ConnectPermit) { + // Check that `self` can increase size first before we check the parent. + let (size, permit) = self.acquire_permit_self(pool).await; + + if let Some(parent) = &pool.options.parent_pool { + let (_, permit) = parent.0.counter.acquire_permit_self(&parent.0).await; + + // consume the parent permit + permit.consume(); + } + + (size, permit) + } + + // Separate method because `async fn`s cannot be recursive. + /// Attempt to acquire a [`ConnectPermit`] from this instance and this instance only. + async fn acquire_permit_self( + &self, + pool: &Arc>, + ) -> (usize, ConnectPermit) { + debug_assert!(ptr::addr_eq(self, &pool.counter)); + + let mut should_wait = pool.options.fair && self.connect_available.total_listeners() > 0; + + for attempt in 1usize.. { + if should_wait { + self.connect_available.listen().await; + } + + let res = self.connections.fetch_update( + Ordering::Release, + Ordering::Acquire, + |connections| { + (connections < pool.options.max_connections).then_some(connections + 1) + }, + ); + + if let Ok(prev_size) = res { + let size = prev_size + 1; + + tracing::trace!(target: "sqlx::pool::connect", size, "increased size"); + + return ( + prev_size + 1, + ConnectPermit { + pool: Some(Arc::clone(pool)), + }, + ); + } + + should_wait = true; + + if attempt == 2 { + tracing::warn!( + "unable to acquire a connect permit after sleeping; this may indicate a bug" + ); + } + } + + panic!("BUG: was never able to acquire a connection despite waking many times") + } + + pub fn release_permit(&self, pool: &PoolInner) { + debug_assert!(ptr::addr_eq(self, &pool.counter)); + + self.connections.fetch_sub(1, Ordering::Release); + self.connect_available.notify(1usize); + + if let Some(parent) = &pool.options.parent_pool { + parent.0.counter.release_permit(&parent.0); + } + } +} + +pub struct ConnectPermit { + pool: Option>>, +} + +impl ConnectPermit { + pub fn float_existing(pool: Arc>) -> Self { + Self { pool: Some(pool) } + } + + pub fn pool(&self) -> &Arc> { + self.pool.as_ref().unwrap() + } + + pub fn consume(mut self) { + self.pool = None; + } +} + +impl Drop for ConnectPermit { + fn drop(&mut self) { + if let Some(pool) = self.pool.take() { + pool.counter.release_permit(&pool); + } + } +} + +#[tracing::instrument( + target = "sqlx::pool::connect", + skip_all, + fields(connection = size), + err +)] +async fn connect_with_backoff( + permit: ConnectPermit, + connector: Arc>, + size: usize, +) -> crate::Result> { + if permit.pool().is_closed() { + return Err(Error::PoolClosed); + } + + let mut ease_off = EaseOff::start_timeout(permit.pool().options.connect_timeout); + + for attempt in 1usize.. { + let meta = PoolConnectMetadata { + start: ease_off.started_at(), + num_attempts: attempt, + pool_size: size, + }; + + let conn = ease_off + .try_async(connector.connect(meta)) + .await + .or_retry_if(|e| can_retry_error(e.inner()))?; + + if let Some(conn) = conn { + return Ok(Floating::new_live(conn, permit).reattach()); + } + } + + Err(Error::PoolTimedOut) +} + +fn can_retry_error(e: &Error) -> bool { + match e { + Error::Io(e) if e.kind() == io::ErrorKind::ConnectionRefused => true, + Error::Database(e) => e.is_retryable_connect_error(), + _ => false, + } +} diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 7912b12a..48d124e3 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -10,7 +10,8 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; -use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner}; +use super::inner::{is_beyond_max_lifetime, PoolInner}; +use crate::pool::connect::ConnectPermit; use crate::pool::options::PoolConnectionMetadata; const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5); @@ -37,7 +38,7 @@ pub(super) struct Idle { /// RAII wrapper for connections being handled by functions that may drop them pub(super) struct Floating { pub(super) inner: C, - pub(super) guard: DecrementSizeGuard, + pub(super) permit: ConnectPermit, } const EXPECT_MSG: &str = "BUG: inner connection already taken!"; @@ -127,6 +128,10 @@ impl PoolConnection { self.live.take().expect(EXPECT_MSG) } + pub(super) fn into_floating(mut self) -> Floating> { + self.take_live().float(self.pool.clone()) + } + /// Test the connection to make sure it is still live before returning it to the pool. /// /// This effectively runs the drop handler eagerly instead of spawning a task to do it. @@ -215,7 +220,7 @@ impl Live { Floating { inner: self, // create a new guard from a previously leaked permit - guard: DecrementSizeGuard::new_permit(pool), + permit: ConnectPermit::float_existing(pool), } } @@ -242,22 +247,22 @@ impl DerefMut for Idle { } impl Floating> { - pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard) -> Self { + pub fn new_live(conn: DB::Connection, permit: ConnectPermit) -> Self { Self { inner: Live { raw: conn, created_at: Instant::now(), }, - guard, + permit, } } pub fn reattach(self) -> PoolConnection { - let Floating { inner, guard } = self; + let Floating { inner, permit } = self; - let pool = Arc::clone(&guard.pool); + let pool = Arc::clone(permit.pool()); - guard.cancel(); + permit.consume(); PoolConnection { live: Some(inner), close_on_drop: false, @@ -266,7 +271,7 @@ impl Floating> { } pub fn release(self) { - self.guard.pool.clone().release(self); + self.permit.pool().clone().release(self); } /// Return the connection to the pool. @@ -274,19 +279,19 @@ impl Floating> { /// Returns `true` if the connection was successfully returned, `false` if it was closed. async fn return_to_pool(mut self) -> bool { // Immediately close the connection. - if self.guard.pool.is_closed() { + if self.permit.pool().is_closed() { self.close().await; return false; } // If the connection is beyond max lifetime, close the connection and // immediately create a new connection - if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) { + if is_beyond_max_lifetime(&self.inner, &self.permit.pool().options) { self.close().await; return false; } - if let Some(test) = &self.guard.pool.options.after_release { + if let Some(test) = &self.permit.pool().options.after_release { let meta = self.metadata(); match (test)(&mut self.inner.raw, meta).await { Ok(true) => (), @@ -345,7 +350,7 @@ impl Floating> { pub fn into_idle(self) -> Floating> { Floating { inner: self.inner.into_idle(), - guard: self.guard, + permit: self.permit, } } @@ -358,14 +363,10 @@ impl Floating> { } impl Floating> { - pub fn from_idle( - idle: Idle, - pool: Arc>, - permit: AsyncSemaphoreReleaser<'_>, - ) -> Self { + pub fn from_idle(idle: Idle, pool: Arc>) -> Self { Self { inner: idle, - guard: DecrementSizeGuard::from_permit(pool, permit), + permit: ConnectPermit::float_existing(pool), } } @@ -376,21 +377,21 @@ impl Floating> { pub fn into_live(self) -> Floating> { Floating { inner: self.inner.live, - guard: self.guard, + permit: self.permit, } } - pub async fn close(self) -> DecrementSizeGuard { + pub async fn close(self) -> ConnectPermit { if let Err(error) = self.inner.live.raw.close().await { tracing::debug!(%error, "error occurred while closing the pool connection"); } - self.guard + self.permit } - pub async fn close_hard(self) -> DecrementSizeGuard { + pub async fn close_hard(self) -> ConnectPermit { let _ = self.inner.live.raw.close_hard().await; - self.guard + self.permit } pub fn metadata(&self) -> PoolConnectionMetadata { diff --git a/sqlx-core/src/pool/idle.rs b/sqlx-core/src/pool/idle.rs new file mode 100644 index 00000000..239313f7 --- /dev/null +++ b/sqlx-core/src/pool/idle.rs @@ -0,0 +1,97 @@ +use crate::connection::Connection; +use crate::database::Database; +use crate::pool::connection::{Floating, Idle, Live}; +use crate::pool::inner::PoolInner; +use crossbeam_queue::ArrayQueue; +use event_listener::Event; +use futures_util::FutureExt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +pub struct IdleQueue { + queue: ArrayQueue>, + // Keep a separate count because `ArrayQueue::len()` loops until the head and tail pointers + // stop changing, which may never happen at high contention. + len: AtomicUsize, + release_event: Event, + fair: bool, +} + +impl IdleQueue { + pub fn new(fair: bool, cap: usize) -> Self { + Self { + queue: ArrayQueue::new(cap), + len: AtomicUsize::new(0), + release_event: Event::new(), + fair, + } + } + + pub fn len(&self) -> usize { + self.len.load(Ordering::Acquire) + } + + pub async fn acquire(&self, pool: &Arc>) -> Floating> { + let mut should_wait = self.fair && self.release_event.total_listeners() > 0; + + for attempt in 1usize.. { + if should_wait { + self.release_event.listen().await; + } + + if let Some(conn) = self.try_acquire(pool) { + return conn; + } + + should_wait = true; + + if attempt == 2 { + tracing::warn!( + "unable to acquire a connection after sleeping; this may indicate a bug" + ); + } + } + + panic!("BUG: was never able to acquire a connection despite waking many times") + } + + pub fn try_acquire(&self, pool: &Arc>) -> Option>> { + self.len + .fetch_update(Ordering::Release, Ordering::Acquire, |len| { + len.checked_sub(1) + }) + .ok() + .and_then(|_| { + let conn = self.queue.pop()?; + + Some(Floating::from_idle(conn, Arc::clone(pool))) + }) + } + + pub fn release(&self, conn: Floating>) { + let Floating { + inner: conn, + permit, + } = conn.into_idle(); + + self.queue + .push(conn) + .unwrap_or_else(|_| panic!("BUG: idle queue capacity exceeded")); + + self.len.fetch_add(1, Ordering::Release); + + self.release_event.notify(1usize); + + // Don't decrease the size. + permit.consume(); + } + + pub fn drain(&self, pool: &PoolInner) { + while let Some(conn) = self.queue.pop() { + // Hopefully will send at least a TCP FIN packet. + conn.live.raw.close_hard().now_or_never(); + + pool.counter.release_permit(pool); + } + } +} diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 7c6a84ad..c366bb07 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -1,33 +1,29 @@ use super::connection::{Floating, Idle, Live}; -use crate::connection::ConnectOptions; -use crate::connection::Connection; use crate::database::Database; use crate::error::Error; -use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions}; -use crossbeam_queue::ArrayQueue; - -use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; +use crate::pool::{CloseEvent, Pool, PoolConnection, PoolConnector, PoolOptions}; use std::cmp; use std::future::{self, Future}; use std::pin::pin; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; -use std::task::Poll; +use std::pin::pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::ready; use crate::logger::private_level_filter_to_trace_level; -use crate::pool::options::PoolConnectionMetadata; +use crate::pool::connect::{ConnectPermit, ConnectionCounter, DynConnector}; +use crate::pool::idle::IdleQueue; use crate::private_tracing_dynamic_event; +use futures_util::future::{self, OptionFuture}; use futures_util::FutureExt; use std::time::{Duration, Instant}; use tracing::Level; pub(crate) struct PoolInner { - pub(super) connect_options: RwLock::Options>>, - pub(super) idle_conns: ArrayQueue>, - pub(super) semaphore: AsyncSemaphore, - pub(super) size: AtomicUsize, - pub(super) num_idle: AtomicUsize, + pub(super) connector: DynConnector, + pub(super) counter: ConnectionCounter, + pub(super) idle: IdleQueue, is_closed: AtomicBool, pub(super) on_closed: event_listener::Event, pub(super) options: PoolOptions, @@ -38,25 +34,12 @@ pub(crate) struct PoolInner { impl PoolInner { pub(super) fn new_arc( options: PoolOptions, - connect_options: ::Options, + connector: impl PoolConnector, ) -> Arc { - let capacity = options.max_connections as usize; - - let semaphore_capacity = if let Some(parent) = &options.parent_pool { - assert!(options.max_connections <= parent.options().max_connections); - assert_eq!(options.fair, parent.options().fair); - // The child pool must steal permits from the parent - 0 - } else { - capacity - }; - let pool = Self { - connect_options: RwLock::new(Arc::new(connect_options)), - idle_conns: ArrayQueue::new(capacity), - semaphore: AsyncSemaphore::new(options.fair, semaphore_capacity), - size: AtomicUsize::new(0), - num_idle: AtomicUsize::new(0), + connector: DynConnector::new(connector), + counter: ConnectionCounter::new(), + idle: IdleQueue::new(options.fair, options.max_connections), is_closed: AtomicBool::new(false), on_closed: event_listener::Event::new(), acquire_time_level: private_level_filter_to_trace_level(options.acquire_time_level), @@ -72,16 +55,11 @@ impl PoolInner { } pub(super) fn size(&self) -> usize { - self.size.load(Ordering::Acquire) + self.counter.connections() } pub(super) fn num_idle(&self) -> usize { - // We don't use `self.idle_conns.len()` as it waits for the internal - // head and tail pointers to stop changing for a moment before calculating the length, - // which may take a long time at high levels of churn. - // - // By maintaining our own atomic count, we avoid that issue entirely. - self.num_idle.load(Ordering::Acquire) + self.idle.len() } pub(super) fn is_closed(&self) -> bool { @@ -97,24 +75,11 @@ impl PoolInner { self.mark_closed(); async move { - // For child pools, we need to acquire permits we actually have rather than - // max_connections - let permits_to_acquire = if self.options.parent_pool.is_some() { - // Child pools start with 0 permits, so we acquire based on current size - self.size() - } else { - // Parent pools can acquire all max_connections permits - self.options.max_connections - }; - - let _permits = self.semaphore.acquire(permits_to_acquire).await; - - while let Some(idle) = self.idle_conns.pop() { - let _ = idle.live.raw.close().await; + while let Some(idle) = self.idle.try_acquire(self) { + idle.close().await; } - self.num_idle.store(0, Ordering::Release); - self.size.store(0, Ordering::Release); + self.counter.drain().await; } } @@ -124,56 +89,6 @@ impl PoolInner { } } - /// Attempt to pull a permit from `self.semaphore` or steal one from the parent. - /// - /// If we steal a permit from the parent but *don't* open a connection, - /// it should be returned to the parent. - async fn acquire_permit(self: &Arc) -> Result, Error> { - let parent = self - .parent() - // If we're already at the max size, we shouldn't try to steal from the parent. - // This is just going to cause unnecessary churn in `acquire()`. - .filter(|_| self.size() < self.options.max_connections); - - let mut acquire_self = pin!(self.semaphore.acquire(1).fuse()); - let mut close_event = pin!(self.close_event()); - - if let Some(parent) = parent { - let mut acquire_parent = pin!(parent.0.semaphore.acquire(1)); - let mut parent_close_event = pin!(parent.0.close_event()); - - let mut poll_parent = false; - - future::poll_fn(|cx| { - if close_event.as_mut().poll(cx).is_ready() { - return Poll::Ready(Err(Error::PoolClosed)); - } - - if parent_close_event.as_mut().poll(cx).is_ready() { - // Propagate the parent's close event to the child. - self.mark_closed(); - return Poll::Ready(Err(Error::PoolClosed)); - } - - if let Poll::Ready(permit) = acquire_self.as_mut().poll(cx) { - return Poll::Ready(Ok(permit)); - } - - // Don't try the parent right away. - if poll_parent { - acquire_parent.as_mut().poll(cx).map(Ok) - } else { - poll_parent = true; - cx.waker().wake_by_ref(); - Poll::Pending - } - }) - .await - } else { - close_event.do_until(acquire_self).await - } - } - fn parent(&self) -> Option<&Pool> { self.options.parent_pool.as_ref() } @@ -184,117 +99,103 @@ impl PoolInner { return None; } - let permit = self.semaphore.try_acquire(1)?; - - self.pop_idle(permit).ok() - } - - fn pop_idle<'a>( - self: &'a Arc, - permit: AsyncSemaphoreReleaser<'a>, - ) -> Result>, AsyncSemaphoreReleaser<'a>> { - if let Some(idle) = self.idle_conns.pop() { - self.num_idle.fetch_sub(1, Ordering::AcqRel); - Ok(Floating::from_idle(idle, (*self).clone(), permit)) - } else { - Err(permit) - } + self.idle.try_acquire(self) } pub(super) fn release(&self, floating: Floating>) { // `options.after_release` and other checks are in `PoolConnection::return_to_pool()`. - - let Floating { inner: idle, guard } = floating.into_idle(); - - if self.idle_conns.push(idle).is_err() { - panic!("BUG: connection queue overflow in release()"); - } - - // NOTE: we need to make sure we drop the permit *after* we push to the idle queue - // don't decrease the size - guard.release_permit(); - - self.num_idle.fetch_add(1, Ordering::AcqRel); + self.idle.release(floating); } - /// Try to atomically increment the pool size for a new connection. - /// - /// Returns `Err` if the pool is at max capacity already or is closed. - pub(super) fn try_increment_size<'a>( - self: &'a Arc, - permit: AsyncSemaphoreReleaser<'a>, - ) -> Result, AsyncSemaphoreReleaser<'a>> { - let result = self - .size - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| { - if self.is_closed() { - return None; - } - - size.checked_add(1) - .filter(|size| size <= &self.options.max_connections) - }); - - match result { - // we successfully incremented the size - Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)), - // the pool is at max capacity or is closed - Err(_) => Err(permit), - } - } - - pub(super) async fn acquire(self: &Arc) -> Result>, Error> { + pub(super) async fn acquire(self: &Arc) -> Result, Error> { if self.is_closed() { return Err(Error::PoolClosed); } let acquire_started_at = Instant::now(); - let deadline = acquire_started_at + self.options.acquire_timeout; - let acquired = crate::rt::timeout( - self.options.acquire_timeout, - async { - loop { - // Handles the close-event internally - let permit = self.acquire_permit().await?; + let mut close_event = pin!(self.close_event()); + let mut deadline = pin!(crate::rt::sleep(self.options.acquire_timeout)); + let mut acquire_idle = pin!(self.idle.acquire(self).fuse()); + let mut check_idle = pin!(OptionFuture::from(None)); + let mut acquire_connect_permit = pin!(OptionFuture::from(Some( + self.counter.acquire_permit(self).fuse() + ))); + let mut connect = OptionFuture::from(None); + // The internal state machine of `acquire()`. + // + // * The initial state is racing to acquire either an idle connection or a new `ConnectPermit`. + // * If we acquire a `ConnectPermit`, we begin the connection loop (with backoff) + // as implemented by `DynConnector`. + // * If we acquire an idle connection, we then start polling `check_idle_conn()`. + let acquired = future::poll_fn(|cx| { + use std::task::Poll::*; - // First attempt to pop a connection from the idle queue. - let guard = match self.pop_idle(permit) { - - // Then, check that we can use it... - Ok(conn) => match check_idle_conn(conn, &self.options).await { - - // All good! - Ok(live) => return Ok(live), - - // if the connection isn't usable for one reason or another, - // we get the `DecrementSizeGuard` back to open a new one - Err(guard) => guard, - }, - Err(permit) => if let Ok(guard) = self.try_increment_size(permit) { - // we can open a new connection - guard - } else { - // This can happen for a child pool that's at its connection limit, - // or if the pool was closed between `acquire_permit()` and - // `try_increment_size()`. - tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying"); - // If so, we're likely in the current-thread runtime if it's Tokio, - // and so we should yield to let any spawned return_to_pool() tasks - // execute. - crate::rt::yield_now().await; - continue; - } - }; - - // Attempt to connect... - return self.connect(deadline, guard).await; - } + // First check if the pool is already closed, + // or register for a wakeup if it gets closed. + if let Ready(()) = close_event.poll_unpin(cx) { + return Ready(Err(Error::PoolClosed)); } - ) - .await - .map_err(|_| Error::PoolTimedOut)??; + + // Then check if our deadline has elapsed, or schedule a wakeup for when that happens. + if let Ready(()) = deadline.poll_unpin(cx) { + return Ready(Err(Error::PoolTimedOut)); + } + + // Attempt to acquire a connection from the idle queue. + if let Ready(idle) = acquire_idle.poll_unpin(cx) { + check_idle.set(Some(check_idle_conn(idle, &self.options)).into()); + } + + // If we acquired an idle connection, run any checks that need to be done. + // + // Includes `test_on_acquire` and the `before_acquire` callback, if set. + // + // We don't want to race this step if it's already running because canceling it + // will result in the potentially unnecessary closure of a connection. + // + // Instead, we just wait and see what happens. If we already started connecting, + // that'll happen concurrently. + match ready!(check_idle.poll_unpin(cx)) { + // The `.reattach()` call errors with "type annotations needed" if not qualified. + Some(Ok(live)) => return Ready(Ok(Floating::reattach(live))), + Some(Err(permit)) => { + // We don't strictly need to poll `connect` here; all we really want to do + // is to check if it is `None`. But since currently there's no getter for that, + // it doesn't really hurt to just poll it here. + match connect.poll_unpin(cx) { + Ready(None) => { + // If we're not already attempting to connect, + // take the permit returned from closing the connection and + // attempt to open a new one. + connect = Some(self.connector.connect(permit, self.size())).into(); + } + // `permit` is dropped in these branches, allowing another task to use it + Ready(Some(res)) => return Ready(res), + Pending => (), + } + + // Attempt to acquire another idle connection concurrently to opening a new one. + acquire_idle.set(self.idle.acquire(self).fuse()); + // Annoyingly, `OptionFuture` doesn't fuse to `None` on its own + check_idle.set(None.into()); + } + None => (), + } + + if let Ready(Some((size, permit))) = acquire_connect_permit.poll_unpin(cx) { + connect = Some(self.connector.connect(permit, size)).into(); + } + + if let Ready(Some(res)) = connect.poll_unpin(cx) { + // RFC: suppress errors here? + return Ready(res); + } + + Pending + }) + .await?; let acquired_after = acquire_started_at.elapsed(); @@ -322,102 +223,29 @@ impl PoolInner { Ok(acquired) } - pub(super) async fn connect( - self: &Arc, - deadline: Instant, - guard: DecrementSizeGuard, - ) -> Result>, Error> { - if self.is_closed() { - return Err(Error::PoolClosed); - } - - let mut backoff = Duration::from_millis(10); - let max_backoff = deadline_as_timeout(deadline)? / 5; - - loop { - let timeout = deadline_as_timeout(deadline)?; - - // clone the connect options arc so it can be used without holding the RwLockReadGuard - // across an async await point - let connect_options = self - .connect_options - .read() - .expect("write-lock holder panicked") - .clone(); - - // result here is `Result, TimeoutError>` - // if this block does not return, sleep for the backoff timeout and try again - match crate::rt::timeout(timeout, connect_options.connect()).await { - // successfully established connection - Ok(Ok(mut raw)) => { - // See comment on `PoolOptions::after_connect` - let meta = PoolConnectionMetadata { - age: Duration::ZERO, - idle_for: Duration::ZERO, - }; - - let res = if let Some(callback) = &self.options.after_connect { - callback(&mut raw, meta).await - } else { - Ok(()) - }; - - match res { - Ok(()) => return Ok(Floating::new_live(raw, guard)), - Err(error) => { - tracing::error!(%error, "error returned from after_connect"); - // The connection is broken, don't try to close nicely. - let _ = raw.close_hard().await; - - // Fall through to the backoff. - } - } - } - - // an IO error while connecting is assumed to be the system starting up - Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => (), - - // We got a transient database error, retry. - Ok(Err(Error::Database(error))) if error.is_transient_in_connect_phase() => (), - - // Any other error while connection should immediately - // terminate and bubble the error up - Ok(Err(e)) => return Err(e), - - // timed out - Err(_) => return Err(Error::PoolTimedOut), - } - - // If the connection is refused, wait in exponentially - // increasing steps for the server to come up, - // capped by a factor of the remaining time until the deadline - crate::rt::sleep(backoff).await; - backoff = cmp::min(backoff * 2, max_backoff); - } - } - /// Try to maintain `min_connections`, returning any errors (including `PoolTimedOut`). pub async fn try_min_connections(self: &Arc, deadline: Instant) -> Result<(), Error> { - while self.size() < self.options.min_connections { - // Don't wait for a semaphore permit. - // - // If no extra permits are available then we shouldn't be trying to spin up - // connections anyway. - let Some(permit) = self.semaphore.try_acquire(1) else { - return Ok(()); - }; + crate::rt::timeout_at(deadline, async { + while self.size() < self.options.min_connections { + // Don't wait for a connect permit. + // + // If no extra permits are available then we shouldn't be trying to spin up + // connections anyway. + let Some((size, permit)) = self.counter.acquire_permit(self).now_or_never() else { + return Ok(()); + }; - // We must always obey `max_connections`. - let Some(guard) = self.try_increment_size(permit).ok() else { - return Ok(()); - }; + let conn = self.connector.connect(permit, size).await?; - // We skip `after_release` since the connection was never provided to user code - // besides `after_connect`, if they set it. - self.release(self.connect(deadline, guard).await?); - } + // We skip `after_release` since the connection was never provided to user code + // besides inside `PollConnector::connect()`, if they override it. + self.release(conn.into_floating()); + } - Ok(()) + Ok(()) + }) + .await + .unwrap_or_else(|_| Err(Error::PoolTimedOut)) } /// Attempt to maintain `min_connections`, logging if unable. @@ -441,11 +269,7 @@ impl PoolInner { impl Drop for PoolInner { fn drop(&mut self) { self.mark_closed(); - - if let Some(parent) = &self.options.parent_pool { - // Release the stolen permits. - parent.0.semaphore.release(self.semaphore.permits()); - } + self.idle.drain(self); } } @@ -469,7 +293,7 @@ fn is_beyond_idle_timeout(idle: &Idle, options: &PoolOptions( mut conn: Floating>, options: &PoolOptions, -) -> Result>, DecrementSizeGuard> { +) -> Result>, ConnectPermit> { if options.test_before_acquire { // Check that the connection is still live if let Err(error) = conn.ping().await { @@ -573,51 +397,3 @@ fn spawn_maintenance_tasks(pool: &Arc>) { .await; }); } - -/// RAII guard returned by `Pool::try_increment_size()` and others. -/// -/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections -/// (where the pool thinks it has more connections than it does). -pub(in crate::pool) struct DecrementSizeGuard { - pub(crate) pool: Arc>, - cancelled: bool, -} - -impl DecrementSizeGuard { - /// Create a new guard that will release a semaphore permit on-drop. - pub fn new_permit(pool: Arc>) -> Self { - Self { - pool, - cancelled: false, - } - } - - pub fn from_permit(pool: Arc>, permit: AsyncSemaphoreReleaser<'_>) -> Self { - // here we effectively take ownership of the permit - permit.disarm(); - Self::new_permit(pool) - } - - /// Release the semaphore permit without decreasing the pool size. - /// - /// If the permit was stolen from the pool's parent, it will be returned to the child's semaphore. - fn release_permit(self) { - self.pool.semaphore.release(1); - self.cancel(); - } - - pub fn cancel(mut self) { - self.cancelled = true; - } -} - -impl Drop for DecrementSizeGuard { - fn drop(&mut self) { - if !self.cancelled { - self.pool.size.fetch_sub(1, Ordering::AcqRel); - - // and here we release the permit we got on construction - self.pool.semaphore.release(1); - } - } -} diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 6e540894..0caa1161 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -71,6 +71,7 @@ use crate::error::Error; use crate::sql_str::SqlSafeStr; use crate::transaction::Transaction; +pub use self::connect::{PoolConnectMetadata, PoolConnector}; pub use self::connection::PoolConnection; use self::inner::PoolInner; #[doc(hidden)] @@ -83,8 +84,11 @@ mod executor; #[macro_use] pub mod maybe; +mod connect; mod connection; mod inner; + +mod idle; mod options; /// An asynchronous pool of SQLx database connections. @@ -356,7 +360,7 @@ impl Pool { /// returning it. pub fn acquire(&self) -> impl Future, Error>> + 'static { let shared = self.0.clone(); - async move { shared.acquire().await.map(|conn| conn.reattach()) } + async move { shared.acquire().await } } /// Attempts to retrieve a connection from the pool if there is one available. @@ -541,28 +545,6 @@ impl Pool { self.0.num_idle() } - /// Gets a clone of the connection options for this pool - pub fn connect_options(&self) -> Arc<::Options> { - self.0 - .connect_options - .read() - .expect("write-lock holder panicked") - .clone() - } - - /// Updates the connection options this pool will use when opening any future connections. Any - /// existing open connection in the pool will be left as-is. - pub fn set_connect_options(&self, connect_options: ::Options) { - // technically write() could also panic if the current thread already holds the lock, - // but because this method can't be re-entered by the same thread that shouldn't be a problem - let mut guard = self - .0 - .connect_options - .write() - .expect("write-lock holder panicked"); - *guard = Arc::new(connect_options); - } - /// Get the options for this pool pub fn options(&self) -> &PoolOptions { &self.0.options diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 2c2c2e18..9775799f 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -1,8 +1,9 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; +use crate::pool::connect::DefaultConnector; use crate::pool::inner::PoolInner; -use crate::pool::Pool; +use crate::pool::{Pool, PoolConnector}; use futures_core::future::BoxFuture; use log::LevelFilter; use std::fmt::{self, Debug, Formatter}; @@ -44,14 +45,6 @@ use std::time::{Duration, Instant}; /// the perspectives of both API designer and consumer. pub struct PoolOptions { pub(crate) test_before_acquire: bool, - pub(crate) after_connect: Option< - Arc< - dyn Fn(&mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'_, Result<(), Error>> - + 'static - + Send - + Sync, - >, - >, pub(crate) before_acquire: Option< Arc< dyn Fn( @@ -79,6 +72,7 @@ pub struct PoolOptions { pub(crate) acquire_slow_level: LevelFilter, pub(crate) acquire_slow_threshold: Duration, pub(crate) acquire_timeout: Duration, + pub(crate) connect_timeout: Duration, pub(crate) min_connections: usize, pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, @@ -94,7 +88,6 @@ impl Clone for PoolOptions { fn clone(&self) -> Self { PoolOptions { test_before_acquire: self.test_before_acquire, - after_connect: self.after_connect.clone(), before_acquire: self.before_acquire.clone(), after_release: self.after_release.clone(), max_connections: self.max_connections, @@ -102,6 +95,7 @@ impl Clone for PoolOptions { acquire_slow_threshold: self.acquire_slow_threshold, acquire_slow_level: self.acquire_slow_level, acquire_timeout: self.acquire_timeout, + connect_timeout: self.connect_timeout, min_connections: self.min_connections, max_lifetime: self.max_lifetime, idle_timeout: self.idle_timeout, @@ -143,7 +137,6 @@ impl PoolOptions { pub fn new() -> Self { Self { // User-specifiable routines - after_connect: None, before_acquire: None, after_release: None, test_before_acquire: true, @@ -158,6 +151,7 @@ impl PoolOptions { // to not flag typical time to add a new connection to a pool. acquire_slow_threshold: Duration::from_secs(2), acquire_timeout: Duration::from_secs(30), + connect_timeout: Duration::from_secs(2 * 60), idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), fair: true, @@ -268,6 +262,23 @@ impl PoolOptions { self.acquire_timeout } + /// Set the maximum amount of time to spend attempting to open a connection. + /// + /// This timeout happens independently of [`acquire_timeout`][Self::acquire_timeout]. + /// + /// If shorter than `acquire_timeout`, this will cause the last connec + pub fn connect_timeout(mut self, timeout: Duration) -> Self { + self.connect_timeout = timeout; + self + } + + /// Get the maximum amount of time to spend attempting to open a connection. + /// + /// This timeout happens independently of [`acquire_timeout`][Self::acquire_timeout]. + pub fn get_connect_timeout(&self) -> Duration { + self.connect_timeout + } + /// Set the maximum lifetime of individual connections. /// /// Any connection with a lifetime greater than this will be closed. @@ -339,57 +350,6 @@ impl PoolOptions { self } - /// Perform an asynchronous action after connecting to the database. - /// - /// If the operation returns with an error then the error is logged, the connection is closed - /// and a new one is opened in its place and the callback is invoked again. - /// - /// This occurs in a backoff loop to avoid high CPU usage and spamming logs during a transient - /// error condition. - /// - /// Note that this may be called for internally opened connections, such as when maintaining - /// [`min_connections`][Self::min_connections], that are then immediately returned to the pool - /// without invoking [`after_release`][Self::after_release]. - /// - /// # Example: Additional Parameters - /// This callback may be used to set additional configuration parameters - /// that are not exposed by the database's `ConnectOptions`. - /// - /// This example is written for PostgreSQL but can likely be adapted to other databases. - /// - /// ```no_run - /// # async fn f() -> Result<(), Box> { - /// use sqlx::Executor; - /// use sqlx::postgres::PgPoolOptions; - /// - /// let pool = PgPoolOptions::new() - /// .after_connect(|conn, _meta| Box::pin(async move { - /// // When directly invoking `Executor` methods, - /// // it is possible to execute multiple statements with one call. - /// conn.execute("SET application_name = 'your_app'; SET search_path = 'my_schema';") - /// .await?; - /// - /// Ok(()) - /// })) - /// .connect("postgres:// …").await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self]. - pub fn after_connect(mut self, callback: F) -> Self - where - // We're passing the `PoolConnectionMetadata` here mostly for future-proofing. - // `age` and `idle_for` are obviously not useful for fresh connections. - for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<(), Error>> - + 'static - + Send - + Sync, - { - self.after_connect = Some(Arc::new(callback)); - self - } - /// Perform an asynchronous action on a previously idle connection before giving it out. /// /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains @@ -537,11 +497,25 @@ impl PoolOptions { pub async fn connect_with( self, options: ::Options, + ) -> Result, Error> { + self.connect_with_connector(DefaultConnector(options)).await + } + + /// Create a new pool from this `PoolOptions` and immediately open at least one connection. + /// + /// This ensures the configuration is correct. + /// + /// The total number of connections opened is max(1, [min_connections][Self::min_connections]). + /// + /// See [PoolConnector] for examples. + pub async fn connect_with_connector( + self, + connector: impl PoolConnector, ) -> Result, Error> { // Don't take longer than `acquire_timeout` starting from when this is called. let deadline = Instant::now() + self.acquire_timeout; - let inner = PoolInner::new_arc(self, options); + let inner = PoolInner::new_arc(self, connector); if inner.options.min_connections > 0 { // If the idle reaper is spawned then this will race with the call from that task @@ -552,7 +526,7 @@ impl PoolOptions { // If `min_connections` is nonzero then we'll likely just pull a connection // from the idle queue here, but it should at least get tested first. let conn = inner.acquire().await?; - inner.release(conn); + inner.release(conn.into_floating()); Ok(Pool(inner)) } @@ -578,7 +552,11 @@ impl PoolOptions { /// optimistically establish that many connections for the pool. pub fn connect_lazy_with(self, options: ::Options) -> Pool { // `min_connections` is guaranteed by the idle reaper now. - Pool(PoolInner::new_arc(self, options)) + self.connect_lazy_with_connector(DefaultConnector(options)) + } + + pub fn connect_lazy_with_connector(self, connector: impl PoolConnector) -> Pool { + Pool(PoolInner::new_arc(self, connector)) } } diff --git a/sqlx-core/src/rt/mod.rs b/sqlx-core/src/rt/mod.rs index 273a1bfc..1da096d9 100644 --- a/sqlx-core/src/rt/mod.rs +++ b/sqlx-core/src/rt/mod.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use cfg_if::cfg_if; @@ -51,6 +51,29 @@ pub async fn timeout(duration: Duration, f: F) -> Result(deadline: Instant, f: F) -> Result { + #[cfg(feature = "_rt-tokio")] + if rt_tokio::available() { + return tokio::time::timeout_at(deadline.into(), f) + .await + .map_err(|_| TimeoutError(())); + } + + #[cfg(feature = "_rt-async-std")] + { + let Some(duration) = deadline.checked_duration_since(Instant::now()) else { + return Err(TimeoutError(())); + }; + + async_std::future::timeout(duration, f) + .await + .map_err(|_| TimeoutError(())) + } + + #[cfg(not(feature = "_rt-async-std"))] + missing_rt((duration, f)) +} + pub async fn sleep(duration: Duration) { #[cfg(feature = "_rt-tokio")] if rt_tokio::available() { diff --git a/sqlx-mysql/src/testing/mod.rs b/sqlx-mysql/src/testing/mod.rs index f509f9da..c27dda3c 100644 --- a/sqlx-mysql/src/testing/mod.rs +++ b/sqlx-mysql/src/testing/mod.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::ops::Deref; use std::str::FromStr; use std::sync::OnceLock; use std::time::Duration; @@ -108,27 +107,11 @@ async fn test_context(args: &TestArgs) -> Result, Error> { .max_connections(20) // Immediately close master connections. Tokio's I/O streams don't like hopping runtimes. .after_release(|_conn, _| Box::pin(async move { Ok(false) })) - .connect_lazy_with(master_opts); + .connect_lazy_with(master_opts.clone()); - let master_pool = match once_lock_try_insert_polyfill(&MASTER_POOL, pool) { - Ok(inserted) => inserted, - Err((existing, pool)) => { - // Sanity checks. - assert_eq!( - existing.connect_options().host, - pool.connect_options().host, - "DATABASE_URL changed at runtime, host differs" - ); - - assert_eq!( - existing.connect_options().database, - pool.connect_options().database, - "DATABASE_URL changed at runtime, database differs" - ); - - existing - } - }; + let master_pool = MASTER_POOL + .try_insert(pool) + .unwrap_or_else(|(existing, _pool)| existing); let mut conn = master_pool.acquire().await?; @@ -144,7 +127,7 @@ async fn test_context(args: &TestArgs) -> Result, Error> { -- BLOB/TEXT columns can only be used as index keys with a prefix length: -- https://dev.mysql.com/doc/refman/8.4/en/column-indexes.html#column-indexes-prefix primary key(db_name(63)) - ); + ); "#, ) .await?; @@ -172,11 +155,7 @@ async fn test_context(args: &TestArgs) -> Result, Error> { // Close connections ASAP if left in the idle queue. .idle_timeout(Some(Duration::from_secs(1))) .parent(master_pool.clone()), - connect_opts: master_pool - .connect_options() - .deref() - .clone() - .database(&db_name), + connect_opts: master_opts.database(&db_name), db_name, }) } diff --git a/sqlx-postgres/src/error.rs b/sqlx-postgres/src/error.rs index 7b5a03f2..ac99bd13 100644 --- a/sqlx-postgres/src/error.rs +++ b/sqlx-postgres/src/error.rs @@ -186,7 +186,7 @@ impl DatabaseError for PgDatabaseError { self } - fn is_transient_in_connect_phase(&self) -> bool { + fn is_retryable_connect_error(&self) -> bool { // https://www.postgresql.org/docs/current/errcodes-appendix.html [ // too_many_connections diff --git a/sqlx-postgres/src/options/mod.rs b/sqlx-postgres/src/options/mod.rs index efbc4398..00ae1597 100644 --- a/sqlx-postgres/src/options/mod.rs +++ b/sqlx-postgres/src/options/mod.rs @@ -127,6 +127,11 @@ impl PgConnectOptions { self } + /// Identical to [Self::host()], but through a mutable reference. + pub fn set_host(&mut self, host: &str) { + host.clone_into(&mut self.host); + } + /// Sets the port to connect to at the server host. /// /// The default port for PostgreSQL is `5432`. @@ -143,6 +148,12 @@ impl PgConnectOptions { self } + /// Identical to [`Self::port()`], but through a mutable reference. + pub fn set_port(&mut self, port: u16) -> &mut Self { + self.port = port; + self + } + /// Sets a custom path to a directory containing a unix domain socket, /// switching the connection method from TCP to the corresponding socket. /// @@ -169,6 +180,12 @@ impl PgConnectOptions { self } + /// Identical to [`Self::username()`], but through a mutable reference. + pub fn set_username(&mut self, username: &str) -> &mut Self { + username.clone_into(&mut self.username); + self + } + /// Sets the password to use if the server demands password authentication. /// /// # Example @@ -184,6 +201,12 @@ impl PgConnectOptions { self } + /// Identical to [`Self::password()`]. but through a mutable reference. + pub fn set_password(&mut self, password: &str) -> &mut Self { + self.password = Some(password.to_owned()); + self + } + /// Sets the database name. Defaults to be the same as the user name. /// /// # Example diff --git a/sqlx-postgres/src/testing/mod.rs b/sqlx-postgres/src/testing/mod.rs index 3e1cf0dd..a7f6a549 100644 --- a/sqlx-postgres/src/testing/mod.rs +++ b/sqlx-postgres/src/testing/mod.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::ops::Deref; use std::str::FromStr; use std::sync::OnceLock; use std::time::Duration; @@ -101,27 +100,11 @@ async fn test_context(args: &TestArgs) -> Result, Error> { .max_connections(20) // Immediately close master connections. Tokio's I/O streams don't like hopping runtimes. .after_release(|_conn, _| Box::pin(async move { Ok(false) })) - .connect_lazy_with(master_opts); + .connect_lazy_with(master_opts.clone()); - let master_pool = match once_lock_try_insert_polyfill(&MASTER_POOL, pool) { - Ok(inserted) => inserted, - Err((existing, pool)) => { - // Sanity checks. - assert_eq!( - existing.connect_options().host, - pool.connect_options().host, - "DATABASE_URL changed at runtime, host differs" - ); - - assert_eq!( - existing.connect_options().database, - pool.connect_options().database, - "DATABASE_URL changed at runtime, database differs" - ); - - existing - } - }; + let master_pool = MASTER_POOL + .try_insert(pool) + .unwrap_or_else(|(existing, _pool)| existing); let mut conn = master_pool.acquire().await?; @@ -177,11 +160,7 @@ async fn test_context(args: &TestArgs) -> Result, Error> { // Close connections ASAP if left in the idle queue. .idle_timeout(Some(Duration::from_secs(1))) .parent(master_pool.clone()), - connect_opts: master_pool - .connect_options() - .deref() - .clone() - .database(&db_name), + connect_opts: master_opts.database(&db_name), db_name, }) }