diff --git a/Cargo.toml b/Cargo.toml index 2cbb947b..bcaf1842 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,10 +28,10 @@ uuid = [ "sqlx-core/uuid", "sqlx-macros/uuid" ] [dependencies] sqlx-core = { version = "0.1.0-pre", path = "sqlx-core" } sqlx-macros = { version = "0.1.0-pre", path = "sqlx-macros", optional = true } -proc-macro-hack = { version = "0.5", optional = true } +proc-macro-hack = { version = "0.5.11", optional = true } [dev-dependencies] -async-std = { version = "1.1.0", features = [ "attributes" ] } +async-std = { version = "1.2.0", features = [ "attributes" ] } matches = "0.1.8" criterion = "0.3.0" diff --git a/examples/realworld/Cargo.toml b/examples/realworld/Cargo.toml index aac921fb..d6cebd73 100644 --- a/examples/realworld/Cargo.toml +++ b/examples/realworld/Cargo.toml @@ -5,10 +5,10 @@ edition = "2018" workspace = "../.." [dependencies] -anyhow = "1.0.22" +anyhow = "1.0.25" dotenv = "0.15.0" -async-std = "1.1.0" +async-std = "1.2.0" tide = "0.4.0" sqlx = { path = "../..", features = [ "postgres" ] } -serde = { version = "1", features = [ "derive"] } +serde = { version = "1.0.103", features = [ "derive"] } futures = "0.3.1" diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index fd82dea9..04696242 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -16,11 +16,10 @@ postgres = [] mariadb = [] [dependencies] -async-std = { version = "1.1.0", features = ["attributes", "unstable"] } +async-std = { version = "1.2.0", features = ["attributes", "unstable"] } async-stream = "0.2.0" bitflags = "1.2.1" byteorder = { version = "1.3.2", default-features = false } -bytes = "0.4.12" futures-channel = "0.3.1" futures-core = "0.3.1" futures-util = "0.3.1" @@ -32,3 +31,4 @@ uuid = { version = "0.8.1", optional = true } [dev-dependencies] matches = "0.1.8" +bytes = "0.5.2" diff --git a/sqlx-core/src/io/buf_stream.rs b/sqlx-core/src/io/buf_stream.rs index 6c278de3..0915d762 100644 --- a/sqlx-core/src/io/buf_stream.rs +++ b/sqlx-core/src/io/buf_stream.rs @@ -2,8 +2,8 @@ use async_std::io::{ prelude::{ReadExt, WriteExt}, Read, Write, }; -use bytes::{BufMut, BytesMut}; use std::io; +use bitflags::_core::mem::MaybeUninit; pub struct BufStream { pub(crate) stream: S, @@ -15,19 +15,23 @@ pub struct BufStream { wbuf: Vec, // Buffer used when reading incoming messages - rbuf: BytesMut, + rbuf: Vec, + rbuf_rindex: usize, + rbuf_windex: usize, } impl BufStream -where - S: Read + Write + Unpin, + where + S: Read + Write + Unpin, { pub fn new(stream: S) -> Self { Self { stream, stream_eof: false, wbuf: Vec::with_capacity(1024), - rbuf: BytesMut::with_capacity(8 * 1024), + rbuf: vec![0; 8 * 1024], + rbuf_rindex: 0, + rbuf_windex: 0, } } @@ -48,7 +52,7 @@ where #[inline] pub fn consume(&mut self, cnt: usize) { - self.rbuf.advance(cnt); + self.rbuf_rindex += cnt; } pub async fn peek(&mut self, cnt: usize) -> io::Result> { @@ -61,25 +65,23 @@ where // If we have enough bytes in our read buffer, // return immediately - if self.rbuf.len() >= cnt { - return Ok(Some(&self.rbuf[..cnt])); + if self.rbuf_windex >= (self.rbuf_rindex + cnt) { + return Ok(Some(&self.rbuf[self.rbuf_rindex..(self.rbuf_rindex + cnt)])); } - if self.rbuf.capacity() < cnt { - // Ask for exactly how much we need with a lower bound of 32-bytes - let needed = (cnt - self.rbuf.capacity()).max(32); - self.rbuf.reserve(needed); + // If we are out of space to write to in the read buffer, + // we reset the indexes + if self.rbuf.len() < (self.rbuf_windex + cnt) { + // TODO: This assumes that all data is consumed when we need to re-allocate + debug_assert_eq!(self.rbuf_rindex, self.rbuf_windex); + + self.rbuf_rindex = 0; + self.rbuf_windex = 0; } - // SAFE: Read data in directly to buffer without zero-initializing the data. - // Postgres is a self-describing format and the TCP frames encode - // length headers. We will never attempt to decode more than we - // received. - let n = self.stream.read(unsafe { self.rbuf.bytes_mut() }).await?; + let n = self.stream.read(&mut self.rbuf[self.rbuf_windex..]).await?; - // SAFE: After we read in N bytes, we can tell the buffer that it actually - // has that many bytes MORE for the decode routines to look at - unsafe { self.rbuf.advance_mut(n) } + self.rbuf_windex += n; if n == 0 { self.stream_eof = true; diff --git a/sqlx-core/src/postgres/connection.rs b/sqlx-core/src/postgres/connection.rs index 4ded3855..4072ee88 100644 --- a/sqlx-core/src/postgres/connection.rs +++ b/sqlx-core/src/postgres/connection.rs @@ -75,10 +75,12 @@ impl Postgres { self.stream.flush().await?; while let Some(message) = self.receive().await? { + println!("recv!?"); match message { Message::Authentication(auth) => { match *auth { protocol::Authentication::Ok => { + println!("no auth?"); // Do nothing. No password is needed to continue. } @@ -126,6 +128,8 @@ impl Postgres { } } + println!("done"); + Ok(()) } @@ -221,10 +225,6 @@ impl Postgres { // Wait and return the next message to be received from Postgres. pub(super) async fn receive(&mut self) -> crate::Result> { - // Before we start the receive loop - // Flush any pending data from the send buffer - self.stream.flush().await?; - loop { // Read the message header (id + len) let mut header = ret_if_none!(self.stream.peek(5).await?); diff --git a/sqlx-macros/Cargo.toml b/sqlx-macros/Cargo.toml index a46010d2..2100b924 100644 --- a/sqlx-macros/Cargo.toml +++ b/sqlx-macros/Cargo.toml @@ -8,13 +8,13 @@ edition = "2018" proc-macro = true [dependencies] -async-std = "1.1.0" +async-std = "1.2.0" dotenv = "0.15.0" futures = "0.3.1" -proc-macro-hack = "0.5" +proc-macro-hack = "0.5.11" proc-macro2 = "1.0.6" sqlx = { version = "0.1.0-pre", path = "../sqlx-core", package = "sqlx-core" } -syn = "1.0.8" +syn = "1.0.11" quote = "1.0.2" url = "2.1.0"