use crate::{ backend::Backend, connection::Connection, error::Error, executor::Executor, params::IntoQueryParameters, row::{FromRow, Row}, }; use futures_channel::oneshot; use futures_core::{future::BoxFuture, stream::BoxStream}; use futures_util::{future::FutureExt, stream::StreamExt}; use std::{ future::Future, marker::PhantomData, ops::{Deref, DerefMut}, sync::{ atomic::{AtomicU32, AtomicUsize, Ordering}, Arc, }, time::{Duration, Instant}, }; use async_std::sync::{channel, Receiver, Sender}; use async_std::task; /// A pool of database connections. pub struct Pool(Arc>) where DB: Backend; impl Pool where DB: Backend, { /// Creates a connection pool with the default configuration. pub async fn new(url: &str) -> crate::Result { Ok(Pool(Arc::new( SharedPool::new(url, Options::default()).await?, ))) } /// Returns a [Builder] to configure a new connection pool. pub fn builder() -> Builder { Builder::new() } /// Retrieves a connection from the pool. /// /// Waits for at most the configured connection timeout before returning an error. pub async fn acquire(&self) -> crate::Result> { let live = self.0.acquire().await?; Ok(Connection::new(live, Some(Arc::clone(&self.0)))) } /// Attempts to retrieve a connection from the pool if there is one available. /// /// Returns `None` if there are no idle connections available in the pool. /// This method will not block waiting to establish a new connection. pub fn try_acquire(&self) -> Option> { let live = self.0.try_acquire()?; Some(Connection::new(live, Some(Arc::clone(&self.0)))) } /// Ends the use of a connection pool. Prevents any new connections /// and will close all active connections when they are returned to the pool. /// /// Does not resolve until all connections are closed. pub async fn close(&self) { unimplemented!() } /// Returns the number of connections currently being managed by the pool. pub fn size(&self) -> u32 { self.0.size.load(Ordering::Acquire) } /// Returns the number of idle connections. pub fn idle(&self) -> usize { self.0.pool_rx.len() } /// Returns the configured maximum pool size. pub fn max_size(&self) -> u32 { self.0.options.max_size } /// Returns the configured mimimum idle connection count. pub fn min_idle(&self) -> Option { self.0.options.min_idle } /// Returns the configured maximum connection lifetime. pub fn max_lifetime(&self) -> Option { self.0.options.max_lifetime } /// Returns the configured idle connection timeout. pub fn idle_timeout(&self) -> Option { self.0.options.idle_timeout } } /// Returns a new [Pool] tied to the same shared connection pool. impl Clone for Pool where DB: Backend, { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } } #[derive(Default)] pub struct Builder where DB: Backend, { phantom: PhantomData, options: Options, } impl Builder where DB: Backend, { fn new() -> Self { Self { phantom: PhantomData, options: Options::default(), } } pub fn max_size(mut self, max_size: u32) -> Self { self.options.max_size = max_size; self } pub fn min_idle(mut self, min_idle: impl Into>) -> Self { self.options.min_idle = min_idle.into(); self } pub fn max_lifetime(mut self, max_lifetime: impl Into>) -> Self { self.options.max_lifetime = max_lifetime.into(); self } pub fn idle_timeout(mut self, idle_timeout: impl Into>) -> Self { self.options.idle_timeout = idle_timeout.into(); self } pub async fn build(self, url: &str) -> crate::Result> { Ok(Pool(Arc::new(SharedPool::new(url, self.options).await?))) } } struct Options { max_size: u32, min_idle: Option, max_lifetime: Option, idle_timeout: Option, } impl Default for Options { fn default() -> Self { Self { max_size: 10, min_idle: None, max_lifetime: None, idle_timeout: None, } } } pub(crate) struct SharedPool where DB: Backend, { url: String, pool_rx: Receiver>, pool_tx: Sender>, size: AtomicU32, options: Options, } impl SharedPool where DB: Backend, { async fn new(url: &str, options: Options) -> crate::Result { // TODO: Establish [min_idle] connections let (pool_tx, pool_rx) = channel(options.max_size as usize); Ok(Self { url: url.to_owned(), pool_rx, pool_tx, size: AtomicU32::new(0), options, }) } #[inline] fn try_acquire(&self) -> Option> { Some(self.pool_rx.recv().now_or_never()??.live(&self.pool_tx)) } async fn acquire(&self) -> crate::Result> { if let Some(live) = self.try_acquire() { return Ok(live); } loop { let size = self.size.load(Ordering::Acquire); if size >= self.options.max_size { // Too many open connections // Wait until one is available // Waiters are not dropped unless the pool is dropped // which would drop this future return Ok(self .pool_rx .recv() .await .expect("waiter dropped without dropping pool") .live(&self.pool_tx)); } if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { // Open a new connection and return directly let raw = DB::open(&self.url).await?; return Ok(Live::pooled(raw, &self.pool_tx)); } } } } impl Executor for Pool where DB: Backend, { type Backend = DB; fn execute<'c, 'q: 'c, A: 'c>( &'c mut self, query: &'q str, params: A, ) -> BoxFuture<'c, Result> where A: IntoQueryParameters + Send, { Box::pin(async move { <&Pool as Executor>::execute(&mut &*self, query, params).await }) } fn fetch<'c, 'q: 'c, T: 'c, A: 'c>( &'c mut self, query: &'q str, params: A, ) -> BoxStream<'c, Result> where A: IntoQueryParameters + Send, T: FromRow + Send + Unpin, { Box::pin(async_stream::try_stream! { let mut self_ = &*self; let mut s = <&Pool as Executor>::fetch(&mut self_, query, params); while let Some(row) = s.next().await.transpose()? { yield row; } drop(s); }) } fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>( &'c mut self, query: &'q str, params: A, ) -> BoxFuture<'c, Result, Error>> where A: IntoQueryParameters + Send, T: FromRow + Send, { Box::pin(async move { <&Pool as Executor>::fetch_optional(&mut &*self, query, params).await }) } } impl Executor for &'_ Pool where DB: Backend, { type Backend = DB; fn execute<'c, 'q: 'c, A: 'c>( &'c mut self, query: &'q str, params: A, ) -> BoxFuture<'c, Result> where A: IntoQueryParameters + Send, { Box::pin(async move { let mut live = self.0.acquire().await?; let result = live.execute(query, params.into_params()).await; result }) } fn fetch<'c, 'q: 'c, T: 'c, A: 'c>( &'c mut self, query: &'q str, params: A, ) -> BoxStream<'c, Result> where A: IntoQueryParameters + Send, T: FromRow + Send + Unpin, { Box::pin(async_stream::try_stream! { let mut live = self.0.acquire().await?; let mut s = live.fetch(query, params.into_params()); while let Some(row) = s.next().await.transpose()? { yield T::from_row(Row(row)); } }) } fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>( &'c mut self, query: &'q str, params: A, ) -> BoxFuture<'c, Result, Error>> where A: IntoQueryParameters + Send, T: FromRow + Send, { Box::pin(async move { Ok(self .0 .acquire() .await? .fetch_optional(query, params.into_params()) .await? .map(Row) .map(T::from_row)) }) } } struct Raw { pub(crate) inner: DB, pub(crate) created: Instant, } struct Idle where DB: Backend, { raw: Raw, #[allow(unused)] since: Instant, } impl Idle { fn live(self, pool_tx: &Sender>) -> Live { Live { raw: Some(self.raw), pool_tx: Some(pool_tx.clone()), } } } pub(crate) struct Live where DB: Backend, { raw: Option>, pool_tx: Option>>, } impl Live { pub fn unpooled(raw: DB) -> Self { Live { raw: Some(Raw { inner: raw, created: Instant::now(), }), pool_tx: None, } } fn pooled(raw: DB, pool_tx: &Sender>) -> Self { Live { raw: Some(Raw { inner: raw, created: Instant::now(), }), pool_tx: Some(pool_tx.clone()), } } pub fn release(mut self) { self.release_mut() } fn release_mut(&mut self) { // `.release_mut()` will be called twice if `.release()` is called if let (Some(raw), Some(pool_tx)) = (self.raw.take(), self.pool_tx.as_ref()) { pool_tx .send(Idle { raw, since: Instant::now(), }) .now_or_never() .expect("(bug) connection released into a full pool") } } } const DEREF_ERR: &str = "(bug) connection already released to pool"; impl Deref for Live { type Target = DB; fn deref(&self) -> &DB { &self.raw.as_ref().expect(DEREF_ERR).inner } } impl DerefMut for Live { fn deref_mut(&mut self) -> &mut DB { &mut self.raw.as_mut().expect(DEREF_ERR).inner } } impl Drop for Live { fn drop(&mut self) { self.release_mut() } }