mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-12-29 21:00:54 +00:00
Experiment with lazy decoding of NoticeResponse
This commit is contained in:
parent
a624578726
commit
eaeef57b96
@ -21,3 +21,6 @@ mason-postgres = { path = "mason-postgres" }
|
||||
failure = "0.1"
|
||||
env_logger = "0.6.1"
|
||||
bytes = "0.4.12"
|
||||
|
||||
[profile.bench]
|
||||
lto = true
|
||||
|
||||
@ -14,3 +14,8 @@ md-5 = "0.8.0"
|
||||
|
||||
[dev-dependencies]
|
||||
matches = "0.1.8"
|
||||
criterion = "0.2"
|
||||
|
||||
[[bench]]
|
||||
name = "decode"
|
||||
harness = false
|
||||
|
||||
29
mason-postgres-protocol/benches/decode.rs
Normal file
29
mason-postgres-protocol/benches/decode.rs
Normal file
@ -0,0 +1,29 @@
|
||||
#[macro_use]
|
||||
extern crate criterion;
|
||||
|
||||
use bytes::Bytes;
|
||||
use criterion::{black_box, Criterion};
|
||||
use mason_postgres_protocol::{Decode, NoticeResponse};
|
||||
|
||||
fn criterion_benchmark(c: &mut Criterion) {
|
||||
// NOTE: This is sans header (for direct decoding)
|
||||
const NOTICE_RESPONSE: &[u8] = b"SNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0";
|
||||
|
||||
c.bench_function("decode NoticeResponse", |b| {
|
||||
b.iter(|| {
|
||||
let _ = NoticeResponse::decode(black_box(Bytes::from_static(NOTICE_RESPONSE))).unwrap();
|
||||
})
|
||||
});
|
||||
|
||||
c.bench_function("decode NoticeResponse and NoticeResponseFields", |b| {
|
||||
b.iter(|| {
|
||||
let _ = NoticeResponse::decode(black_box(Bytes::from_static(NOTICE_RESPONSE)))
|
||||
.unwrap()
|
||||
.fields()
|
||||
.unwrap();
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
@ -3,15 +3,15 @@ use memchr::memchr;
|
||||
use std::{io, str};
|
||||
|
||||
pub trait Decode {
|
||||
fn decode(b: Bytes) -> io::Result<Self>
|
||||
fn decode(src: Bytes) -> io::Result<Self>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_str<'a>(b: &'a [u8]) -> io::Result<&'a str> {
|
||||
let end = memchr(b'\0', &b).ok_or(io::ErrorKind::UnexpectedEof)?;
|
||||
let buf = &b[..end];
|
||||
pub(crate) fn get_str(src: &[u8]) -> io::Result<&str> {
|
||||
let end = memchr(b'\0', &src).ok_or(io::ErrorKind::UnexpectedEof)?;
|
||||
let buf = &src[..end];
|
||||
let s = str::from_utf8(buf).map_err(|_| io::ErrorKind::InvalidData)?;
|
||||
|
||||
Ok(s)
|
||||
|
||||
@ -26,23 +26,23 @@ impl Encode for Message {
|
||||
}
|
||||
|
||||
impl Decode for Message {
|
||||
fn decode(b: Bytes) -> io::Result<Self>
|
||||
fn decode(src: Bytes) -> io::Result<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
let mut buf = Cursor::new(&b);
|
||||
let mut buf = Cursor::new(&src);
|
||||
|
||||
let token = buf.read_u8()?;
|
||||
let len = buf.read_u32::<BigEndian>()? as usize;
|
||||
let pos = buf.position() as usize;
|
||||
|
||||
// `len` includes the size of the length u32
|
||||
let b = b.slice(pos, pos + len - 4);
|
||||
let src = src.slice(pos, pos + len - 4);
|
||||
|
||||
Ok(match token {
|
||||
// FIXME: These tokens are duplicated here and in the respective encode functions
|
||||
b'N' => Message::NoticeResponse(NoticeResponse::decode(b)?),
|
||||
b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(b)?),
|
||||
b'N' => Message::NoticeResponse(NoticeResponse::decode(src)?),
|
||||
b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(src)?),
|
||||
|
||||
_ => unimplemented!("decode not implemented for token: {}", token as char),
|
||||
})
|
||||
|
||||
@ -42,7 +42,51 @@ impl FromStr for Severity {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NoticeResponse {
|
||||
// NOTE: `NoticeResponse` is lazily decoded on access to `.fields()`
|
||||
#[derive(Clone)]
|
||||
pub struct NoticeResponse(Bytes);
|
||||
|
||||
impl NoticeResponse {
|
||||
#[inline]
|
||||
pub fn fields(self) -> io::Result<NoticeResponseFields> {
|
||||
NoticeResponseFields::decode(self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NoticeResponse {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
// Proxy format to the results of decoding the fields
|
||||
self.clone().fields().fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for NoticeResponse {
|
||||
#[inline]
|
||||
fn size_hint(&self) -> usize {
|
||||
self.0.len() + 5
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.write_u8(b'Z')?;
|
||||
buf.write_u32::<BigEndian>((4 + self.0.len()) as u32)?;
|
||||
buf.write_all(&self.0)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for NoticeResponse {
|
||||
#[inline]
|
||||
fn decode(src: Bytes) -> io::Result<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Ok(Self(src))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NoticeResponseFields {
|
||||
#[used]
|
||||
storage: Pin<Bytes>,
|
||||
severity: Severity,
|
||||
@ -65,34 +109,34 @@ pub struct NoticeResponse {
|
||||
}
|
||||
|
||||
// SAFE: Raw pointers point to pinned memory inside the struct
|
||||
unsafe impl Send for NoticeResponse {}
|
||||
unsafe impl Sync for NoticeResponse {}
|
||||
unsafe impl Send for NoticeResponseFields {}
|
||||
unsafe impl Sync for NoticeResponseFields {}
|
||||
|
||||
impl NoticeResponse {
|
||||
impl NoticeResponseFields {
|
||||
#[inline]
|
||||
pub fn severity(&self) -> Severity { self.severity }
|
||||
|
||||
#[inline]
|
||||
pub fn code(&self) -> &str {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.code.as_ref() }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn message(&self) -> &str {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.message.as_ref() }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn detail(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.detail.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn hint(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.hint.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
@ -104,49 +148,49 @@ impl NoticeResponse {
|
||||
|
||||
#[inline]
|
||||
pub fn internal_query(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.internal_query.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn where_(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.where_.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn schema(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.schema.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn table(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.table.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn column(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.column.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn data_type(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.data_type.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn constraint(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.constraint.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn file(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.file.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
|
||||
@ -155,14 +199,14 @@ impl NoticeResponse {
|
||||
|
||||
#[inline]
|
||||
pub fn routine(&self) -> Option<&str> {
|
||||
// SAFE: Memory is pinned (`self.storage`)
|
||||
// SAFE: Memory is pinned
|
||||
unsafe { self.routine.as_ref().map(|s| s.as_ref()) }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NoticeResponse {
|
||||
impl fmt::Debug for NoticeResponseFields {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("NoticeResponse")
|
||||
f.debug_struct("NoticeResponseFields")
|
||||
.field("severity", &self.severity)
|
||||
.field("code", &self.code())
|
||||
.field("message", &self.message())
|
||||
@ -184,27 +228,9 @@ impl fmt::Debug for NoticeResponse {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: `Encode` here is (mostly) useless as its not easy to construct a NoticeResponse.
|
||||
// Need a `NoticeResponse::builder().severity(...).build()` etc. type thing
|
||||
|
||||
impl Encode for NoticeResponse {
|
||||
fn size_hint(&self) -> usize { self.storage.len() + 5 }
|
||||
|
||||
fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
buf.write_u8(b'Z')?;
|
||||
buf.write_u32::<BigEndian>((4 + self.storage.len()) as u32)?;
|
||||
buf.write_all(&self.storage)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for NoticeResponse {
|
||||
fn decode(b: Bytes) -> io::Result<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
let storage = Pin::new(b);
|
||||
impl Decode for NoticeResponseFields {
|
||||
fn decode(src: Bytes) -> io::Result<Self> {
|
||||
let storage = Pin::new(src);
|
||||
|
||||
let mut code = None::<&str>;
|
||||
let mut message = None::<&str>;
|
||||
@ -364,50 +390,30 @@ impl Decode for NoticeResponse {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{Decode, Message, Severity};
|
||||
use crate::{Message, Severity, Decode};
|
||||
use bytes::Bytes;
|
||||
use std::{io, thread};
|
||||
use std::io;
|
||||
|
||||
#[test]
|
||||
fn it_decodes_notice_response() -> io::Result<()> {
|
||||
let b = Bytes::from_static(b"N\0\0\0pSNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0");
|
||||
let message = Message::decode(b)?;
|
||||
let src = Bytes::from_static(b"N\0\0\0pSNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0");
|
||||
let message = Message::decode(src)?;
|
||||
|
||||
// FIXME: Is there a simpler pattern here for tests?
|
||||
let body = if let Message::NoticeResponse(body) = message {
|
||||
body
|
||||
} else {
|
||||
unreachable!();
|
||||
panic!("unexpected {:?}", message);
|
||||
};
|
||||
|
||||
assert_eq!(body.severity(), Severity::Notice);
|
||||
assert_eq!(body.message(), "extension \"uuid-ossp\" already exists, skipping");
|
||||
assert_eq!(body.code(), "42710");
|
||||
assert_eq!(body.file(), Some("extension.c"));
|
||||
assert_eq!(body.line(), Some(1656));
|
||||
assert_eq!(body.routine(), Some("CreateExtension"));
|
||||
let fields = body.fields()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn it_decodes_notice_response_and_is_send() -> io::Result<()> {
|
||||
let b = Bytes::from_static(b"N\0\0\0pSNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0");
|
||||
let message = Message::decode(b)?;
|
||||
let body = if let Message::NoticeResponse(body) = message {
|
||||
body
|
||||
} else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
let body = thread::spawn(move || {
|
||||
assert_eq!(body.message(), "extension \"uuid-ossp\" already exists, skipping");
|
||||
|
||||
body
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(body.code(), "42710");
|
||||
assert_eq!(body.routine(), Some("CreateExtension"));
|
||||
assert_eq!(fields.severity(), Severity::Notice);
|
||||
assert_eq!(fields.message(), "extension \"uuid-ossp\" already exists, skipping");
|
||||
assert_eq!(fields.code(), "42710");
|
||||
assert_eq!(fields.file(), Some("extension.c"));
|
||||
assert_eq!(fields.line(), Some(1656));
|
||||
assert_eq!(fields.routine(), Some("CreateExtension"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -36,13 +36,13 @@ impl Encode for ReadyForQuery {
|
||||
}
|
||||
|
||||
impl Decode for ReadyForQuery {
|
||||
fn decode(b: Bytes) -> io::Result<Self> {
|
||||
if b.len() != 1 {
|
||||
fn decode(src: Bytes) -> io::Result<Self> {
|
||||
if src.len() != 1 {
|
||||
return Err(io::ErrorKind::InvalidInput)?;
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
status: match b[0] {
|
||||
status: match src[0] {
|
||||
// FIXME: Variant value are duplicated with declaration
|
||||
b'I' => TransactionStatus::Idle,
|
||||
b'T' => TransactionStatus::Transaction,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user