Add inherent helpers to executors

This commit is contained in:
Ryan Leckey 2019-08-22 22:23:15 -07:00
parent c5a3fc8d7e
commit 6c98ba01b8
5 changed files with 157 additions and 30 deletions

View File

@ -63,7 +63,7 @@ CREATE TABLE IF NOT EXISTS contacts (
.execute(&pool)
.await?;
sqlx::query("TRUNCATE contacts").execute(&pool).await?;
pool.execute("TRUNCATE contacts", ()).await?;
Ok(())
}
@ -78,18 +78,19 @@ async fn insert(pool: &PostgresPool, count: usize) -> Result<(), sqlx::Error> {
let (tx, rx) = channel::<()>();
tokio::spawn(async move {
sqlx::query(
pool.execute(
r#"
INSERT INTO contacts (name, username, password, email, phone)
VALUES ($1, $2, $3, $4, $5)
"#,
"#,
(
contact.name,
contact.username,
contact.password,
contact.email,
contact.phone,
)
)
.bind(contact.name)
.bind(contact.username)
.bind(contact.password)
.bind(contact.email)
.bind(contact.phone)
.execute(&pool)
.await
.unwrap();
@ -114,13 +115,12 @@ async fn select(pool: &PostgresPool, iterations: usize) -> Result<(), sqlx::Erro
for _ in 0..iterations {
// TODO: Once we have FromRow derives we can replace this with Vec<Contact>
let contacts: Vec<(String, String, String, String, String)> = sqlx::query(
let contacts: Vec<(String, String, String, String, String)> = pool.fetch(
r#"
SELECT name, username, password, email, phone
FROM contacts
"#,
"#, (),
)
.fetch(&pool)
.try_collect()
.await?;

View File

@ -49,9 +49,8 @@ async fn main() -> Fallible<()> {
}
async fn ensure_schema(conn: &mut Connection<Postgres>) -> Fallible<()> {
sqlx::query("BEGIN").execute(conn).await?;
conn.execute("BEGIN", ()).await?;
// language=sql
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS tasks (
@ -65,21 +64,19 @@ CREATE TABLE IF NOT EXISTS tasks (
.execute(conn)
.await?;
sqlx::query("COMMIT").execute(conn).await?;
conn.execute("COMMIT", ()).await?;
Ok(())
}
async fn print_all_tasks(conn: &mut Connection<Postgres>) -> Fallible<()> {
// language=sql
sqlx::query(
conn.fetch(
r#"
SELECT id, text
FROM tasks
WHERE done_at IS NULL
"#,
)
.fetch(conn)
.try_for_each(|(id, text): (i64, String)| {
// language=text
println!("{:>5} | {}", id, text);
@ -92,16 +89,7 @@ WHERE done_at IS NULL
}
async fn add_task(conn: &mut Connection<Postgres>, text: &str) -> Fallible<()> {
// language=sql
sqlx::query(
r#"
INSERT INTO tasks ( text )
VALUES ( $1 )
"#,
)
.bind(text)
.execute(conn)
.await?;
conn.execute("INSERT INTO tasks ( text ) VALUES ( $1 )", (text,)).await?;
Ok(())
}

View File

@ -1,5 +1,5 @@
use crate::{
backend::Backend, error::Error, executor::Executor, query::QueryParameters, row::FromSqlRow,
backend::Backend, error::Error, executor::Executor, query::{QueryParameters, IntoQueryParameters}, row::FromSqlRow,
};
use crossbeam_queue::SegQueue;
use crossbeam_utils::atomic::AtomicCell;
@ -79,6 +79,40 @@ where
async fn get(&self) -> ConnectionFairy<'_, DB> {
ConnectionFairy::new(&self.0, self.0.acquire().await)
}
#[inline]
pub async fn execute<A>(&self, query: &str, params: A) -> Result<u64, Error>
where
A: IntoQueryParameters<DB>,
{
Executor::execute(self, query, params.into()).await
}
#[inline]
pub fn fetch<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB> + Send + Unpin,
{
Executor::fetch(self, query, params.into())
}
#[inline]
pub async fn fetch_optional<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> Result<Option<T>, Error>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB>,
{
Executor::fetch_optional(self, query, params.into()).await
}
}
impl<DB> Executor for Connection<DB>

View File

@ -1,6 +1,6 @@
use crate::{
backend::Backend, connection::RawConnection, error::Error, executor::Executor,
query::QueryParameters, row::FromSqlRow,
query::{QueryParameters, IntoQueryParameters}, row::FromSqlRow,
};
use crossbeam_queue::{ArrayQueue, SegQueue};
use futures_channel::oneshot;
@ -58,6 +58,40 @@ where
},
}))
}
#[inline]
pub async fn execute<A>(&self, query: &str, params: A) -> Result<u64, Error>
where
A: IntoQueryParameters<DB>,
{
Executor::execute(self, query, params.into()).await
}
#[inline]
pub fn fetch<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> BoxStream<'c, Result<T, Error>>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB> + Send + Unpin,
{
Executor::fetch(self, query, params.into())
}
#[inline]
pub async fn fetch_optional<'c, 'q: 'c, A: 'c, T: 'c>(
&'c self,
query: &'q str,
params: A,
) -> Result<Option<T>, Error>
where
A: IntoQueryParameters<DB> + Send,
T: FromSqlRow<DB>,
{
Executor::fetch_optional(self, query, params.into()).await
}
}
struct SharedPool<DB>

View File

@ -18,6 +18,77 @@ pub trait QueryParameters: Send {
T: ToSql<Self::Backend>;
}
pub trait IntoQueryParameters<DB> where DB: Backend {
fn into(self) -> DB::QueryParameters;
}
#[allow(unused)]
macro_rules! impl_into_query_parameters {
($( ($idx:tt) -> $T:ident );+;) => {
impl<$($T,)+ DB> IntoQueryParameters<DB> for ($($T,)+)
where
DB: Backend,
$(DB: crate::types::HasSqlType<$T>,)+
$($T: crate::serialize::ToSql<DB>,)+
{
fn into(self) -> DB::QueryParameters {
let mut params = DB::QueryParameters::new();
$(params.bind(self.$idx);)+
params
}
}
};
}
impl<DB> IntoQueryParameters<DB> for ()
where
DB: Backend,
{
fn into(self) -> DB::QueryParameters {
DB::QueryParameters::new()
}
}
impl_into_query_parameters!(
(0) -> T1;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
);
impl_into_query_parameters!(
(0) -> T1;
(1) -> T2;
(2) -> T3;
(3) -> T4;
(4) -> T5;
(5) -> T6;
);
pub struct SqlQuery<'q, DB>
where
DB: Backend,