fix(pool): prevent calls to acquire() from cutting in line

add a few more checks for `is_closed`
This commit is contained in:
Austin Bonander 2020-06-29 19:24:05 -07:00 committed by Ryan Leckey
parent 0824723765
commit 8c2acaa258

View File

@ -55,6 +55,10 @@ impl<DB: Database> SharedPool<DB> {
#[inline] #[inline]
pub(super) fn try_acquire(&self) -> Option<Floating<'_, Live<DB>>> { pub(super) fn try_acquire(&self) -> Option<Floating<'_, Live<DB>>> {
// don't cut in line
if !self.waiters.is_empty() {
return None;
}
Some(self.pop_idle()?.into_live()) Some(self.pop_idle()?.into_live())
} }
@ -78,8 +82,12 @@ impl<DB: Database> SharedPool<DB> {
/// Try to atomically increment the pool size for a new connection. /// Try to atomically increment the pool size for a new connection.
/// ///
/// Returns `None` if we are at max_size. /// Returns `None` if we are at max_size or if the pool is closed.
fn try_increment_size(&self) -> Option<DecrementSizeGuard<'_>> { fn try_increment_size(&self) -> Option<DecrementSizeGuard<'_>> {
if self.is_closed() {
return None;
}
let mut size = self.size(); let mut size = self.size();
while size < self.options.max_size { while size < self.options.max_size {
@ -100,6 +108,10 @@ impl<DB: Database> SharedPool<DB> {
/// ///
/// Returns an error if `deadline` elapses before we are woken. /// Returns an error if `deadline` elapses before we are woken.
async fn wait_for_conn(&self, deadline: Instant) -> Result<(), Error> { async fn wait_for_conn(&self, deadline: Instant) -> Result<(), Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
let mut waker_pushed = false; let mut waker_pushed = false;
timeout( timeout(
@ -146,17 +158,20 @@ impl<DB: Database> SharedPool<DB> {
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> { pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> {
let start = Instant::now(); let start = Instant::now();
let deadline = start + self.options.connect_timeout; let deadline = start + self.options.connect_timeout;
let mut waited = false;
// Unless the pool has been closed ... // Unless the pool has been closed ...
while !self.is_closed() { while !self.is_closed() {
// Don't cut in line
if waited || self.waiters.is_empty() {
// Attempt to immediately acquire a connection. This will return Some // Attempt to immediately acquire a connection. This will return Some
// if there is an idle connection in our channel. // if there is an idle connection in our channel.
if let Ok(conn) = self.idle_conns.pop() { if let Some(conn) = self.pop_idle() {
let conn = Floating::from_idle(conn, self);
if let Some(live) = check_conn(conn, &self.options).await { if let Some(live) = check_conn(conn, &self.options).await {
return Ok(live); return Ok(live);
} }
} }
}
if let Some(guard) = self.try_increment_size() { if let Some(guard) = self.try_increment_size() {
// pool has slots available; open a new connection // pool has slots available; open a new connection
@ -171,6 +186,8 @@ impl<DB: Database> SharedPool<DB> {
// Wait for a connection to become available (or we are allowed to open a new one) // Wait for a connection to become available (or we are allowed to open a new one)
// Returns an error if `deadline` passes // Returns an error if `deadline` passes
self.wait_for_conn(deadline).await?; self.wait_for_conn(deadline).await?;
waited = true;
} }
Err(Error::PoolClosed) Err(Error::PoolClosed)