diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index 4cc05c36..2c16ec0a 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -18,7 +18,7 @@ pub enum Error { UrlParse(url::ParseError), /// An error was returned by the database. - Database(Box), + Database(Box), /// No rows were returned by a query that expected to return at least one row. NotFound, @@ -39,7 +39,7 @@ pub enum Error { /// A [Pool::acquire] timed out due to connections not becoming available or /// because another task encountered too many errors while trying to open a new connection. - PoolTimedOut, + PoolTimedOut(Option>), /// [Pool::close] was called while we were waiting in [Pool::acquire]. PoolClosed, @@ -58,6 +58,8 @@ impl StdError for Error { Error::UrlParse(error) => Some(error), + Error::PoolTimedOut(Some(error)) => Some(&**error), + Error::Decode(DecodeError::Other(error)) => Some(&**error), _ => None, @@ -88,7 +90,13 @@ impl Display for Error { Error::Protocol(ref err) => f.write_str(err), - Error::PoolTimedOut => f.write_str("timed out while waiting for an open connection"), + Error::PoolTimedOut(Some(ref err)) => { + write!(f, "timed out while waiting for an open connection: {}", err) + } + + Error::PoolTimedOut(None) => { + write!(f, "timed out while waiting for an open connection") + } Error::PoolClosed => f.write_str("attempted to acquire a connection on a closed pool"), diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 5c30cd74..c3e0350a 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -1,22 +1,15 @@ -use std::{ - cmp, - sync::{ - atomic::{AtomicBool, AtomicU32, Ordering}, - Arc, - }, - time::Instant, -}; +use std::cmp; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::Instant; -use async_std::{ - future::timeout, - sync::{channel, Receiver, Sender}, - task, -}; -use futures_util::future::FutureExt; - -use crate::{error::Error, Connection, Database}; +use async_std::prelude::FutureExt as _; +use async_std::sync::{channel, Receiver, Sender}; +use async_std::task; +use futures_util::future::FutureExt as _; use super::{Idle, Options, Raw}; +use crate::{error::Error, Connection, Database}; pub(super) struct SharedPool where @@ -50,7 +43,7 @@ where for _ in 0..pool.options.min_size { let raw = pool - .new_conn(Instant::now() + pool.options.connect_timeout) + .eventually_connect(Instant::now() + pool.options.connect_timeout) .await?; pool_tx @@ -120,89 +113,166 @@ where let start = Instant::now(); let deadline = start + self.options.connect_timeout; - if let Some(raw) = self.try_acquire() { - return Ok(raw); - } - + // Unless the pool has been closed ... while !self.closed.load(Ordering::Acquire) { let size = self.size.load(Ordering::Acquire); - if size >= self.options.max_size { + // Attempt to immediately acquire a connection. This will return Some + // if there is an idle connection in our channel. + let mut idle = if let Some(idle) = self.pool_rx.recv().now_or_never() { + let idle = match idle { + Some(idle) => idle, + + // This isn't possible. [Pool] owns the sender and [SharedPool] + // owns the receiver. + None => unreachable!(), + }; + + idle + } else if size >= self.options.max_size { // Too many open connections // Wait until one is available // get the time between the deadline and now and use that as our timeout - let max_wait = deadline + let until = deadline .checked_duration_since(Instant::now()) - .ok_or(Error::PoolTimedOut)?; + .ok_or(Error::PoolTimedOut(None))?; // don't sleep forever - let mut idle = match timeout(max_wait, self.pool_rx.recv()).await { + let idle = match self.pool_rx.recv().timeout(until).await { + // A connection was returned to the pool Ok(Some(idle)) => idle, - Ok(None) => panic!("this isn't possible, we own a `pool_tx`"), - // try our acquire logic again - Err(_) => continue, + + // This isn't possible. [Pool] owns the sender and [SharedPool] + // owns the receiver. + Ok(None) => unreachable!(), + + // Timed out waiting for a connection + // Error is not forwarded as its useless context + Err(_) => { + return Err(Error::PoolTimedOut(None)); + } }; - if self.closed.load(Ordering::Acquire) { - idle.close().await; - self.size.fetch_sub(1, Ordering::AcqRel); - return Err(Error::PoolClosed); + idle + } else if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { + // pool has slots available; open a new connection + match self.connect(deadline).await { + Ok(Some(conn)) => return Ok(conn), + // [size] is internally decremented on _retry_ and _error_ + Ok(None) => continue, + Err(e) => return Err(e), } + } else { + continue; + }; - if should_reap(&idle, &self.options) { - // close the connection but don't really care about the result - idle.close().await; - } else { - match idle.raw.inner.ping().await { - Ok(_) => return Ok(idle.raw), - // an error here means the other end has hung up or we lost connectivity - // either way we're fine to just discard the connection - // the error itself here isn't necessarily unexpected so WARN is too strong - Err(e) => log::info!("ping on idle connection returned error: {}", e), - } + // If pool was closed while waiting for a connection, + // release the connection + if self.closed.load(Ordering::Acquire) { + idle.close().await; + self.size.fetch_sub(1, Ordering::AcqRel); - // make sure the idle connection is gone explicitly before we open one - drop(idle); - } - - // while we're still at max size, acquire a new connection - return self.new_conn(deadline).await; + return Err(Error::PoolClosed); } - if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { - // Open a new connection and return directly - return self.new_conn(deadline).await; + // If the connection we pulled has expired, close the connection and + // immediately create a new connection + if is_beyond_lifetime(&idle.raw, &self.options) { + // close the connection but don't really care about the result + let _ = idle.close().await; + } else if self.options.test_on_acquire { + // TODO: Check on acquire should be a configuration setting + // Check that the connection is still live + match idle.raw.inner.ping().await { + // Connection still seems to respond + Ok(_) => return Ok(idle.raw), + + // an error here means the other end has hung up or we lost connectivity + // either way we're fine to just discard the connection + // the error itself here isn't necessarily unexpected so WARN is too strong + Err(e) => log::info!("ping on idle connection returned error: {}", e), + } + + // make sure the idle connection is gone explicitly before we open one + // this will close the resources for the stream on our side + drop(idle); + } else { + // No need to re-connect + return Ok(idle.raw); + } + + // while there is still room in the pool, acquire a new connection + match self.connect(deadline).await { + Ok(Some(conn)) => return Ok(conn), + // [size] is internally decremented on _retry_ and _error_ + Ok(None) => continue, + Err(e) => return Err(e), } } Err(Error::PoolClosed) } - async fn new_conn(&self, deadline: Instant) -> crate::Result> { - while Instant::now() < deadline { - if self.closed.load(Ordering::Acquire) { - self.size.fetch_sub(1, Ordering::AcqRel); - return Err(Error::PoolClosed); - } - - // result here is `Result, TimeoutError>` - match timeout(deadline - Instant::now(), DB::Connection::open(&self.url)).await { - Ok(Ok(inner)) => { - return Ok(Raw { - inner, - created: Instant::now(), - }) - } - // error while connecting, this should definitely be logged - Ok(Err(e)) => log::warn!("error establishing a connection: {}", e), - // timed out - Err(_) => break, + async fn eventually_connect(&self, deadline: Instant) -> crate::Result> { + loop { + // [connect] will raise an error when past deadline + // [connect] returns None if its okay to retry + if let Some(conn) = self.connect(deadline).await? { + return Ok(conn); } } + } - self.size.fetch_sub(1, Ordering::AcqRel); - Err(Error::PoolTimedOut) + async fn connect(&self, deadline: Instant) -> crate::Result>> { + // FIXME: Code between `-` is duplicate with [acquire] + // --------------------------------- + + // get the time between the deadline and now and use that as our timeout + let until = deadline + .checked_duration_since(Instant::now()) + .ok_or(Error::PoolTimedOut(None))?; + + // If pool was closed while waiting for a connection, + // release the connection + if self.closed.load(Ordering::Acquire) { + self.size.fetch_sub(1, Ordering::AcqRel); // ? + + return Err(Error::PoolClosed); + } + + // --------------------------------- + + // result here is `Result, TimeoutError>` + match DB::Connection::open(&self.url).timeout(until).await { + // successfully established connection + Ok(Ok(inner)) => { + Ok(Some(Raw { + inner, + // remember when it was created so we can expire it + // if there is a [max_lifetime] set + created: Instant::now(), + })) + } + + // IO error while connecting, this should definitely be logged + // and we should attempt to retry + Ok(Err(crate::Error::Io(e))) => { + log::warn!("error establishing a connection: {}", e); + + Ok(None) + } + + // Any other error while connection should immediately + // terminate and bubble the error up + Ok(Err(e)) => Err(e), + + // timed out + Err(e) => { + self.size.fetch_sub(1, Ordering::AcqRel); // ? + Err(Error::PoolTimedOut(Some(Box::new(e)))) + } + } } } @@ -215,11 +285,20 @@ where } } -fn should_reap(idle: &Idle, options: &Options) -> bool { - // check if idle connection was within max lifetime (or not set) - options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max) - // and if connection wasn't idle too long (or not set) - && options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout) +// NOTE: Function names here are bizzare. Helpful help would be appreciated. + +fn is_beyond_lifetime(raw: &Raw, options: &Options) -> bool { + // check if connection was within max lifetime (or not set) + options + .max_lifetime + .map_or(false, |max| raw.created.elapsed() > max) +} + +fn is_beyond_idle(idle: &Idle, options: &Options) -> bool { + // if connection wasn't idle too long (or not set) + options + .idle_timeout + .map_or(false, |timeout| idle.since.elapsed() > timeout) } /// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections @@ -250,7 +329,10 @@ where let (reap, keep) = (0..max_reaped) // only connections waiting in the queue .filter_map(|_| pool.pool_rx.recv().now_or_never()?) - .partition::, _>(|conn| should_reap(conn, &pool.options)); + .partition::, _>(|conn| { + is_beyond_idle(conn, &pool.options) + || is_beyond_lifetime(&conn.raw, &pool.options) + }); for conn in keep { // return these connections to the pool first