Collapse into 1 crate

This commit is contained in:
Ryan Leckey
2019-07-25 23:41:50 -07:00
parent 0703a1b91a
commit 04f56570a4
41 changed files with 69 additions and 284 deletions

View File

@@ -1,22 +0,0 @@
[package]
name = "sqlx-postgres"
version = "0.0.0"
authors = ["Ryan Leckey <leckey.ryan@gmail.com>"]
license = "MIT OR Apache-2.0"
description = "PostgreSQL database driver for dbx."
edition = "2018"
[dependencies]
sqlx-core = { path = "../sqlx-core" }
sqlx-postgres-protocol = { path = "../sqlx-postgres-protocol" }
runtime = "=0.3.0-alpha.6"
futures-preview = "=0.3.0-alpha.17"
byteorder = "1.3.2"
log = "0.4.7"
hex = "0.3.2"
bytes = "0.4.12"
memchr = "2.2.1"
md-5 = "0.8.0"
[dev-dependencies]
matches = "0.1.8"

View File

@@ -1,77 +0,0 @@
use super::Connection;
use sqlx_core::ConnectOptions;
use sqlx_postgres_protocol::{Authentication, Message, PasswordMessage, StartupMessage};
use std::io;
pub async fn establish<'a, 'b: 'a>(
conn: &'a mut Connection,
options: ConnectOptions<'b>,
) -> io::Result<()> {
// See this doc for more runtime parameters
// https://www.postgresql.org/docs/12/runtime-config-client.html
let params = &[
// FIXME: ConnectOptions user and database need to be required parameters and error
// before they get here
("user", options.user.expect("user is required")),
("database", options.database.expect("database is required")),
// Sets the display format for date and time values,
// as well as the rules for interpreting ambiguous date input values.
("DateStyle", "ISO, MDY"),
// Sets the display format for interval values.
("IntervalStyle", "iso_8601"),
// Sets the time zone for displaying and interpreting time stamps.
("TimeZone", "UTC"),
// Adjust postgres to return percise values for floats
// NOTE: This is default in postgres 12+
("extra_float_digits", "3"),
// Sets the client-side encoding (character set).
("client_encoding", "UTF-8"),
];
let message = StartupMessage::new(params);
conn.send(message);
conn.flush().await?;
while let Some(message) = conn.receive().await? {
match message {
Message::Authentication(Authentication::Ok) => {
// Do nothing; server is just telling us that
// there is no password needed
}
Message::Authentication(Authentication::CleartextPassword) => {
// FIXME: Should error early (before send) if the user did not supply a password
conn.send(PasswordMessage::cleartext(
options.password.unwrap_or_default(),
));
conn.flush().await?;
}
Message::Authentication(Authentication::Md5Password { salt }) => {
// FIXME: Should error early (before send) if the user did not supply a password
conn.send(PasswordMessage::md5(
options.password.unwrap_or_default(),
options.user.unwrap_or_default(),
salt,
));
conn.flush().await?;
}
Message::BackendKeyData(body) => {
conn.process_id = body.process_id();
conn.secret_key = body.secret_key();
}
Message::ReadyForQuery(_) => {
break;
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
Ok(())
}

View File

@@ -1,48 +0,0 @@
use super::prepare::Prepare;
use sqlx_postgres_protocol::{self as proto, Execute, Message, Sync};
use std::io;
impl<'a> Prepare<'a> {
pub async fn execute(self) -> io::Result<u64> {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
self.connection.flush().await?;
let mut rows = 0;
while let Some(message) = self.connection.receive().await? {
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(_) => {
// This is EXECUTE so we are ignoring any potential results
}
Message::CommandComplete(body) => {
rows = body.rows();
}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
return Ok(rows);
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
// FIXME: This is an end-of-file error. How we should bubble this up here?
unreachable!()
}
}

View File

@@ -1,49 +0,0 @@
use super::prepare::Prepare;
use sqlx_postgres_protocol::{self as proto, DataRow, Execute, Message, Sync};
use std::io;
impl<'a> Prepare<'a> {
pub async fn get(self) -> io::Result<Option<DataRow>> {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
self.connection.flush().await?;
let mut row: Option<DataRow> = None;
while let Some(message) = self.connection.receive().await? {
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(data_row) => {
// we only care about the first result.
if row.is_none() {
row = Some(data_row);
}
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
return Ok(row);
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
// FIXME: This is an end-of-file error. How we should bubble this up here?
unreachable!()
}
}

View File

@@ -1,187 +0,0 @@
use bytes::{BufMut, BytesMut};
use futures::{
io::{AsyncReadExt, AsyncWrite, AsyncWriteExt},
ready,
task::{Context, Poll},
Future,
};
use runtime::net::TcpStream;
use sqlx_core::ConnectOptions;
use sqlx_postgres_protocol::{Encode, Message, Terminate};
use std::{fmt::Debug, io, pin::Pin};
mod establish;
mod execute;
mod get;
mod prepare;
mod select;
pub struct Connection {
pub(super) stream: TcpStream,
// Do we think that there is data in the read buffer to be decoded
stream_readable: bool,
// Have we reached end-of-file (been disconnected)
stream_eof: bool,
// Buffer used when sending outgoing messages
wbuf: Vec<u8>,
// Buffer used when reading incoming messages
// TODO: Evaluate if we _really_ want to use BytesMut here
rbuf: BytesMut,
// Process ID of the Backend
process_id: u32,
// Backend-unique key to use to send a cancel query message to the server
secret_key: u32,
}
impl Connection {
pub async fn establish(options: ConnectOptions<'_>) -> io::Result<Self> {
let stream = TcpStream::connect((options.host, options.port)).await?;
let mut conn = Self {
wbuf: Vec::with_capacity(1024),
rbuf: BytesMut::with_capacity(1024 * 8),
stream,
stream_readable: false,
stream_eof: false,
process_id: 0,
secret_key: 0,
};
establish::establish(&mut conn, options).await?;
Ok(conn)
}
pub fn prepare(&mut self, query: &str) -> prepare::Prepare {
prepare::prepare(self, query)
}
pub async fn close(mut self) -> io::Result<()> {
self.send(Terminate);
self.flush().await?;
self.stream.close().await?;
Ok(())
}
// Wait and return the next message to be received from Postgres.
async fn receive(&mut self) -> io::Result<Option<Message>> {
loop {
if self.stream_eof {
// Reached end-of-file on a previous read call.
return Ok(None);
}
if self.stream_readable {
loop {
match Message::decode(&mut self.rbuf)? {
Some(Message::ParameterStatus(_body)) => {
// TODO: not sure what to do with these yet
}
Some(Message::Response(_body)) => {
// TODO: Transform Errors+ into an error type and return
// TODO: Log all others
}
Some(message) => {
return Ok(Some(message));
}
None => {
// Not enough data in the read buffer to parse a message
self.stream_readable = true;
break;
}
}
}
}
// Ensure there is at least 32-bytes of space available
// in the read buffer so we can safely detect end-of-file
self.rbuf.reserve(32);
// SAFE: Read data in directly to buffer without zero-initializing the data.
// Postgres is a self-describing format and the TCP frames encode
// length headers. We will never attempt to decode more than we
// received.
let n = self.stream.read(unsafe { self.rbuf.bytes_mut() }).await?;
// SAFE: After we read in N bytes, we can tell the buffer that it actually
// has that many bytes MORE for the decode routines to look at
unsafe { self.rbuf.advance_mut(n) }
if n == 0 {
self.stream_eof = true;
}
self.stream_readable = true;
}
}
fn send<T>(&mut self, message: T)
where
T: Encode + Debug,
{
log::trace!("encode {:?}", message);
// TODO: Encoding should not be fallible
message.encode(&mut self.wbuf).unwrap();
}
async fn flush(&mut self) -> io::Result<()> {
// TODO: Find some other way to print a Vec<u8> as an ASCII escaped string
log::trace!("send {:?}", bytes::Bytes::from(&*self.wbuf));
WriteAllVec::new(&mut self.stream, &mut self.wbuf).await?;
self.stream.flush().await?;
Ok(())
}
}
// Derived from: https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.16/src/futures_util/io/write_all.rs.html#10-13
// With alterations to be more efficient if we're writing from a mutable vector
// that we can erase
// TODO: Move to Core under 'sqlx_core::io' perhaps?
// TODO: Perhaps the futures project wants this?
pub struct WriteAllVec<'a, W: ?Sized + Unpin> {
writer: &'a mut W,
buf: &'a mut Vec<u8>,
}
impl<W: ?Sized + Unpin> Unpin for WriteAllVec<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVec<'a, W> {
pub(super) fn new(writer: &'a mut W, buf: &'a mut Vec<u8>) -> Self {
WriteAllVec { writer, buf }
}
}
impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVec<'_, W> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = &mut *self;
while !this.buf.is_empty() {
let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?;
this.buf.truncate(this.buf.len() - n);
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
}
Poll::Ready(Ok(()))
}
}

View File

@@ -1,39 +0,0 @@
use super::Connection;
use sqlx_postgres_protocol::{self as proto, Parse};
pub struct Prepare<'a> {
pub(super) connection: &'a mut Connection,
pub(super) bind_state: (usize, usize),
pub(super) bind_values: usize,
}
#[inline]
pub fn prepare<'a, 'b>(connection: &'a mut Connection, query: &'b str) -> Prepare<'a> {
// TODO: Use a hash map to cache the parse
// TODO: Use named statements
connection.send(Parse::new("", query, &[]));
let bind_state = proto::bind::header(&mut connection.wbuf, "", "", &[]);
Prepare {
connection,
bind_state,
bind_values: 0,
}
}
impl<'a> Prepare<'a> {
#[inline]
pub fn bind<'b>(mut self, value: &'b [u8]) -> Self {
proto::bind::value(&mut self.connection.wbuf, value);
self.bind_values += 1;
self
}
#[inline]
pub fn bind_null<'b>(mut self) -> Self {
proto::bind::value_null(&mut self.connection.wbuf);
self.bind_values += 1;
self
}
}

View File

@@ -1,59 +0,0 @@
use super::prepare::Prepare;
use futures::{stream, Stream};
use sqlx_postgres_protocol::{self as proto, DataRow, Execute, Message, Sync};
use std::io;
impl<'a> Prepare<'a> {
pub fn select(self) -> impl Stream<Item = Result<DataRow, io::Error>> + 'a + Unpin {
proto::bind::trailer(
&mut self.connection.wbuf,
self.bind_state,
self.bind_values,
&[],
);
self.connection.send(Execute::new("", 0));
self.connection.send(Sync);
// FIXME: Manually implement Stream on a new type to avoid the unfold adapter
stream::unfold(self.connection, |conn| {
Box::pin(async {
if !conn.wbuf.is_empty() {
if let Err(e) = conn.flush().await {
return Some((Err(e), conn));
}
}
loop {
let message = match conn.receive().await {
Ok(Some(message)) => message,
// FIXME: This is an end-of-file error. How we should bubble this up here?
Ok(None) => unreachable!(),
Err(e) => return Some((Err(e), conn)),
};
match message {
Message::BindComplete | Message::ParseComplete => {
// Indicates successful completion of a phase
}
Message::DataRow(row) => {
break Some((Ok(row), conn));
}
Message::CommandComplete(_) => {}
Message::ReadyForQuery(_) => {
// Successful completion of the whole cycle
break None;
}
message => {
unimplemented!("received {:?} unimplemented message", message);
}
}
}
})
})
}
}

View File

@@ -1,5 +0,0 @@
#![feature(non_exhaustive, async_await)]
#![allow(clippy::needless_lifetimes)]
mod connection;
pub use connection::Connection;