Initial low-level connection interface

This commit is contained in:
Ryan Leckey
2019-06-07 15:34:21 -07:00
parent d29b24abf0
commit 7fd4918c48
13 changed files with 442 additions and 6 deletions

8
.editorconfig Normal file
View File

@@ -0,0 +1,8 @@
root = true
[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
indent_style = space
indent_size = 4

6
.gitignore vendored
View File

@@ -1,3 +1,3 @@
/target
**/*.rs.bk
Cargo.lock
.idea/
target/
Cargo.lock

View File

@@ -1,9 +1,20 @@
[workspace]
members = [
".",
"mason-postgres"
]
[package]
name = "dbx"
version = "0.1.0"
name = "mason"
version = "0.0.0"
authors = ["Ryan Leckey <leckey.ryan@gmail.com>"]
license = "MIT OR Apache-2.0"
description = "Asynchronous and expressive database client in pure Rust."
edition = "2018"
[dependencies]
runtime = "=0.3.0-alpha.4"
mason-postgres = { path = "mason-postgres" }
failure = "0.1"
env_logger = "0.6.1"
bytes = "0.4.12"

View File

@@ -2,7 +2,8 @@
_Asynchronous and expressive database client in pure Rust_
This is an experiment being worked on in stages. The first stage
will be an untyped query builder.
will be a very low-level, generic database driver (hopefully) capable of basic execution of
simple queries.
## License

17
mason-postgres/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "mason-postgres"
version = "0.0.0"
authors = ["Ryan Leckey <leckey.ryan@gmail.com>"]
license = "MIT OR Apache-2.0"
description = "PostgreSQL database driver for dbx."
edition = "2018"
[dependencies]
runtime = "=0.3.0-alpha.4"
futures-preview = "=0.3.0-alpha.16"
failure = "0.1"
byteorder = "1.3.1"
log = "0.4"
hex = "0.3.2"
bytes = "0.4.12"
memchr = "2.2.0"

145
mason-postgres/src/lib.rs Normal file
View File

@@ -0,0 +1,145 @@
#![feature(non_exhaustive, async_await)]
#![allow(clippy::needless_lifetimes)]
use crate::protocol::{client, server};
use bytes::BytesMut;
use futures::{
channel::mpsc,
io::{AsyncRead, AsyncReadExt, AsyncWriteExt, WriteHalf},
SinkExt, StreamExt,
};
use runtime::{net::TcpStream, task::JoinHandle};
use std::io;
pub mod protocol;
pub struct Connection {
buf: Vec<u8>,
writer: WriteHalf<TcpStream>,
incoming: mpsc::Receiver<server::Message>,
reader: Option<JoinHandle<io::Result<()>>>,
}
impl Connection {
pub async fn open(address: &str) -> io::Result<Self> {
let stream = TcpStream::connect(address).await?;
let (mut reader, writer) = stream.split();
// FIXME: What's a good buffer size here?
let (mut tx, rx) = mpsc::channel(1024);
let reader = runtime::spawn(async move {
let mut buf = BytesMut::with_capacity(0);
let mut len = 0;
'reader: loop {
if len == buf.len() {
unsafe {
buf.reserve(32);
buf.set_len(buf.capacity());
reader.initializer().initialize(&mut buf[len..]);
}
}
let num = reader.read(&mut buf[len..]).await?;
if num > 0 {
len += num;
}
while len > 0 && !buf.is_empty() {
let size = buf.len();
let msg = server::Message::deserialize(&mut buf)?;
let removed = size - buf.len();
len -= removed;
match msg {
Some(server::Message::ParameterStatus(body)) => {
// FIXME: Proper log
log::info!("{:?}", body);
}
Some(msg) => {
tx.send(msg).await.unwrap();
}
None => {
// We have _some_ amount of data but not enough to
// deserialize anything
break;
}
}
}
// FIXME: This probably doesn't make sense
if len == 0 && !buf.is_empty() {
// Hit end-of-stream
break 'reader;
}
}
Ok(())
});
Ok(Self {
// FIXME: What's a good buffer size here?
buf: Vec::with_capacity(1024),
writer,
reader: Some(reader),
incoming: rx,
})
}
pub async fn startup<'a, 'b: 'a>(
&'a mut self,
user: &'b str,
_password: &'b str,
database: &'b str,
) -> io::Result<()> {
let params = [("user", user), ("database", database)];
let msg = client::StartupMessage { params: &params };
msg.serialize(&mut self.buf);
self.writer.write_all(&self.buf).await?;
self.buf.clear();
self.writer.flush().await?;
// FIXME: We _actually_ want to wait until ReadyForQuery, ErrorResponse, AuthX, etc.
while let Some(message) = self.incoming.next().await {
match message {
server::Message::AuthenticationOk => {
// Do nothing; server is just telling us "you're in"
}
server::Message::ReadyForQuery(_) => {
// Good to go
break;
}
_ => {}
}
}
Ok(())
}
pub async fn terminate(&mut self) -> io::Result<()> {
let msg = client::Terminate {};
msg.serialize(&mut self.buf);
self.writer.write_all(&self.buf).await?;
self.buf.clear();
self.writer.flush().await?;
self.writer.close().await?;
if let Some(reader) = self.reader.take() {
reader.await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,81 @@
use byteorder::{BigEndian, ByteOrder};
// Reference
// https://www.postgresql.org/docs/devel/protocol-message-formats.html
// https://www.postgresql.org/docs/devel/protocol-message-types.html
#[derive(Debug)]
pub struct Terminate;
impl Terminate {
pub fn serialize(&self, buf: &mut Vec<u8>) {
buf.push(b'X');
buf.push(4);
}
}
#[derive(Debug)]
pub struct StartupMessage<'a> {
/// One or more pairs of parameter name and value strings.
/// A zero byte is required as a terminator after the last name/value pair.
/// Parameters can appear in any order. user is required, others are optional.
pub params: &'a [(&'a str, &'a str)],
}
impl<'a> StartupMessage<'a> {
pub fn serialize(&self, buf: &mut Vec<u8>) {
with_length_prefix(buf, |buf| {
// version: 3 = major, 0 = minor
buf.extend_from_slice(&0x0003_0000_i32.to_be_bytes());
for (name, value) in self.params {
buf.extend_from_slice(name.as_bytes());
buf.push(0);
buf.extend_from_slice(value.as_bytes());
buf.push(0);
}
// A zero byte is required as a terminator after the last name/value pair.
buf.push(0);
});
}
}
// Write a variable amount of data into a buffer and then
// prefix that data with the length of what was written
fn with_length_prefix<F>(buf: &mut Vec<u8>, f: F)
where
F: FnOnce(&mut Vec<u8>),
{
// Reserve space for length
let base = buf.len();
buf.extend_from_slice(&[0; 4]);
f(buf);
// Write back the length
// FIXME: Handle >= i32
let size = (buf.len() - base) as i32;
BigEndian::write_i32(&mut buf[base..], size);
}
#[cfg(test)]
mod tests {
use super::*;
// TODO: Serialize test more messages
#[test]
fn ser_startup_message() {
let msg = StartupMessage { params: &[("user", "postgres"), ("database", "postgres")] };
let mut buf = Vec::new();
msg.serialize(&mut buf);
assert_eq!(
"00000029000300007573657200706f73746772657\
300646174616261736500706f7374677265730000",
hex::encode(buf)
);
}
}

View File

@@ -0,0 +1,2 @@
pub mod client;
pub mod server;

View File

@@ -0,0 +1,150 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::{Bytes, BytesMut};
use std::io;
// Reference
// https://www.postgresql.org/docs/devel/protocol-message-formats.html
// https://www.postgresql.org/docs/devel/protocol-message-types.html
#[derive(Debug)]
#[non_exhaustive]
pub enum Message {
/// Authentication was successful.
AuthenticationOk,
/// Authentication request for a cleartext password.
AuthenticationCleartextPassword,
/// Authentication request for an MD5-encrypted password.
AuthenticationMd5Password(AuthenticationMd5Password),
/// The client must save these values if it wishes to be able
/// to issue CancelRequest messages later.
BackendKeyData(BackendKeyData),
BindComplete,
CloseComplete,
CommandComplete(CommandComplete),
DataRow(DataRow),
/// Response to an empty query string (substitutes for `CommandComplete`).
EmptyQueryResponse,
ErrorResponse(ErrorResponse),
NoData,
ParameterDescription(ParameterDescription),
ParameterStatus(ParameterStatus),
ParseComplete,
PortalSuspended,
ReadyForQuery(ReadyForQuery),
RowDescription(RowDescription),
}
impl Message {
pub fn deserialize(buf: &mut BytesMut) -> io::Result<Option<Self>> {
if buf.len() < 5 {
// No message is less than 5 bytes
return Ok(None);
}
let tag = buf[0];
if tag == 0 {
panic!("handle graceful close");
}
// FIXME: What happens if len(u32) < len(usize) ?
let len = BigEndian::read_u32(&buf[1..5]) as usize;
if buf.len() < len + 1 {
// Haven't received enough (yet)
return Ok(None);
}
let buf = buf.split_to(len + 1).freeze();
let idx = 5;
Ok(Some(match tag {
b'E' => Message::ErrorResponse(ErrorResponse { storage: buf.slice_from(idx) }),
b'S' => {
let name = read_str(buf.slice_from(idx))?;
let value = read_str(buf.slice_from(idx + name.len() + 1))?;
Message::ParameterStatus(ParameterStatus { name, value })
}
b'R' => match BigEndian::read_i32(&buf[idx..]) {
0 => Message::AuthenticationOk,
code => {
unimplemented!("unknown response code received: {:x}", code);
}
},
b'K' => Message::BackendKeyData(BackendKeyData {
process_id: BigEndian::read_i32(&buf[idx..]),
secret_key: BigEndian::read_i32(&buf[(idx + 4)..]),
}),
b'Z' => Message::ReadyForQuery(ReadyForQuery { status: buf[idx] }),
_ => unimplemented!("unknown tag received: {:x}", tag),
}))
}
}
#[derive(Debug)]
pub struct AuthenticationMd5Password {
pub(super) salt: [u8; 4],
}
#[derive(Debug)]
pub struct DataRow {
pub(super) storage: Bytes,
pub(super) len: u16,
}
#[derive(Debug)]
pub struct BackendKeyData {
pub(super) process_id: i32,
pub(super) secret_key: i32,
}
#[derive(Debug)]
pub struct CommandComplete {
pub(super) tag: Bytes,
}
#[derive(Debug)]
pub struct ErrorResponse {
pub(super) storage: Bytes,
}
#[derive(Debug)]
pub struct ParameterDescription {
pub(super) storage: Bytes,
pub(super) len: u16,
}
#[derive(Debug)]
pub struct ParameterStatus {
pub(super) name: Bytes,
pub(super) value: Bytes,
}
#[derive(Debug)]
pub struct ReadyForQuery {
pub(super) status: u8,
}
#[derive(Debug)]
pub struct RowDescription {
pub(super) storage: Bytes,
pub(super) len: u16,
}
#[inline]
fn read_str(buf: Bytes) -> io::Result<Bytes> {
Ok(buf.slice_to(memchr::memchr(0, &buf).ok_or(io::ErrorKind::UnexpectedEof)?))
}

1
rust-toolchain Normal file
View File

@@ -0,0 +1 @@
nightly-2019-06-06

4
rustfmt.toml Normal file
View File

@@ -0,0 +1,4 @@
unstable_features = true
merge_imports = true
use_small_heuristics = "Max"
edition = "2018"

View File

@@ -0,0 +1 @@
pub use mason_postgres as pg;

15
src/main.rs Normal file
View File

@@ -0,0 +1,15 @@
#![feature(async_await)]
use mason::pg::Connection;
#[runtime::main]
async fn main() -> Result<(), failure::Error> {
env_logger::try_init()?;
let mut conn = Connection::open("127.0.0.1:5432").await?;
conn.startup("postgres", "", "postgres").await?;
conn.terminate().await?;
Ok(())
}