WIP: Mariadb initial handshake packet

This commit is contained in:
Daniel Akhterov 2019-06-13 00:05:23 -07:00 committed by Daniel Akhterov
parent 2b7f54f1a0
commit 9d8b567790
8 changed files with 271 additions and 0 deletions

View File

@ -7,6 +7,7 @@ description = "Asynchronous and expressive database client in pure Rust."
edition = "2018"
[dependencies]
failure = "0.1"
byteorder = "1.3.2"
bytes = "0.4.12"
futures-preview = "=0.3.0-alpha.17"

18
mason-mariadb/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "mason-mariadb"
version = "0.1.0"
authors = ["Daniel Akhterov <akhterovd@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
mason-core = { path = "../mason-core" }
runtime = "=0.3.0-alpha.4"
futures-preview = "=0.3.0-alpha.16"
failure = "0.1"
byteorder = "1.3.1"
log = "0.4"
hex = "0.3.2"
bytes = "0.4.12"
memchr = "2.2.0"

View File

@ -0,0 +1,16 @@
use super::Connection;
use crate::protocol::{
client::{PasswordMessage, StartupMessage},
server::Message as ServerMessage,
};
use futures::StreamExt;
use mason_core::ConnectOptions;
use std::io;
pub async fn establish<'a, 'b: 'a>(
conn: &'a mut Connection,
options: ConnectOptions<'b>,
) -> io::Result<()> {
// The actual connection establishing
Ok(())
}

View File

@ -0,0 +1,83 @@
use crate::protocol::{
client::{Serialize, Terminate},
server::Message as ServerMessage,
};
use bytes::BytesMut;
use futures::{
channel::mpsc,
io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
SinkExt, StreamExt,
};
use mason_core::ConnectOptions;
use runtime::{net::TcpStream, task::JoinHandle};
use std::io;
mod establish;
mod query;
pub struct Connection {
writer: WriteHalf<TcpStream>,
incoming: mpsc::UnboundedReceiver<ServerMessage>,
// Buffer used when serializing outgoing messages
wbuf: Vec<u8>,
// Handle to coroutine reading messages from the stream
receiver: JoinHandle<io::Result<()>>,
// Process ID of the Backend
process_id: i32,
// Backend-unique key to use to send a cancel query message to the server
secret_key: i32,
}
impl Connection {
pub async fn establish(options: ConnectOptions<'_>) -> io::Result<Self> {
let stream = TcpStream::connect((options.host, options.port)).await?;
let (reader, writer) = stream.split();
let (tx, rx) = mpsc::unbounded();
let receiver = runtime::spawn(receiver(reader, tx));
let mut conn = Self {
wbuf: Vec::with_capacity(1024),
writer,
receiver,
incoming: rx,
process_id: -1,
secret_key: -1,
};
establish::establish(&mut conn, options).await?;
Ok(conn)
}
async fn send<S>(&mut self, message: S) -> io::Result<()>
where
S: Serialize,
{
self.wbuf.clear();
message.serialize(&mut self.wbuf);
self.writer.write_all(&self.wbuf).await?;
self.writer.flush().await?;
Ok(())
}
}
async fn receiver(
mut reader: ReadHalf<TcpStream>,
mut sender: mpsc::UnboundedSender<ServerMessage>,
) -> io::Result<()> {
let mut rbuf = BytesMut::with_capacity(0);
let mut len = 0;
loop {
println!("Inside receiver loop");
std::thread::sleep(std::time::Duration::from_millis(1000));
}
Ok(())
}

5
mason-mariadb/src/lib.rs Normal file
View File

@ -0,0 +1,5 @@
#![feature(non_exhaustive, async_await)]
#![allow(clippy::needless_lifetimes)]
// mod connection;
mod protocol;

View File

@ -0,0 +1,13 @@
pub trait Serialize {
fn serialize(&self, buf: &mut Vec<u8>);
}
#[derive(Debug)]
pub struct StartupMessage<'a> {
pub host: &'a str,
}
impl<'a> Serialize for StartupMessage<'a> {
fn serialize(&self, buf: &mut Vec<u8>) {
}
}

View File

@ -0,0 +1,2 @@
pub mod client;
pub mod server;

View File

@ -0,0 +1,133 @@
use failure::Error;
use std::iter::FromIterator;
use byteorder::LittleEndian;
use byteorder::ByteOrder;
pub trait Deserialize: Sized {
fn deserialize(buf: &mut Vec<u8>) -> Result<Self, Error>;
}
#[derive(Debug)]
#[non_exhaustive]
pub enum Message {
InitialHandshakePacket(InitialHandshakePacket),
}
pub enum Capabilities {
ClientMysql = 1,
FoundRows = 2,
ConnectWithDb = 8,
Compress = 32,
LocalFiles = 128,
IgnroeSpace = 256,
ClientProtocol41 = 1 << 9,
ClientInteractive = 1 << 10,
SSL = 1 << 11,
Transactions = 1 << 12,
SecureConnection = 1 << 13,
MultiStatements = 1 << 16,
MultiResults = 1 << 17,
PsMultiResults = 1 << 18,
PluginAuth = 1 << 19,
ConnectAttrs = 1 << 20,
PluginAuthLenencClientData = 1 << 21,
ClientSessionTrack = 1 << 23,
ClientDeprecateEof = 1 << 24,
MariaDbClientProgress = 1 << 32,
MariaDbClientComMulti = 1 << 33,
MariaClientStmtBulkOperations = 1 << 34,
}
#[derive(Default, Debug)]
pub struct InitialHandshakePacket {
pub protocol_version: u8,
pub server_version: String,
pub connection_id: u32,
pub auth_seed: String,
pub reserved: u8,
pub capabilities1: u16,
pub collation: u8,
pub status: u16,
pub plugin_data_length: u8,
pub scramble2: Option<String>,
pub reserved2: Option<u8>,
pub auth_plugin_name: Option<String>,
}
impl Deserialize for InitialHandshakePacket {
fn deserialize(buf: &mut Vec<u8>) -> Result<Self, Error> {
let mut index = 0;
let mut null_index = 0;
let protocol_version = buf[0] as u8;
index += 1;
// Find index of null character
null_index = index;
loop {
if buf[null_index] == b'\0' {
break;
}
null_index += 1;
}
let server_version = String::from_iter(buf[index..null_index]
.iter()
.map(|b| char::from(b.clone()))
.collect::<Vec<char>>()
.into_iter()
);
// Script null character
index = null_index + 1;
let connection_id = LittleEndian::read_u32(&buf);
// Increment by index by 4 bytes since we read a u32
index += 4;
let auth_seed = String::from_iter(buf[index..index+8]
.iter()
.map(|b| char::from(b.clone()))
.collect::<Vec<char>>()
.into_iter()
);
index += 8;
// Skip reserved byte
index += 1;
let mut capabilities = LittleEndian::read_u16(&buf[index..]) as u32;
index += 2;
let collation = buf[index];
index += 1;
let status = LittleEndian::read_u16(&buf[index..]);
index += 2;
capabilities |= LittleEndian::read_u16(&buf[index..]) as u32;
index += 2;
let mut plugin_data_length = None;
if capabilities as u128 & Capabilities::PluginAuth as u128> 0 {
plugin_data_length = Some(buf[index] as u8);
}
index += 1;
// Skip filler
index += 6;
if capabilities as u128 & Capabilities::ClientMysql as u128 == 0 {
capabilities |= LittleEndian::read_u32(&buf[index..]);
}
index += 4;
let mut scramble2: Option<String> = None;
let mut auth_plugin_name: Option<String> = None;
if capabilities as u128 & Capabilities::SecureConnection as u128 > 0 {
// TODO: scramble 2nd part. Length = max(12, plugin_data_length - 9)
} else {
// TODO: auth_plugin_name null temrinated string
}
Ok(InitialHandshakePacket::default())
}
}