From fd4cc043ed1a394a2889bf372db45db193672677 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Thu, 8 Aug 2019 18:05:16 -0700 Subject: [PATCH] Make Pool generic over Backend --- examples/contacts/src/main.rs | 6 +- rust-toolchain | 2 +- src/backend.rs | 4 +- src/connection.rs | 9 +++ src/lib.rs | 1 + src/pool.rs | 107 +++++++++++++++++++-------- src/postgres/connection/establish.rs | 4 +- src/postgres/connection/mod.rs | 12 ++- src/postgres/connection/prepare.rs | 8 +- src/postgres/mod.rs | 3 +- src/row.rs | 2 + 11 files changed, 112 insertions(+), 46 deletions(-) create mode 100644 src/connection.rs diff --git a/examples/contacts/src/main.rs b/examples/contacts/src/main.rs index 164b1df4..7b6292b0 100644 --- a/examples/contacts/src/main.rs +++ b/examples/contacts/src/main.rs @@ -1,6 +1,5 @@ #![feature(async_await)] -use futures::future; use failure::Fallible; use fake::{ faker::{ @@ -10,7 +9,8 @@ use fake::{ }, Dummy, Fake, Faker, }; -use sqlx::pool::Pool; +use futures::future; +use sqlx::{pool::Pool, postgres::Postgres}; #[derive(Debug, Dummy)] struct Contact { @@ -40,7 +40,7 @@ async fn main() -> Fallible<()> { .user("postgres") .database("sqlx__dev__contacts"); - let pool = Pool::new(options); + let pool = Pool::::new(options); { let mut conn = pool.acquire().await?; diff --git a/rust-toolchain b/rust-toolchain index 4390c8a2..be303dd7 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2019-08-01 +nightly-2019-08-08 diff --git a/src/backend.rs b/src/backend.rs index 45451f6a..b4a8bcd2 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -1,6 +1,8 @@ -use crate::row::RawRow; +use crate::{connection::RawConnection, row::RawRow}; pub trait Backend { + type RawConnection: RawConnection; + type RawRow: RawRow; /// The type used to represent metadata associated with a SQL type. diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 00000000..18cb0f80 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,9 @@ +use crate::{backend::Backend, ConnectOptions}; +use futures::future::BoxFuture; +use std::io; + +pub trait RawConnection { + fn establish(options: ConnectOptions<'_>) -> BoxFuture> + where + Self: Sized; +} diff --git a/src/lib.rs b/src/lib.rs index 0953eb08..a0b0e2cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,4 +35,5 @@ pub mod mariadb; pub mod postgres; // TODO: This module is not intended to be directly public +pub mod connection; pub mod pool; diff --git a/src/pool.rs b/src/pool.rs index cdece6f6..16cada9b 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -1,7 +1,7 @@ -use crate::{postgres::Connection as C, ConnectOptions}; +use super::connection::RawConnection; +use crate::{backend::Backend, ConnectOptions}; use crossbeam_queue::{ArrayQueue, SegQueue}; -use futures::TryFutureExt; -use futures::channel::oneshot; +use futures::{channel::oneshot, TryFutureExt}; use std::{ io, ops::{Deref, DerefMut}, @@ -15,33 +15,51 @@ use std::{ // TODO: Reap old connections // TODO: Clean up (a lot) and document what's going on // TODO: sqlx::ConnectOptions needs to be removed and replaced with URIs everywhere -// TODO: Make Pool generic over Backend (requires a generic sqlx::Connection type) -#[derive(Clone)] -pub struct Pool { - inner: Arc, +pub struct Pool +where + B: Backend, +{ + inner: Arc>, } -struct InnerPool { +impl Clone for Pool +where + B: Backend, +{ + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +struct InnerPool +where + B: Backend, +{ options: ConnectOptions<'static>, - idle: ArrayQueue, - waiters: SegQueue>, + idle: ArrayQueue>, + waiters: SegQueue>>, total: AtomicUsize, } -impl Pool { - pub fn new<'a>(options: ConnectOptions<'a>) -> Pool { - Pool { +impl Pool +where + B: Backend, +{ + pub fn new<'a>(options: ConnectOptions<'a>) -> Self { + Self { inner: Arc::new(InnerPool { options: options.into_owned(), idle: ArrayQueue::new(10), total: AtomicUsize::new(0), - waiters: SegQueue::new() + waiters: SegQueue::new(), }), } } - pub async fn acquire(&self) -> io::Result { + pub async fn acquire(&self) -> io::Result> { self.inner .acquire() .map_ok(|live| Connection::new(live, &self.inner)) @@ -49,8 +67,11 @@ impl Pool { } } -impl InnerPool { - async fn acquire(&self) -> io::Result { +impl InnerPool +where + B: Backend, +{ + async fn acquire(&self) -> io::Result> { if let Ok(idle) = self.idle.pop() { log::debug!("acquire: found idle connection"); @@ -79,7 +100,7 @@ impl InnerPool { self.total.store(total + 1, Ordering::SeqCst); log::debug!("acquire: no idle connections; establish new connection"); - let connection = C::establish(self.options.clone()).await?; + let connection = B::RawConnection::establish(self.options.clone()).await?; let live = Live { connection, since: Instant::now(), @@ -88,7 +109,7 @@ impl InnerPool { Ok(live) } - fn release(&self, mut connection: Live) { + fn release(&self, mut connection: Live) { while let Ok(waiter) = self.waiters.pop() { connection = match waiter.send(connection) { Ok(()) => { @@ -107,13 +128,20 @@ impl InnerPool { } // TODO: Need a better name here than [pool::Connection] ? -pub struct Connection { - connection: Option, - pool: Arc, + +pub struct Connection +where + B: Backend, +{ + connection: Option>, + pool: Arc>, } -impl Connection { - fn new(connection: Live, pool: &Arc) -> Self { +impl Connection +where + B: Backend, +{ + fn new(connection: Live, pool: &Arc>) -> Self { Self { connection: Some(connection), pool: Arc::clone(pool), @@ -121,8 +149,11 @@ impl Connection { } } -impl Deref for Connection { - type Target = C; +impl Deref for Connection +where + B: Backend, +{ + type Target = B::RawConnection; #[inline] fn deref(&self) -> &Self::Target { @@ -131,7 +162,10 @@ impl Deref for Connection { } } -impl DerefMut for Connection { +impl DerefMut for Connection +where + B: Backend, +{ #[inline] fn deref_mut(&mut self) -> &mut Self::Target { // PANIC: Will not panic unless accessed after drop @@ -139,7 +173,10 @@ impl DerefMut for Connection { } } -impl Drop for Connection { +impl Drop for Connection +where + B: Backend, +{ fn drop(&mut self) { log::debug!("release: dropping connection; store back in queue"); if let Some(connection) = self.connection.take() { @@ -148,12 +185,18 @@ impl Drop for Connection { } } -struct Idle { - connection: Live, +struct Idle +where + B: Backend, +{ + connection: Live, since: Instant, } -struct Live { - connection: C, +struct Live +where + B: Backend, +{ + connection: B::RawConnection, since: Instant, } diff --git a/src/postgres/connection/establish.rs b/src/postgres/connection/establish.rs index 822e8898..44409cd1 100644 --- a/src/postgres/connection/establish.rs +++ b/src/postgres/connection/establish.rs @@ -1,4 +1,4 @@ -use super::Connection; +use super::RawConnection; use crate::{ postgres::protocol::{Authentication, Message, PasswordMessage, StartupMessage}, ConnectOptions, @@ -6,7 +6,7 @@ use crate::{ use std::{borrow::Cow, io}; pub async fn establish<'a, 'b: 'a>( - conn: &'a mut Connection, + conn: &'a mut RawConnection, options: ConnectOptions<'b>, ) -> io::Result<()> { let user = &*options.user.expect("user is required"); diff --git a/src/postgres/connection/mod.rs b/src/postgres/connection/mod.rs index 02a11eaa..64ef3500 100644 --- a/src/postgres/connection/mod.rs +++ b/src/postgres/connection/mod.rs @@ -2,6 +2,7 @@ use super::protocol::{Encode, Message, Terminate}; use crate::ConnectOptions; use bytes::{BufMut, BytesMut}; use futures::{ + future::BoxFuture, io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}, ready, task::{Context, Poll}, @@ -16,7 +17,7 @@ mod get; mod prepare; mod select; -pub struct Connection { +pub struct RawConnection { stream: TcpStream, // Do we think that there is data in the read buffer to be decoded @@ -39,7 +40,7 @@ pub struct Connection { secret_key: u32, } -impl Connection { +impl RawConnection { pub async fn establish(options: ConnectOptions<'_>) -> io::Result { let stream = TcpStream::connect((&*options.host, options.port)).await?; let mut conn = Self { @@ -135,3 +136,10 @@ impl Connection { Ok(()) } } + +impl crate::connection::RawConnection for RawConnection { + #[inline] + fn establish(options: ConnectOptions<'_>) -> BoxFuture> { + Box::pin(RawConnection::establish(options)) + } +} diff --git a/src/postgres/connection/prepare.rs b/src/postgres/connection/prepare.rs index 1852cbfb..02e3e690 100644 --- a/src/postgres/connection/prepare.rs +++ b/src/postgres/connection/prepare.rs @@ -1,4 +1,4 @@ -use super::Connection; +use super::RawConnection; use crate::{ postgres::{ protocol::{self, BindValues}, @@ -10,12 +10,12 @@ use crate::{ pub struct Prepare<'a, 'b> { query: &'b str, - pub(super) connection: &'a mut Connection, + pub(super) connection: &'a mut RawConnection, pub(super) bind: BindValues, } #[inline] -pub fn prepare<'a, 'b>(connection: &'a mut Connection, query: &'b str) -> Prepare<'a, 'b> { +pub fn prepare<'a, 'b>(connection: &'a mut RawConnection, query: &'b str) -> Prepare<'a, 'b> { // TODO: Use a hash map to cache the parse // TODO: Use named statements Prepare { @@ -41,7 +41,7 @@ impl<'a, 'b> Prepare<'a, 'b> { self } - pub(super) fn finish(self) -> &'a mut Connection { + pub(super) fn finish(self) -> &'a mut RawConnection { self.connection.write(protocol::Parse { portal: "", query: self.query, diff --git a/src/postgres/mod.rs b/src/postgres/mod.rs index 06fa9a98..9bcf2f05 100644 --- a/src/postgres/mod.rs +++ b/src/postgres/mod.rs @@ -2,7 +2,7 @@ use crate::backend::Backend; mod connection; -pub use connection::Connection; +pub use connection::RawConnection; mod protocol; @@ -11,6 +11,7 @@ pub mod types; pub struct Postgres; impl Backend for Postgres { + type RawConnection = RawConnection; type RawRow = protocol::DataRow; type TypeMetadata = types::TypeMetadata; } diff --git a/src/row.rs b/src/row.rs index a6920d1a..1273f7e2 100644 --- a/src/row.rs +++ b/src/row.rs @@ -45,6 +45,7 @@ where } } +#[allow(unused)] macro_rules! impl_from_row_tuple { ($B:ident: $( ($idx:tt) -> $T:ident, $ST:ident );+;) => { impl<$($ST,)+ $($T,)+> crate::row::FromRow<$B, ($($ST,)+)> for ($($T,)+) @@ -60,6 +61,7 @@ macro_rules! impl_from_row_tuple { }; } +#[allow(unused)] macro_rules! impl_from_row_tuples_for_backend { ($B:ident) => { impl_from_row_tuple!($B: