Add OkPacket

This commit is contained in:
Daniel Akhterov 2019-06-14 17:05:19 -07:00
parent 9f0d41ea77
commit f67490d790
4 changed files with 78 additions and 13 deletions

View File

@ -1,6 +1,5 @@
use super::Connection;
use crate::protocol::{
client::{PasswordMessage, StartupMessage},
server::Message as ServerMessage,
};
use futures::StreamExt;
@ -12,5 +11,14 @@ pub async fn establish<'a, 'b: 'a>(
options: ConnectOptions<'b>,
) -> io::Result<()> {
// The actual connection establishing
// if let Some(message) = conn.incoming.next().await {
// if let Some(ServerMessage::InitialHandshakePacket(message)) = message {
// } else {
// unimplemented!("received {:?} unimplemented message", message);
// }
// }
Ok(())
}

View File

@ -1,5 +1,5 @@
use crate::protocol::{
client::{Serialize, Terminate},
client::Serialize,
server::Message as ServerMessage,
};
use bytes::BytesMut;
@ -13,7 +13,7 @@ use runtime::{net::TcpStream, task::JoinHandle};
use std::io;
mod establish;
mod query;
// mod query;
pub struct Connection {
writer: WriteHalf<TcpStream>,
@ -25,11 +25,8 @@ pub struct Connection {
// Handle to coroutine reading messages from the stream
receiver: JoinHandle<io::Result<()>>,
// Process ID of the Backend
process_id: i32,
// Backend-unique key to use to send a cancel query message to the server
secret_key: i32,
// MariaDB Connection ID
connection_id: i32,
}
impl Connection {
@ -43,8 +40,7 @@ impl Connection {
writer,
receiver,
incoming: rx,
process_id: -1,
secret_key: -1,
connection_id: -1,
};
establish::establish(&mut conn, options).await?;

View File

@ -4,5 +4,5 @@
#[macro_use]
extern crate bitflags;
// mod connection;
mod connection;
mod protocol;

View File

@ -2,7 +2,7 @@
use byteorder::{ByteOrder, LittleEndian};
use failure::{Error, err_msg};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
pub trait Deserialize: Sized {
fn deserialize(buf: &mut Vec<u8>) -> Result<Self, Error>;
@ -12,9 +12,10 @@ pub trait Deserialize: Sized {
#[non_exhaustive]
pub enum Message {
InitialHandshakePacket(InitialHandshakePacket),
OkPacket(OkPacket),
// ErrPacket(ErrPacket),
}
bitflags! {
pub struct Capabilities: u128 {
const CLIENT_MYSQL = 1;
@ -64,6 +65,25 @@ pub struct InitialHandshakePacket {
pub auth_plugin_name: Option<Bytes>,
}
#[derive(Default, Debug)]
pub struct OkPacket {
pub affected_rows: Option<usize>,
pub last_insert_id: Option<usize>,
pub server_status: u16,
pub warning_count: u16,
pub info: Bytes,
pub session_state_info: Option<Bytes>,
pub value: Option<Bytes>,
}
impl Message {
pub fn deserialize(buf: &mut BytesMut) -> Result<Option<Self>, Error> {
let length = buf[0] + buf[1] << 8 + buf[2] << 16;
let sequence_number = buf[3];
Ok(None)
}
}
impl Deserialize for InitialHandshakePacket {
fn deserialize(buf: &mut Vec<u8>) -> Result<Self, Error> {
let mut index = 0;
@ -150,3 +170,44 @@ impl Deserialize for InitialHandshakePacket {
})
}
}
fn deserialize_int_lenenc(buf: &Vec<u8>, index: &usize) -> (Option<usize>, usize) {
match buf[*index] {
0xFB => (None, *index + 1),
0xFC => (Some(LittleEndian::read_u16(&buf[*index + 1..]) as usize), *index + 2),
0xFD => (Some((buf[*index + 1] + buf[*index + 2] << 8 + buf[*index + 3] << 16) as usize), *index + 3),
0xFE => (Some(LittleEndian::read_u64(&buf[*index..]) as usize), *index + 8),
0xFF => panic!("int<lenenc> unprocessable first byte 0xFF"),
_ => (Some(buf[*index] as usize), *index + 1),
}
}
fn deserialize_int_2(buf: &Vec<u8>, index: &usize) -> (u16, usize) {
(LittleEndian::read_u16(&buf[*index..]), index + 2)
}
impl Deserialize for OkPacket {
fn deserialize(buf: &mut Vec<u8>) -> Result<Self, Error> {
let mut index = 1;
let (affected_rows, index) = deserialize_int_lenenc(&buf, &index);
let (last_insert_id, index) = deserialize_int_lenenc(&buf, &index);
let (server_status, index) = deserialize_int_2(&buf, &index);
let (warning_count, index) = deserialize_int_2(&buf, &index);
// Assuming CLIENT_SESSION_TRACK is unsupported
let session_state_info = None;
let value = None;
let info = Bytes::from(&buf[index..]);
Ok(OkPacket {
affected_rows,
last_insert_id,
server_status,
warning_count,
info,
session_state_info,
value,
})
}
}