Make Pool generic over Backend

This commit is contained in:
Ryan Leckey 2019-08-08 18:05:16 -07:00
parent acf78d5a43
commit fd4cc043ed
11 changed files with 112 additions and 46 deletions

View File

@ -1,6 +1,5 @@
#![feature(async_await)]
use futures::future;
use failure::Fallible;
use fake::{
faker::{
@ -10,7 +9,8 @@ use fake::{
},
Dummy, Fake, Faker,
};
use sqlx::pool::Pool;
use futures::future;
use sqlx::{pool::Pool, postgres::Postgres};
#[derive(Debug, Dummy)]
struct Contact {
@ -40,7 +40,7 @@ async fn main() -> Fallible<()> {
.user("postgres")
.database("sqlx__dev__contacts");
let pool = Pool::new(options);
let pool = Pool::<Postgres>::new(options);
{
let mut conn = pool.acquire().await?;

View File

@ -1 +1 @@
nightly-2019-08-01
nightly-2019-08-08

View File

@ -1,6 +1,8 @@
use crate::row::RawRow;
use crate::{connection::RawConnection, row::RawRow};
pub trait Backend {
type RawConnection: RawConnection;
type RawRow: RawRow;
/// The type used to represent metadata associated with a SQL type.

9
src/connection.rs Normal file
View File

@ -0,0 +1,9 @@
use crate::{backend::Backend, ConnectOptions};
use futures::future::BoxFuture;
use std::io;
pub trait RawConnection {
fn establish(options: ConnectOptions<'_>) -> BoxFuture<io::Result<Self>>
where
Self: Sized;
}

View File

@ -35,4 +35,5 @@ pub mod mariadb;
pub mod postgres;
// TODO: This module is not intended to be directly public
pub mod connection;
pub mod pool;

View File

@ -1,7 +1,7 @@
use crate::{postgres::Connection as C, ConnectOptions};
use super::connection::RawConnection;
use crate::{backend::Backend, ConnectOptions};
use crossbeam_queue::{ArrayQueue, SegQueue};
use futures::TryFutureExt;
use futures::channel::oneshot;
use futures::{channel::oneshot, TryFutureExt};
use std::{
io,
ops::{Deref, DerefMut},
@ -15,33 +15,51 @@ use std::{
// TODO: Reap old connections
// TODO: Clean up (a lot) and document what's going on
// TODO: sqlx::ConnectOptions needs to be removed and replaced with URIs everywhere
// TODO: Make Pool generic over Backend (requires a generic sqlx::Connection type)
#[derive(Clone)]
pub struct Pool {
inner: Arc<InnerPool>,
pub struct Pool<B>
where
B: Backend,
{
inner: Arc<InnerPool<B>>,
}
struct InnerPool {
impl<B> Clone for Pool<B>
where
B: Backend,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
struct InnerPool<B>
where
B: Backend,
{
options: ConnectOptions<'static>,
idle: ArrayQueue<Idle>,
waiters: SegQueue<oneshot::Sender<Live>>,
idle: ArrayQueue<Idle<B>>,
waiters: SegQueue<oneshot::Sender<Live<B>>>,
total: AtomicUsize,
}
impl Pool {
pub fn new<'a>(options: ConnectOptions<'a>) -> Pool {
Pool {
impl<B> Pool<B>
where
B: Backend,
{
pub fn new<'a>(options: ConnectOptions<'a>) -> Self {
Self {
inner: Arc::new(InnerPool {
options: options.into_owned(),
idle: ArrayQueue::new(10),
total: AtomicUsize::new(0),
waiters: SegQueue::new()
waiters: SegQueue::new(),
}),
}
}
pub async fn acquire(&self) -> io::Result<Connection> {
pub async fn acquire(&self) -> io::Result<Connection<B>> {
self.inner
.acquire()
.map_ok(|live| Connection::new(live, &self.inner))
@ -49,8 +67,11 @@ impl Pool {
}
}
impl InnerPool {
async fn acquire(&self) -> io::Result<Live> {
impl<B> InnerPool<B>
where
B: Backend,
{
async fn acquire(&self) -> io::Result<Live<B>> {
if let Ok(idle) = self.idle.pop() {
log::debug!("acquire: found idle connection");
@ -79,7 +100,7 @@ impl InnerPool {
self.total.store(total + 1, Ordering::SeqCst);
log::debug!("acquire: no idle connections; establish new connection");
let connection = C::establish(self.options.clone()).await?;
let connection = B::RawConnection::establish(self.options.clone()).await?;
let live = Live {
connection,
since: Instant::now(),
@ -88,7 +109,7 @@ impl InnerPool {
Ok(live)
}
fn release(&self, mut connection: Live) {
fn release(&self, mut connection: Live<B>) {
while let Ok(waiter) = self.waiters.pop() {
connection = match waiter.send(connection) {
Ok(()) => {
@ -107,13 +128,20 @@ impl InnerPool {
}
// TODO: Need a better name here than [pool::Connection] ?
pub struct Connection {
connection: Option<Live>,
pool: Arc<InnerPool>,
pub struct Connection<B>
where
B: Backend,
{
connection: Option<Live<B>>,
pool: Arc<InnerPool<B>>,
}
impl Connection {
fn new(connection: Live, pool: &Arc<InnerPool>) -> Self {
impl<B> Connection<B>
where
B: Backend,
{
fn new(connection: Live<B>, pool: &Arc<InnerPool<B>>) -> Self {
Self {
connection: Some(connection),
pool: Arc::clone(pool),
@ -121,8 +149,11 @@ impl Connection {
}
}
impl Deref for Connection {
type Target = C;
impl<B> Deref for Connection<B>
where
B: Backend,
{
type Target = B::RawConnection;
#[inline]
fn deref(&self) -> &Self::Target {
@ -131,7 +162,10 @@ impl Deref for Connection {
}
}
impl DerefMut for Connection {
impl<B> DerefMut for Connection<B>
where
B: Backend,
{
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
// PANIC: Will not panic unless accessed after drop
@ -139,7 +173,10 @@ impl DerefMut for Connection {
}
}
impl Drop for Connection {
impl<B> Drop for Connection<B>
where
B: Backend,
{
fn drop(&mut self) {
log::debug!("release: dropping connection; store back in queue");
if let Some(connection) = self.connection.take() {
@ -148,12 +185,18 @@ impl Drop for Connection {
}
}
struct Idle {
connection: Live,
struct Idle<B>
where
B: Backend,
{
connection: Live<B>,
since: Instant,
}
struct Live {
connection: C,
struct Live<B>
where
B: Backend,
{
connection: B::RawConnection,
since: Instant,
}

View File

@ -1,4 +1,4 @@
use super::Connection;
use super::RawConnection;
use crate::{
postgres::protocol::{Authentication, Message, PasswordMessage, StartupMessage},
ConnectOptions,
@ -6,7 +6,7 @@ use crate::{
use std::{borrow::Cow, io};
pub async fn establish<'a, 'b: 'a>(
conn: &'a mut Connection,
conn: &'a mut RawConnection,
options: ConnectOptions<'b>,
) -> io::Result<()> {
let user = &*options.user.expect("user is required");

View File

@ -2,6 +2,7 @@ use super::protocol::{Encode, Message, Terminate};
use crate::ConnectOptions;
use bytes::{BufMut, BytesMut};
use futures::{
future::BoxFuture,
io::{AsyncReadExt, AsyncWrite, AsyncWriteExt},
ready,
task::{Context, Poll},
@ -16,7 +17,7 @@ mod get;
mod prepare;
mod select;
pub struct Connection {
pub struct RawConnection {
stream: TcpStream,
// Do we think that there is data in the read buffer to be decoded
@ -39,7 +40,7 @@ pub struct Connection {
secret_key: u32,
}
impl Connection {
impl RawConnection {
pub async fn establish(options: ConnectOptions<'_>) -> io::Result<Self> {
let stream = TcpStream::connect((&*options.host, options.port)).await?;
let mut conn = Self {
@ -135,3 +136,10 @@ impl Connection {
Ok(())
}
}
impl crate::connection::RawConnection for RawConnection {
#[inline]
fn establish(options: ConnectOptions<'_>) -> BoxFuture<io::Result<Self>> {
Box::pin(RawConnection::establish(options))
}
}

View File

@ -1,4 +1,4 @@
use super::Connection;
use super::RawConnection;
use crate::{
postgres::{
protocol::{self, BindValues},
@ -10,12 +10,12 @@ use crate::{
pub struct Prepare<'a, 'b> {
query: &'b str,
pub(super) connection: &'a mut Connection,
pub(super) connection: &'a mut RawConnection,
pub(super) bind: BindValues,
}
#[inline]
pub fn prepare<'a, 'b>(connection: &'a mut Connection, query: &'b str) -> Prepare<'a, 'b> {
pub fn prepare<'a, 'b>(connection: &'a mut RawConnection, query: &'b str) -> Prepare<'a, 'b> {
// TODO: Use a hash map to cache the parse
// TODO: Use named statements
Prepare {
@ -41,7 +41,7 @@ impl<'a, 'b> Prepare<'a, 'b> {
self
}
pub(super) fn finish(self) -> &'a mut Connection {
pub(super) fn finish(self) -> &'a mut RawConnection {
self.connection.write(protocol::Parse {
portal: "",
query: self.query,

View File

@ -2,7 +2,7 @@ use crate::backend::Backend;
mod connection;
pub use connection::Connection;
pub use connection::RawConnection;
mod protocol;
@ -11,6 +11,7 @@ pub mod types;
pub struct Postgres;
impl Backend for Postgres {
type RawConnection = RawConnection;
type RawRow = protocol::DataRow;
type TypeMetadata = types::TypeMetadata;
}

View File

@ -45,6 +45,7 @@ where
}
}
#[allow(unused)]
macro_rules! impl_from_row_tuple {
($B:ident: $( ($idx:tt) -> $T:ident, $ST:ident );+;) => {
impl<$($ST,)+ $($T,)+> crate::row::FromRow<$B, ($($ST,)+)> for ($($T,)+)
@ -60,6 +61,7 @@ macro_rules! impl_from_row_tuple {
};
}
#[allow(unused)]
macro_rules! impl_from_row_tuples_for_backend {
($B:ident) => {
impl_from_row_tuple!($B: