From eaeef57b96a99f0ba4a8bb7b1084b885e69aec36 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Thu, 20 Jun 2019 15:25:05 -0700 Subject: [PATCH] Experiment with lazy decoding of NoticeResponse --- Cargo.toml | 3 + mason-postgres-protocol/Cargo.toml | 5 + mason-postgres-protocol/benches/decode.rs | 29 ++++ mason-postgres-protocol/src/decode.rs | 8 +- mason-postgres-protocol/src/message.rs | 10 +- .../src/notice_response.rs | 154 +++++++++--------- .../src/ready_for_query.rs | 6 +- 7 files changed, 129 insertions(+), 86 deletions(-) create mode 100644 mason-postgres-protocol/benches/decode.rs diff --git a/Cargo.toml b/Cargo.toml index 080ccf28..c5345cef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,6 @@ mason-postgres = { path = "mason-postgres" } failure = "0.1" env_logger = "0.6.1" bytes = "0.4.12" + +[profile.bench] +lto = true diff --git a/mason-postgres-protocol/Cargo.toml b/mason-postgres-protocol/Cargo.toml index 0f2fddce..c2106403 100644 --- a/mason-postgres-protocol/Cargo.toml +++ b/mason-postgres-protocol/Cargo.toml @@ -14,3 +14,8 @@ md-5 = "0.8.0" [dev-dependencies] matches = "0.1.8" +criterion = "0.2" + +[[bench]] +name = "decode" +harness = false diff --git a/mason-postgres-protocol/benches/decode.rs b/mason-postgres-protocol/benches/decode.rs new file mode 100644 index 00000000..a89cdd18 --- /dev/null +++ b/mason-postgres-protocol/benches/decode.rs @@ -0,0 +1,29 @@ +#[macro_use] +extern crate criterion; + +use bytes::Bytes; +use criterion::{black_box, Criterion}; +use mason_postgres_protocol::{Decode, NoticeResponse}; + +fn criterion_benchmark(c: &mut Criterion) { + // NOTE: This is sans header (for direct decoding) + const NOTICE_RESPONSE: &[u8] = b"SNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0"; + + c.bench_function("decode NoticeResponse", |b| { + b.iter(|| { + let _ = NoticeResponse::decode(black_box(Bytes::from_static(NOTICE_RESPONSE))).unwrap(); + }) + }); + + c.bench_function("decode NoticeResponse and NoticeResponseFields", |b| { + b.iter(|| { + let _ = NoticeResponse::decode(black_box(Bytes::from_static(NOTICE_RESPONSE))) + .unwrap() + .fields() + .unwrap(); + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/mason-postgres-protocol/src/decode.rs b/mason-postgres-protocol/src/decode.rs index dee16bd9..969c5e41 100644 --- a/mason-postgres-protocol/src/decode.rs +++ b/mason-postgres-protocol/src/decode.rs @@ -3,15 +3,15 @@ use memchr::memchr; use std::{io, str}; pub trait Decode { - fn decode(b: Bytes) -> io::Result + fn decode(src: Bytes) -> io::Result where Self: Sized; } #[inline] -pub(crate) fn get_str<'a>(b: &'a [u8]) -> io::Result<&'a str> { - let end = memchr(b'\0', &b).ok_or(io::ErrorKind::UnexpectedEof)?; - let buf = &b[..end]; +pub(crate) fn get_str(src: &[u8]) -> io::Result<&str> { + let end = memchr(b'\0', &src).ok_or(io::ErrorKind::UnexpectedEof)?; + let buf = &src[..end]; let s = str::from_utf8(buf).map_err(|_| io::ErrorKind::InvalidData)?; Ok(s) diff --git a/mason-postgres-protocol/src/message.rs b/mason-postgres-protocol/src/message.rs index abd0c298..5cbec16c 100644 --- a/mason-postgres-protocol/src/message.rs +++ b/mason-postgres-protocol/src/message.rs @@ -26,23 +26,23 @@ impl Encode for Message { } impl Decode for Message { - fn decode(b: Bytes) -> io::Result + fn decode(src: Bytes) -> io::Result where Self: Sized, { - let mut buf = Cursor::new(&b); + let mut buf = Cursor::new(&src); let token = buf.read_u8()?; let len = buf.read_u32::()? as usize; let pos = buf.position() as usize; // `len` includes the size of the length u32 - let b = b.slice(pos, pos + len - 4); + let src = src.slice(pos, pos + len - 4); Ok(match token { // FIXME: These tokens are duplicated here and in the respective encode functions - b'N' => Message::NoticeResponse(NoticeResponse::decode(b)?), - b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(b)?), + b'N' => Message::NoticeResponse(NoticeResponse::decode(src)?), + b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(src)?), _ => unimplemented!("decode not implemented for token: {}", token as char), }) diff --git a/mason-postgres-protocol/src/notice_response.rs b/mason-postgres-protocol/src/notice_response.rs index bbe27414..2299552a 100644 --- a/mason-postgres-protocol/src/notice_response.rs +++ b/mason-postgres-protocol/src/notice_response.rs @@ -42,7 +42,51 @@ impl FromStr for Severity { } } -pub struct NoticeResponse { +// NOTE: `NoticeResponse` is lazily decoded on access to `.fields()` +#[derive(Clone)] +pub struct NoticeResponse(Bytes); + +impl NoticeResponse { + #[inline] + pub fn fields(self) -> io::Result { + NoticeResponseFields::decode(self.0) + } +} + +impl fmt::Debug for NoticeResponse { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Proxy format to the results of decoding the fields + self.clone().fields().fmt(f) + } +} + +impl Encode for NoticeResponse { + #[inline] + fn size_hint(&self) -> usize { + self.0.len() + 5 + } + + #[inline] + fn encode(&self, buf: &mut Vec) -> io::Result<()> { + buf.write_u8(b'Z')?; + buf.write_u32::((4 + self.0.len()) as u32)?; + buf.write_all(&self.0)?; + + Ok(()) + } +} + +impl Decode for NoticeResponse { + #[inline] + fn decode(src: Bytes) -> io::Result + where + Self: Sized, + { + Ok(Self(src)) + } +} + +pub struct NoticeResponseFields { #[used] storage: Pin, severity: Severity, @@ -65,34 +109,34 @@ pub struct NoticeResponse { } // SAFE: Raw pointers point to pinned memory inside the struct -unsafe impl Send for NoticeResponse {} -unsafe impl Sync for NoticeResponse {} +unsafe impl Send for NoticeResponseFields {} +unsafe impl Sync for NoticeResponseFields {} -impl NoticeResponse { +impl NoticeResponseFields { #[inline] pub fn severity(&self) -> Severity { self.severity } #[inline] pub fn code(&self) -> &str { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.code.as_ref() } } #[inline] pub fn message(&self) -> &str { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.message.as_ref() } } #[inline] pub fn detail(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.detail.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn hint(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.hint.as_ref().map(|s| s.as_ref()) } } @@ -104,49 +148,49 @@ impl NoticeResponse { #[inline] pub fn internal_query(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.internal_query.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn where_(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.where_.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn schema(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.schema.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn table(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.table.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn column(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.column.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn data_type(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.data_type.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn constraint(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.constraint.as_ref().map(|s| s.as_ref()) } } #[inline] pub fn file(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.file.as_ref().map(|s| s.as_ref()) } } @@ -155,14 +199,14 @@ impl NoticeResponse { #[inline] pub fn routine(&self) -> Option<&str> { - // SAFE: Memory is pinned (`self.storage`) + // SAFE: Memory is pinned unsafe { self.routine.as_ref().map(|s| s.as_ref()) } } } -impl fmt::Debug for NoticeResponse { +impl fmt::Debug for NoticeResponseFields { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("NoticeResponse") + f.debug_struct("NoticeResponseFields") .field("severity", &self.severity) .field("code", &self.code()) .field("message", &self.message()) @@ -184,27 +228,9 @@ impl fmt::Debug for NoticeResponse { } } -// FIXME: `Encode` here is (mostly) useless as its not easy to construct a NoticeResponse. -// Need a `NoticeResponse::builder().severity(...).build()` etc. type thing - -impl Encode for NoticeResponse { - fn size_hint(&self) -> usize { self.storage.len() + 5 } - - fn encode(&self, buf: &mut Vec) -> io::Result<()> { - buf.write_u8(b'Z')?; - buf.write_u32::((4 + self.storage.len()) as u32)?; - buf.write_all(&self.storage)?; - - Ok(()) - } -} - -impl Decode for NoticeResponse { - fn decode(b: Bytes) -> io::Result - where - Self: Sized, - { - let storage = Pin::new(b); +impl Decode for NoticeResponseFields { + fn decode(src: Bytes) -> io::Result { + let storage = Pin::new(src); let mut code = None::<&str>; let mut message = None::<&str>; @@ -364,50 +390,30 @@ impl Decode for NoticeResponse { #[cfg(test)] mod tests { - use crate::{Decode, Message, Severity}; + use crate::{Message, Severity, Decode}; use bytes::Bytes; - use std::{io, thread}; + use std::io; #[test] fn it_decodes_notice_response() -> io::Result<()> { - let b = Bytes::from_static(b"N\0\0\0pSNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0"); - let message = Message::decode(b)?; + let src = Bytes::from_static(b"N\0\0\0pSNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0"); + let message = Message::decode(src)?; + + // FIXME: Is there a simpler pattern here for tests? let body = if let Message::NoticeResponse(body) = message { body } else { - unreachable!(); + panic!("unexpected {:?}", message); }; - assert_eq!(body.severity(), Severity::Notice); - assert_eq!(body.message(), "extension \"uuid-ossp\" already exists, skipping"); - assert_eq!(body.code(), "42710"); - assert_eq!(body.file(), Some("extension.c")); - assert_eq!(body.line(), Some(1656)); - assert_eq!(body.routine(), Some("CreateExtension")); + let fields = body.fields()?; - Ok(()) - } - - #[test] - fn it_decodes_notice_response_and_is_send() -> io::Result<()> { - let b = Bytes::from_static(b"N\0\0\0pSNOTICE\0VNOTICE\0C42710\0Mextension \"uuid-ossp\" already exists, skipping\0Fextension.c\0L1656\0RCreateExtension\0\0"); - let message = Message::decode(b)?; - let body = if let Message::NoticeResponse(body) = message { - body - } else { - unreachable!(); - }; - - let body = thread::spawn(move || { - assert_eq!(body.message(), "extension \"uuid-ossp\" already exists, skipping"); - - body - }) - .join() - .unwrap(); - - assert_eq!(body.code(), "42710"); - assert_eq!(body.routine(), Some("CreateExtension")); + assert_eq!(fields.severity(), Severity::Notice); + assert_eq!(fields.message(), "extension \"uuid-ossp\" already exists, skipping"); + assert_eq!(fields.code(), "42710"); + assert_eq!(fields.file(), Some("extension.c")); + assert_eq!(fields.line(), Some(1656)); + assert_eq!(fields.routine(), Some("CreateExtension")); Ok(()) } diff --git a/mason-postgres-protocol/src/ready_for_query.rs b/mason-postgres-protocol/src/ready_for_query.rs index 687383e3..e975b6cf 100644 --- a/mason-postgres-protocol/src/ready_for_query.rs +++ b/mason-postgres-protocol/src/ready_for_query.rs @@ -36,13 +36,13 @@ impl Encode for ReadyForQuery { } impl Decode for ReadyForQuery { - fn decode(b: Bytes) -> io::Result { - if b.len() != 1 { + fn decode(src: Bytes) -> io::Result { + if src.len() != 1 { return Err(io::ErrorKind::InvalidInput)?; } Ok(Self { - status: match b[0] { + status: match src[0] { // FIXME: Variant value are duplicated with declaration b'I' => TransactionStatus::Idle, b'T' => TransactionStatus::Transaction,