mysql: ping now follows the normal stream lifecycle

fixes test introduced by #142

Co-authored-by: Daniel Akhterov <daniel@launchbadge.com>
This commit is contained in:
Ryan Leckey
2020-03-20 20:41:33 -07:00
parent f82c57e4b2
commit f86f80d8b9
4 changed files with 45 additions and 32 deletions

View File

@@ -252,10 +252,13 @@ async fn close(mut stream: MySqlStream) -> crate::Result<()> {
}
async fn ping(stream: &mut MySqlStream) -> crate::Result<()> {
stream.wait_until_ready().await?;
stream.is_ready = false;
stream.send(ComPing, true).await?;
match stream.receive().await?[0] {
0x00 | 0xFE => Ok(()),
0x00 | 0xFE => stream.handle_ok().map(drop),
0xFF => stream.handle_err(),
@@ -323,10 +326,12 @@ impl Connect for MySqlConnection {
}
impl Connection for MySqlConnection {
#[inline]
fn close(self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(close(self.stream))
}
#[inline]
fn ping(&mut self) -> BoxFuture<crate::Result<()>> {
Box::pin(ping(&mut self.stream))
}

View File

@@ -10,33 +10,6 @@ use crate::mysql::protocol::{
use crate::mysql::{MySql, MySqlArguments, MySqlCursor, MySqlTypeInfo};
impl super::MySqlConnection {
async fn wait_until_ready(&mut self) -> crate::Result<()> {
if !self.is_ready {
loop {
let packet_id = self.stream.receive().await?[0];
match packet_id {
0xFE if self.stream.packet().len() < 0xFF_FF_FF => {
// OK or EOF packet
self.is_ready = true;
break;
}
0xFF => {
// ERR packet
self.is_ready = true;
return self.stream.handle_err();
}
_ => {
// Something else; skip
}
}
}
}
Ok(())
}
// Creates a prepared statement for the passed query string
async fn prepare(&mut self, query: &str) -> crate::Result<ComStmtPrepareOk> {
// https://dev.mysql.com/doc/dev/mysql-server/8.0.11/page_protocol_com_stmt_prepare.html
@@ -91,8 +64,8 @@ impl super::MySqlConnection {
query: &str,
arguments: Option<MySqlArguments>,
) -> crate::Result<Option<u32>> {
self.wait_until_ready().await?;
self.is_ready = false;
self.stream.wait_until_ready().await?;
self.stream.is_ready = false;
if let Some(arguments) = arguments {
let statement_id = self.get_or_prepare(query).await?;
@@ -147,6 +120,7 @@ impl super::MySqlConnection {
}
0xFF => {
self.is_ready = true;
return self.stream.handle_err();
}
@@ -160,7 +134,7 @@ impl super::MySqlConnection {
// method is not named describe to work around an intellijrust bug
// otherwise it marks someone trying to describe the connection as "method is private"
async fn do_describe(&mut self, query: &str) -> crate::Result<Describe<MySql>> {
self.wait_until_ready().await?;
self.stream.wait_until_ready().await?;
let stmt = self.prepare(query).await?;

View File

@@ -13,6 +13,10 @@ const MAX_PACKET_SIZE: u32 = 1024;
pub(crate) struct MySqlStream {
pub(super) stream: BufStream<MaybeTlsStream>,
// Is the stream ready to send commands
// Put another way, are we still expecting an EOF or OK packet to terminate
pub(super) is_ready: bool,
// Active capabilities
pub(super) capabilities: Capabilities,
@@ -56,6 +60,7 @@ impl MySqlStream {
packet_buf: Vec::with_capacity(MAX_PACKET_SIZE as usize),
packet_len: 0,
seq_no: 0,
is_ready: true,
})
}
@@ -182,10 +187,39 @@ impl MySqlStream {
}
pub(crate) fn handle_err<T>(&mut self) -> crate::Result<T> {
self.is_ready = true;
Err(MySqlError(ErrPacket::read(self.packet(), self.capabilities)?).into())
}
pub(crate) fn handle_ok(&mut self) -> crate::Result<OkPacket> {
self.is_ready = true;
OkPacket::read(self.packet())
}
pub(crate) async fn wait_until_ready(&mut self) -> crate::Result<()> {
if !self.is_ready {
loop {
let packet_id = self.receive().await?[0];
match packet_id {
0xFE if self.packet().len() < 0xFF_FF_FF => {
// OK or EOF packet
self.is_ready = true;
break;
}
0xFF => {
// ERR packet
self.is_ready = true;
return self.handle_err();
}
_ => {
// Something else; skip
}
}
}
}
Ok(())
}
}

View File

@@ -25,7 +25,7 @@ impl Type<Postgres> for [Uuid] {
impl Type<Postgres> for Vec<Uuid> {
fn type_info() -> PgTypeInfo {
<Postgres as HasSqlType<[Uuid]>>::type_info()
<[Uuid] as Type<Postgres>>::type_info()
}
}