From f67490d790a46b4a907fc067643cf16d236b5172 Mon Sep 17 00:00:00 2001 From: Daniel Akhterov Date: Fri, 14 Jun 2019 17:05:19 -0700 Subject: [PATCH] Add OkPacket --- mason-mariadb/src/connection/establish.rs | 10 +++- mason-mariadb/src/connection/mod.rs | 14 ++--- mason-mariadb/src/lib.rs | 2 +- mason-mariadb/src/protocol/server.rs | 65 ++++++++++++++++++++++- 4 files changed, 78 insertions(+), 13 deletions(-) diff --git a/mason-mariadb/src/connection/establish.rs b/mason-mariadb/src/connection/establish.rs index ea551651..779fb066 100644 --- a/mason-mariadb/src/connection/establish.rs +++ b/mason-mariadb/src/connection/establish.rs @@ -1,6 +1,5 @@ use super::Connection; use crate::protocol::{ - client::{PasswordMessage, StartupMessage}, server::Message as ServerMessage, }; use futures::StreamExt; @@ -12,5 +11,14 @@ pub async fn establish<'a, 'b: 'a>( options: ConnectOptions<'b>, ) -> io::Result<()> { // The actual connection establishing + + // if let Some(message) = conn.incoming.next().await { + // if let Some(ServerMessage::InitialHandshakePacket(message)) = message { + + // } else { + // unimplemented!("received {:?} unimplemented message", message); + // } + // } + Ok(()) } diff --git a/mason-mariadb/src/connection/mod.rs b/mason-mariadb/src/connection/mod.rs index c592728d..ac5de5b6 100644 --- a/mason-mariadb/src/connection/mod.rs +++ b/mason-mariadb/src/connection/mod.rs @@ -1,5 +1,5 @@ use crate::protocol::{ - client::{Serialize, Terminate}, + client::Serialize, server::Message as ServerMessage, }; use bytes::BytesMut; @@ -13,7 +13,7 @@ use runtime::{net::TcpStream, task::JoinHandle}; use std::io; mod establish; -mod query; +// mod query; pub struct Connection { writer: WriteHalf, @@ -25,11 +25,8 @@ pub struct Connection { // Handle to coroutine reading messages from the stream receiver: JoinHandle>, - // Process ID of the Backend - process_id: i32, - - // Backend-unique key to use to send a cancel query message to the server - secret_key: i32, + // MariaDB Connection ID + connection_id: i32, } impl Connection { @@ -43,8 +40,7 @@ impl Connection { writer, receiver, incoming: rx, - process_id: -1, - secret_key: -1, + connection_id: -1, }; establish::establish(&mut conn, options).await?; diff --git a/mason-mariadb/src/lib.rs b/mason-mariadb/src/lib.rs index 2e760c7f..e983c4db 100644 --- a/mason-mariadb/src/lib.rs +++ b/mason-mariadb/src/lib.rs @@ -4,5 +4,5 @@ #[macro_use] extern crate bitflags; -// mod connection; +mod connection; mod protocol; diff --git a/mason-mariadb/src/protocol/server.rs b/mason-mariadb/src/protocol/server.rs index d4e075ee..2810eae6 100644 --- a/mason-mariadb/src/protocol/server.rs +++ b/mason-mariadb/src/protocol/server.rs @@ -2,7 +2,7 @@ use byteorder::{ByteOrder, LittleEndian}; use failure::{Error, err_msg}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; pub trait Deserialize: Sized { fn deserialize(buf: &mut Vec) -> Result; @@ -12,9 +12,10 @@ pub trait Deserialize: Sized { #[non_exhaustive] pub enum Message { InitialHandshakePacket(InitialHandshakePacket), + OkPacket(OkPacket), + // ErrPacket(ErrPacket), } - bitflags! { pub struct Capabilities: u128 { const CLIENT_MYSQL = 1; @@ -64,6 +65,25 @@ pub struct InitialHandshakePacket { pub auth_plugin_name: Option, } +#[derive(Default, Debug)] +pub struct OkPacket { + pub affected_rows: Option, + pub last_insert_id: Option, + pub server_status: u16, + pub warning_count: u16, + pub info: Bytes, + pub session_state_info: Option, + pub value: Option, +} + +impl Message { + pub fn deserialize(buf: &mut BytesMut) -> Result, Error> { + let length = buf[0] + buf[1] << 8 + buf[2] << 16; + let sequence_number = buf[3]; + Ok(None) + } +} + impl Deserialize for InitialHandshakePacket { fn deserialize(buf: &mut Vec) -> Result { let mut index = 0; @@ -150,3 +170,44 @@ impl Deserialize for InitialHandshakePacket { }) } } + +fn deserialize_int_lenenc(buf: &Vec, index: &usize) -> (Option, usize) { + match buf[*index] { + 0xFB => (None, *index + 1), + 0xFC => (Some(LittleEndian::read_u16(&buf[*index + 1..]) as usize), *index + 2), + 0xFD => (Some((buf[*index + 1] + buf[*index + 2] << 8 + buf[*index + 3] << 16) as usize), *index + 3), + 0xFE => (Some(LittleEndian::read_u64(&buf[*index..]) as usize), *index + 8), + 0xFF => panic!("int unprocessable first byte 0xFF"), + _ => (Some(buf[*index] as usize), *index + 1), + } +} + +fn deserialize_int_2(buf: &Vec, index: &usize) -> (u16, usize) { + (LittleEndian::read_u16(&buf[*index..]), index + 2) +} + +impl Deserialize for OkPacket { + fn deserialize(buf: &mut Vec) -> Result { + let mut index = 1; + let (affected_rows, index) = deserialize_int_lenenc(&buf, &index); + let (last_insert_id, index) = deserialize_int_lenenc(&buf, &index); + let (server_status, index) = deserialize_int_2(&buf, &index); + let (warning_count, index) = deserialize_int_2(&buf, &index); + + // Assuming CLIENT_SESSION_TRACK is unsupported + let session_state_info = None; + let value = None; + + let info = Bytes::from(&buf[index..]); + + Ok(OkPacket { + affected_rows, + last_insert_id, + server_status, + warning_count, + info, + session_state_info, + value, + }) + } +}