From 9d8b567790c64ad53450921da2462004ac45e2d9 Mon Sep 17 00:00:00 2001 From: Daniel Akhterov Date: Thu, 13 Jun 2019 00:05:23 -0700 Subject: [PATCH] WIP: Mariadb initial handshake packet --- Cargo.toml | 1 + mason-mariadb/Cargo.toml | 18 +++ mason-mariadb/src/connection/establish.rs | 16 +++ mason-mariadb/src/connection/mod.rs | 83 ++++++++++++++ mason-mariadb/src/lib.rs | 5 + mason-mariadb/src/protocol/client.rs | 13 +++ mason-mariadb/src/protocol/mod.rs | 2 + mason-mariadb/src/protocol/server.rs | 133 ++++++++++++++++++++++ 8 files changed, 271 insertions(+) create mode 100644 mason-mariadb/Cargo.toml create mode 100644 mason-mariadb/src/connection/establish.rs create mode 100644 mason-mariadb/src/connection/mod.rs create mode 100644 mason-mariadb/src/lib.rs create mode 100644 mason-mariadb/src/protocol/client.rs create mode 100644 mason-mariadb/src/protocol/mod.rs create mode 100644 mason-mariadb/src/protocol/server.rs diff --git a/Cargo.toml b/Cargo.toml index 25a87a71..62038a2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ description = "Asynchronous and expressive database client in pure Rust." edition = "2018" [dependencies] +failure = "0.1" byteorder = "1.3.2" bytes = "0.4.12" futures-preview = "=0.3.0-alpha.17" diff --git a/mason-mariadb/Cargo.toml b/mason-mariadb/Cargo.toml new file mode 100644 index 00000000..e89e5140 --- /dev/null +++ b/mason-mariadb/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "mason-mariadb" +version = "0.1.0" +authors = ["Daniel Akhterov "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +mason-core = { path = "../mason-core" } +runtime = "=0.3.0-alpha.4" +futures-preview = "=0.3.0-alpha.16" +failure = "0.1" +byteorder = "1.3.1" +log = "0.4" +hex = "0.3.2" +bytes = "0.4.12" +memchr = "2.2.0" diff --git a/mason-mariadb/src/connection/establish.rs b/mason-mariadb/src/connection/establish.rs new file mode 100644 index 00000000..ea551651 --- /dev/null +++ b/mason-mariadb/src/connection/establish.rs @@ -0,0 +1,16 @@ +use super::Connection; +use crate::protocol::{ + client::{PasswordMessage, StartupMessage}, + server::Message as ServerMessage, +}; +use futures::StreamExt; +use mason_core::ConnectOptions; +use std::io; + +pub async fn establish<'a, 'b: 'a>( + conn: &'a mut Connection, + options: ConnectOptions<'b>, +) -> io::Result<()> { + // The actual connection establishing + Ok(()) +} diff --git a/mason-mariadb/src/connection/mod.rs b/mason-mariadb/src/connection/mod.rs new file mode 100644 index 00000000..c592728d --- /dev/null +++ b/mason-mariadb/src/connection/mod.rs @@ -0,0 +1,83 @@ +use crate::protocol::{ + client::{Serialize, Terminate}, + server::Message as ServerMessage, +}; +use bytes::BytesMut; +use futures::{ + channel::mpsc, + io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, + SinkExt, StreamExt, +}; +use mason_core::ConnectOptions; +use runtime::{net::TcpStream, task::JoinHandle}; +use std::io; + +mod establish; +mod query; + +pub struct Connection { + writer: WriteHalf, + incoming: mpsc::UnboundedReceiver, + + // Buffer used when serializing outgoing messages + wbuf: Vec, + + // Handle to coroutine reading messages from the stream + receiver: JoinHandle>, + + // Process ID of the Backend + process_id: i32, + + // Backend-unique key to use to send a cancel query message to the server + secret_key: i32, +} + +impl Connection { + pub async fn establish(options: ConnectOptions<'_>) -> io::Result { + let stream = TcpStream::connect((options.host, options.port)).await?; + let (reader, writer) = stream.split(); + let (tx, rx) = mpsc::unbounded(); + let receiver = runtime::spawn(receiver(reader, tx)); + let mut conn = Self { + wbuf: Vec::with_capacity(1024), + writer, + receiver, + incoming: rx, + process_id: -1, + secret_key: -1, + }; + + establish::establish(&mut conn, options).await?; + + Ok(conn) + } + + async fn send(&mut self, message: S) -> io::Result<()> + where + S: Serialize, + { + self.wbuf.clear(); + + message.serialize(&mut self.wbuf); + + self.writer.write_all(&self.wbuf).await?; + self.writer.flush().await?; + + Ok(()) + } +} + +async fn receiver( + mut reader: ReadHalf, + mut sender: mpsc::UnboundedSender, +) -> io::Result<()> { + let mut rbuf = BytesMut::with_capacity(0); + let mut len = 0; + + loop { + println!("Inside receiver loop"); + std::thread::sleep(std::time::Duration::from_millis(1000)); + } + + Ok(()) +} diff --git a/mason-mariadb/src/lib.rs b/mason-mariadb/src/lib.rs new file mode 100644 index 00000000..8a8bf704 --- /dev/null +++ b/mason-mariadb/src/lib.rs @@ -0,0 +1,5 @@ +#![feature(non_exhaustive, async_await)] +#![allow(clippy::needless_lifetimes)] + +// mod connection; +mod protocol; diff --git a/mason-mariadb/src/protocol/client.rs b/mason-mariadb/src/protocol/client.rs new file mode 100644 index 00000000..985f58c2 --- /dev/null +++ b/mason-mariadb/src/protocol/client.rs @@ -0,0 +1,13 @@ +pub trait Serialize { + fn serialize(&self, buf: &mut Vec); +} + +#[derive(Debug)] +pub struct StartupMessage<'a> { + pub host: &'a str, +} + +impl<'a> Serialize for StartupMessage<'a> { + fn serialize(&self, buf: &mut Vec) { + } +} diff --git a/mason-mariadb/src/protocol/mod.rs b/mason-mariadb/src/protocol/mod.rs new file mode 100644 index 00000000..c07f47e0 --- /dev/null +++ b/mason-mariadb/src/protocol/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod server; diff --git a/mason-mariadb/src/protocol/server.rs b/mason-mariadb/src/protocol/server.rs new file mode 100644 index 00000000..9beac954 --- /dev/null +++ b/mason-mariadb/src/protocol/server.rs @@ -0,0 +1,133 @@ +use failure::Error; +use std::iter::FromIterator; +use byteorder::LittleEndian; +use byteorder::ByteOrder; + +pub trait Deserialize: Sized { + fn deserialize(buf: &mut Vec) -> Result; +} + +#[derive(Debug)] +#[non_exhaustive] +pub enum Message { + InitialHandshakePacket(InitialHandshakePacket), +} + +pub enum Capabilities { + ClientMysql = 1, + FoundRows = 2, + ConnectWithDb = 8, + Compress = 32, + LocalFiles = 128, + IgnroeSpace = 256, + ClientProtocol41 = 1 << 9, + ClientInteractive = 1 << 10, + SSL = 1 << 11, + Transactions = 1 << 12, + SecureConnection = 1 << 13, + MultiStatements = 1 << 16, + MultiResults = 1 << 17, + PsMultiResults = 1 << 18, + PluginAuth = 1 << 19, + ConnectAttrs = 1 << 20, + PluginAuthLenencClientData = 1 << 21, + ClientSessionTrack = 1 << 23, + ClientDeprecateEof = 1 << 24, + MariaDbClientProgress = 1 << 32, + MariaDbClientComMulti = 1 << 33, + MariaClientStmtBulkOperations = 1 << 34, +} + +#[derive(Default, Debug)] +pub struct InitialHandshakePacket { + pub protocol_version: u8, + pub server_version: String, + pub connection_id: u32, + pub auth_seed: String, + pub reserved: u8, + pub capabilities1: u16, + pub collation: u8, + pub status: u16, + pub plugin_data_length: u8, + pub scramble2: Option, + pub reserved2: Option, + pub auth_plugin_name: Option, +} + +impl Deserialize for InitialHandshakePacket { + fn deserialize(buf: &mut Vec) -> Result { + let mut index = 0; + let mut null_index = 0; + let protocol_version = buf[0] as u8; + index += 1; + + // Find index of null character + null_index = index; + loop { + if buf[null_index] == b'\0' { + break; + } + null_index += 1; + } + let server_version = String::from_iter(buf[index..null_index] + .iter() + .map(|b| char::from(b.clone())) + .collect::>() + .into_iter() + ); + // Script null character + index = null_index + 1; + + let connection_id = LittleEndian::read_u32(&buf); + + // Increment by index by 4 bytes since we read a u32 + index += 4; + + let auth_seed = String::from_iter(buf[index..index+8] + .iter() + .map(|b| char::from(b.clone())) + .collect::>() + .into_iter() + ); + index += 8; + + // Skip reserved byte + index += 1; + + let mut capabilities = LittleEndian::read_u16(&buf[index..]) as u32; + index += 2; + + let collation = buf[index]; + index += 1; + + let status = LittleEndian::read_u16(&buf[index..]); + index += 2; + + capabilities |= LittleEndian::read_u16(&buf[index..]) as u32; + index += 2; + + let mut plugin_data_length = None; + if capabilities as u128 & Capabilities::PluginAuth as u128> 0 { + plugin_data_length = Some(buf[index] as u8); + } + index += 1; + + // Skip filler + index += 6; + + if capabilities as u128 & Capabilities::ClientMysql as u128 == 0 { + capabilities |= LittleEndian::read_u32(&buf[index..]); + } + index += 4; + + let mut scramble2: Option = None; + let mut auth_plugin_name: Option = None; + if capabilities as u128 & Capabilities::SecureConnection as u128 > 0 { + // TODO: scramble 2nd part. Length = max(12, plugin_data_length - 9) + } else { + // TODO: auth_plugin_name null temrinated string + } + + Ok(InitialHandshakePacket::default()) + } +}