feat: re-introduce Pool::begin and add Pool::try_begin to match Pool::try_acquire

This commit is contained in:
Ryan Leckey 2020-05-30 05:20:18 -07:00
parent 0a04abdb3e
commit afd831b0d3
No known key found for this signature in database
GPG Key ID: BBDFC5595030E7D3
5 changed files with 85 additions and 20 deletions

View File

@ -24,7 +24,10 @@ pub trait Connection: Send {
/// Begin a new transaction or establish a savepoint within the active transaction.
///
/// Returns a [`Transaction`] for controlling and tracking the new transaction.
fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database, Self>, Error>> {
fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database, Self>, Error>>
where
Self: Sized,
{
Transaction::begin(self)
}
@ -34,6 +37,7 @@ pub trait Connection: Send {
/// return an error, the transaction will be committed.
fn transaction<'c: 'f, 'f, T, E, F, Fut>(&'c mut self, f: F) -> BoxFuture<'f, Result<T, E>>
where
Self: Sized,
T: Send,
F: FnOnce(&mut <Self::Database as Database>::Connection) -> Fut + Send + 'f,
E: From<Error> + Send,

View File

@ -0,0 +1,40 @@
use std::ops::{Deref, DerefMut};
pub enum MaybeOwned<'a, T> {
Borrowed(&'a mut T),
Owned(T),
}
impl<'a, T> From<T> for MaybeOwned<'a, T> {
fn from(v: T) -> Self {
MaybeOwned::Owned(v)
}
}
impl<'a, T> From<&'a mut T> for MaybeOwned<'a, T> {
fn from(v: &'a mut T) -> Self {
MaybeOwned::Borrowed(v)
}
}
impl<'a, T> Deref for MaybeOwned<'a, T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
match self {
MaybeOwned::Borrowed(v) => v,
MaybeOwned::Owned(v) => v,
}
}
}
impl<'a, T> DerefMut for MaybeOwned<'a, T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
MaybeOwned::Borrowed(v) => v,
MaybeOwned::Owned(v) => v,
}
}
}

View File

@ -1 +1,2 @@
pub mod maybe_owned;
pub mod ustr;

View File

@ -8,6 +8,7 @@ use std::{
use crate::database::Database;
use crate::error::Error;
use crate::transaction::Transaction;
use self::inner::SharedPool;
use self::options::Options;
@ -60,6 +61,22 @@ impl<DB: Database> Pool<DB> {
self.0.try_acquire().map(|conn| conn.attach(&self.0))
}
/// Retrieves a new connection and immediately begins a new transaction.
pub async fn begin(&self) -> Result<Transaction<'static, DB, PoolConnection<DB>>, Error> {
Ok(Transaction::begin(self.acquire().await?).await?)
}
/// Attempts to retrieve a new connection and immediately begins a new transaction if there
/// is one available.
pub async fn try_begin(
&self,
) -> Result<Option<Transaction<'static, DB, PoolConnection<DB>>>, Error> {
match self.try_acquire() {
Some(conn) => Transaction::begin(conn).await.map(Some),
None => Ok(None),
}
}
/// Ends the use of a connection pool. Prevents any new connections
/// and will close all active connections when they are returned to the pool.
///

View File

@ -8,6 +8,7 @@ use futures_util::{future, FutureExt};
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::ext::maybe_owned::MaybeOwned;
/// Generic management of database transactions.
///
@ -56,10 +57,10 @@ pub trait TransactionManager {
/// [`rollback`]: #method.rollback
pub struct Transaction<'c, DB, C = <DB as Database>::Connection>
where
DB: ?Sized + Database,
C: ?Sized + Connection<Database = DB>,
DB: Database,
C: Sized + Connection<Database = DB>,
{
connection: &'c mut C,
connection: MaybeOwned<'c, C>,
// the depth of ~this~ transaction, depth directly equates to how many times [`begin()`]
// was called without a corresponding [`commit()`] or [`rollback()`]
@ -68,10 +69,12 @@ where
impl<'c, DB, C> Transaction<'c, DB, C>
where
DB: ?Sized + Database,
C: ?Sized + Connection<Database = DB>,
DB: Database,
C: Sized + Connection<Database = DB>,
{
pub(crate) fn begin(conn: &'c mut C) -> BoxFuture<'c, Result<Self, Error>> {
pub(crate) fn begin(conn: impl Into<MaybeOwned<'c, C>>) -> BoxFuture<'c, Result<Self, Error>> {
let mut conn = conn.into();
Box::pin(async move {
let depth = conn.transaction_depth();
@ -85,20 +88,20 @@ where
}
/// Commits this transaction or savepoint.
pub async fn commit(self) -> Result<(), Error> {
pub async fn commit(mut self) -> Result<(), Error> {
DB::TransactionManager::commit(self.connection.get_mut(), self.depth).await
}
/// Aborts this transaction or savepoint.
pub async fn rollback(self) -> Result<(), Error> {
pub async fn rollback(mut self) -> Result<(), Error> {
DB::TransactionManager::rollback(self.connection.get_mut(), self.depth).await
}
}
impl<'c, DB, C> Connection for Transaction<'c, DB, C>
where
DB: ?Sized + Database,
C: ?Sized + Connection<Database = DB>,
DB: Database,
C: Sized + Connection<Database = DB>,
{
type Database = C::Database;
@ -137,7 +140,7 @@ where
#[allow(unused_macros)]
macro_rules! impl_executor_for_transaction {
($DB:ident, $Row:ident) => {
impl<'c, 't, C: ?Sized> crate::executor::Executor<'t>
impl<'c, 't, C: Sized> crate::executor::Executor<'t>
for &'t mut crate::transaction::Transaction<'c, $DB, C>
where
C: crate::connection::Connection<Database = $DB>,
@ -189,8 +192,8 @@ macro_rules! impl_executor_for_transaction {
impl<'c, DB, C> Debug for Transaction<'c, DB, C>
where
DB: ?Sized + Database,
C: ?Sized + Connection<Database = DB>,
DB: Database,
C: Sized + Connection<Database = DB>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
// TODO: Show the full type <..<..<..
@ -200,8 +203,8 @@ where
impl<'c, DB, C> Deref for Transaction<'c, DB, C>
where
DB: ?Sized + Database,
C: ?Sized + Connection<Database = DB>,
DB: Database,
C: Sized + Connection<Database = DB>,
{
type Target = <C::Database as Database>::Connection;
@ -213,8 +216,8 @@ where
impl<'c, DB, C> DerefMut for Transaction<'c, DB, C>
where
DB: ?Sized + Database,
C: ?Sized + Connection<Database = DB>,
DB: Database,
C: Sized + Connection<Database = DB>,
{
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
@ -224,8 +227,8 @@ where
impl<'c, DB, C> Drop for Transaction<'c, DB, C>
where
DB: ?Sized + Database,
C: ?Sized + Connection<Database = DB>,
DB: Database,
C: Sized + Connection<Database = DB>,
{
fn drop(&mut self) {
// starts a rollback operation