mason -> sqlx

This commit is contained in:
Ryan Leckey
2019-06-22 20:54:43 -07:00
parent 2c7475ef0b
commit 22f71df7c7
25 changed files with 21 additions and 21 deletions

View File

@@ -0,0 +1,27 @@
[package]
name = "sqlx-postgres-protocol"
version = "0.0.0"
authors = ["Ryan Leckey <leckey.ryan@gmail.com>"]
license = "MIT OR Apache-2.0"
description = "Provides standalone encoding and decoding of the PostgreSQL v3 wire protocol."
edition = "2018"
[dependencies]
byteorder = "1.3.1"
bytes = "0.4.12"
memchr = "2.2.0"
md-5 = "0.8.0"
itoa = "0.4.4"
hex = "0.3.2"
[dev-dependencies]
matches = "0.1.8"
criterion = "0.2"
[[bench]]
name = "decode"
harness = false
[[bench]]
name = "encode"
harness = false

View File

@@ -0,0 +1,20 @@
#[macro_use]
extern crate criterion;
use bytes::Bytes;
use criterion::{black_box, Criterion};
use sqlx_postgres_protocol::{Decode, Response};
fn criterion_benchmark(c: &mut Criterion) {
// NOTE: This is sans header (for direct decoding)
const NOTICE_RESPONSE: &[u8] = b"SNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0";
c.bench_function("decode Response", |b| {
b.iter(|| {
let _ = Response::decode(black_box(Bytes::from_static(NOTICE_RESPONSE))).unwrap();
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,67 @@
#[macro_use]
extern crate criterion;
use criterion::Criterion;
use sqlx_postgres_protocol::{Encode, PasswordMessage, StartupMessage, Response, Severity};
fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("encode Response(Builder)", |b| {
let mut dst = Vec::new();
b.iter(|| {
dst.truncate(0);
Response::builder()
.severity(Severity::Notice)
.code("42710")
.message("extension \"uuid-ossp\" already exists, skipping")
.file("extension.c")
.line(1656)
.routine("CreateExtension")
.encode(&mut dst)
.unwrap();
})
});
c.bench_function("encode Password(Cleartext)", |b| {
let mut dst = Vec::new();
b.iter(|| {
dst.truncate(0);
PasswordMessage::cleartext("8e323AMF9YSE9zftFnuhQcvhz7Vf342W4cWU")
.encode(&mut dst)
.unwrap();
})
});
c.bench_function("encode StartupMessage", |b| {
let mut dst = Vec::new();
b.iter(|| {
dst.truncate(0);
StartupMessage::builder()
.param("user", "postgres")
.param("database", "postgres")
.build()
.encode(&mut dst)
.unwrap();
})
});
c.bench_function("encode Password(MD5)", |b| {
let mut dst = Vec::new();
b.iter(|| {
dst.truncate(0);
PasswordMessage::md5(
"8e323AMF9YSE9zftFnuhQcvhz7Vf342W4cWU",
"postgres",
&[10, 41, 20, 150],
)
.encode(&mut dst)
.unwrap();
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,38 @@
use bytes::Bytes;
#[derive(Debug)]
pub enum Authentication {
/// Authentication was successful.
Ok,
/// Kerberos V5 authentication is required.
KerberosV5,
/// A clear-text password is required.
CleartextPassword,
/// An MD5-encrypted password is required.
Md5Password { salt: [u8; 4] },
/// An SCM credentials message is required.
ScmCredential,
/// GSSAPI authentication is required.
Gss,
/// SSPI authentication is required.
Sspi,
/// This message contains GSSAPI or SSPI data.
GssContinue { data: Bytes },
/// SASL authentication is required.
// FIXME: authentication mechanisms
Sasl,
/// This message contains a SASL challenge.
SaslContinue { data: Bytes },
/// SASL authentication has completed.
SaslFinal { data: Bytes },
}

View File

@@ -0,0 +1,10 @@
use bytes::Bytes;
#[derive(Debug)]
pub struct BackendKeyData {
/// The process ID of this backend.
pub process_id: u32,
/// The secret key of this backend.
pub secret_key: u32,
}

View File

@@ -0,0 +1,18 @@
use bytes::Bytes;
use memchr::memchr;
use std::{io, str};
pub trait Decode {
fn decode(src: Bytes) -> io::Result<Self>
where
Self: Sized;
}
#[inline]
pub(crate) fn get_str(src: &[u8]) -> io::Result<&str> {
let end = memchr(b'\0', &src).ok_or(io::ErrorKind::UnexpectedEof)?;
let buf = &src[..end];
let s = str::from_utf8(buf).map_err(|_| io::ErrorKind::InvalidData)?;
Ok(s)
}

View File

@@ -0,0 +1,9 @@
use std::io;
pub trait Encode {
// TODO: Remove
fn size_hint(&self) -> usize { 0 }
// FIXME: Use BytesMut and not Vec<u8> (also remove the error type here)
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()>;
}

View File

@@ -0,0 +1,21 @@
//! https://www.postgresql.org/docs/devel/protocol.html
mod authentication;
mod backend_key_data;
mod decode;
mod encode;
mod message;
mod password_message;
mod ready_for_query;
mod response;
mod startup_message;
pub use self::{
decode::Decode,
encode::Encode,
message::Message,
password_message::PasswordMessage,
ready_for_query::{ReadyForQuery, TransactionStatus},
response::{Response, ResponseBuilder, Severity},
startup_message::StartupMessage,
};

View File

@@ -0,0 +1,49 @@
use crate::{Decode, Encode, ReadyForQuery, Response};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::Bytes;
use std::io::{self, Cursor};
#[derive(Debug)]
pub enum Message {
ReadyForQuery(ReadyForQuery),
Response(Response),
}
impl Encode for Message {
fn size_hint(&self) -> usize {
match self {
Message::ReadyForQuery(body) => body.size_hint(),
Message::Response(body) => body.size_hint(),
}
}
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
match self {
Message::ReadyForQuery(body) => body.encode(buf),
Message::Response(body) => body.encode(buf),
}
}
}
impl Decode for Message {
fn decode(src: Bytes) -> io::Result<Self>
where
Self: Sized,
{
let mut buf = Cursor::new(&src);
let token = buf.read_u8()?;
let len = buf.read_u32::<BigEndian>()? as usize;
let pos = buf.position() as usize;
// `len` includes the size of the length u32
let src = src.slice(pos, pos + len - 4);
Ok(match token {
b'N' | b'E' => Message::Response(Response::decode(src)?),
b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(src)?),
_ => unimplemented!("decode not implemented for token: {}", token as char),
})
}
}

View File

@@ -0,0 +1,69 @@
use crate::{Decode, Encode};
use bytes::Bytes;
use md5::{Digest, Md5};
use std::io;
#[derive(Debug)]
pub struct PasswordMessage {
password: Bytes,
}
impl PasswordMessage {
/// Create a `PasswordMessage` with an unecrypted password.
pub fn cleartext(password: &str) -> Self {
Self { password: Bytes::from(password) }
}
/// Create a `PasswordMessage` by hasing the password, user, and salt together using MD5.
pub fn md5(password: &str, user: &str, salt: &[u8; 4]) -> Self {
let mut hasher = Md5::new();
hasher.input(password);
hasher.input(user);
let credentials = hex::encode(hasher.result_reset());
hasher.input(credentials);
hasher.input(salt);
let salted = hex::encode(hasher.result());
let mut password = Vec::with_capacity(3 + salted.len());
password.extend_from_slice(b"md5");
password.extend_from_slice(salted.as_bytes());
Self { password: Bytes::from(password) }
}
/// The password (encrypted, if requested).
pub fn password(&self) -> &[u8] {
&self.password
}
}
impl Decode for PasswordMessage {
fn decode(src: Bytes) -> io::Result<Self>
where
Self: Sized,
{
// There is only one field, the password, and it's not like we can
// decrypt it if it was encrypted
Ok(PasswordMessage { password: src })
}
}
impl Encode for PasswordMessage {
fn size_hint(&self) -> usize {
self.password.len() + 5
}
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
buf.push(b'p');
buf.extend_from_slice(&(self.password.len() + 4).to_be_bytes());
buf.extend_from_slice(&self.password);
Ok(())
}
}
// TODO: Encode and Decode tests

View File

@@ -0,0 +1,89 @@
use crate::{Decode, Encode};
use byteorder::{WriteBytesExt, BE};
use bytes::Bytes;
use std::io;
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(u8)]
pub enum TransactionStatus {
/// Not in a transaction block.
Idle = b'I',
/// In a transaction block.
Transaction = b'T',
/// In a _failed_ transaction block. Queries will be rejected until block is ended.
Error = b'E',
}
/// `ReadyForQuery` is sent whenever the backend is ready for a new query cycle.
#[derive(Debug)]
pub struct ReadyForQuery {
pub status: TransactionStatus,
}
impl Encode for ReadyForQuery {
#[inline]
fn size_hint(&self) -> usize {
6
}
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
buf.write_u8(b'Z')?;
buf.write_u32::<BE>(5)?;
buf.write_u8(self.status as u8)?;
Ok(())
}
}
impl Decode for ReadyForQuery {
fn decode(src: Bytes) -> io::Result<Self> {
if src.len() != 1 {
return Err(io::ErrorKind::InvalidInput)?;
}
Ok(Self {
status: match src[0] {
// FIXME: Variant value are duplicated with declaration
b'I' => TransactionStatus::Idle,
b'T' => TransactionStatus::Transaction,
b'E' => TransactionStatus::Error,
_ => unreachable!(),
},
})
}
}
#[cfg(test)]
mod tests {
use super::{ReadyForQuery, TransactionStatus};
use crate::{Decode, Encode};
use bytes::Bytes;
use std::io;
const READY_FOR_QUERY: &[u8] = b"E";
#[test]
fn it_encodes_ready_for_query() -> io::Result<()> {
let message = ReadyForQuery { status: TransactionStatus::Error };
let mut dst = Vec::with_capacity(message.size_hint());
message.encode(&mut dst)?;
assert_eq!(&dst[5..], READY_FOR_QUERY);
Ok(())
}
#[test]
fn it_decodes_ready_for_query() -> io::Result<()> {
let src = Bytes::from_static(READY_FOR_QUERY);
let message = ReadyForQuery::decode(src)?;
assert_eq!(message.status, TransactionStatus::Error);
Ok(())
}
}

View File

@@ -0,0 +1,730 @@
use crate::{decode::get_str, Decode, Encode};
use byteorder::{BigEndian, WriteBytesExt};
use bytes::Bytes;
use std::{
fmt,
io::{self, Write},
ops::Range,
pin::Pin,
ptr::NonNull,
str::{self, FromStr},
};
#[derive(Debug, PartialEq, PartialOrd, Copy, Clone)]
pub enum Severity {
Panic,
Fatal,
Error,
Warning,
Notice,
Debug,
Info,
Log,
}
impl Severity {
pub fn is_error(&self) -> bool {
match self {
Severity::Panic | Severity::Fatal | Severity::Error => true,
_ => false,
}
}
pub fn is_notice(&self) -> bool {
match self {
Severity::Warning
| Severity::Notice
| Severity::Debug
| Severity::Info
| Severity::Log => true,
_ => false,
}
}
pub fn to_str(&self) -> &'static str {
match self {
Severity::Panic => "PANIC",
Severity::Fatal => "FATAL",
Severity::Error => "ERROR",
Severity::Warning => "WARNING",
Severity::Notice => "NOTICE",
Severity::Debug => "DEBUG",
Severity::Info => "INFO",
Severity::Log => "LOG",
}
}
}
impl FromStr for Severity {
type Err = io::Error;
fn from_str(s: &str) -> io::Result<Self> {
Ok(match s {
"PANIC" => Severity::Panic,
"FATAL" => Severity::Fatal,
"ERROR" => Severity::Error,
"WARNING" => Severity::Warning,
"NOTICE" => Severity::Notice,
"DEBUG" => Severity::Debug,
"INFO" => Severity::Info,
"LOG" => Severity::Log,
_ => {
return Err(io::ErrorKind::InvalidData)?;
}
})
}
}
#[derive(Clone)]
pub struct Response {
#[used]
storage: Pin<Bytes>,
severity: Severity,
code: NonNull<str>,
message: NonNull<str>,
detail: Option<NonNull<str>>,
hint: Option<NonNull<str>>,
position: Option<usize>,
internal_position: Option<usize>,
internal_query: Option<NonNull<str>>,
where_: Option<NonNull<str>>,
schema: Option<NonNull<str>>,
table: Option<NonNull<str>>,
column: Option<NonNull<str>>,
data_type: Option<NonNull<str>>,
constraint: Option<NonNull<str>>,
file: Option<NonNull<str>>,
line: Option<usize>,
routine: Option<NonNull<str>>,
}
// SAFE: Raw pointers point to pinned memory inside the struct
unsafe impl Send for Response {}
unsafe impl Sync for Response {}
impl Response {
#[inline]
pub fn builder() -> ResponseBuilder {
ResponseBuilder::new()
}
#[inline]
pub fn severity(&self) -> Severity {
self.severity
}
#[inline]
pub fn code(&self) -> &str {
// SAFE: Memory is pinned
unsafe { self.code.as_ref() }
}
#[inline]
pub fn message(&self) -> &str {
// SAFE: Memory is pinned
unsafe { self.message.as_ref() }
}
#[inline]
pub fn detail(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.detail.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn hint(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.hint.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn position(&self) -> Option<usize> {
self.position
}
#[inline]
pub fn internal_position(&self) -> Option<usize> {
self.internal_position
}
#[inline]
pub fn internal_query(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.internal_query.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn where_(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.where_.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn schema(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.schema.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn table(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.table.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn column(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.column.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn data_type(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.data_type.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn constraint(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.constraint.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn file(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.file.as_ref().map(|s| s.as_ref()) }
}
#[inline]
pub fn line(&self) -> Option<usize> {
self.line
}
#[inline]
pub fn routine(&self) -> Option<&str> {
// SAFE: Memory is pinned
unsafe { self.routine.as_ref().map(|s| s.as_ref()) }
}
}
impl fmt::Debug for Response {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Response")
.field("severity", &self.severity)
.field("code", &self.code())
.field("message", &self.message())
.field("detail", &self.detail())
.field("hint", &self.hint())
.field("position", &self.position())
.field("internal_position", &self.internal_position())
.field("internal_query", &self.internal_query())
.field("where_", &self.where_())
.field("schema", &self.schema())
.field("table", &self.table())
.field("column", &self.column())
.field("data_type", &self.data_type())
.field("constraint", &self.constraint())
.field("file", &self.file())
.field("line", &self.line())
.field("routine", &self.routine())
.finish()
}
}
impl Encode for Response {
#[inline]
fn size_hint(&self) -> usize {
self.storage.len() + 5
}
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
if self.severity.is_error() {
buf.push(b'E');
} else {
buf.push(b'N');
}
buf.write_u32::<BigEndian>((4 + self.storage.len()) as u32)?;
buf.extend_from_slice(&self.storage);
Ok(())
}
}
impl Decode for Response {
fn decode(src: Bytes) -> io::Result<Self> {
let storage = Pin::new(src);
let mut code = None::<&str>;
let mut message = None::<&str>;
let mut severity = None::<&str>;
let mut severity_non_local = None::<Severity>;
let mut detail = None::<&str>;
let mut hint = None::<&str>;
let mut position = None::<usize>;
let mut internal_position = None::<usize>;
let mut internal_query = None::<&str>;
let mut where_ = None::<&str>;
let mut schema = None::<&str>;
let mut table = None::<&str>;
let mut column = None::<&str>;
let mut data_type = None::<&str>;
let mut constraint = None::<&str>;
let mut file = None::<&str>;
let mut line = None::<usize>;
let mut routine = None::<&str>;
let mut idx = 0;
loop {
let field_type = storage[idx];
idx += 1;
if field_type == 0 {
break;
}
let field_value = get_str(&storage[idx..])?;
idx += field_value.len() + 1;
match field_type {
b'S' => {
severity = Some(field_value);
}
b'V' => {
severity_non_local = Some(field_value.parse()?);
}
b'C' => {
code = Some(field_value);
}
b'M' => {
message = Some(field_value);
}
b'D' => {
detail = Some(field_value);
}
b'H' => {
hint = Some(field_value);
}
b'P' => {
position = Some(field_value.parse().map_err(|_| io::ErrorKind::InvalidData)?);
}
b'p' => {
internal_position =
Some(field_value.parse().map_err(|_| io::ErrorKind::InvalidData)?);
}
b'q' => {
internal_query = Some(field_value);
}
b'w' => {
where_ = Some(field_value);
}
b's' => {
schema = Some(field_value);
}
b't' => {
table = Some(field_value);
}
b'c' => {
column = Some(field_value);
}
b'd' => {
data_type = Some(field_value);
}
b'n' => {
constraint = Some(field_value);
}
b'F' => {
file = Some(field_value);
}
b'L' => {
line = Some(field_value.parse().map_err(|_| io::ErrorKind::InvalidData)?);
}
b'R' => {
routine = Some(field_value);
}
_ => {
unimplemented!(
"response message field {:?} not implemented",
field_type as char
);
}
}
}
let severity = severity_non_local
.or_else(move || severity?.parse().ok())
.expect("`severity` required by protocol");
let code = NonNull::from(code.expect("`code` required by protocol"));
let message = NonNull::from(message.expect("`message` required by protocol"));
let detail = detail.map(NonNull::from);
let hint = hint.map(NonNull::from);
let internal_query = internal_query.map(NonNull::from);
let where_ = where_.map(NonNull::from);
let schema = schema.map(NonNull::from);
let table = table.map(NonNull::from);
let column = column.map(NonNull::from);
let data_type = data_type.map(NonNull::from);
let constraint = constraint.map(NonNull::from);
let file = file.map(NonNull::from);
let routine = routine.map(NonNull::from);
Ok(Self {
storage,
severity,
code,
message,
detail,
hint,
internal_query,
where_,
schema,
table,
column,
data_type,
constraint,
file,
routine,
line,
position,
internal_position,
})
}
}
pub struct ResponseBuilder {
storage: Vec<u8>,
severity: Option<Severity>,
code: Option<Range<usize>>,
message: Option<Range<usize>>,
detail: Option<Range<usize>>,
hint: Option<Range<usize>>,
position: Option<usize>,
internal_position: Option<usize>,
internal_query: Option<Range<usize>>,
where_: Option<Range<usize>>,
schema: Option<Range<usize>>,
table: Option<Range<usize>>,
column: Option<Range<usize>>,
data_type: Option<Range<usize>>,
constraint: Option<Range<usize>>,
file: Option<Range<usize>>,
line: Option<usize>,
routine: Option<Range<usize>>,
}
impl Default for ResponseBuilder {
fn default() -> Self {
Self {
storage: Vec::with_capacity(256),
severity: None,
message: None,
code: None,
detail: None,
hint: None,
position: None,
internal_position: None,
internal_query: None,
where_: None,
schema: None,
table: None,
column: None,
data_type: None,
constraint: None,
file: None,
line: None,
routine: None,
}
}
}
fn put_str(buf: &mut Vec<u8>, tag: u8, value: &str) -> Range<usize> {
buf.push(tag);
let beg = buf.len();
buf.extend_from_slice(value.as_bytes());
let end = buf.len();
buf.push(0);
beg..end
}
impl ResponseBuilder {
#[inline]
pub fn new() -> ResponseBuilder {
Self::default()
}
#[inline]
pub fn severity(mut self, severity: Severity) -> Self {
let sev = severity.to_str();
let _ = put_str(&mut self.storage, b'S', sev);
let _ = put_str(&mut self.storage, b'V', sev);
self.severity = Some(severity);
self
}
#[inline]
pub fn message(mut self, message: &str) -> Self {
self.message = Some(put_str(&mut self.storage, b'M', message));
self
}
#[inline]
pub fn code(mut self, code: &str) -> Self {
self.code = Some(put_str(&mut self.storage, b'C', code));
self
}
#[inline]
pub fn detail(mut self, detail: &str) -> Self {
self.detail = Some(put_str(&mut self.storage, b'D', detail));
self
}
#[inline]
pub fn hint(mut self, hint: &str) -> Self {
self.hint = Some(put_str(&mut self.storage, b'H', hint));
self
}
#[inline]
pub fn position(mut self, position: usize) -> Self {
self.storage.push(b'P');
// PANIC: Write to Vec<u8> is infallible
itoa::write(&mut self.storage, position).unwrap();
self.storage.push(0);
self.position = Some(position);
self
}
#[inline]
pub fn internal_position(mut self, position: usize) -> Self {
self.storage.push(b'p');
// PANIC: Write to Vec<u8> is infallible
itoa::write(&mut self.storage, position).unwrap();
self.storage.push(0);
self.internal_position = Some(position);
self
}
#[inline]
pub fn internal_query(mut self, query: &str) -> Self {
self.internal_query = Some(put_str(&mut self.storage, b'q', query));
self
}
#[inline]
pub fn where_(mut self, where_: &str) -> Self {
self.where_ = Some(put_str(&mut self.storage, b'w', where_));
self
}
#[inline]
pub fn schema(mut self, schema: &str) -> Self {
self.schema = Some(put_str(&mut self.storage, b's', schema));
self
}
#[inline]
pub fn table(mut self, table: &str) -> Self {
self.table = Some(put_str(&mut self.storage, b't', table));
self
}
#[inline]
pub fn column(mut self, column: &str) -> Self {
self.column = Some(put_str(&mut self.storage, b'c', column));
self
}
#[inline]
pub fn data_type(mut self, data_type: &str) -> Self {
self.data_type = Some(put_str(&mut self.storage, b'd', data_type));
self
}
#[inline]
pub fn constraint(mut self, constraint: &str) -> Self {
self.constraint = Some(put_str(&mut self.storage, b'n', constraint));
self
}
#[inline]
pub fn file(mut self, file: &str) -> Self {
self.file = Some(put_str(&mut self.storage, b'F', file));
self
}
#[inline]
pub fn line(mut self, line: usize) -> Self {
self.storage.push(b'L');
// PANIC: Write to Vec<u8> is infallible
itoa::write(&mut self.storage, line).unwrap();
self.storage.push(0);
self.line = Some(line);
self
}
#[inline]
pub fn routine(mut self, routine: &str) -> Self {
self.routine = Some(put_str(&mut self.storage, b'R', routine));
self
}
pub fn build(mut self) -> Response {
// Add a \0 terminator
self.storage.push(0);
// Freeze the storage and Pin so we can self-reference it
let storage = Pin::new(Bytes::from(self.storage));
let make_str_ref = |val: Option<Range<usize>>| unsafe {
val.map(|r| NonNull::from(str::from_utf8_unchecked(&storage[r])))
};
let code = make_str_ref(self.code);
let message = make_str_ref(self.message);
let detail = make_str_ref(self.detail);
let hint = make_str_ref(self.hint);
let internal_query = make_str_ref(self.internal_query);
let where_ = make_str_ref(self.where_);
let schema = make_str_ref(self.schema);
let table = make_str_ref(self.table);
let column = make_str_ref(self.column);
let data_type = make_str_ref(self.data_type);
let constraint = make_str_ref(self.constraint);
let file = make_str_ref(self.file);
let routine = make_str_ref(self.routine);
Response {
storage,
// FIXME: Default and don't panic here
severity: self.severity.expect("`severity` required by protocol"),
code: code.expect("`code` required by protocol"),
message: message.expect("`message` required by protocol"),
detail,
hint,
internal_query,
where_,
schema,
table,
column,
data_type,
constraint,
file,
routine,
line: self.line,
position: self.position,
internal_position: self.internal_position,
}
}
}
impl Encode for ResponseBuilder {
#[inline]
fn size_hint(&self) -> usize {
self.storage.len() + 6
}
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
if self.severity.as_ref().map_or(false, |s| s.is_error()) {
buf.push(b'E');
} else {
buf.push(b'N');
}
buf.write_u32::<BigEndian>((5 + self.storage.len()) as u32)?;
buf.extend_from_slice(&self.storage);
buf.push(0);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{Response, Severity};
use crate::{Decode, Encode};
use bytes::Bytes;
use std::io;
const RESPONSE: &[u8] = b"SNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, \
skipping\0Fextension.c\0L1656\0RCreateExtension\0\0";
#[test]
fn it_encodes_response() -> io::Result<()> {
let message = Response::builder()
.severity(Severity::Notice)
.code("42710")
.message("extension \"uuid-ossp\" already exists, skipping")
.file("extension.c")
.line(1656)
.routine("CreateExtension")
.build();
let mut dst = Vec::with_capacity(message.size_hint());
message.encode(&mut dst)?;
assert_eq!(&dst[5..], RESPONSE);
Ok(())
}
#[test]
fn it_encodes_response_builder() -> io::Result<()> {
let message = Response::builder()
.severity(Severity::Notice)
.code("42710")
.message("extension \"uuid-ossp\" already exists, skipping")
.file("extension.c")
.line(1656)
.routine("CreateExtension");
let mut dst = Vec::with_capacity(message.size_hint());
message.encode(&mut dst)?;
assert_eq!(&dst[5..], RESPONSE);
Ok(())
}
#[test]
fn it_decodes_response() -> io::Result<()> {
let src = Bytes::from_static(RESPONSE);
let message = Response::decode(src)?;
assert_eq!(message.severity(), Severity::Notice);
assert_eq!(message.message(), "extension \"uuid-ossp\" already exists, skipping");
assert_eq!(message.code(), "42710");
assert_eq!(message.file(), Some("extension.c"));
assert_eq!(message.line(), Some(1656));
assert_eq!(message.routine(), Some("CreateExtension"));
Ok(())
}
}

View File

@@ -0,0 +1,108 @@
use crate::{Decode, Encode};
use bytes::{BufMut, Bytes, BytesMut};
use std::io;
#[derive(Debug)]
pub struct StartupMessage {
// (major, minor)
version: (u16, u16),
params: Bytes,
}
impl StartupMessage {
pub fn builder() -> StartupMessageBuilder {
StartupMessageBuilder::new()
}
pub fn version(&self) -> (u16, u16) {
self.version
}
pub fn params(&self) -> StartupMessageParams<'_> {
StartupMessageParams(&*self.params)
}
}
impl Encode for StartupMessage {
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
let len = self.params.len() + 9;
buf.reserve(len);
buf.put_u32_be(len as u32);
buf.put_u16_be(self.version.0);
buf.put_u16_be(self.version.1);
buf.put(&self.params);
Ok(())
}
}
pub struct StartupMessageParams<'a>(&'a [u8]);
// impl Iterator for StartupMessageParams {
// }
pub struct StartupMessageBuilder {
// (major, minor)
version: (u16, u16),
params: BytesMut,
}
impl Default for StartupMessageBuilder {
fn default() -> Self {
StartupMessageBuilder { version: (3, 0), params: BytesMut::with_capacity(128) }
}
}
impl StartupMessageBuilder {
pub fn new() -> Self {
StartupMessageBuilder::default()
}
/// Set the protocol version number. Defaults to `3` and `0`.
pub fn version(mut self, major: u16, minor: u16) -> Self {
self.version = (major, minor);
self
}
pub fn param(mut self, name: &str, value: &str) -> Self {
self.params.reserve(name.len() + value.len() + 2);
self.params.put(name.as_bytes());
self.params.put_u8(0);
self.params.put(value.as_bytes());
self.params.put_u8(0);
self
}
pub fn build(mut self) -> StartupMessage {
self.params.reserve(1);
self.params.put_u8(0);
StartupMessage { version: self.version, params: self.params.freeze() }
}
}
#[cfg(test)]
mod tests {
use super::StartupMessage;
use crate::Encode;
use std::io;
const STARTUP_MESSAGE: &[u8] = b"\0\0\0*\0\x03\0\0user\0postgres\0database\0postgres\0\0";
#[test]
fn it_encodes_startup_message() -> io::Result<()> {
let message = StartupMessage::builder()
.param("user", "postgres")
.param("database", "postgres")
.build();
let mut buf = Vec::new();
message.encode(&mut buf)?;
assert_eq!(&*buf, STARTUP_MESSAGE);
Ok(())
}
}