mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-12-29 21:00:54 +00:00
Refactor pool to fix liveness and consistently check
This commit is contained in:
parent
132f7b2944
commit
03251b719b
@ -18,7 +18,7 @@ pub enum Error {
|
||||
UrlParse(url::ParseError),
|
||||
|
||||
/// An error was returned by the database.
|
||||
Database(Box<dyn DatabaseError>),
|
||||
Database(Box<dyn DatabaseError + Send + Sync>),
|
||||
|
||||
/// No rows were returned by a query that expected to return at least one row.
|
||||
NotFound,
|
||||
@ -39,7 +39,7 @@ pub enum Error {
|
||||
|
||||
/// A [Pool::acquire] timed out due to connections not becoming available or
|
||||
/// because another task encountered too many errors while trying to open a new connection.
|
||||
PoolTimedOut,
|
||||
PoolTimedOut(Option<Box<dyn StdError + Send + Sync>>),
|
||||
|
||||
/// [Pool::close] was called while we were waiting in [Pool::acquire].
|
||||
PoolClosed,
|
||||
@ -58,6 +58,8 @@ impl StdError for Error {
|
||||
|
||||
Error::UrlParse(error) => Some(error),
|
||||
|
||||
Error::PoolTimedOut(Some(error)) => Some(&**error),
|
||||
|
||||
Error::Decode(DecodeError::Other(error)) => Some(&**error),
|
||||
|
||||
_ => None,
|
||||
@ -88,7 +90,13 @@ impl Display for Error {
|
||||
|
||||
Error::Protocol(ref err) => f.write_str(err),
|
||||
|
||||
Error::PoolTimedOut => f.write_str("timed out while waiting for an open connection"),
|
||||
Error::PoolTimedOut(Some(ref err)) => {
|
||||
write!(f, "timed out while waiting for an open connection: {}", err)
|
||||
}
|
||||
|
||||
Error::PoolTimedOut(None) => {
|
||||
write!(f, "timed out while waiting for an open connection")
|
||||
}
|
||||
|
||||
Error::PoolClosed => f.write_str("attempted to acquire a connection on a closed pool"),
|
||||
|
||||
|
||||
@ -1,22 +1,15 @@
|
||||
use std::{
|
||||
cmp,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU32, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use std::cmp;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use async_std::{
|
||||
future::timeout,
|
||||
sync::{channel, Receiver, Sender},
|
||||
task,
|
||||
};
|
||||
use futures_util::future::FutureExt;
|
||||
|
||||
use crate::{error::Error, Connection, Database};
|
||||
use async_std::prelude::FutureExt as _;
|
||||
use async_std::sync::{channel, Receiver, Sender};
|
||||
use async_std::task;
|
||||
use futures_util::future::FutureExt as _;
|
||||
|
||||
use super::{Idle, Options, Raw};
|
||||
use crate::{error::Error, Connection, Database};
|
||||
|
||||
pub(super) struct SharedPool<DB>
|
||||
where
|
||||
@ -50,7 +43,7 @@ where
|
||||
|
||||
for _ in 0..pool.options.min_size {
|
||||
let raw = pool
|
||||
.new_conn(Instant::now() + pool.options.connect_timeout)
|
||||
.eventually_connect(Instant::now() + pool.options.connect_timeout)
|
||||
.await?;
|
||||
|
||||
pool_tx
|
||||
@ -120,89 +113,166 @@ where
|
||||
let start = Instant::now();
|
||||
let deadline = start + self.options.connect_timeout;
|
||||
|
||||
if let Some(raw) = self.try_acquire() {
|
||||
return Ok(raw);
|
||||
}
|
||||
|
||||
// Unless the pool has been closed ...
|
||||
while !self.closed.load(Ordering::Acquire) {
|
||||
let size = self.size.load(Ordering::Acquire);
|
||||
|
||||
if size >= self.options.max_size {
|
||||
// Attempt to immediately acquire a connection. This will return Some
|
||||
// if there is an idle connection in our channel.
|
||||
let mut idle = if let Some(idle) = self.pool_rx.recv().now_or_never() {
|
||||
let idle = match idle {
|
||||
Some(idle) => idle,
|
||||
|
||||
// This isn't possible. [Pool] owns the sender and [SharedPool]
|
||||
// owns the receiver.
|
||||
None => unreachable!(),
|
||||
};
|
||||
|
||||
idle
|
||||
} else if size >= self.options.max_size {
|
||||
// Too many open connections
|
||||
// Wait until one is available
|
||||
|
||||
// get the time between the deadline and now and use that as our timeout
|
||||
let max_wait = deadline
|
||||
let until = deadline
|
||||
.checked_duration_since(Instant::now())
|
||||
.ok_or(Error::PoolTimedOut)?;
|
||||
.ok_or(Error::PoolTimedOut(None))?;
|
||||
|
||||
// don't sleep forever
|
||||
let mut idle = match timeout(max_wait, self.pool_rx.recv()).await {
|
||||
let idle = match self.pool_rx.recv().timeout(until).await {
|
||||
// A connection was returned to the pool
|
||||
Ok(Some(idle)) => idle,
|
||||
Ok(None) => panic!("this isn't possible, we own a `pool_tx`"),
|
||||
// try our acquire logic again
|
||||
Err(_) => continue,
|
||||
|
||||
// This isn't possible. [Pool] owns the sender and [SharedPool]
|
||||
// owns the receiver.
|
||||
Ok(None) => unreachable!(),
|
||||
|
||||
// Timed out waiting for a connection
|
||||
// Error is not forwarded as its useless context
|
||||
Err(_) => {
|
||||
return Err(Error::PoolTimedOut(None));
|
||||
}
|
||||
};
|
||||
|
||||
if self.closed.load(Ordering::Acquire) {
|
||||
idle.close().await;
|
||||
self.size.fetch_sub(1, Ordering::AcqRel);
|
||||
return Err(Error::PoolClosed);
|
||||
idle
|
||||
} else if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size {
|
||||
// pool has slots available; open a new connection
|
||||
match self.connect(deadline).await {
|
||||
Ok(Some(conn)) => return Ok(conn),
|
||||
// [size] is internally decremented on _retry_ and _error_
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if should_reap(&idle, &self.options) {
|
||||
// close the connection but don't really care about the result
|
||||
idle.close().await;
|
||||
} else {
|
||||
match idle.raw.inner.ping().await {
|
||||
Ok(_) => return Ok(idle.raw),
|
||||
// an error here means the other end has hung up or we lost connectivity
|
||||
// either way we're fine to just discard the connection
|
||||
// the error itself here isn't necessarily unexpected so WARN is too strong
|
||||
Err(e) => log::info!("ping on idle connection returned error: {}", e),
|
||||
}
|
||||
// If pool was closed while waiting for a connection,
|
||||
// release the connection
|
||||
if self.closed.load(Ordering::Acquire) {
|
||||
idle.close().await;
|
||||
self.size.fetch_sub(1, Ordering::AcqRel);
|
||||
|
||||
// make sure the idle connection is gone explicitly before we open one
|
||||
drop(idle);
|
||||
}
|
||||
|
||||
// while we're still at max size, acquire a new connection
|
||||
return self.new_conn(deadline).await;
|
||||
return Err(Error::PoolClosed);
|
||||
}
|
||||
|
||||
if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size {
|
||||
// Open a new connection and return directly
|
||||
return self.new_conn(deadline).await;
|
||||
// If the connection we pulled has expired, close the connection and
|
||||
// immediately create a new connection
|
||||
if is_beyond_lifetime(&idle.raw, &self.options) {
|
||||
// close the connection but don't really care about the result
|
||||
let _ = idle.close().await;
|
||||
} else if self.options.test_on_acquire {
|
||||
// TODO: Check on acquire should be a configuration setting
|
||||
// Check that the connection is still live
|
||||
match idle.raw.inner.ping().await {
|
||||
// Connection still seems to respond
|
||||
Ok(_) => return Ok(idle.raw),
|
||||
|
||||
// an error here means the other end has hung up or we lost connectivity
|
||||
// either way we're fine to just discard the connection
|
||||
// the error itself here isn't necessarily unexpected so WARN is too strong
|
||||
Err(e) => log::info!("ping on idle connection returned error: {}", e),
|
||||
}
|
||||
|
||||
// make sure the idle connection is gone explicitly before we open one
|
||||
// this will close the resources for the stream on our side
|
||||
drop(idle);
|
||||
} else {
|
||||
// No need to re-connect
|
||||
return Ok(idle.raw);
|
||||
}
|
||||
|
||||
// while there is still room in the pool, acquire a new connection
|
||||
match self.connect(deadline).await {
|
||||
Ok(Some(conn)) => return Ok(conn),
|
||||
// [size] is internally decremented on _retry_ and _error_
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
Err(Error::PoolClosed)
|
||||
}
|
||||
|
||||
async fn new_conn(&self, deadline: Instant) -> crate::Result<Raw<DB>> {
|
||||
while Instant::now() < deadline {
|
||||
if self.closed.load(Ordering::Acquire) {
|
||||
self.size.fetch_sub(1, Ordering::AcqRel);
|
||||
return Err(Error::PoolClosed);
|
||||
}
|
||||
|
||||
// result here is `Result<Result<DB, Error>, TimeoutError>`
|
||||
match timeout(deadline - Instant::now(), DB::Connection::open(&self.url)).await {
|
||||
Ok(Ok(inner)) => {
|
||||
return Ok(Raw {
|
||||
inner,
|
||||
created: Instant::now(),
|
||||
})
|
||||
}
|
||||
// error while connecting, this should definitely be logged
|
||||
Ok(Err(e)) => log::warn!("error establishing a connection: {}", e),
|
||||
// timed out
|
||||
Err(_) => break,
|
||||
async fn eventually_connect(&self, deadline: Instant) -> crate::Result<Raw<DB>> {
|
||||
loop {
|
||||
// [connect] will raise an error when past deadline
|
||||
// [connect] returns None if its okay to retry
|
||||
if let Some(conn) = self.connect(deadline).await? {
|
||||
return Ok(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.size.fetch_sub(1, Ordering::AcqRel);
|
||||
Err(Error::PoolTimedOut)
|
||||
async fn connect(&self, deadline: Instant) -> crate::Result<Option<Raw<DB>>> {
|
||||
// FIXME: Code between `-` is duplicate with [acquire]
|
||||
// ---------------------------------
|
||||
|
||||
// get the time between the deadline and now and use that as our timeout
|
||||
let until = deadline
|
||||
.checked_duration_since(Instant::now())
|
||||
.ok_or(Error::PoolTimedOut(None))?;
|
||||
|
||||
// If pool was closed while waiting for a connection,
|
||||
// release the connection
|
||||
if self.closed.load(Ordering::Acquire) {
|
||||
self.size.fetch_sub(1, Ordering::AcqRel); // ?
|
||||
|
||||
return Err(Error::PoolClosed);
|
||||
}
|
||||
|
||||
// ---------------------------------
|
||||
|
||||
// result here is `Result<Result<DB, Error>, TimeoutError>`
|
||||
match DB::Connection::open(&self.url).timeout(until).await {
|
||||
// successfully established connection
|
||||
Ok(Ok(inner)) => {
|
||||
Ok(Some(Raw {
|
||||
inner,
|
||||
// remember when it was created so we can expire it
|
||||
// if there is a [max_lifetime] set
|
||||
created: Instant::now(),
|
||||
}))
|
||||
}
|
||||
|
||||
// IO error while connecting, this should definitely be logged
|
||||
// and we should attempt to retry
|
||||
Ok(Err(crate::Error::Io(e))) => {
|
||||
log::warn!("error establishing a connection: {}", e);
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
// Any other error while connection should immediately
|
||||
// terminate and bubble the error up
|
||||
Ok(Err(e)) => Err(e),
|
||||
|
||||
// timed out
|
||||
Err(e) => {
|
||||
self.size.fetch_sub(1, Ordering::AcqRel); // ?
|
||||
Err(Error::PoolTimedOut(Some(Box::new(e))))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -215,11 +285,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn should_reap<DB: Database>(idle: &Idle<DB>, options: &Options) -> bool {
|
||||
// check if idle connection was within max lifetime (or not set)
|
||||
options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max)
|
||||
// and if connection wasn't idle too long (or not set)
|
||||
&& options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout)
|
||||
// NOTE: Function names here are bizzare. Helpful help would be appreciated.
|
||||
|
||||
fn is_beyond_lifetime<DB: Database>(raw: &Raw<DB>, options: &Options) -> bool {
|
||||
// check if connection was within max lifetime (or not set)
|
||||
options
|
||||
.max_lifetime
|
||||
.map_or(false, |max| raw.created.elapsed() > max)
|
||||
}
|
||||
|
||||
fn is_beyond_idle<DB: Database>(idle: &Idle<DB>, options: &Options) -> bool {
|
||||
// if connection wasn't idle too long (or not set)
|
||||
options
|
||||
.idle_timeout
|
||||
.map_or(false, |timeout| idle.since.elapsed() > timeout)
|
||||
}
|
||||
|
||||
/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections
|
||||
@ -250,7 +329,10 @@ where
|
||||
let (reap, keep) = (0..max_reaped)
|
||||
// only connections waiting in the queue
|
||||
.filter_map(|_| pool.pool_rx.recv().now_or_never()?)
|
||||
.partition::<Vec<_>, _>(|conn| should_reap(conn, &pool.options));
|
||||
.partition::<Vec<_>, _>(|conn| {
|
||||
is_beyond_idle(conn, &pool.options)
|
||||
|| is_beyond_lifetime(&conn.raw, &pool.options)
|
||||
});
|
||||
|
||||
for conn in keep {
|
||||
// return these connections to the pool first
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user