Minor fixes and run rustfmt

This commit is contained in:
Ryan Leckey 2019-12-03 00:22:02 -08:00
parent f16c805f4c
commit 871183d23b
13 changed files with 61 additions and 74 deletions

View File

@ -1,6 +1,6 @@
use sqlx::{FromRow, Pool, Postgres};
use sqlx::{Pool, Postgres};
use std::env;
use tide::{http::StatusCode, Request, Response, ResultExt};
use tide::{Request, Response};
#[async_std::main]
async fn main() -> anyhow::Result<()> {
@ -34,13 +34,14 @@ async fn register(mut req: Request<Pool<Postgres>>) -> Response {
let mut pool = req.state();
// TODO: Handle the unwrap
let (user_id,): (i64,) =
sqlx::query("INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id")
.bind(body.username)
.bind(body.email)
.fetch_one(&mut pool)
.await
.unwrap();
let (user_id,): (i64,) = sqlx::query!(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
&*body.username,
&*body.email
)
.fetch_one(&mut pool)
.await
.unwrap();
#[derive(serde::Serialize)]
struct RegisterResponseBody {

View File

@ -2,8 +2,8 @@ use async_std::io::{
prelude::{ReadExt, WriteExt},
Read, Write,
};
use std::io;
use bitflags::_core::mem::MaybeUninit;
use std::io;
pub struct BufStream<S> {
pub(crate) stream: S,
@ -21,8 +21,8 @@ pub struct BufStream<S> {
}
impl<S> BufStream<S>
where
S: Read + Write + Unpin,
where
S: Read + Write + Unpin,
{
pub fn new(stream: S) -> Self {
Self {

View File

@ -1,4 +1,4 @@
#![recursion_limit="256"]
#![recursion_limit = "256"]
#![allow(unused_imports)]
#[macro_use]

View File

@ -1,13 +1,11 @@
use super::MariaDb;
use crate::mariadb::protocol::ResultRow;
use crate::mariadb::query::MariaDbQueryParameters;
use crate::{
backend::Backend,
describe::{Describe, ResultField},
mariadb::{protocol::ResultRow, query::MariaDbQueryParameters},
url::Url,
};
use futures_core::stream::BoxStream;
use futures_core::future::BoxFuture;
use crate::url::Url;
use futures_core::{future::BoxFuture, stream::BoxStream};
impl Backend for MariaDb {
type QueryParameters = MariaDbQueryParameters;
@ -24,9 +22,7 @@ impl Backend for MariaDb {
}
fn close(self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(async move {
self.close().await
})
Box::pin(async move { self.close().await })
}
}

View File

@ -9,6 +9,7 @@ use crate::{
},
query::MariaDbQueryParameters,
},
url::Url,
Error, Result,
};
use async_std::net::TcpStream;
@ -17,7 +18,6 @@ use std::{
io,
net::{IpAddr, SocketAddr},
};
use crate::url::Url;
pub struct MariaDb {
pub(crate) stream: BufStream<TcpStream>,
@ -208,9 +208,7 @@ impl MariaDb {
Ok(ok)
}
pub(super) async fn column_definitions(
&mut self
) -> Result<Vec<ColumnDefinitionPacket>> {
pub(super) async fn column_definitions(&mut self) -> Result<Vec<ColumnDefinitionPacket>> {
let packet = self.receive().await?;
// A Resultset starts with a [ColumnCountPacket] which is a single field that encodes
@ -222,7 +220,7 @@ impl MariaDb {
// TODO: This information was *already* returned by PREPARE .., is there a way to suppress generation
let mut columns = vec![];
for _ in 0..column_count {
let column =ColumnDefinitionPacket::decode(self.receive().await?)?;
let column = ColumnDefinitionPacket::decode(self.receive().await?)?;
columns.push(column);
}

View File

@ -3,9 +3,9 @@ use crate::{
connection::MariaDb,
protocol::{Capabilities, HandshakeResponsePacket, InitialHandshakePacket},
},
url::Url,
Result,
};
use crate::url::Url;
pub(crate) async fn establish(conn: &mut MariaDb, url: &Url) -> Result<()> {
let initial = InitialHandshakePacket::decode(conn.receive().await?)?;

View File

@ -3,12 +3,14 @@ use crate::{
backend::Backend,
describe::{Describe, ResultField},
executor::Executor,
params::{IntoQueryParameters, QueryParameters},
mariadb::protocol::{
Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComStmtExecute, EofPacket,
ErrPacket, OkPacket, ResultRow, StmtExecFlag,
mariadb::{
protocol::{
Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComStmtExecute, EofPacket,
ErrPacket, OkPacket, ResultRow, StmtExecFlag,
},
query::MariaDbQueryParameters,
},
mariadb::query::MariaDbQueryParameters,
params::{IntoQueryParameters, QueryParameters},
row::FromRow,
url::Url,
};
@ -76,34 +78,34 @@ impl Executor for MariaDb {
{
let params = params.into_params();
Box::pin(async_stream::try_stream! {
let prepare = self.send_prepare(query).await?;
self.send_execute(prepare.statement_id, params).await?;
Box::pin(async_stream::try_stream! {
let prepare = self.send_prepare(query).await?;
self.send_execute(prepare.statement_id, params).await?;
let columns = self.column_definitions().await?;
let capabilities = self.capabilities;
let columns = self.column_definitions().await?;
let capabilities = self.capabilities;
loop {
let packet = self.receive().await?;
if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
// NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
// but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
if !capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
let _eof = EofPacket::decode(packet)?;
} else {
let _ok = OkPacket::decode(packet, capabilities)?;
}
loop {
let packet = self.receive().await?;
if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
// NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
// but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
if !capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
let _eof = EofPacket::decode(packet)?;
} else {
let _ok = OkPacket::decode(packet, capabilities)?;
}
break;
} else if packet[0] == 0xFF {
let _err = ErrPacket::decode(packet)?;
panic!("ErrPacket received");
} else {
let row = ResultRow::decode(packet, &columns)?;
yield FromRow::from_row(row);
}
}
})
break;
} else if packet[0] == 0xFF {
let _err = ErrPacket::decode(packet)?;
panic!("ErrPacket received");
} else {
let row = ResultRow::decode(packet, &columns)?;
yield FromRow::from_row(row);
}
}
})
}
fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
@ -146,7 +148,7 @@ impl Executor for MariaDb {
}
Ok(row)
})
})
}
fn describe<'e, 'q: 'e>(

View File

@ -5,7 +5,7 @@ use crate::{
mariadb::protocol::{FieldType, ParameterFlag},
types::HasSqlType,
};
use byteorder::{LittleEndian, ByteOrder};
use byteorder::{ByteOrder, LittleEndian};
impl HasSqlType<i16> for MariaDb {
#[inline]

View File

@ -2,12 +2,11 @@ use super::{connection::Step, Postgres};
use crate::{
backend::Backend,
describe::{Describe, ResultField},
postgres::protocol::DataRow,
params::QueryParameters,
postgres::{protocol::DataRow, query::PostgresQueryParameters},
url::Url,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
use crate::postgres::query::PostgresQueryParameters;
impl Backend for Postgres {
type QueryParameters = PostgresQueryParameters;

View File

@ -1,7 +1,9 @@
use crate::{
io::{Buf, BufStream},
postgres::{
error::PostgresDatabaseError,
protocol::{self, Decode, Encode, Message},
query::PostgresQueryParameters,
},
};
use async_std::net::TcpStream;
@ -10,8 +12,6 @@ use std::{
io,
net::{Shutdown, SocketAddr},
};
use crate::postgres::query::PostgresQueryParameters;
use crate::postgres::error::PostgresDatabaseError;
pub struct Postgres {
stream: BufStream<TcpStream>,
@ -75,12 +75,10 @@ impl Postgres {
self.stream.flush().await?;
while let Some(message) = self.receive().await? {
println!("recv!?");
match message {
Message::Authentication(auth) => {
match *auth {
protocol::Authentication::Ok => {
println!("no auth?");
// Do nothing. No password is needed to continue.
}
@ -128,8 +126,6 @@ impl Postgres {
}
}
println!("done");
Ok(())
}

View File

@ -13,6 +13,4 @@ pub mod protocol;
pub mod types;
pub use self::{
connection::Postgres
};
pub use self::connection::Postgres;

View File

@ -188,11 +188,10 @@ where
Ok(quote! {{
#params
sqlx::Query::<#backend_path, _, (#(#output_types),*,), _> {
sqlx::Query::<#backend_path, _, (#(#output_types),*,)> {
query: #query,
input: params,
output: ::core::marker::PhantomData,
target: ::core::marker::PhantomData,
backend: ::core::marker::PhantomData,
}
}}

View File

@ -26,5 +26,3 @@ macro_rules! test {
test!(mysql_bool: bool: "false" == false, "true" == true);
test!(mysql_long: i32: "2141512" == 2141512_i32);