diff --git a/sqlx-core/src/backend.rs b/sqlx-core/src/backend.rs index d656f122..4e1fb4ef 100644 --- a/sqlx-core/src/backend.rs +++ b/sqlx-core/src/backend.rs @@ -10,7 +10,6 @@ use futures_core::future::BoxFuture; /// important related traits as associated types. /// /// This trait is not intended to be used directly. -/// Instead [sqlx::Connection] or [sqlx::Pool] should be used instead. pub trait Backend: Executor + HasTypeMetadata + Send + Sync + Sized + 'static { diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 1caab08f..09e705a1 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -20,19 +20,18 @@ where DB: Backend, { live: Live, - pool: Option>>, } impl Connection where DB: Backend, { - pub(crate) fn new(live: Live, pool: Option>>) -> Self { - Self { live, pool } + pub(crate) fn new(live: Live) -> Self { + Self { live } } pub async fn open(url: &str) -> crate::Result { - Ok(Self::new(Live::unpooled(DB::open(url).await?), None)) + Ok(Self::new(Live::unpooled(DB::open(url).await?))) } } diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index 02e34491..08c59e53 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -4,6 +4,8 @@ use std::{ io, }; +use async_std::future::TimeoutError; + /// A convenient Result instantiation appropriate for SQLx. pub type Result = std::result::Result; @@ -35,6 +37,13 @@ pub enum Error { /// Context is provided by the included error message. Protocol(Box), + /// A `Pool::acquire()` timed out due to connections not becoming available or + /// because another task encountered too many errors while trying to open a new connection. + TimedOut, + + /// `Pool::close()` was called while we were waiting in `Pool::acquire()`. + PoolClosed, + // TODO: Remove and replace with `#[non_exhaustive]` when possible #[doc(hidden)] __Nonexhaustive, @@ -65,6 +74,10 @@ impl Display for Error { Error::Protocol(ref err) => f.write_str(err), + Error::TimedOut => f.write_str("timed out while waiting for an open connection"), + + Error::PoolClosed => f.write_str("attempted to acquire a connection on a closed pool"), + Error::__Nonexhaustive => unreachable!(), } } @@ -77,6 +90,12 @@ impl From for Error { } } +impl From for Error { + fn from(_: TimeoutError) -> Self { + Error::TimedOut + } +} + impl From> for Error { #[inline] fn from(err: ProtocolError) -> Self { diff --git a/sqlx-core/src/pool.rs b/sqlx-core/src/pool.rs deleted file mode 100644 index b7a49283..00000000 --- a/sqlx-core/src/pool.rs +++ /dev/null @@ -1,447 +0,0 @@ -use crate::{ - backend::Backend, - connection::Connection, - describe::Describe, - error::Error, - executor::Executor, - params::IntoQueryParameters, - row::{FromRow, Row}, -}; -use async_std::{ - sync::{channel, Receiver, Sender}, - task, -}; -use futures_channel::oneshot; -use futures_core::{future::BoxFuture, stream::BoxStream}; -use futures_util::{future::FutureExt, stream::StreamExt}; -use std::{ - future::Future, - marker::PhantomData, - ops::{Deref, DerefMut}, - sync::{ - atomic::{AtomicU32, AtomicUsize, Ordering}, - Arc, - }, - time::{Duration, Instant}, -}; - -/// A pool of database connections. -pub struct Pool(Arc>) -where - DB: Backend; - -impl Pool -where - DB: Backend, -{ - /// Creates a connection pool with the default configuration. - pub async fn new(url: &str) -> crate::Result { - Ok(Pool(Arc::new( - SharedPool::new(url, Options::default()).await?, - ))) - } - - /// Returns a [Builder] to configure a new connection pool. - pub fn builder() -> Builder { - Builder::new() - } - - /// Retrieves a connection from the pool. - /// - /// Waits for at most the configured connection timeout before returning an error. - pub async fn acquire(&self) -> crate::Result> { - let live = self.0.acquire().await?; - Ok(Connection::new(live, Some(Arc::clone(&self.0)))) - } - - /// Attempts to retrieve a connection from the pool if there is one available. - /// - /// Returns `None` if there are no idle connections available in the pool. - /// This method will not block waiting to establish a new connection. - pub fn try_acquire(&self) -> Option> { - let live = self.0.try_acquire()?; - Some(Connection::new(live, Some(Arc::clone(&self.0)))) - } - - /// Ends the use of a connection pool. Prevents any new connections - /// and will close all active connections when they are returned to the pool. - /// - /// Does not resolve until all connections are closed. - pub async fn close(&self) { - unimplemented!() - } - - /// Returns the number of connections currently being managed by the pool. - pub fn size(&self) -> u32 { - self.0.size.load(Ordering::Acquire) - } - - /// Returns the number of idle connections. - pub fn idle(&self) -> usize { - self.0.pool_rx.len() - } - - /// Returns the configured maximum pool size. - pub fn max_size(&self) -> u32 { - self.0.options.max_size - } - - /// Returns the configured mimimum idle connection count. - pub fn min_idle(&self) -> Option { - self.0.options.min_idle - } - - /// Returns the configured maximum connection lifetime. - pub fn max_lifetime(&self) -> Option { - self.0.options.max_lifetime - } - - /// Returns the configured idle connection timeout. - pub fn idle_timeout(&self) -> Option { - self.0.options.idle_timeout - } -} - -/// Returns a new [Pool] tied to the same shared connection pool. -impl Clone for Pool -where - DB: Backend, -{ - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } -} - -#[derive(Default)] -pub struct Builder -where - DB: Backend, -{ - phantom: PhantomData, - options: Options, -} - -impl Builder -where - DB: Backend, -{ - fn new() -> Self { - Self { - phantom: PhantomData, - options: Options::default(), - } - } - - pub fn max_size(mut self, max_size: u32) -> Self { - self.options.max_size = max_size; - self - } - - pub fn min_idle(mut self, min_idle: impl Into>) -> Self { - self.options.min_idle = min_idle.into(); - self - } - - pub fn max_lifetime(mut self, max_lifetime: impl Into>) -> Self { - self.options.max_lifetime = max_lifetime.into(); - self - } - - pub fn idle_timeout(mut self, idle_timeout: impl Into>) -> Self { - self.options.idle_timeout = idle_timeout.into(); - self - } - - pub async fn build(self, url: &str) -> crate::Result> { - Ok(Pool(Arc::new(SharedPool::new(url, self.options).await?))) - } -} - -struct Options { - max_size: u32, - min_idle: Option, - max_lifetime: Option, - idle_timeout: Option, -} - -impl Default for Options { - fn default() -> Self { - Self { - max_size: 10, - min_idle: None, - max_lifetime: None, - idle_timeout: None, - } - } -} - -pub(crate) struct SharedPool -where - DB: Backend, -{ - url: String, - pool_rx: Receiver>, - pool_tx: Sender>, - size: AtomicU32, - options: Options, -} - -impl SharedPool -where - DB: Backend, -{ - async fn new(url: &str, options: Options) -> crate::Result { - // TODO: Establish [min_idle] connections - - let (pool_tx, pool_rx) = channel(options.max_size as usize); - - Ok(Self { - url: url.to_owned(), - pool_rx, - pool_tx, - size: AtomicU32::new(0), - options, - }) - } - - #[inline] - fn try_acquire(&self) -> Option> { - Some(self.pool_rx.recv().now_or_never()??.live(&self.pool_tx)) - } - - async fn acquire(&self) -> crate::Result> { - if let Some(live) = self.try_acquire() { - return Ok(live); - } - - loop { - let size = self.size.load(Ordering::Acquire); - - if size >= self.options.max_size { - // Too many open connections - // Wait until one is available - - // Waiters are not dropped unless the pool is dropped - // which would drop this future - return Ok(self - .pool_rx - .recv() - .await - .expect("waiter dropped without dropping pool") - .live(&self.pool_tx)); - } - - if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { - // Open a new connection and return directly - let raw = DB::open(&self.url).await?; - return Ok(Live::pooled(raw, &self.pool_tx)); - } - } - } -} - -impl Executor for Pool -where - DB: Backend, -{ - type Backend = DB; - - fn execute<'e, 'q: 'e, I: 'e>( - &'e mut self, - query: &'q str, - params: I, - ) -> BoxFuture<'e, crate::Result> - where - I: IntoQueryParameters + Send, - { - Box::pin(async move { <&Pool as Executor>::execute(&mut &*self, query, params).await }) - } - - fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( - &'e mut self, - query: &'q str, - params: I, - ) -> BoxStream<'e, crate::Result> - where - I: IntoQueryParameters + Send, - T: FromRow + Send + Unpin, - { - Box::pin(async_stream::try_stream! { - let mut self_ = &*self; - let mut s = <&Pool as Executor>::fetch(&mut self_, query, params); - - while let Some(row) = s.next().await.transpose()? { - yield row; - } - }) - } - - fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( - &'e mut self, - query: &'q str, - params: I, - ) -> BoxFuture<'e, crate::Result>> - where - I: IntoQueryParameters + Send, - T: FromRow + Send, - { - Box::pin(async move { - <&Pool as Executor>::fetch_optional(&mut &*self, query, params).await - }) - } - - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(async move { <&Pool as Executor>::describe(&mut &*self, query).await }) - } -} - -impl Executor for &'_ Pool -where - DB: Backend, -{ - type Backend = DB; - - fn execute<'e, 'q: 'e, I: 'e>( - &'e mut self, - query: &'q str, - params: I, - ) -> BoxFuture<'e, crate::Result> - where - I: IntoQueryParameters + Send, - { - Box::pin(async move { self.0.acquire().await?.execute(query, params).await }) - } - - fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( - &'e mut self, - query: &'q str, - params: I, - ) -> BoxStream<'e, crate::Result> - where - I: IntoQueryParameters + Send, - T: FromRow + Send + Unpin, - { - Box::pin(async_stream::try_stream! { - let mut live = self.0.acquire().await?; - let mut s = live.fetch(query, params); - - while let Some(row) = s.next().await.transpose()? { - yield row; - } - }) - } - - fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( - &'e mut self, - query: &'q str, - params: I, - ) -> BoxFuture<'e, crate::Result>> - where - I: IntoQueryParameters + Send, - T: FromRow + Send, - { - Box::pin(async move { self.0.acquire().await?.fetch_optional(query, params).await }) - } - - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(async move { self.0.acquire().await?.describe(query).await }) - } -} - -struct Raw { - pub(crate) inner: DB, - pub(crate) created: Instant, -} - -struct Idle -where - DB: Backend, -{ - raw: Raw, - #[allow(unused)] - since: Instant, -} - -impl Idle { - fn live(self, pool_tx: &Sender>) -> Live { - Live { - raw: Some(self.raw), - pool_tx: Some(pool_tx.clone()), - } - } -} - -pub(crate) struct Live -where - DB: Backend, -{ - raw: Option>, - pool_tx: Option>>, -} - -impl Live { - pub fn unpooled(raw: DB) -> Self { - Live { - raw: Some(Raw { - inner: raw, - created: Instant::now(), - }), - pool_tx: None, - } - } - - fn pooled(raw: DB, pool_tx: &Sender>) -> Self { - Live { - raw: Some(Raw { - inner: raw, - created: Instant::now(), - }), - pool_tx: Some(pool_tx.clone()), - } - } - - pub fn release(mut self) { - self.release_mut() - } - - fn release_mut(&mut self) { - // `.release_mut()` will be called twice if `.release()` is called - if let (Some(raw), Some(pool_tx)) = (self.raw.take(), self.pool_tx.as_ref()) { - pool_tx - .send(Idle { - raw, - since: Instant::now(), - }) - .now_or_never() - .expect("(bug) connection released into a full pool") - } - } -} - -const DEREF_ERR: &str = "(bug) connection already released to pool"; - -impl Deref for Live { - type Target = DB; - - fn deref(&self) -> &DB { - &self.raw.as_ref().expect(DEREF_ERR).inner - } -} - -impl DerefMut for Live { - fn deref_mut(&mut self) -> &mut DB { - &mut self.raw.as_mut().expect(DEREF_ERR).inner - } -} - -impl Drop for Live { - fn drop(&mut self) { - self.release_mut() - } -} diff --git a/sqlx-core/src/pool/executor.rs b/sqlx-core/src/pool/executor.rs new file mode 100644 index 00000000..c384380b --- /dev/null +++ b/sqlx-core/src/pool/executor.rs @@ -0,0 +1,120 @@ +use crate::{ + backend::Backend, describe::Describe, executor::Executor, params::IntoQueryParameters, + pool::Pool, row::FromRow, +}; +use futures_core::{future::BoxFuture, stream::BoxStream}; +use futures_util::StreamExt; + +impl Executor for Pool +where + DB: Backend, +{ + type Backend = DB; + + fn execute<'e, 'q: 'e, I: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxFuture<'e, crate::Result> + where + I: IntoQueryParameters + Send, + { + Box::pin(async move { <&Pool as Executor>::execute(&mut &*self, query, params).await }) + } + + fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxStream<'e, crate::Result> + where + I: IntoQueryParameters + Send, + T: FromRow + Send + Unpin, + { + Box::pin(async_stream::try_stream! { + let mut self_ = &*self; + let mut s = <&Pool as Executor>::fetch(&mut self_, query, params); + + while let Some(row) = s.next().await.transpose()? { + yield row; + } + }) + } + + fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxFuture<'e, crate::Result>> + where + I: IntoQueryParameters + Send, + T: FromRow + Send, + { + Box::pin(async move { + <&Pool as Executor>::fetch_optional(&mut &*self, query, params).await + }) + } + + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>> { + Box::pin(async move { <&Pool as Executor>::describe(&mut &*self, query).await }) + } +} + +impl Executor for &'_ Pool +where + DB: Backend, +{ + type Backend = DB; + + fn execute<'e, 'q: 'e, I: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxFuture<'e, crate::Result> + where + I: IntoQueryParameters + Send, + { + Box::pin(async move { self.0.acquire().await?.execute(query, params).await }) + } + + fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxStream<'e, crate::Result> + where + I: IntoQueryParameters + Send, + T: FromRow + Send + Unpin, + { + Box::pin(async_stream::try_stream! { + let mut live = self.0.acquire().await?; + let mut s = live.fetch(query, params); + + while let Some(row) = s.next().await.transpose()? { + yield row; + } + }) + } + + fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>( + &'e mut self, + query: &'q str, + params: I, + ) -> BoxFuture<'e, crate::Result>> + where + I: IntoQueryParameters + Send, + T: FromRow + Send, + { + Box::pin(async move { self.0.acquire().await?.fetch_optional(query, params).await }) + } + + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>> { + Box::pin(async move { self.0.acquire().await?.describe(query).await }) + } +} diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs new file mode 100644 index 00000000..71721c43 --- /dev/null +++ b/sqlx-core/src/pool/inner.rs @@ -0,0 +1,329 @@ +use crate::{ + backend::Backend, + connection::Connection, + error::Error, + executor::Executor, + params::IntoQueryParameters, + row::{FromRow, Row}, +}; +use futures_channel::oneshot; +use futures_core::{future::BoxFuture, stream::BoxStream}; +use futures_util::{ + future::{AbortHandle, AbortRegistration, FutureExt, TryFutureExt}, + stream::StreamExt, +}; +use std::{ + cmp, + future::Future, + marker::PhantomData, + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use async_std::{ + future::timeout, + sync::{channel, Receiver, Sender}, + task, +}; + +use super::Options; + +pub(crate) struct SharedPool +where + DB: Backend, +{ + url: String, + pool_rx: Receiver>, + pool_tx: Sender>, + size: AtomicU32, + closed: AtomicBool, + options: Options, +} + +impl SharedPool +where + DB: Backend, +{ + pub(crate) async fn new_arc(url: &str, options: Options) -> crate::Result> { + // TODO: Establish [min_idle] connections + + let (pool_tx, pool_rx) = channel(options.max_size as usize); + + let pool = Arc::new(Self { + url: url.to_owned(), + pool_rx, + pool_tx, + size: AtomicU32::new(0), + closed: AtomicBool::new(false), + options, + }); + + conn_reaper(&pool); + + Ok(pool) + } + + pub fn options(&self) -> &Options { + &self.options + } + + pub(crate) fn size(&self) -> u32 { + self.size.load(Ordering::Acquire) + } + + pub(crate) fn num_idle(&self) -> usize { + self.pool_rx.len() + } + + pub(crate) async fn close(&self) { + self.closed.store(true, Ordering::Release); + + while self.size.load(Ordering::Acquire) > 0 { + // don't block on the receiver because we own one Sender so it should never return + // `None`; a `select!()` would also work but that produces more complicated code + // and a timeout isn't necessarily appropriate + match self.pool_rx.recv().now_or_never() { + Some(Some(idle)) => { + idle.close().await; + self.size.fetch_sub(1, Ordering::AcqRel); + } + Some(None) => panic!("we own a Sender how did this happen"), + None => task::yield_now().await, + } + } + } + + #[inline] + pub(crate) fn try_acquire(&self) -> Option> { + if self.closed.load(Ordering::Acquire) { + return None; + } + + Some(self.pool_rx.recv().now_or_never()??.revive(&self.pool_tx)) + } + + pub(crate) async fn acquire(&self) -> crate::Result> { + let start = Instant::now(); + let deadline = start + self.options.connect_timeout; + + if let Some(live) = self.try_acquire() { + return Ok(live); + } + + while !self.closed.load(Ordering::Acquire) { + let size = self.size.load(Ordering::Acquire); + + if size >= self.options.max_size { + // Too many open connections + // Wait until one is available + + // get the time between the deadline and now and use that as our timeout + let max_wait = deadline + .checked_duration_since(Instant::now()) + .ok_or(Error::TimedOut)?; + + // don't sleep forever + let mut idle = match timeout(max_wait, self.pool_rx.recv()).await { + Ok(Some(idle)) => idle, + Ok(None) => panic!("this isn't possible, we own a `pool_tx`"), + // try our acquire logic again + Err(_) => continue, + }; + + if self.closed.load(Ordering::Acquire) { + idle.close().await; + self.size.fetch_sub(1, Ordering::AcqRel); + return Err(Error::PoolClosed); + } + + if should_reap(&idle, &self.options) { + // close the connection but don't really care about the result + idle.close().await; + } else { + match idle.raw.inner.ping().await { + Ok(_) => return Ok(idle.revive(&self.pool_tx)), + // an error here means the other end has hung up or we lost connectivity + // either way we're fine to just discard the connection + // the error itself here isn't necessarily unexpected so WARN is too strong + Err(e) => log::info!("ping on idle connection returned error: {}", e), + } + + // make sure the idle connection is gone explicitly before we open one + drop(idle); + } + + // while we're still at max size, acquire a new connection + return self.new_conn(deadline).await; + } + + if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { + // Open a new connection and return directly + return self.new_conn(deadline).await; + } + } + + Err(Error::PoolClosed) + } + + async fn new_conn(&self, deadline: Instant) -> crate::Result> { + while Instant::now() < deadline { + if self.closed.load(Ordering::Acquire) { + self.size.fetch_sub(1, Ordering::AcqRel); + return Err(Error::PoolClosed); + } + + // result here is `Result, TimeoutError>` + match timeout(deadline - Instant::now(), DB::open(&self.url)).await { + Ok(Ok(raw)) => return Ok(Live::pooled(raw, &self.pool_tx)), + // error while connecting, this should definitely be logged + Ok(Err(e)) => log::warn!("error establishing a connection: {}", e), + // timed out + Err(_) => break, + } + } + + self.size.fetch_sub(1, Ordering::AcqRel); + Err(Error::TimedOut) + } +} + +struct Raw { + pub(crate) inner: DB, + pub(crate) created: Instant, +} + +struct Idle +where + DB: Backend, +{ + raw: Raw, + #[allow(unused)] + since: Instant, +} + +impl Idle { + fn revive(self, pool_tx: &Sender>) -> Live { + Live { + raw: Some(self.raw), + pool_tx: Some(pool_tx.clone()), + } + } + + async fn close(self) { + let _ = self.raw.inner.close().await; + } +} + +pub(crate) struct Live +where + DB: Backend, +{ + raw: Option>, + pool_tx: Option>>, +} + +impl Live { + pub fn unpooled(raw: DB) -> Self { + Live { + raw: Some(Raw { + inner: raw, + created: Instant::now(), + }), + pool_tx: None, + } + } + + fn pooled(raw: DB, pool_tx: &Sender>) -> Self { + Live { + raw: Some(Raw { + inner: raw, + created: Instant::now(), + }), + pool_tx: Some(pool_tx.clone()), + } + } + + fn release_mut(&mut self) { + // `.release_mut()` will be called twice if `.release()` is called + if let (Some(raw), Some(pool_tx)) = (self.raw.take(), self.pool_tx.as_ref()) { + pool_tx + .send(Idle { + raw, + since: Instant::now(), + }) + .now_or_never() + .expect("(bug) connection released into a full pool") + } + } +} + +const DEREF_ERR: &str = "(bug) connection already released to pool"; + +impl Deref for Live { + type Target = DB; + + fn deref(&self) -> &DB { + &self.raw.as_ref().expect(DEREF_ERR).inner + } +} + +impl DerefMut for Live { + fn deref_mut(&mut self) -> &mut DB { + &mut self.raw.as_mut().expect(DEREF_ERR).inner + } +} + +impl Drop for Live { + fn drop(&mut self) { + self.release_mut() + } +} + +fn should_reap(idle: &Idle, options: &Options) -> bool { + // check if idle connection was within max lifetime (or not set) + options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max) + // and if connection wasn't idle too long (or not set) + && options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout) +} + +/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections +fn conn_reaper(pool: &Arc>) { + if pool.options.max_lifetime.is_some() || pool.options.idle_timeout.is_some() { + let pool = pool.clone(); + + let reap_period = cmp::min(pool.options.max_lifetime, pool.options.idle_timeout) + .expect("one of max_lifetime/idle_timeout should be `Some` at this point"); + + task::spawn(async move { + while !pool.closed.load(Ordering::AcqRel) { + // reap at most the current size minus the minimum idle + let max_reaped = pool + .size + .load(Ordering::Acquire) + .saturating_sub(pool.options.min_idle); + + // collect connections to reap + let (reap, keep) = (0..max_reaped) + // only connections waiting in the queue + .filter_map(|_| pool.pool_rx.recv().now_or_never()?) + .partition::, _>(|conn| should_reap(conn, &pool.options)); + + for conn in keep { + // return these connections to the pool first + pool.pool_tx.send(conn).await; + } + + for conn in reap { + conn.close().await; + pool.size.fetch_sub(1, Ordering::AcqRel); + } + + task::sleep(reap_period).await; + } + }); + } +} diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs new file mode 100644 index 00000000..4ecf074a --- /dev/null +++ b/sqlx-core/src/pool/mod.rs @@ -0,0 +1,128 @@ +use crate::{ + backend::Backend, connection::Connection, error::Error, executor::Executor, + params::IntoQueryParameters, row::FromRow, Row, +}; +use futures_channel::oneshot; +use futures_core::{future::BoxFuture, stream::BoxStream}; +use futures_util::{ + future::{AbortHandle, AbortRegistration, FutureExt, TryFutureExt}, + stream::StreamExt, +}; +use std::{ + cmp, + future::Future, + marker::PhantomData, + ops::{Deref, DerefMut}, + sync::{ + atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use async_std::{ + future::timeout, + sync::{channel, Receiver, Sender}, + task, +}; + +pub(crate) use self::inner::{Live, SharedPool}; +use self::options::Options; + +pub use self::options::Builder; + +mod executor; +mod inner; +mod options; + +/// A pool of database connections. +pub struct Pool(Arc>) +where + DB: Backend; + +impl Pool +where + DB: Backend, +{ + /// Creates a connection pool with the default configuration. + pub async fn new(url: &str) -> crate::Result { + Self::with_options(url, Options::default()).await + } + + async fn with_options(url: &str, options: Options) -> crate::Result { + Ok(Pool(SharedPool::new_arc(url, options).await?)) + } + + /// Returns a [Builder] to configure a new connection pool. + pub fn builder() -> Builder { + Builder::new() + } + + /// Retrieves a connection from the pool. + /// + /// Waits for at most the configured connection timeout before returning an error. + pub async fn acquire(&self) -> crate::Result> { + self.0.acquire().await.map(Connection::new) + } + + /// Attempts to retrieve a connection from the pool if there is one available. + /// + /// Returns `None` if there are no idle connections available in the pool. + /// This method will not block waiting to establish a new connection. + pub fn try_acquire(&self) -> Option> { + self.0.try_acquire().map(Connection::new) + } + + /// Ends the use of a connection pool. Prevents any new connections + /// and will close all active connections when they are returned to the pool. + /// + /// Does not resolve until all connections are closed. + pub async fn close(&self) { + let _ = self.0.close().await; + } + + /// Returns the number of connections currently being managed by the pool. + pub fn size(&self) -> u32 { + self.0.size() + } + + /// Returns the number of idle connections. + pub fn idle(&self) -> usize { + self.0.num_idle() + } + + /// Returns the configured maximum pool size. + pub fn max_size(&self) -> u32 { + self.0.options().max_size + } + + /// Returns the maximum time spent acquiring a new connection before an error is returned. + pub fn connect_timeout(&self) -> Duration { + self.0.options().connect_timeout + } + + /// Returns the configured minimum idle connection count. + pub fn min_idle(&self) -> u32 { + self.0.options().min_idle + } + + /// Returns the configured maximum connection lifetime. + pub fn max_lifetime(&self) -> Option { + self.0.options().max_lifetime + } + + /// Returns the configured idle connection timeout. + pub fn idle_timeout(&self) -> Option { + self.0.options().idle_timeout + } +} + +/// Returns a new [Pool] tied to the same shared connection pool. +impl Clone for Pool +where + DB: Backend, +{ + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs new file mode 100644 index 00000000..438d0fb1 --- /dev/null +++ b/sqlx-core/src/pool/options.rs @@ -0,0 +1,75 @@ +use std::{marker::PhantomData, time::Duration}; + +use crate::Backend; + +use super::Pool; + +#[derive(Default)] +pub struct Builder +where + DB: Backend, +{ + phantom: PhantomData, + options: Options, +} + +impl Builder +where + DB: Backend, +{ + pub fn new() -> Self { + Self { + phantom: PhantomData, + options: Options::default(), + } + } + + pub fn max_size(mut self, max_size: u32) -> Self { + self.options.max_size = max_size; + self + } + + pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self { + self.options.connect_timeout = connect_timeout; + self + } + + pub fn min_idle(mut self, min_idle: u32) -> Self { + self.options.min_idle = min_idle; + self + } + + pub fn max_lifetime(mut self, max_lifetime: impl Into>) -> Self { + self.options.max_lifetime = max_lifetime.into(); + self + } + + pub fn idle_timeout(mut self, idle_timeout: impl Into>) -> Self { + self.options.idle_timeout = idle_timeout.into(); + self + } + + pub async fn build(self, url: &str) -> crate::Result> { + Pool::with_options(url, self.options).await + } +} + +pub(crate) struct Options { + pub max_size: u32, + pub connect_timeout: Duration, + pub min_idle: u32, + pub max_lifetime: Option, + pub idle_timeout: Option, +} + +impl Default for Options { + fn default() -> Self { + Self { + max_size: 10, + min_idle: 0, + connect_timeout: Duration::from_secs(30), + max_lifetime: None, + idle_timeout: None, + } + } +}