break up pool.rs into multiple files

This commit is contained in:
Austin Bonander 2019-11-25 11:21:18 -08:00
parent 8d9e949cc2
commit 279e329f27
4 changed files with 344 additions and 291 deletions

View File

@ -17,20 +17,19 @@ where
DB: Backend,
{
live: Live<DB>,
pool: Option<Arc<SharedPool<DB>>>,
}
impl<DB> Connection<DB>
where
DB: Backend,
{
pub(crate) fn new(live: Live<DB>, pool: Option<Arc<SharedPool<DB>>>) -> Self {
Self { live, pool }
pub(crate) fn new(live: Live<DB>) -> Self {
Self { live }
}
pub async fn open(url: &str) -> crate::Result<Self> {
let raw = DB::open(url).await?;
Ok(Self::new(Live::unpooled(raw), None))
Ok(Self::new(Live::unpooled(raw)))
}
/// Verifies a connection to the database is still alive.

View File

@ -13,174 +13,23 @@ use futures_util::{
future::{FutureExt, TryFutureExt},
stream::StreamExt,
};
use std::{future::Future, marker::PhantomData, ops::{Deref, DerefMut}, sync::{
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
Arc,
}, time::{Duration, Instant}, cmp};
use std::{
cmp,
future::Future,
marker::PhantomData,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use async_std::future::timeout;
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
/// A pool of database connections.
pub struct Pool<DB>(Arc<SharedPool<DB>>)
where
DB: Backend;
impl<DB> Pool<DB>
where
DB: Backend,
{
/// Creates a connection pool with the default configuration.
pub async fn new(url: &str) -> crate::Result<Self> {
Ok(Pool(SharedPool::new_arc(url, Options::default()).await?))
}
/// Returns a [Builder] to configure a new connection pool.
pub fn builder() -> Builder<DB> {
Builder::new()
}
/// Retrieves a connection from the pool.
///
/// Waits for at most the configured connection timeout before returning an error.
pub async fn acquire(&self) -> crate::Result<Connection<DB>> {
let live = self.0.acquire().await?;
Ok(Connection::new(live, Some(Arc::clone(&self.0))))
}
/// Attempts to retrieve a connection from the pool if there is one available.
///
/// Returns `None` if there are no idle connections available in the pool.
/// This method will not block waiting to establish a new connection.
pub fn try_acquire(&self) -> Option<Connection<DB>> {
let live = self.0.try_acquire()?;
Some(Connection::new(live, Some(Arc::clone(&self.0))))
}
/// Ends the use of a connection pool. Prevents any new connections
/// and will close all active connections when they are returned to the pool.
///
/// Does not resolve until all connections are closed.
pub async fn close(&self) {
let _ = self.0.close().await;
}
/// Returns the number of connections currently being managed by the pool.
pub fn size(&self) -> u32 {
self.0.size.load(Ordering::Acquire)
}
/// Returns the number of idle connections.
pub fn idle(&self) -> usize {
self.0.pool_rx.len()
}
/// Returns the configured maximum pool size.
pub fn max_size(&self) -> u32 {
self.0.options.max_size
}
/// Returns the maximum time spent acquiring a new connection before an error is returned.
pub fn connect_timeout(&self) -> Duration {
self.0.options.connect_timeout
}
/// Returns the configured minimum idle connection count.
pub fn min_idle(&self) -> u32 {
self.0.options.min_idle
}
/// Returns the configured maximum connection lifetime.
pub fn max_lifetime(&self) -> Option<Duration> {
self.0.options.max_lifetime
}
/// Returns the configured idle connection timeout.
pub fn idle_timeout(&self) -> Option<Duration> {
self.0.options.idle_timeout
}
}
/// Returns a new [Pool] tied to the same shared connection pool.
impl<DB> Clone for Pool<DB>
where
DB: Backend,
{
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
#[derive(Default)]
pub struct Builder<DB>
where
DB: Backend,
{
phantom: PhantomData<DB>,
options: Options,
}
impl<DB> Builder<DB>
where
DB: Backend,
{
fn new() -> Self {
Self {
phantom: PhantomData,
options: Options::default(),
}
}
pub fn max_size(mut self, max_size: u32) -> Self {
self.options.max_size = max_size;
self
}
pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
self.options.connect_timeout = connect_timeout;
self
}
pub fn min_idle(mut self, min_idle: u32) -> Self {
self.options.min_idle = min_idle;
self
}
pub fn max_lifetime(mut self, max_lifetime: impl Into<Option<Duration>>) -> Self {
self.options.max_lifetime = max_lifetime.into();
self
}
pub fn idle_timeout(mut self, idle_timeout: impl Into<Option<Duration>>) -> Self {
self.options.idle_timeout = idle_timeout.into();
self
}
pub async fn build(self, url: &str) -> crate::Result<Pool<DB>> {
Ok(Pool(SharedPool::new_arc(url, self.options).await?))
}
}
struct Options {
max_size: u32,
connect_timeout: Duration,
min_idle: u32,
max_lifetime: Option<Duration>,
idle_timeout: Option<Duration>,
}
impl Default for Options {
fn default() -> Self {
Self {
max_size: 10,
min_idle: 0,
connect_timeout: Duration::from_secs(30),
max_lifetime: None,
idle_timeout: None,
}
}
}
use super::Options;
pub(crate) struct SharedPool<DB>
where
@ -198,7 +47,7 @@ impl<DB> SharedPool<DB>
where
DB: Backend,
{
async fn new_arc(url: &str, options: Options) -> crate::Result<Arc<Self>> {
pub(crate) async fn new_arc(url: &str, options: Options) -> crate::Result<Arc<Self>> {
// TODO: Establish [min_idle] connections
let (pool_tx, pool_rx) = channel(options.max_size as usize);
@ -217,7 +66,19 @@ where
Ok(pool)
}
async fn close(&self) {
pub fn options(&self) -> &Options {
&self.options
}
pub(crate) fn size(&self) -> u32 {
self.size.load(Ordering::Acquire)
}
pub(crate) fn num_idle(&self) -> usize {
self.pool_rx.len()
}
pub(crate) async fn close(&self) {
self.closed.store(true, Ordering::Release);
while self.size.load(Ordering::Acquire) > 0 {
@ -236,7 +97,7 @@ where
}
#[inline]
fn try_acquire(&self) -> Option<Live<DB>> {
pub(crate) fn try_acquire(&self) -> Option<Live<DB>> {
if self.closed.load(Ordering::Acquire) {
return None;
}
@ -244,7 +105,7 @@ where
Some(self.pool_rx.recv().now_or_never()??.revive(&self.pool_tx))
}
async fn acquire(&self) -> crate::Result<Live<DB>> {
pub(crate) async fn acquire(&self) -> crate::Result<Live<DB>> {
let start = Instant::now();
let deadline = start + self.options.connect_timeout;
@ -329,121 +190,6 @@ where
}
}
impl<DB> Executor for Pool<DB>
where
DB: Backend,
{
type Backend = DB;
fn execute<'c, 'q: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move { <&Pool<DB> as Executor>::execute(&mut &*self, query, params).await })
}
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let mut self_ = &*self;
let mut s = <&Pool<DB> as Executor>::fetch(&mut self_, query, params);
while let Some(row) = s.next().await.transpose()? {
yield row;
}
drop(s);
})
}
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send,
{
Box::pin(async move {
<&Pool<DB> as Executor>::fetch_optional(&mut &*self, query, params).await
})
}
}
impl<DB> Executor for &'_ Pool<DB>
where
DB: Backend,
{
type Backend = DB;
fn execute<'c, 'q: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move {
let mut live = self.0.acquire().await?;
let result = live.execute(query, params.into_params()).await;
result
})
}
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let mut live = self.0.acquire().await?;
let mut s = live.fetch(query, params.into_params());
while let Some(row) = s.next().await.transpose()? {
yield T::from_row(Row(row));
}
})
}
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send,
{
Box::pin(async move {
Ok(self
.0
.acquire()
.await?
.fetch_optional(query, params.into_params())
.await?
.map(Row)
.map(T::from_row))
})
}
}
struct Raw<DB> {
pub(crate) inner: DB,
pub(crate) created: Instant,
@ -500,10 +246,6 @@ impl<DB: Backend> Live<DB> {
}
}
pub fn release(mut self) {
self.release_mut()
}
fn release_mut(&mut self) {
// `.release_mut()` will be called twice if `.release()` is called
if let (Some(raw), Some(pool_tx)) = (self.raw.take(), self.pool_tx.as_ref()) {
@ -564,7 +306,7 @@ fn conn_reaper<DB: Backend>(pool: &Arc<SharedPool<DB>>) {
.saturating_sub(pool.options.min_idle);
// collect connections to reap
let (reap, mut keep) = (0..max_reaped)
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pool_rx.recv().now_or_never()?)
.partition::<Vec<_>, _>(|conn| should_reap(conn, &pool.options));

237
sqlx-core/src/pool/mod.rs Normal file
View File

@ -0,0 +1,237 @@
use crate::{
backend::Backend, connection::Connection, error::Error, executor::Executor,
query::IntoQueryParameters, row::FromRow, Row,
};
use futures_channel::oneshot;
use futures_core::{future::BoxFuture, stream::BoxStream};
use futures_util::future::{AbortHandle, AbortRegistration};
use futures_util::{
future::{FutureExt, TryFutureExt},
stream::StreamExt,
};
use std::{
cmp,
future::Future,
marker::PhantomData,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use async_std::future::timeout;
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
pub(crate) use self::inner::{Live, SharedPool};
use self::options::Options;
pub use self::options::Builder;
use crate::params::IntoQueryParameters;
mod inner;
mod options;
/// A pool of database connections.
pub struct Pool<DB>(Arc<SharedPool<DB>>)
where
DB: Backend;
impl<DB> Pool<DB>
where
DB: Backend,
{
/// Creates a connection pool with the default configuration.
pub async fn new(url: &str) -> crate::Result<Self> {
Self::with_options(url, Options::default()).await
}
async fn with_options(url: &str, options: Options) -> crate::Result<Self> {
Ok(Pool(SharedPool::new_arc(url, options).await?))
}
/// Returns a [Builder] to configure a new connection pool.
pub fn builder() -> Builder<DB> {
Builder::new()
}
/// Retrieves a connection from the pool.
///
/// Waits for at most the configured connection timeout before returning an error.
pub async fn acquire(&self) -> crate::Result<Connection<DB>> {
self.0.acquire().await.map(Connection::new)
}
/// Attempts to retrieve a connection from the pool if there is one available.
///
/// Returns `None` if there are no idle connections available in the pool.
/// This method will not block waiting to establish a new connection.
pub fn try_acquire(&self) -> Option<Connection<DB>> {
self.0.try_acquire().map(Connection::new)
}
/// Ends the use of a connection pool. Prevents any new connections
/// and will close all active connections when they are returned to the pool.
///
/// Does not resolve until all connections are closed.
pub async fn close(&self) {
let _ = self.0.close().await;
}
/// Returns the number of connections currently being managed by the pool.
pub fn size(&self) -> u32 {
self.0.size()
}
/// Returns the number of idle connections.
pub fn idle(&self) -> usize {
self.0.num_idle()
}
/// Returns the configured maximum pool size.
pub fn max_size(&self) -> u32 {
self.0.options().max_size
}
/// Returns the maximum time spent acquiring a new connection before an error is returned.
pub fn connect_timeout(&self) -> Duration {
self.0.options().connect_timeout
}
/// Returns the configured minimum idle connection count.
pub fn min_idle(&self) -> u32 {
self.0.options().min_idle
}
/// Returns the configured maximum connection lifetime.
pub fn max_lifetime(&self) -> Option<Duration> {
self.0.options().max_lifetime
}
/// Returns the configured idle connection timeout.
pub fn idle_timeout(&self) -> Option<Duration> {
self.0.options().idle_timeout
}
}
/// Returns a new [Pool] tied to the same shared connection pool.
impl<DB> Clone for Pool<DB>
where
DB: Backend,
{
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl<DB> Executor for Pool<DB>
where
DB: Backend,
{
type Backend = DB;
fn execute<'c, 'q: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move { <&Pool<DB> as Executor>::execute(&mut &*self, query, params).await })
}
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let mut self_ = &*self;
let mut s = <&Pool<DB> as Executor>::fetch(&mut self_, query, params);
while let Some(row) = s.next().await.transpose()? {
yield row;
}
drop(s);
})
}
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send,
{
Box::pin(async move {
<&Pool<DB> as Executor>::fetch_optional(&mut &*self, query, params).await
})
}
}
impl<DB> Executor for &'_ Pool<DB>
where
DB: Backend,
{
type Backend = DB;
fn execute<'c, 'q: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<u64, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move {
let mut live = self.0.acquire().await?;
let result = live.execute(query, params.into_params()).await;
result
})
}
fn fetch<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let mut live = self.0.acquire().await?;
let mut s = live.fetch(query, params.into_params());
while let Some(row) = s.next().await.transpose()? {
yield T::from_row(Row(row));
}
})
}
fn fetch_optional<'c, 'q: 'c, T: 'c, A: 'c>(
&'c mut self,
query: &'q str,
params: A,
) -> BoxFuture<'c, Result<Option<T>, Error>>
where
A: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend> + Send,
{
Box::pin(async move {
let mut live = self.0.acquire().await?;
let row = live.fetch_optional(query, params.into_params()).await?;
Ok(row.map(Row).map(T::from_row))
})
}
}

View File

@ -0,0 +1,75 @@
use std::{marker::PhantomData, time::Duration};
use crate::Backend;
use super::Pool;
#[derive(Default)]
pub struct Builder<DB>
where
DB: Backend,
{
phantom: PhantomData<DB>,
options: Options,
}
impl<DB> Builder<DB>
where
DB: Backend,
{
pub fn new() -> Self {
Self {
phantom: PhantomData,
options: Options::default(),
}
}
pub fn max_size(mut self, max_size: u32) -> Self {
self.options.max_size = max_size;
self
}
pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
self.options.connect_timeout = connect_timeout;
self
}
pub fn min_idle(mut self, min_idle: u32) -> Self {
self.options.min_idle = min_idle;
self
}
pub fn max_lifetime(mut self, max_lifetime: impl Into<Option<Duration>>) -> Self {
self.options.max_lifetime = max_lifetime.into();
self
}
pub fn idle_timeout(mut self, idle_timeout: impl Into<Option<Duration>>) -> Self {
self.options.idle_timeout = idle_timeout.into();
self
}
pub async fn build(self, url: &str) -> crate::Result<Pool<DB>> {
Pool::with_options(url, self.options).await
}
}
pub(crate) struct Options {
pub max_size: u32,
pub connect_timeout: Duration,
pub min_idle: u32,
pub max_lifetime: Option<Duration>,
pub idle_timeout: Option<Duration>,
}
impl Default for Options {
fn default() -> Self {
Self {
max_size: 10,
min_idle: 0,
connect_timeout: Duration::from_secs(30),
max_lifetime: None,
idle_timeout: None,
}
}
}