From 8d9e949cc22dad166ef48cc0e17e11691d44ca91 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Sat, 23 Nov 2019 16:08:36 +0000 Subject: [PATCH] implement pool idle reaper, format relevant files --- sqlx-core/src/backend.rs | 2 +- sqlx-core/src/error.rs | 2 + sqlx-core/src/pool.rs | 129 ++++++++++++++++++++++++++------------- 3 files changed, 90 insertions(+), 43 deletions(-) diff --git a/sqlx-core/src/backend.rs b/sqlx-core/src/backend.rs index 62e15947..4610db8b 100644 --- a/sqlx-core/src/backend.rs +++ b/sqlx-core/src/backend.rs @@ -12,7 +12,7 @@ use futures_core::stream::BoxStream; /// Instead [sqlx::Connection] or [sqlx::Pool] should be used instead, /// which provide concurrent access and typed retrieval of results. #[async_trait] -pub trait Backend: HasTypeMetadata + Send + Sync + Sized { +pub trait Backend: HasTypeMetadata + Send + Sync + Sized + 'static { /// The concrete `QueryParameters` implementation for this backend. type QueryParameters: QueryParameters; diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index b8cc9d7f..08c59e53 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -76,6 +76,8 @@ impl Display for Error { Error::TimedOut => f.write_str("timed out while waiting for an open connection"), + Error::PoolClosed => f.write_str("attempted to acquire a connection on a closed pool"), + Error::__Nonexhaustive => unreachable!(), } } diff --git a/sqlx-core/src/pool.rs b/sqlx-core/src/pool.rs index 6ef9302b..533ec5ea 100644 --- a/sqlx-core/src/pool.rs +++ b/sqlx-core/src/pool.rs @@ -8,18 +8,15 @@ use crate::{ }; use futures_channel::oneshot; use futures_core::{future::BoxFuture, stream::BoxStream}; -use futures_util::{future::{FutureExt, TryFutureExt}, stream::StreamExt}; use futures_util::future::{AbortHandle, AbortRegistration}; -use std::{ - future::Future, - marker::PhantomData, - ops::{Deref, DerefMut}, - sync::{ - atomic::{AtomicU32, AtomicUsize, AtomicBool, Ordering}, - Arc, - }, - time::{Duration, Instant}, +use futures_util::{ + future::{FutureExt, TryFutureExt}, + stream::StreamExt, }; +use std::{future::Future, marker::PhantomData, ops::{Deref, DerefMut}, sync::{ + atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, + Arc, +}, time::{Duration, Instant}, cmp}; use async_std::future::timeout; use async_std::sync::{channel, Receiver, Sender}; @@ -36,9 +33,7 @@ where { /// Creates a connection pool with the default configuration. pub async fn new(url: &str) -> crate::Result { - Ok(Pool(Arc::new( - SharedPool::new(url, Options::default()).await?, - ))) + Ok(Pool(SharedPool::new_arc(url, Options::default()).await?)) } /// Returns a [Builder] to configure a new connection pool. @@ -91,8 +86,8 @@ where self.0.options.connect_timeout } - /// Returns the configured mimimum idle connection count. - pub fn min_idle(&self) -> Option { + /// Returns the configured minimum idle connection count. + pub fn min_idle(&self) -> u32 { self.0.options.min_idle } @@ -147,8 +142,8 @@ where self } - pub fn min_idle(mut self, min_idle: impl Into>) -> Self { - self.options.min_idle = min_idle.into(); + pub fn min_idle(mut self, min_idle: u32) -> Self { + self.options.min_idle = min_idle; self } @@ -163,14 +158,14 @@ where } pub async fn build(self, url: &str) -> crate::Result> { - Ok(Pool(Arc::new(SharedPool::new(url, self.options).await?))) + Ok(Pool(SharedPool::new_arc(url, self.options).await?)) } } struct Options { max_size: u32, connect_timeout: Duration, - min_idle: Option, + min_idle: u32, max_lifetime: Option, idle_timeout: Option, } @@ -179,7 +174,7 @@ impl Default for Options { fn default() -> Self { Self { max_size: 10, - min_idle: None, + min_idle: 0, connect_timeout: Duration::from_secs(30), max_lifetime: None, idle_timeout: None, @@ -203,19 +198,23 @@ impl SharedPool where DB: Backend, { - async fn new(url: &str, options: Options) -> crate::Result { + async fn new_arc(url: &str, options: Options) -> crate::Result> { // TODO: Establish [min_idle] connections let (pool_tx, pool_rx) = channel(options.max_size as usize); - Ok(Self { + let pool = Arc::new(Self { url: url.to_owned(), pool_rx, pool_tx, size: AtomicU32::new(0), closed: AtomicBool::new(false), options, - }) + }); + + conn_reaper(&pool); + + Ok(pool) } async fn close(&self) { @@ -227,9 +226,9 @@ where // and a timeout isn't necessarily appropriate match self.pool_rx.recv().now_or_never() { Some(Some(idle)) => { - let _ = idle.raw.inner.close().await; + idle.close().await; self.size.fetch_sub(1, Ordering::AcqRel); - }, + } Some(None) => panic!("we own a Sender how did this happen"), None => task::yield_now().await, } @@ -242,7 +241,7 @@ where return None; } - Some(self.pool_rx.recv().now_or_never()??.live(&self.pool_tx)) + Some(self.pool_rx.recv().now_or_never()??.revive(&self.pool_tx)) } async fn acquire(&self) -> crate::Result> { @@ -261,11 +260,12 @@ where // Wait until one is available // get the time between the deadline and now and use that as our timeout - let max_wait = deadline.checked_duration_since(Instant::now()) + let max_wait = deadline + .checked_duration_since(Instant::now()) .ok_or(Error::TimedOut)?; // don't sleep forever - let idle = match timeout(max_wait, self.pool_rx.recv()).await { + let mut idle = match timeout(max_wait, self.pool_rx.recv()).await { Ok(Some(idle)) => idle, Ok(None) => panic!("this isn't possible, we own a `pool_tx`"), // try our acquire logic again @@ -273,16 +273,15 @@ where }; if self.closed.load(Ordering::Acquire) { - let _ = idle.raw.inner.close().await; + idle.close().await; self.size.fetch_sub(1, Ordering::AcqRel); return Err(Error::PoolClosed); } - // check if idle connection was within max lifetime (or not set) - if self.options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max) - // and if connection wasn't idle too long (or not set) - && self.options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout) - { + if should_reap(&idle, &self.options) { + // close the connection but don't really care about the result + idle.close().await; + } else { match idle.raw.inner.ping().await { Ok(_) => return Ok(idle.revive(&self.pool_tx)), // an error here means the other end has hung up or we lost connectivity @@ -290,21 +289,18 @@ where // the error itself here isn't necessarily unexpected so WARN is too strong Err(e) => log::info!("ping on idle connection returned error: {}", e), } - } else { - // close the connection but don't really care about the result - let _ = idle.raw.inner.close().await; + + // make sure the idle connection is gone explicitly before we open one + drop(idle); } - // either case, make sure the idle connection is gone explicitly before we open one - drop(idle); - // while we're still at max size, acquire a new connection - return self.new_conn(deadline).await + return self.new_conn(deadline).await; } if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { // Open a new connection and return directly - return self.new_conn(deadline).await + return self.new_conn(deadline).await; } } @@ -469,6 +465,10 @@ impl Idle { pool_tx: Some(pool_tx.clone()), } } + + async fn close(self) { + let _ = self.raw.inner.close().await; + } } pub(crate) struct Live @@ -539,3 +539,48 @@ impl Drop for Live { self.release_mut() } } + +fn should_reap(idle: &Idle, options: &Options) -> bool { + // check if idle connection was within max lifetime (or not set) + options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max) + // and if connection wasn't idle too long (or not set) + && options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout) +} + +/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections +fn conn_reaper(pool: &Arc>) { + if pool.options.max_lifetime.is_some() || pool.options.idle_timeout.is_some() { + let pool = pool.clone(); + + let reap_period = cmp::min(pool.options.max_lifetime, pool.options.idle_timeout) + .expect("one of max_lifetime/idle_timeout should be `Some` at this point"); + + task::spawn(async move { + while !pool.closed.load(Ordering::AcqRel) { + // reap at most the current size minus the minimum idle + let max_reaped = pool + .size + .load(Ordering::Acquire) + .saturating_sub(pool.options.min_idle); + + // collect connections to reap + let (reap, mut keep) = (0..max_reaped) + // only connections waiting in the queue + .filter_map(|_| pool.pool_rx.recv().now_or_never()?) + .partition::, _>(|conn| should_reap(conn, &pool.options)); + + for conn in keep { + // return these connections to the pool first + pool.pool_tx.send(conn).await; + } + + for conn in reap { + conn.close().await; + pool.size.fetch_sub(1, Ordering::AcqRel); + } + + task::sleep(reap_period).await; + } + }); + } +}