diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index 02e34491..b83bad7c 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -4,6 +4,8 @@ use std::{ io, }; +use async_std::future::TimeoutError; + /// A convenient Result instantiation appropriate for SQLx. pub type Result = std::result::Result; @@ -35,6 +37,10 @@ pub enum Error { /// Context is provided by the included error message. Protocol(Box), + /// A `Pool::acquire()` timed out due to connections not becoming available or + /// because another task encountered too many errors while trying to open a new connection. + TimedOut, + // TODO: Remove and replace with `#[non_exhaustive]` when possible #[doc(hidden)] __Nonexhaustive, @@ -65,6 +71,8 @@ impl Display for Error { Error::Protocol(ref err) => f.write_str(err), + Error::TimedOut => f.write_str("timed out while waiting for an open connection"), + Error::__Nonexhaustive => unreachable!(), } } @@ -77,6 +85,12 @@ impl From for Error { } } +impl From for Error { + fn from(_: TimeoutError) -> Self { + Error::TimedOut + } +} + impl From> for Error { #[inline] fn from(err: ProtocolError) -> Self { diff --git a/sqlx-core/src/pool.rs b/sqlx-core/src/pool.rs index 0e004665..76a15347 100644 --- a/sqlx-core/src/pool.rs +++ b/sqlx-core/src/pool.rs @@ -8,7 +8,8 @@ use crate::{ }; use futures_channel::oneshot; use futures_core::{future::BoxFuture, stream::BoxStream}; -use futures_util::{future::FutureExt, stream::StreamExt}; +use futures_util::{future::{FutureExt, TryFutureExt}, stream::StreamExt}; +use futures_util::future::{AbortHandle, AbortRegistration}; use std::{ future::Future, marker::PhantomData, @@ -20,6 +21,7 @@ use std::{ time::{Duration, Instant}, }; +use async_std::future::timeout; use async_std::sync::{channel, Receiver, Sender}; use async_std::task; @@ -84,6 +86,11 @@ where self.0.options.max_size } + /// Returns the maximum time spent acquiring a new connection before an error is returned. + pub fn connect_timeout(&self) -> Duration { + self.0.options.connect_timeout + } + /// Returns the configured mimimum idle connection count. pub fn min_idle(&self) -> Option { self.0.options.min_idle @@ -135,6 +142,11 @@ where self } + pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self { + self.options.connect_timeout = connect_timeout; + self + } + pub fn min_idle(mut self, min_idle: impl Into>) -> Self { self.options.min_idle = min_idle.into(); self @@ -157,6 +169,7 @@ where struct Options { max_size: u32, + connect_timeout: Duration, min_idle: Option, max_lifetime: Option, idle_timeout: Option, @@ -167,6 +180,7 @@ impl Default for Options { Self { max_size: 10, min_idle: None, + connect_timeout: Duration::from_secs(30), max_lifetime: None, idle_timeout: None, } @@ -208,6 +222,9 @@ where } async fn acquire(&self) -> crate::Result> { + let start = Instant::now(); + let deadline = start + self.options.connect_timeout; + if let Some(live) = self.try_acquire() { return Ok(live); } @@ -219,23 +236,64 @@ where // Too many open connections // Wait until one is available - // Waiters are not dropped unless the pool is dropped - // which would drop this future - return Ok(self - .pool_rx - .recv() - .await - .expect("waiter dropped without dropping pool") - .live(&self.pool_tx)); + // get the time between the deadline and now and use that as our timeout + let max_wait = deadline.checked_duration_since(Instant::now()) + .ok_or(Error::TimedOut)?; + + // don't sleep forever + let idle = match timeout(max_wait, self.pool_rx.recv()).await { + Ok(Some(idle)) => idle, + Ok(None) => panic!("this isn't possible, we own a `pool_tx`"), + // try our acquire logic again + Err(_) => continue, + }; + + // check if idle connection was within max lifetime (or not set) + if self.options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max) + // and if connection wasn't idle too long (or not set) + && self.options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout) + { + match idle.raw.inner.ping().await { + Ok(_) => return Ok(idle.revive(&self.pool_tx)), + // an error here means the other end has hung up or we lost connectivity + // either way we're fine to just discard the connection + // the error itself here isn't necessarily unexpected so WARN is too strong + Err(e) => log::info!("ping on idle connection returned error: {}", e), + } + } else { + // close the connection but don't really care about the result + let _ = idle.raw.inner.close().await; + } + + // either case, make sure the idle connection is gone explicitly before we open one + drop(idle); + + // while we're still at max size, acquire a new connection + return self.new_conn(deadline).await } if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { // Open a new connection and return directly - let raw = DB::open(&self.url).await?; - return Ok(Live::pooled(raw, &self.pool_tx)); + return self.new_conn(deadline).await } } } + + async fn new_conn(&self, deadline: Instant) -> crate::Result> { + while Instant::now() < deadline { + // result here is `Result, TimeoutError>` + match timeout(deadline - Instant::now(), DB::open(&self.url)).await { + Ok(Ok(raw)) => return Ok(Live::pooled(raw, &self.pool_tx)), + // error while connecting, this should definitely be logged + Ok(Err(e)) => log::warn!("error establishing a connection: {}", e), + // timed out + Err(_) => break, + } + } + + self.size.fetch_sub(1, Ordering::AcqRel); + Err(Error::TimedOut) + } } impl Executor for Pool @@ -368,7 +426,7 @@ where } impl Idle { - fn live(self, pool_tx: &Sender>) -> Live { + fn revive(self, pool_tx: &Sender>) -> Live { Live { raw: Some(self.raw), pool_tx: Some(pool_tx.clone()),