[mariadb] Finish changes to the Text protocol

This commit is contained in:
Ryan Leckey 2019-09-09 23:40:08 -07:00
parent 4b2b08e63a
commit 22c889b472
19 changed files with 120 additions and 316 deletions

View File

@ -7,7 +7,8 @@ pub trait BufExt {
fn get_uint_lenenc<T: ByteOrder>(&mut self) -> io::Result<Option<u64>>;
fn get_str_eof(&mut self) -> io::Result<&str>;
fn get_str_lenenc<T: ByteOrder>(&mut self) -> io::Result<Option<&str>>;
fn get_byte_lenenc<T: ByteOrder>(&mut self) -> io::Result<Option<&[u8]>>;
fn get_bytes(&mut self, n: usize) -> io::Result<&[u8]>;
fn get_bytes_lenenc<T: ByteOrder>(&mut self) -> io::Result<Option<&[u8]>>;
}
impl<'a> BufExt for &'a [u8] {
@ -39,9 +40,16 @@ impl<'a> BufExt for &'a [u8] {
.transpose()
}
fn get_byte_lenenc<T: ByteOrder>(&mut self) -> io::Result<Option<&[u8]>> {
Ok(self
.get_uint_lenenc::<T>()?
.map(|len| &self[..len as usize]))
fn get_bytes(&mut self, n: usize) -> io::Result<&[u8]> {
let buf = &self[..n];
self.advance(n);
Ok(buf)
}
fn get_bytes_lenenc<T: ByteOrder>(&mut self) -> io::Result<Option<&[u8]>> {
self.get_uint_lenenc::<T>()?
.map(move |len| self.get_bytes(len as usize))
.transpose()
}
}

View File

@ -1,129 +0,0 @@
use crate::mariadb::FieldType;
use bytes::Bytes;
#[derive(Debug, Default)]
pub struct ResultRow {
pub length: u32,
pub seq_no: u8,
pub columns: Vec<Option<Bytes>>,
}
impl crate::mariadb::Decode for ResultRow {
fn decode(ctx: &mut crate::mariadb::DeContext) -> Result<Self, failure::Error> {
let decoder = &mut ctx.decoder;
let length = decoder.decode_length()?;
let seq_no = decoder.decode_int_u8();
let header = decoder.decode_int_u8();
let bitmap = if let Some(columns) = ctx.columns {
let size = (columns + 9) / 8;
Ok(decoder.decode_byte_fix(size as usize))
} else {
Err(failure::err_msg(
"Columns were not provided; cannot deserialize binary result row",
))
}?;
let columns = match (&ctx.columns, &ctx.column_defs) {
(Some(columns), Some(column_defs)) => {
(0..*columns as usize)
.map(|index| {
if (1 << (index % 8)) & bitmap[index / 8] as usize == 1 {
None
} else {
match column_defs[index].field_type {
// Ordered by https://mariadb.com/kb/en/library/resultset-row/#binary-resultset-row
FieldType::MYSQL_TYPE_DOUBLE => {
Some(decoder.decode_binary_double())
}
FieldType::MYSQL_TYPE_LONGLONG => {
Some(decoder.decode_binary_bigint())
}
// Is this MYSQL_TYPE_INTEGER?
FieldType::MYSQL_TYPE_LONG => Some(decoder.decode_binary_int()),
// Is this MYSQL_TYPE_MEDIUMINTEGER?
FieldType::MYSQL_TYPE_INT24 => {
Some(decoder.decode_binary_mediumint())
}
FieldType::MYSQL_TYPE_FLOAT => Some(decoder.decode_binary_float()),
// Is this MYSQL_TYPE_SMALLINT?
FieldType::MYSQL_TYPE_SHORT => {
Some(decoder.decode_binary_smallint())
}
FieldType::MYSQL_TYPE_YEAR => Some(decoder.decode_binary_year()),
FieldType::MYSQL_TYPE_TINY => Some(decoder.decode_binary_tinyint()),
FieldType::MYSQL_TYPE_DATE => Some(decoder.decode_binary_date()),
FieldType::MYSQL_TYPE_TIMESTAMP => {
Some(decoder.decode_binary_timestamp())
}
FieldType::MYSQL_TYPE_DATETIME => {
Some(decoder.decode_binary_datetime())
}
FieldType::MYSQL_TYPE_TIME => Some(decoder.decode_binary_time()),
FieldType::MYSQL_TYPE_NEWDECIMAL => {
Some(decoder.decode_binary_decimal())
}
// This group of types are all encoded as byte<lenenc>
FieldType::MYSQL_TYPE_TINY_BLOB => {
Some(decoder.decode_byte_lenenc())
}
FieldType::MYSQL_TYPE_MEDIUM_BLOB => {
Some(decoder.decode_byte_lenenc())
}
FieldType::MYSQL_TYPE_LONG_BLOB => {
Some(decoder.decode_byte_lenenc())
}
FieldType::MYSQL_TYPE_BLOB => Some(decoder.decode_byte_lenenc()),
FieldType::MYSQL_TYPE_VARCHAR => Some(decoder.decode_byte_lenenc()),
FieldType::MYSQL_TYPE_VAR_STRING => {
Some(decoder.decode_byte_lenenc())
}
FieldType::MYSQL_TYPE_STRING => Some(decoder.decode_byte_lenenc()),
FieldType::MYSQL_TYPE_GEOMETRY => {
Some(decoder.decode_byte_lenenc())
}
// The following did not have defined binary encoding, so I guessed.
// Perhaps you cannot get these types back from the server if you're using
// prepared statements? In that case we should error out here instead of
// proceeding to decode.
FieldType::MYSQL_TYPE_DECIMAL => {
Some(decoder.decode_binary_decimal())
}
FieldType::MYSQL_TYPE_NULL => panic!("Cannot decode MysqlTypeNull"),
FieldType::MYSQL_TYPE_NEWDATE => Some(decoder.decode_binary_date()),
FieldType::MYSQL_TYPE_BIT => Some(decoder.decode_byte_fix(1)),
FieldType::MYSQL_TYPE_TIMESTAMP2 => {
Some(decoder.decode_binary_timestamp())
}
FieldType::MYSQL_TYPE_DATETIME2 => {
Some(decoder.decode_binary_datetime())
}
FieldType::MYSQL_TYPE_TIME2 => Some(decoder.decode_binary_time()),
FieldType::MYSQL_TYPE_JSON => Some(decoder.decode_byte_lenenc()),
FieldType::MYSQL_TYPE_ENUM => Some(decoder.decode_byte_lenenc()),
FieldType::MYSQL_TYPE_SET => Some(decoder.decode_byte_lenenc()),
_ => panic!("Unrecognized FieldType received from MariaDB"),
}
}
})
.collect::<Vec<Option<Bytes>>>()
}
_ => Vec::new(),
};
Ok(ResultRow {
length,
seq_no,
columns,
})
}
}

View File

@ -7,8 +7,8 @@ mod encode;
mod error_code;
mod field;
mod response;
mod text;
mod server_status;
mod text;
pub use capabilities::Capabilities;
pub use connect::{
@ -21,7 +21,4 @@ pub use response::{
ColumnCountPacket, ColumnDefinitionPacket, EofPacket, ErrPacket, OkPacket, ResultRow,
};
pub use server_status::ServerStatusFlag;
pub use text::{
ComDebug, ComInitDb,
ComPing, ComProcessKill, ComQuery, ComQuit,
};
pub use text::{ComDebug, ComInitDb, ComPing, ComProcessKill, ComQuery, ComQuit};

View File

@ -45,7 +45,7 @@ impl OkPacket {
.unwrap_or_default()
.to_owned()
.into();
session_state_info = buf.get_byte_lenenc::<LittleEndian>()?.map(Into::into);
session_state_info = buf.get_bytes_lenenc::<LittleEndian>()?.map(Into::into);
value_of_variable = buf.get_str_lenenc::<LittleEndian>()?.map(Into::into);
} else {
info = buf.get_str_eof()?.to_owned().into();

View File

@ -29,7 +29,9 @@ impl ResultRow {
debug_assert_eq!(header, 0);
// NULL-Bitmap : byte<(number_of_columns + 9) / 8>
let null = buf.get_uint::<LittleEndian>((columns.len() + 9) / 8)?;
let null_len = (columns.len() + 9) / 8;
let null = &buf[..];
buf.advance(null_len);
let buffer: Pin<Box<[u8]>> = Pin::new(buf.into());
let mut buf = &*buffer;
@ -37,20 +39,23 @@ impl ResultRow {
let mut values = Vec::with_capacity(columns.len());
for column_idx in 0..columns.len() {
if (null & (1 << column_idx)) != 0 {
if null[column_idx / 8] & (1 << (column_idx % 8)) != 0 {
values.push(None);
} else {
match columns[column_idx].field_type {
FieldType::MYSQL_TYPE_LONG => {
values.push(Some(buf[..(4 as usize)].into()));
buf.advance(4);
values.push(Some(buf.get_bytes(4)?.into()));
}
FieldType::MYSQL_TYPE_VAR_STRING => {
let len = buf.get_uint_lenenc::<LittleEndian>()?.unwrap_or_default();
values.push(Some(buf[..(len as usize)].into()));
buf.advance(len as usize);
FieldType::MYSQL_TYPE_TINY_BLOB
| FieldType::MYSQL_TYPE_MEDIUM_BLOB
| FieldType::MYSQL_TYPE_LONG_BLOB
| FieldType::MYSQL_TYPE_BLOB
| FieldType::MYSQL_TYPE_GEOMETRY
| FieldType::MYSQL_TYPE_STRING
| FieldType::MYSQL_TYPE_VARCHAR
| FieldType::MYSQL_TYPE_VAR_STRING => {
values.push(buf.get_bytes_lenenc::<LittleEndian>()?.map(Into::into));
}
type_ => {

View File

@ -1,5 +1,8 @@
use crate::{io::BufMut, mariadb::protocol::{Capabilities, Encode}};
use super::TextProtocol;
use crate::{
io::BufMut,
mariadb::protocol::{Capabilities, Encode},
};
#[derive(Debug)]
pub struct ComDebug;

View File

@ -1,5 +1,8 @@
use crate::{io::BufMut, mariadb::protocol::{Encode, Capabilities}};
use super::TextProtocol;
use crate::{
io::BufMut,
mariadb::protocol::{Capabilities, Encode},
};
pub struct ComInitDb<'a> {
pub schema_name: &'a str,

View File

@ -1,5 +1,8 @@
use crate::{io::BufMut, mariadb::{protocol::{Encode, Capabilities}}};
use super::TextProtocol;
use crate::{
io::BufMut,
mariadb::protocol::{Capabilities, Encode},
};
#[derive(Debug)]
pub struct ComPing;

View File

@ -1,5 +1,8 @@
use crate::{io::BufMut, mariadb::protocol::{Encode, Capabilities}};
use super::TextProtocol;
use crate::{
io::BufMut,
mariadb::protocol::{Capabilities, Encode},
};
use byteorder::LittleEndian;
/// Forces the server to terminate a specified connection.

View File

@ -1,6 +1,9 @@
use crate::{
io::BufMut,
mariadb::{io::{BufMutExt}, protocol::{Encode, Capabilities}},
mariadb::{
io::BufMutExt,
protocol::{Capabilities, Encode},
},
};
/// Sends the server an SQL statement to be executed immediately.
@ -20,7 +23,7 @@ mod tests {
use super::*;
#[test]
fn it_encodes_com_query() {
fn it_encodes_com_query() {
let mut buf = Vec::new();
ComQuery {

View File

@ -1,5 +1,8 @@
use crate::{io::BufMut, mariadb::protocol::{Encode, Capabilities}};
use super::TextProtocol;
use crate::{
io::BufMut,
mariadb::protocol::{Capabilities, Encode},
};
pub struct ComQuit;

View File

@ -1,10 +1,17 @@
use crate::{io::BufMut, mariadb::Encode};
use super::TextProtocol;
use crate::{
io::BufMut,
mariadb::protocol::{Capabilities, Encode},
};
pub struct ComResetConnection();
/// Resets a connection without re-authentication.
#[derive(Debug)]
pub struct ComResetConnection;
impl Encode for ComResetConnection {
fn encode(&self, buf: &mut Vec<u8>) {
buf.put_u8(super::TextProtocol::ComResetConnection as u8);
fn encode(&self, buf: &mut Vec<u8>, _: Capabilities) {
// COM_RESET_CONNECTION Header : int<1>
buf.put_u8(TextProtocol::ComResetConnection as u8);
}
}
@ -13,13 +20,11 @@ mod tests {
use super::*;
#[test]
fn it_encodes_com_reset_conn() -> std::io::Result<()> {
let mut buf = Vec::with_capacity(1024);
fn it_encodes_com_reset_conn() {
let mut buf = Vec::new();
ComResetConnection().encode(&mut buf);
ComResetConnection.encode(&mut buf, Capabilities::empty());
assert_eq!(&buf[..], b"\x01\0\0\x00\x1F");
Ok(())
assert_eq!(&buf[..], b"\x1F");
}
}

View File

@ -1,27 +1,29 @@
use crate::{io::BufMut, mariadb::Encode};
use crate::{
io::BufMut,
mariadb::protocol::{text::TextProtocol, Capabilities, Encode},
};
use byteorder::LittleEndian;
#[derive(Clone, Copy)]
#[derive(Debug, Copy, Clone)]
#[repr(u16)]
pub enum SetOptionOptions {
MySqlOptionMultiStatementsOn = 0x00,
MySqlOptionMultiStatementsOff = 0x01,
}
/// Enables or disables server option.
#[derive(Debug)]
pub struct ComSetOption {
pub option: SetOptionOptions,
}
impl Encode for ComSetOption {
fn encode(&self, buf: &mut Vec<u8>) {
buf.put_u8(super::TextProtocol::ComSetOption as u8);
buf.put_u16::<LittleEndian>(self.option.into());
}
}
fn encode(&self, buf: &mut Vec<u8>, _: Capabilities) {
// COM_SET_OPTION : int<1>
buf.put_u8(TextProtocol::ComSetOption as u8);
// Helper method to easily transform into u16
impl Into<u16> for SetOptionOptions {
fn into(self) -> u16 {
self as u16
// option : int<2>
buf.put_u16::<LittleEndian>(self.option as u16);
}
}
@ -30,16 +32,14 @@ mod tests {
use super::*;
#[test]
fn it_encodes_com_set_option() -> std::io::Result<()> {
let mut buf = Vec::with_capacity(1024);
fn it_encodes_com_set_option() {
let mut buf = Vec::new();
ComSetOption {
option: SetOptionOptions::MySqlOptionMultiStatementsOff,
}
.encode(&mut buf);
.encode(&mut buf, Capabilities::empty());
assert_eq!(&buf[..], b"\x03\0\0\x00\x1B\x01\0");
Ok(())
assert_eq!(&buf[..], b"\x1B\x01\0");
}
}

View File

@ -1,43 +0,0 @@
use crate::{io::BufMut, mariadb::Encode};
#[derive(Clone, Copy)]
pub enum ShutdownOptions {
ShutdownDefault = 0x00,
}
pub struct ComShutdown {
pub option: ShutdownOptions,
}
impl Encode for ComShutdown {
fn encode(&self, buf: &mut Vec<u8>) {
buf.put_u8(super::TextProtocol::ComShutdown as u8);
buf.put_u8(self.option as u8);
}
}
// Helper method to easily transform into u8
impl Into<u8> for ShutdownOptions {
fn into(self) -> u8 {
self as u8
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_encodes_com_shutdown() -> std::io::Result<()> {
let mut buf = Vec::with_capacity(1024);
ComShutdown {
option: ShutdownOptions::ShutdownDefault,
}
.encode(&mut buf);
assert_eq!(&buf[..], b"\x02\0\0\x00\x0A\x00");
Ok(())
}
}

View File

@ -1,10 +1,14 @@
use crate::{io::BufMut, mariadb::Encode};
use crate::{
io::BufMut,
mariadb::protocol::{text::TextProtocol, Capabilities, Encode},
};
pub struct ComSleep();
pub struct ComSleep;
impl Encode for ComSleep {
fn encode(&self, buf: &mut Vec<u8>) {
buf.put_u8(super::TextProtocol::ComSleep as u8);
fn encode(&self, buf: &mut Vec<u8>, _: Capabilities) {
// COM_SLEEP : int<1>
buf.put_u8(TextProtocol::ComSleep as u8);
}
}
@ -13,13 +17,11 @@ mod tests {
use super::*;
#[test]
fn it_encodes_com_sleep() -> std::io::Result<()> {
let mut buf = Vec::with_capacity(1024);
fn it_encodes_com_sleep() {
let mut buf = Vec::new();
ComSleep().encode(&mut buf);
ComSleep.encode(&mut buf, Capabilities::empty());
assert_eq!(&buf[..], b"\x01\0\0\x00\x00");
Ok(())
assert_eq!(&buf[..], b"\x00");
}
}

View File

@ -1,10 +1,15 @@
use crate::{io::BufMut, mariadb::Encode};
use crate::{
io::BufMut,
mariadb::protocol::{text::TextProtocol, Capabilities, Encode},
};
pub struct ComStatistics();
#[derive(Debug)]
pub struct ComStatistics;
impl Encode for ComStatistics {
fn encode(&self, buf: &mut Vec<u8>) {
buf.put_u8(super::TextProtocol::ComStatistics.into());
fn encode(&self, buf: &mut Vec<u8>, _: Capabilities) {
// COM_STATISTICS : int<1>
buf.put_u8(TextProtocol::ComStatistics as u8);
}
}
@ -13,13 +18,11 @@ mod tests {
use super::*;
#[test]
fn it_encodes_com_statistics() -> std::io::Result<()> {
let mut buf = Vec::with_capacity(1024);
fn it_encodes_com_statistics() {
let mut buf = Vec::new();
ComStatistics().encode(&mut buf);
ComStatistics.encode(&mut buf, Capabilities::empty());
assert_eq!(&buf[..], b"\x01\0\0\x00\x09");
Ok(())
assert_eq!(&buf[..], b"\x09");
}
}

View File

@ -4,12 +4,10 @@ mod com_ping;
mod com_process_kill;
mod com_query;
mod com_quit;
// mod com_reset_conn;
// mod com_set_option;
// mod com_shutdown;
// mod com_sleep;
// mod com_statistics;
// mod result_row;
mod com_reset_conn;
mod com_set_option;
mod com_sleep;
mod com_statistics;
pub use com_debug::ComDebug;
pub use com_init_db::ComInitDb;
@ -17,12 +15,10 @@ pub use com_ping::ComPing;
pub use com_process_kill::ComProcessKill;
pub use com_query::ComQuery;
pub use com_quit::ComQuit;
// pub use com_reset_conn::ComResetConnection;
// pub use com_set_option::{ComSetOption, SetOptionOptions};
// pub use com_shutdown::{ComShutdown, ShutdownOptions};
// pub use com_sleep::ComSleep;
// pub use com_statistics::ComStatistics;
// pub use result_row::ResultRow;
pub use com_reset_conn::ComResetConnection;
pub use com_set_option::{ComSetOption, SetOptionOptions};
pub use com_sleep::ComSleep;
pub use com_statistics::ComStatistics;
// This is an enum of text protocol packet tags.
// Tags are the 5th byte of the packet (1st byte of packet body)

View File

@ -1,48 +0,0 @@
use crate::mariadb::{BufExt, Capabilities, Decode};
use byteorder::LittleEndian;
use std::{io, pin::Pin};
#[derive(Default, Debug)]
pub struct ResultRow<'a> {
pub columns: Vec<&'a [u8]>,
}
impl<'a> Decode<'a> for ResultRow<'a> {
fn decode(buf: &'a [u8], _: Capabilities) -> io::Result<Self> {
// let buffer = Pin::new(buf.into());
// let mut buf: &[u8] = &*buffer;
// // FIXME: Where to put number of columns to decode?
// let columns = Vec::new();
// if let Some(num_columns) = Some(0) {
// for _ in 0..num_columns {
// columns.push(buf.get_byte_lenenc::<LittleEndian>()?);
// }
// }
Ok(ResultRow { columns: vec![] })
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::__bytes_builder;
#[test]
fn it_decodes_result_row_packet() -> io::Result<()> {
#[rustfmt::skip]
let buf = __bytes_builder!(
// int<3> length
1u8, 0u8, 0u8,
// int<1> seq_no
1u8,
// string<lenenc> column data
1u8, b"s"
);
let _message = ResultRow::decode(&buf, Capabilities::CLIENT_PROTOCOL_41)?;
Ok(())
}
}

View File

@ -1,13 +1,3 @@
pub enum SessionChangeType {
SessionTrackSystemVariables = 0,
SessionTrackSchema = 1,
SessionTrackStateChange = 2,
SessionTrackGTIDS = 3,
SessionTrackTransactionCharacteristics = 4,
SessionTrackTransactionState = 5,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct StmtExecFlag(pub u8);
impl StmtExecFlag {