Initial experiment with FromRow

This commit is contained in:
Ryan Leckey 2019-08-05 19:07:36 -07:00
parent bbc0d70366
commit 7dab021532
4 changed files with 117 additions and 56 deletions

View File

@ -1,9 +1,9 @@
#![feature(async_await)]
use futures::{future, TryStreamExt};
use sqlx::{postgres::Connection, ConnectOptions};
use std::io;
// TODO: ToSql and FromSql (to [de]serialize values from/to Rust and SQL)
// TODO: Connection strings ala postgres@localhost/sqlx_dev
#[runtime::main(runtime_tokio::Tokio)]
@ -21,63 +21,56 @@ async fn main() -> io::Result<()> {
)
.await?;
// println!(" :: drop database (if exists) sqlx__dev");
println!(" :: create database sqlx__dev (if not exists)");
// conn.prepare("DROP DATABASE IF EXISTS sqlx__dev")
// .execute()
// .await?;
conn.prepare("CREATE DATABASE IF NOT EXISTS sqlx__dev")
.execute()
.await?;
// println!(" :: create database sqlx__dev");
conn.close().await?;
// conn.prepare("CREATE DATABASE sqlx__dev").execute().await?;
let mut conn = Connection::establish(
ConnectOptions::new()
.host("127.0.0.1")
.port(5432)
.user("postgres")
.database("sqlx__dev"),
)
.await?;
// conn.close().await?;
println!(" :: create schema");
// let mut conn = Connection::establish(
// ConnectOptions::new()
// .host("127.0.0.1")
// .port(5432)
// .user("postgres")
// .database("sqlx__dev"),
// )
// .await?;
conn.prepare(
r#"
CREATE TABLE IF NOT EXISTS users (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL
);
"#,
)
.execute()
.await?;
// println!(" :: create schema");
println!(" :: insert");
// conn.prepare(
// r#"
// CREATE TABLE IF NOT EXISTS users (
// id BIGSERIAL PRIMARY KEY,
// name TEXT NOT NULL
// );
// "#,
// )
// .execute()
// .await?;
// println!(" :: insert");
let row = conn
.prepare("SELECT $1")
.bind("We're cooking with fire now")
let user_id: i64 = conn
.prepare("INSERT INTO users (name) VALUES ($1) RETURNING id")
.bind("Joe")
.get()
.await?
.unwrap();
.await?;
let value: String = row.get(0);
println!("insert {:?}", user_id);
println!(" - {}", value);
println!(" :: select");
// conn.prepare("SELECT id FROM users")
// .select()
// .try_for_each(|row| {
// let id = row.get(0);
conn.prepare("SELECT id, name FROM users")
.select()
.try_for_each(|(id, name): (i64, String)| {
println!("select {} -> {}", id, name);
// println!("select {:?}", id);
// future::ok(())
// })
// .await?;
future::ok(())
})
.await?;
conn.close().await?;

View File

@ -1,17 +1,30 @@
use super::prepare::Prepare;
use crate::{
postgres::protocol::{self, DataRow, Message},
row::Row,
row::{FromRow, Row},
types::SqlType,
};
use std::io;
// TODO: Think through how best to handle null _rows_ and null _values_
impl<'a, 'b> Prepare<'a, 'b> {
pub async fn get(self) -> io::Result<Option<Row>> {
#[inline]
pub async fn get<Record, T>(self) -> io::Result<T>
where
T: FromRow<Record>,
{
Ok(T::from_row(self.get_raw().await?.unwrap()))
}
// TODO: Better name?
// TODO: Should this be public?
async fn get_raw(self) -> io::Result<Option<Row>> {
let conn = self.finish();
conn.flush().await?;
let mut raw: Option<DataRow> = None;
let mut row: Option<Row> = None;
while let Some(message) = conn.receive().await? {
match message {
@ -22,15 +35,15 @@ impl<'a, 'b> Prepare<'a, 'b> {
// Indicates successful completion of a phase
}
Message::DataRow(data_row) => {
Message::DataRow(body) => {
// note: because we used `EXECUTE 1` this will only execute once
raw = Some(data_row);
row = Some(Row(body));
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
return Ok(raw.map(Row));
return Ok(row);
}
message => {

View File

@ -1,10 +1,25 @@
use super::prepare::Prepare;
use crate::postgres::protocol::{self, DataRow, Message};
use futures::{stream, Stream};
use crate::{
postgres::protocol::{self, DataRow, Message},
row::{FromRow, Row},
};
use futures::{stream, Stream, TryStreamExt};
use std::io;
impl<'a, 'b> Prepare<'a, 'b> {
pub fn select(self) -> impl Stream<Item = Result<DataRow, io::Error>> + 'a + Unpin {
#[inline]
pub fn select<Record: 'a, T: 'static>(
self,
) -> impl Stream<Item = Result<T, io::Error>> + 'a + Unpin
where
T: FromRow<Record>,
{
self.select_raw().map_ok(T::from_row)
}
// TODO: Better name?
// TODO: Should this be public?
fn select_raw(self) -> impl Stream<Item = Result<Row, io::Error>> + 'a + Unpin {
// FIXME: Manually implement Stream on a new type to avoid the unfold adapter
stream::unfold(self.finish(), |conn| {
Box::pin(async {
@ -28,7 +43,7 @@ impl<'a, 'b> Prepare<'a, 'b> {
}
Message::DataRow(row) => {
break Some((Ok(row), conn));
break Some((Ok(Row(row)), conn));
}
Message::CommandComplete(_) => {}

View File

@ -7,6 +7,7 @@ use crate::{
pub struct Row(pub(crate) DataRow);
impl Row {
#[inline]
pub fn get<ST, T>(&self, index: usize) -> T
where
ST: SqlType,
@ -15,3 +16,42 @@ impl Row {
T::from_sql(self.0.get(index).unwrap())
}
}
pub trait FromRow<Record> {
fn from_row(row: Row) -> Self;
}
impl<ST, T> FromRow<ST> for T
where
ST: SqlType,
T: FromSql<ST>,
{
#[inline]
fn from_row(row: Row) -> Self {
row.get::<ST, T>(0)
}
}
impl<ST1, T1> FromRow<(ST1,)> for (T1,)
where
ST1: SqlType,
T1: FromSql<ST1>,
{
#[inline]
fn from_row(row: Row) -> Self {
(row.get::<ST1, T1>(0),)
}
}
impl<ST1, ST2, T1, T2> FromRow<(ST1, ST2)> for (T1, T2)
where
ST1: SqlType,
ST2: SqlType,
T1: FromSql<ST1>,
T2: FromSql<ST2>,
{
#[inline]
fn from_row(row: Row) -> Self {
(row.get::<ST1, T1>(0), row.get::<ST2, T2>(1))
}
}