mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-09-27 13:01:43 +00:00
Split up RawQuery into QueryParameters and &str
This commit is contained in:
parent
459a091f74
commit
c5a3fc8d7e
@ -5,7 +5,7 @@ extern crate criterion;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use criterion::{BatchSize, Criterion};
|
||||
use sqlx::pg::protocol::Message;
|
||||
use sqlx::postgres::protocol::Message;
|
||||
|
||||
const MESSAGE_DATA_ROW: &[u8] = b"D\0\0\0\x19\0\x02\0\0\0\x08\0\0\0\0\0\0\0\x05\0\0\0\x03whaD\0\0\0\xd6\0\x02\0\0\0\x08\0\0\0\0\0\0\0\x07\0\0\0\xc0Spicy jalapeno bacon ipsum dolor amet doner venison ground round burgdoggen salami fatback jerky sirloin t-bone beef. Ribeye chuck tenderloin pastrami short loin capicola beef tri-tip venison.";
|
||||
const MESSAGE_COMMAND_COMPLETE: &[u8] = b"C\0\0\0\rSELECT 4\0";
|
||||
@ -25,7 +25,7 @@ fn bench(c: &mut Criterion, name: &'static str, input: &'static [u8]) {
|
||||
b.iter_batched(
|
||||
|| buf.clone(),
|
||||
|mut buf| {
|
||||
while let Some(_body) = Message::decode(&mut buf).unwrap() {}
|
||||
while let Some(_body) = Message::decode(&mut buf) {}
|
||||
assert!(buf.is_empty());
|
||||
},
|
||||
BatchSize::LargeInput,
|
||||
|
@ -10,10 +10,8 @@ use fake::{
|
||||
Dummy, Fake, Faker,
|
||||
};
|
||||
use futures::{channel::oneshot::channel, future, stream::TryStreamExt};
|
||||
use sqlx::{Postgres, Pool};
|
||||
use std::{
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use sqlx::{Pool, Postgres};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
type PostgresPool = Pool<Postgres>;
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
use failure::Fallible;
|
||||
use futures::{future, TryStreamExt};
|
||||
use sqlx::{Postgres, Connection};
|
||||
use sqlx::{Connection, Postgres};
|
||||
use structopt::StructOpt;
|
||||
|
||||
#[derive(StructOpt, Debug)]
|
||||
@ -26,7 +26,8 @@ async fn main() -> Fallible<()> {
|
||||
|
||||
let opt = Options::from_args();
|
||||
|
||||
let mut conn = Connection::<Postgres>::establish("postgres://postgres@127.0.0.1/sqlx__dev").await?;
|
||||
let mut conn =
|
||||
Connection::<Postgres>::establish("postgres://postgres@127.0.0.1/sqlx__dev").await?;
|
||||
|
||||
ensure_schema(&mut conn).await?;
|
||||
|
||||
|
@ -1,14 +1,4 @@
|
||||
pub(crate) use self::internal::BackendAssocRawQuery;
|
||||
use crate::{connection::RawConnection, query::RawQuery, row::Row};
|
||||
|
||||
mod internal {
|
||||
pub trait BackendAssocRawQuery<'q, DB>
|
||||
where
|
||||
DB: super::Backend,
|
||||
{
|
||||
type RawQuery: super::RawQuery<'q, Backend = DB>;
|
||||
}
|
||||
}
|
||||
use crate::{connection::RawConnection, query::QueryParameters, row::Row};
|
||||
|
||||
/// A database backend.
|
||||
///
|
||||
@ -16,7 +6,8 @@ mod internal {
|
||||
/// e.g., implementing `ToSql for Uuid` differently for MySQL and Postgres) and
|
||||
/// to query capabilities within a database backend (e.g., with a specific
|
||||
/// `Connection` can we `bind` a `i64`?).
|
||||
pub trait Backend: Sized + for<'q> BackendAssocRawQuery<'q, Self> {
|
||||
pub trait Backend: Sized {
|
||||
type QueryParameters: QueryParameters<Backend = Self>;
|
||||
type RawConnection: RawConnection<Backend = Self>;
|
||||
type Row: Row<Backend = Self>;
|
||||
}
|
||||
|
@ -1,4 +1,6 @@
|
||||
use crate::{backend::Backend, executor::Executor, query::RawQuery, row::FromSqlRow, error::Error};
|
||||
use crate::{
|
||||
backend::Backend, error::Error, executor::Executor, query::QueryParameters, row::FromSqlRow,
|
||||
};
|
||||
use crossbeam_queue::SegQueue;
|
||||
use crossbeam_utils::atomic::AtomicCell;
|
||||
use futures_channel::oneshot::{channel, Sender};
|
||||
@ -26,23 +28,23 @@ pub trait RawConnection: Send {
|
||||
/// and clean up not fully closed connections.
|
||||
fn finalize<'c>(&'c mut self) -> BoxFuture<'c, Result<(), Error>>;
|
||||
|
||||
fn execute<'c, 'q, Q: 'q>(&'c mut self, query: Q) -> BoxFuture<'c, Result<u64, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>;
|
||||
|
||||
fn fetch<'c, 'q, Q: 'q>(
|
||||
fn execute<'c>(
|
||||
&'c mut self,
|
||||
query: Q,
|
||||
) -> BoxStream<'c, Result<<Self::Backend as Backend>::Row, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>;
|
||||
query: &str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<u64, Error>>;
|
||||
|
||||
fn fetch_optional<'c, 'q, Q: 'q>(
|
||||
fn fetch<'c>(
|
||||
&'c mut self,
|
||||
query: Q,
|
||||
) -> BoxFuture<'c, Result<Option<<Self::Backend as Backend>::Row>, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>;
|
||||
query: &str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxStream<'c, Result<<Self::Backend as Backend>::Row, Error>>;
|
||||
|
||||
fn fetch_optional<'c>(
|
||||
&'c mut self,
|
||||
query: &str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<Option<<Self::Backend as Backend>::Row>, Error>>;
|
||||
}
|
||||
|
||||
pub struct Connection<DB>(Arc<SharedConnection<DB>>)
|
||||
@ -85,24 +87,28 @@ where
|
||||
{
|
||||
type Backend = DB;
|
||||
|
||||
fn execute<'c, 'q, Q: 'q + 'c>(&'c self, query: Q) -> BoxFuture<'c, Result<u64, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
{
|
||||
fn execute<'c, 'q: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<u64, Error>> {
|
||||
Box::pin(async move {
|
||||
let mut conn = self.get().await;
|
||||
conn.execute(query).await
|
||||
conn.execute(query, params).await
|
||||
})
|
||||
}
|
||||
|
||||
fn fetch<'c, 'q, T: 'c, Q: 'q + 'c>(&'c self, query: Q) -> BoxStream<'c, Result<T, Error>>
|
||||
fn fetch<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxStream<'c, Result<T, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend> + Send + Unpin,
|
||||
{
|
||||
Box::pin(async_stream::try_stream! {
|
||||
let mut conn = self.get().await;
|
||||
let mut s = conn.fetch(query);
|
||||
let mut s = conn.fetch(query, params);
|
||||
|
||||
while let Some(row) = s.next().await.transpose()? {
|
||||
yield T::from_row(row);
|
||||
@ -110,17 +116,17 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn fetch_optional<'c, 'q, T: 'c, Q: 'q + 'c>(
|
||||
fn fetch_optional<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: Q,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<Option<T>, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend>,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let mut conn = self.get().await;
|
||||
let row = conn.fetch_optional(query).await?;
|
||||
let row = conn.fetch_optional(query, params).await?;
|
||||
|
||||
Ok(row.map(T::from_row))
|
||||
})
|
||||
|
21
src/error.rs
21
src/error.rs
@ -1,18 +1,20 @@
|
||||
use std::io;
|
||||
use std::error::Error as StdError;
|
||||
use std::fmt::{self, Display};
|
||||
use std::{
|
||||
error::Error as StdError,
|
||||
fmt::{self, Display},
|
||||
io,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// Error communicating with the database backend.
|
||||
///
|
||||
///
|
||||
/// Some reasons for this to be caused:
|
||||
///
|
||||
/// - [io::ErrorKind::ConnectionRefused] - Database backend is most likely behind a firewall.
|
||||
/// - [io::ErrorKind::ConnectionReset] - Database backend dropped the client connection (perhaps from an administrator action).
|
||||
/// - [io::ErrorKind::InvalidData] - Unexpected or invalid data was encountered. This would indicate that we received data that we were not
|
||||
/// expecting or it was in a format we did not understand. This generally means either there is a programming error in a SQLx driver or
|
||||
/// something with the connection or the database backend itself is corrupted. Additional details are provided along with the
|
||||
/// - [io::ErrorKind::InvalidData] - Unexpected or invalid data was encountered. This would indicate that we received data that we were not
|
||||
/// expecting or it was in a format we did not understand. This generally means either there is a programming error in a SQLx driver or
|
||||
/// something with the connection or the database backend itself is corrupted. Additional details are provided along with the
|
||||
/// error.
|
||||
///
|
||||
Io(io::Error),
|
||||
@ -25,12 +27,11 @@ pub enum Error {
|
||||
|
||||
// TODO: Remove and replace with `#[non_exhaustive]` when possible
|
||||
#[doc(hidden)]
|
||||
__Nonexhaustive
|
||||
__Nonexhaustive,
|
||||
}
|
||||
|
||||
// TODO: Forward causes where present
|
||||
impl StdError for Error {
|
||||
}
|
||||
impl StdError for Error {}
|
||||
|
||||
// TODO: Don't just forward to debug
|
||||
impl Display for Error {
|
||||
|
@ -1,25 +1,30 @@
|
||||
use crate::{backend::Backend, query::RawQuery, row::FromSqlRow, error::Error};
|
||||
use crate::{backend::Backend, error::Error, query::QueryParameters, row::FromSqlRow};
|
||||
use futures_core::{future::BoxFuture, stream::BoxStream};
|
||||
use std::io;
|
||||
|
||||
pub trait Executor: Send {
|
||||
type Backend: Backend;
|
||||
|
||||
fn execute<'c, 'q, Q: 'q + 'c>(&'c self, query: Q) -> BoxFuture<'c, Result<u64, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>;
|
||||
fn execute<'c, 'q: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<u64, Error>>;
|
||||
|
||||
fn fetch<'c, 'q, T: 'c, Q: 'q + 'c>(&'c self, query: Q) -> BoxStream<'c, Result<T, Error>>
|
||||
fn fetch<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxStream<'c, Result<T, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend> + Send + Unpin;
|
||||
|
||||
fn fetch_optional<'c, 'q, T: 'c, Q: 'q + 'c>(
|
||||
fn fetch_optional<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: Q,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<Option<T>, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend>;
|
||||
}
|
||||
|
||||
@ -30,29 +35,33 @@ where
|
||||
type Backend = E::Backend;
|
||||
|
||||
#[inline]
|
||||
fn execute<'c, 'q, Q: 'q + 'c>(&'c self, query: Q) -> BoxFuture<'c, Result<u64, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
{
|
||||
(*self).execute(query)
|
||||
fn execute<'c, 'q: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<u64, Error>> {
|
||||
(*self).execute(query, params)
|
||||
}
|
||||
|
||||
fn fetch<'c, 'q, T: 'c, Q: 'q + 'c>(&'c self, query: Q) -> BoxStream<'c, Result<T, Error>>
|
||||
fn fetch<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxStream<'c, Result<T, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend> + Send + Unpin,
|
||||
{
|
||||
(*self).fetch(query)
|
||||
(*self).fetch(query, params)
|
||||
}
|
||||
|
||||
fn fetch_optional<'c, 'q, T: 'c, Q: 'q + 'c>(
|
||||
fn fetch_optional<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: Q,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<Option<T>, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend>,
|
||||
{
|
||||
(*self).fetch_optional(query)
|
||||
(*self).fetch_optional(query, params)
|
||||
}
|
||||
}
|
||||
|
@ -30,15 +30,14 @@ pub mod postgres;
|
||||
pub use self::postgres::Postgres;
|
||||
|
||||
mod connection;
|
||||
pub mod error;
|
||||
mod executor;
|
||||
mod pool;
|
||||
mod query;
|
||||
pub mod error;
|
||||
|
||||
pub use self::{
|
||||
error::Error,
|
||||
connection::Connection,
|
||||
error::Error,
|
||||
pool::Pool,
|
||||
query::{query, SqlQuery},
|
||||
};
|
||||
|
||||
|
32
src/pool.rs
32
src/pool.rs
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
error::Error,
|
||||
backend::Backend, connection::RawConnection, executor::Executor, query::RawQuery, row::FromSqlRow,
|
||||
backend::Backend, connection::RawConnection, error::Error, executor::Executor,
|
||||
query::QueryParameters, row::FromSqlRow,
|
||||
};
|
||||
use crossbeam_queue::{ArrayQueue, SegQueue};
|
||||
use futures_channel::oneshot;
|
||||
@ -129,27 +129,31 @@ where
|
||||
{
|
||||
type Backend = DB;
|
||||
|
||||
fn execute<'c, 'q, Q: 'q + 'c>(&'c self, query: Q) -> BoxFuture<'c, Result<u64, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
{
|
||||
fn execute<'c, 'q: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<u64, Error>> {
|
||||
Box::pin(async move {
|
||||
let live = self.0.acquire().await?;
|
||||
let mut conn = PooledConnection::new(&self.0, live);
|
||||
|
||||
conn.execute(query).await
|
||||
conn.execute(query, params).await
|
||||
})
|
||||
}
|
||||
|
||||
fn fetch<'c, 'q, T: 'c, Q: 'q + 'c>(&'c self, query: Q) -> BoxStream<'c, Result<T, Error>>
|
||||
fn fetch<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxStream<'c, Result<T, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend> + Send + Unpin,
|
||||
{
|
||||
Box::pin(async_stream::try_stream! {
|
||||
let live = self.0.acquire().await?;
|
||||
let mut conn = PooledConnection::new(&self.0, live);
|
||||
let mut s = conn.fetch(query);
|
||||
let mut s = conn.fetch(query, params);
|
||||
|
||||
while let Some(row) = s.next().await.transpose()? {
|
||||
yield T::from_row(row);
|
||||
@ -157,18 +161,18 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn fetch_optional<'c, 'q, T: 'c, Q: 'q + 'c>(
|
||||
fn fetch_optional<'c, 'q: 'c, T: 'c>(
|
||||
&'c self,
|
||||
query: Q,
|
||||
query: &'q str,
|
||||
params: <Self::Backend as Backend>::QueryParameters,
|
||||
) -> BoxFuture<'c, Result<Option<T>, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
T: FromSqlRow<Self::Backend>,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let live = self.0.acquire().await?;
|
||||
let mut conn = PooledConnection::new(&self.0, live);
|
||||
let row = conn.fetch_optional(query).await?;
|
||||
let row = conn.fetch_optional(query, params).await?;
|
||||
|
||||
Ok(row.map(T::from_row))
|
||||
})
|
||||
|
@ -1,12 +1,9 @@
|
||||
use crate::backend::{Backend, BackendAssocRawQuery};
|
||||
use crate::backend::Backend;
|
||||
|
||||
pub struct Postgres;
|
||||
|
||||
impl<'q> BackendAssocRawQuery<'q, Postgres> for Postgres {
|
||||
type RawQuery = super::PostgresRawQuery<'q>;
|
||||
}
|
||||
|
||||
impl Backend for Postgres {
|
||||
type QueryParameters = super::PostgresQueryParameters;
|
||||
type RawConnection = super::PostgresRawConnection;
|
||||
type Row = super::PostgresRow;
|
||||
}
|
||||
|
@ -1,10 +1,15 @@
|
||||
use super::PostgresRawConnection;
|
||||
use crate::postgres::protocol::{Authentication, Message, PasswordMessage, StartupMessage};
|
||||
use crate::{
|
||||
error::Error,
|
||||
postgres::protocol::{Authentication, Message, PasswordMessage, StartupMessage},
|
||||
};
|
||||
use std::io;
|
||||
use crate::error::Error;
|
||||
use url::Url;
|
||||
|
||||
pub async fn establish<'a, 'b: 'a>(conn: &'a mut PostgresRawConnection, url: &'b Url) -> Result<(), Error> {
|
||||
pub async fn establish<'a, 'b: 'a>(
|
||||
conn: &'a mut PostgresRawConnection,
|
||||
url: &'b Url,
|
||||
) -> Result<(), Error> {
|
||||
let user = url.username();
|
||||
let password = url.password().unwrap_or("");
|
||||
let database = url.path().trim_start_matches('/');
|
||||
|
@ -1,6 +1,5 @@
|
||||
use super::PostgresRawConnection;
|
||||
use crate::postgres::protocol::Message;
|
||||
use crate::error::Error;
|
||||
use crate::{error::Error, postgres::protocol::Message};
|
||||
use std::io;
|
||||
|
||||
pub async fn execute(conn: &mut PostgresRawConnection) -> Result<u64, Error> {
|
||||
|
@ -1,7 +1,6 @@
|
||||
use super::{PostgresRawConnection, PostgresRow};
|
||||
use crate::postgres::protocol::Message;
|
||||
use crate::{error::Error, postgres::protocol::Message};
|
||||
use futures_core::stream::Stream;
|
||||
use crate::error::Error;
|
||||
use std::io;
|
||||
|
||||
pub fn fetch<'a>(
|
||||
|
@ -1,9 +1,10 @@
|
||||
use super::{PostgresRawConnection, PostgresRow};
|
||||
use crate::postgres::protocol::Message;
|
||||
use crate::error::Error;
|
||||
use crate::{error::Error, postgres::protocol::Message};
|
||||
use std::io;
|
||||
|
||||
pub async fn fetch_optional<'a>(conn: &'a mut PostgresRawConnection) -> Result<Option<PostgresRow>, Error> {
|
||||
pub async fn fetch_optional<'a>(
|
||||
conn: &'a mut PostgresRawConnection,
|
||||
) -> Result<Option<PostgresRow>, Error> {
|
||||
conn.flush().await?;
|
||||
|
||||
let mut row: Option<PostgresRow> = None;
|
||||
|
@ -1,9 +1,8 @@
|
||||
use super::{
|
||||
protocol::{Encode, Message, Terminate},
|
||||
Postgres, PostgresRow,
|
||||
protocol::{self, Encode, Message, Terminate},
|
||||
Postgres, PostgresQueryParameters, PostgresRow,
|
||||
};
|
||||
use crate::error::Error;
|
||||
use crate::{connection::RawConnection, query::RawQuery};
|
||||
use crate::{connection::RawConnection, error::Error, query::QueryParameters};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use futures_core::{future::BoxFuture, stream::BoxStream};
|
||||
use std::{
|
||||
@ -122,7 +121,11 @@ impl PostgresRawConnection {
|
||||
// Postgres is a self-describing format and the TCP frames encode
|
||||
// length headers. We will never attempt to decode more than we
|
||||
// received.
|
||||
let n = self.stream.read(unsafe { self.rbuf.bytes_mut() }).await.map_err(Error::Io)?;
|
||||
let n = self
|
||||
.stream
|
||||
.read(unsafe { self.rbuf.bytes_mut() })
|
||||
.await
|
||||
.map_err(Error::Io)?;
|
||||
|
||||
// SAFE: After we read in N bytes, we can tell the buffer that it actually
|
||||
// has that many bytes MORE for the decode routines to look at
|
||||
@ -161,33 +164,59 @@ impl RawConnection for PostgresRawConnection {
|
||||
Box::pin(self.finalize())
|
||||
}
|
||||
|
||||
fn execute<'c, 'q, Q: 'q>(&'c mut self, query: Q) -> BoxFuture<'c, Result<u64, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
{
|
||||
query.finish(self);
|
||||
fn execute<'c>(
|
||||
&'c mut self,
|
||||
query: &str,
|
||||
params: PostgresQueryParameters,
|
||||
) -> BoxFuture<'c, Result<u64, Error>> {
|
||||
finish(self, query, params, 0);
|
||||
|
||||
Box::pin(execute::execute(self))
|
||||
}
|
||||
|
||||
fn fetch<'c, 'q, Q: 'q>(&'c mut self, query: Q) -> BoxStream<'c, Result<PostgresRow, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
{
|
||||
query.finish(self);
|
||||
fn fetch<'c>(
|
||||
&'c mut self,
|
||||
query: &str,
|
||||
params: PostgresQueryParameters,
|
||||
) -> BoxStream<'c, Result<PostgresRow, Error>> {
|
||||
finish(self, query, params, 0);
|
||||
|
||||
Box::pin(fetch::fetch(self))
|
||||
}
|
||||
|
||||
fn fetch_optional<'c, 'q, Q: 'q>(
|
||||
fn fetch_optional<'c>(
|
||||
&'c mut self,
|
||||
query: Q,
|
||||
) -> BoxFuture<'c, Result<Option<PostgresRow>, Error>>
|
||||
where
|
||||
Q: RawQuery<'q, Backend = Self::Backend>,
|
||||
{
|
||||
query.finish(self);
|
||||
query: &str,
|
||||
params: PostgresQueryParameters,
|
||||
) -> BoxFuture<'c, Result<Option<PostgresRow>, Error>> {
|
||||
finish(self, query, params, 1);
|
||||
|
||||
Box::pin(fetch_optional::fetch_optional(self))
|
||||
}
|
||||
}
|
||||
|
||||
fn finish(conn: &mut PostgresRawConnection, query: &str, params: PostgresQueryParameters, limit: i32) {
|
||||
conn.write(protocol::Parse {
|
||||
portal: "",
|
||||
query,
|
||||
param_types: &*params.types,
|
||||
});
|
||||
|
||||
conn.write(protocol::Bind {
|
||||
portal: "",
|
||||
statement: "",
|
||||
formats: &[1], // [BINARY]
|
||||
// TODO: Early error if there is more than i16
|
||||
values_len: params.types.len() as i16,
|
||||
values: &*params.buf,
|
||||
result_formats: &[1], // [BINARY]
|
||||
});
|
||||
|
||||
// TODO: Make limit be 1 for fetch_optional
|
||||
conn.write(protocol::Execute {
|
||||
portal: "",
|
||||
limit,
|
||||
});
|
||||
|
||||
conn.write(protocol::Sync);
|
||||
}
|
||||
|
@ -6,4 +6,7 @@ mod query;
|
||||
mod row;
|
||||
pub mod types;
|
||||
|
||||
pub use self::{backend::Postgres, connection::PostgresRawConnection, query::PostgresRawQuery, row::PostgresRow};
|
||||
pub use self::{
|
||||
backend::Postgres, connection::PostgresRawConnection, query::PostgresQueryParameters,
|
||||
row::PostgresRow,
|
||||
};
|
||||
|
@ -1,4 +1,4 @@
|
||||
//! Low level PostgreSQL protocol. Defines the encoding and decoding of the messages communicated
|
||||
//! Low level PostgreSQL protocol. Defines the encoding and decoding of the messages communicated
|
||||
//! to and from the database server.
|
||||
|
||||
mod bind;
|
||||
|
@ -3,28 +3,24 @@ use super::{
|
||||
Postgres, PostgresRawConnection,
|
||||
};
|
||||
use crate::{
|
||||
query::RawQuery,
|
||||
query::QueryParameters,
|
||||
serialize::{IsNull, ToSql},
|
||||
types::HasSqlType,
|
||||
};
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
|
||||
pub struct PostgresRawQuery<'q> {
|
||||
limit: i32,
|
||||
query: &'q str,
|
||||
pub struct PostgresQueryParameters {
|
||||
// OIDs of the bind parameters
|
||||
types: Vec<u32>,
|
||||
pub(super) types: Vec<u32>,
|
||||
// Write buffer for serializing bind values
|
||||
buf: Vec<u8>,
|
||||
pub(super) buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'q> RawQuery<'q> for PostgresRawQuery<'q> {
|
||||
impl QueryParameters for PostgresQueryParameters {
|
||||
type Backend = Postgres;
|
||||
|
||||
fn new(query: &'q str) -> Self {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
limit: 0,
|
||||
query,
|
||||
// Estimates for average number of bind parameters were
|
||||
// chosen from sampling from internal projects
|
||||
types: Vec::with_capacity(4),
|
||||
@ -32,7 +28,7 @@ impl<'q> RawQuery<'q> for PostgresRawQuery<'q> {
|
||||
}
|
||||
}
|
||||
|
||||
fn bind<T>(mut self, value: T) -> Self
|
||||
fn bind<T>(&mut self, value: T)
|
||||
where
|
||||
Self: Sized,
|
||||
Self::Backend: HasSqlType<T>,
|
||||
@ -56,33 +52,5 @@ impl<'q> RawQuery<'q> for PostgresRawQuery<'q> {
|
||||
|
||||
// Write-back the len to the beginning of this frame (not including the len of len)
|
||||
BigEndian::write_i32(&mut self.buf[pos..], len as i32);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
fn finish(self, conn: &mut PostgresRawConnection) {
|
||||
conn.write(protocol::Parse {
|
||||
portal: "",
|
||||
query: self.query,
|
||||
param_types: &*self.types,
|
||||
});
|
||||
|
||||
conn.write(protocol::Bind {
|
||||
portal: "",
|
||||
statement: "",
|
||||
formats: &[1], // [BINARY]
|
||||
// TODO: Early error if there is more than i16
|
||||
values_len: self.types.len() as i16,
|
||||
values: &*self.buf,
|
||||
result_formats: &[1], // [BINARY]
|
||||
});
|
||||
|
||||
// TODO: Make limit be 1 for fetch_optional
|
||||
conn.write(protocol::Execute {
|
||||
portal: "",
|
||||
limit: self.limit,
|
||||
});
|
||||
|
||||
conn.write(protocol::Sync);
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use super::{Postgres, PostgresTypeMetadata, PostgresTypeFormat};
|
||||
use super::{Postgres, PostgresTypeFormat, PostgresTypeMetadata};
|
||||
use crate::{
|
||||
deserialize::FromSql,
|
||||
serialize::{IsNull, ToSql},
|
||||
|
@ -1,4 +1,4 @@
|
||||
use super::{Postgres, PostgresTypeMetadata, PostgresTypeFormat};
|
||||
use super::{Postgres, PostgresTypeFormat, PostgresTypeMetadata};
|
||||
use crate::{
|
||||
deserialize::FromSql,
|
||||
serialize::{IsNull, ToSql},
|
||||
|
@ -1,6 +1,6 @@
|
||||
//! PostgreSQL types.
|
||||
//!
|
||||
//! The following types are supported by this crate,
|
||||
//! The following types are supported by this crate,
|
||||
//! along with the corresponding Postgres types:
|
||||
//!
|
||||
//! ### Standard
|
||||
@ -39,10 +39,10 @@ pub enum PostgresTypeFormat {
|
||||
Binary = 1,
|
||||
}
|
||||
|
||||
/// Provides the OIDs for a SQL type and the expected format to be used for
|
||||
/// Provides the OIDs for a SQL type and the expected format to be used for
|
||||
/// transmission between Rust and PostgreSQL.
|
||||
///
|
||||
/// While the BINARY format is preferred in most cases, there are scenarios
|
||||
/// While the BINARY format is preferred in most cases, there are scenarios
|
||||
/// where only the TEXT format may be available for a type.
|
||||
pub struct PostgresTypeMetadata {
|
||||
pub format: PostgresTypeFormat,
|
||||
|
@ -1,4 +1,4 @@
|
||||
use super::{Postgres, PostgresTypeMetadata, PostgresTypeFormat};
|
||||
use super::{Postgres, PostgresTypeFormat, PostgresTypeMetadata};
|
||||
use crate::{
|
||||
deserialize::FromSql,
|
||||
serialize::{IsNull, ToSql},
|
||||
|
38
src/query.rs
38
src/query.rs
@ -1,32 +1,29 @@
|
||||
use crate::{
|
||||
backend::{Backend, BackendAssocRawQuery},
|
||||
executor::Executor,
|
||||
row::FromSqlRow,
|
||||
serialize::ToSql,
|
||||
error::Error,
|
||||
backend::Backend, error::Error, executor::Executor, row::FromSqlRow, serialize::ToSql,
|
||||
types::HasSqlType,
|
||||
};
|
||||
use futures_core::{future::BoxFuture, stream::BoxStream};
|
||||
use std::io;
|
||||
|
||||
pub trait RawQuery<'q>: Sized + Send + Sync {
|
||||
pub trait QueryParameters: Send {
|
||||
type Backend: Backend;
|
||||
|
||||
fn new(query: &'q str) -> Self;
|
||||
fn new() -> Self
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
fn bind<T>(self, value: T) -> Self
|
||||
fn bind<T>(&mut self, value: T)
|
||||
where
|
||||
Self::Backend: HasSqlType<T>,
|
||||
T: ToSql<Self::Backend>;
|
||||
|
||||
fn finish(self, conn: &mut <Self::Backend as Backend>::RawConnection);
|
||||
}
|
||||
|
||||
pub struct SqlQuery<'q, DB>
|
||||
where
|
||||
DB: Backend,
|
||||
{
|
||||
inner: <DB as BackendAssocRawQuery<'q, DB>>::RawQuery,
|
||||
query: &'q str,
|
||||
params: DB::QueryParameters,
|
||||
}
|
||||
|
||||
impl<'q, DB> SqlQuery<'q, DB>
|
||||
@ -36,7 +33,8 @@ where
|
||||
#[inline]
|
||||
pub fn new(query: &'q str) -> Self {
|
||||
Self {
|
||||
inner: <DB as BackendAssocRawQuery<'q, DB>>::RawQuery::new(query),
|
||||
query,
|
||||
params: DB::QueryParameters::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +44,7 @@ where
|
||||
DB: HasSqlType<T>,
|
||||
T: ToSql<DB>,
|
||||
{
|
||||
self.inner = self.inner.bind(value);
|
||||
self.params.bind(value);
|
||||
self
|
||||
}
|
||||
|
||||
@ -56,9 +54,8 @@ where
|
||||
pub fn execute<E>(self, executor: &'q E) -> BoxFuture<'q, Result<u64, Error>>
|
||||
where
|
||||
E: Executor<Backend = DB>,
|
||||
<DB as BackendAssocRawQuery<'q, DB>>::RawQuery: 'q,
|
||||
{
|
||||
executor.execute(self.inner)
|
||||
executor.execute(self.query, self.params)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@ -66,19 +63,20 @@ where
|
||||
where
|
||||
E: Executor<Backend = DB>,
|
||||
T: FromSqlRow<DB> + Send + Unpin,
|
||||
<DB as BackendAssocRawQuery<'q, DB>>::RawQuery: 'q,
|
||||
{
|
||||
executor.fetch(self.inner)
|
||||
executor.fetch(self.query, self.params)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn fetch_optional<E, T: 'q>(self, executor: &'q E) -> BoxFuture<'q, Result<Option<T>, Error>>
|
||||
pub fn fetch_optional<E, T: 'q>(
|
||||
self,
|
||||
executor: &'q E,
|
||||
) -> BoxFuture<'q, Result<Option<T>, Error>>
|
||||
where
|
||||
E: Executor<Backend = DB>,
|
||||
T: FromSqlRow<DB>,
|
||||
<DB as BackendAssocRawQuery<'q, DB>>::RawQuery: 'q,
|
||||
{
|
||||
executor.fetch_optional(self.inner)
|
||||
executor.fetch_optional(self.query, self.params)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,17 +16,17 @@ pub enum IsNull {
|
||||
///
|
||||
/// The data must be written to the buffer in the expected format for the given backend.
|
||||
///
|
||||
/// When possible, implementations of this trait should prefer using an existing implementation,
|
||||
/// When possible, implementations of this trait should prefer using an existing implementation,
|
||||
/// rather than writing to `out` directly.
|
||||
pub trait ToSql<DB: Backend> {
|
||||
/// Writes the value of `self` into `buf` as the expected format for the given backend.
|
||||
///
|
||||
/// The return value indicates if this value should be represented as `NULL`.
|
||||
/// The return value indicates if this value should be represented as `NULL`.
|
||||
/// If this is the case, implementations **must not** write anything to `out`.
|
||||
fn to_sql(self, buf: &mut Vec<u8>) -> IsNull;
|
||||
}
|
||||
|
||||
/// [ToSql] is implemented for `Option<T>` where `T` implements `ToSql`. An `Option<T>`
|
||||
/// [ToSql] is implemented for `Option<T>` where `T` implements `ToSql`. An `Option<T>`
|
||||
/// represents a nullable SQL value.
|
||||
impl<T, DB> ToSql<DB> for Option<T>
|
||||
where
|
||||
|
Loading…
x
Reference in New Issue
Block a user