Fix warnings

This commit is contained in:
Daniel Akhterov 2019-06-30 14:57:59 -07:00 committed by Daniel Akhterov
parent b787fa8475
commit a50c2805ba
9 changed files with 51 additions and 37 deletions

View File

@ -17,3 +17,12 @@ hex = "0.3.2"
bytes = "0.4.12"
memchr = "2.2.0"
bitflags = "1.1.0"
[dev-dependencies]
criterion = "0.2.11"
[[bench]]
name = "bench"
harness = false

View File

@ -0,0 +1,21 @@
#[macro_use]
extern crate criterion;
use criterion::{Criterion, black_box};
fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("establish connection", |b| {
b.iter(|| {
let conn = Connection::establish(ConnectOptions {
host: "127.0.0.1",
port: 3306,
user: Some("root"),
database: None,
password: None,
});
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@ -1,17 +1,15 @@
use super::Connection;
use crate::protocol::{
client::{ComPing, ComQuit, HandshakeResponsePacket, Serialize},
client::HandshakeResponsePacket,
server::{Capabilities, Deserialize, InitialHandshakePacket, Message as ServerMessage},
};
use bytes::Bytes;
use failure::{err_msg, Error};
use futures::StreamExt;
use mason_core::ConnectOptions;
use std::io;
pub async fn establish<'a, 'b: 'a>(
conn: &'a mut Connection,
options: ConnectOptions<'b>,
_options: ConnectOptions<'b>,
) -> Result<(), Error> {
let init_packet = InitialHandshakePacket::deserialize(&conn.stream.next_bytes().await?)?;
@ -37,7 +35,7 @@ pub async fn establish<'a, 'b: 'a>(
Some(ServerMessage::ErrPacket(message)) => Err(err_msg(format!("{:?}", message))),
Some(message) => {
panic!("Did not receive OkPacket nor ErrPacket");
panic!("Did not receive OkPacket nor ErrPacket. Received: {:?}", message);
}
None => {
@ -49,7 +47,7 @@ pub async fn establish<'a, 'b: 'a>(
#[cfg(test)]
mod test {
use super::*;
use failure::{err_msg, Error};
use failure::Error;
#[runtime::test]
async fn it_connects() -> Result<(), Error> {

View File

@ -2,22 +2,19 @@ use crate::protocol::{
client::{ComPing, ComQuit, Serialize},
serialize::serialize_length,
server::{
Capabilities, Deserialize, InitialHandshakePacket, Message as ServerMessage,
Capabilities, Deserialize, Message as ServerMessage,
ServerStatusFlag, OkPacket
},
};
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use failure::{err_msg, Error};
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use failure::Error;
use futures::{
io::{AsyncRead, AsyncWriteExt},
prelude::*,
task::{Context, Poll},
Stream,
};
use mason_core::ConnectOptions;
use runtime::{net::TcpStream, task::JoinHandle};
use std::io;
use runtime::net::TcpStream;
mod establish;
// mod query;

View File

@ -1,5 +1,7 @@
#![feature(non_exhaustive, async_await)]
#![allow(clippy::needless_lifetimes)]
// TODO: Remove this once API has matured
#![allow(dead_code)]
#[macro_use]
extern crate bitflags;

View File

@ -9,7 +9,6 @@
use super::server::Capabilities;
use crate::protocol::serialize::*;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use bytes::{Bytes, BytesMut};
use failure::Error;

View File

@ -105,7 +105,6 @@ pub fn deserialize_string_eof(buf: &Bytes, index: &mut usize) -> Bytes {
#[inline]
pub fn deserialize_string_null(buf: &Bytes, index: &mut usize) -> Result<Bytes, Error> {
if let Some(null_index) = memchr::memchr(0, &buf[*index..]) {
let new_index = null_index + *index;
let value = Bytes::from(&buf[*index..*index + null_index]);
*index = *index + null_index + 1;
Ok(value)

View File

@ -1,6 +1,5 @@
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use failure::{err_msg, Error};
const U24_MAX: usize = 0xFF_FF_FF;
@ -24,12 +23,12 @@ pub fn serialize_length(buf: &mut BytesMut) {
#[inline]
pub fn serialize_int_8(buf: &mut BytesMut, value: u64) {
buf.put_u64::<LittleEndian>(value);
buf.put_u64_le(value);
}
#[inline]
pub fn serialize_int_4(buf: &mut BytesMut, value: u32) {
buf.put_u32::<LittleEndian>(value);
buf.put_u32_le(value);
}
#[inline]
@ -40,7 +39,7 @@ pub fn serialize_int_3(buf: &mut BytesMut, value: u32) {
#[inline]
pub fn serialize_int_2(buf: &mut BytesMut, value: u16) {
buf.put_u16::<LittleEndian>(value);
buf.put_u16_le(value);
}
#[inline]
@ -60,7 +59,7 @@ pub fn serialize_int_lenenc(buf: &mut BytesMut, value: Option<&usize>) {
} else if *value > std::u8::MAX as usize && *value <= std::u16::MAX as usize {
buf.put_u8(0xFC);
serialize_int_2(buf, *value as u16);
} else if *value >= 0 && *value <= std::u8::MAX as usize {
} else if *value <= std::u8::MAX as usize {
buf.put_u8(0xFA);
serialize_int_1(buf, *value as u8);
} else {
@ -130,7 +129,6 @@ pub fn serialize_byte_eof(buf: &mut BytesMut, bytes: &Bytes) {
#[cfg(test)]
mod tests {
use super::*;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
// [X] serialize_int_lenenc_u64
// [X] serialize_int_lenenc_u32

View File

@ -3,7 +3,6 @@
use crate::protocol::deserialize::*;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use core::num::FpCategory::Infinite;
use failure::{err_msg, Error};
pub trait Deserialize: Sized {
@ -123,6 +122,7 @@ pub struct InitialHandshakePacket {
#[derive(Default, Debug)]
pub struct OkPacket {
pub length: u32,
pub seq_no: u8,
pub affected_rows: Option<usize>,
pub last_insert_id: Option<usize>,
@ -135,6 +135,7 @@ pub struct OkPacket {
#[derive(Default, Debug)]
pub struct ErrPacket {
pub length: u32,
pub seq_no: u8,
pub error_code: u16,
pub stage: Option<u8>,
@ -152,14 +153,13 @@ impl Message {
return Ok(None);
}
let mut index = 0_usize;
let length = LittleEndian::read_u24(&buf[0..]) as usize;
if buf.len() < length + 4 {
return Ok(None);
}
let buf = buf.split_to(length + 4).freeze();
let serial_number = [3];
let _seq_no = [3];
let tag = buf[4];
Ok(Some(match tag {
@ -168,17 +168,6 @@ impl Message {
_ => unimplemented!(),
}))
}
pub fn init(buf: &mut BytesMut) -> Result<Option<Self>, Error> {
let length = LittleEndian::read_u24(&buf[0..]) as usize;
if buf.len() < length + 4 {
return Ok(None);
}
match InitialHandshakePacket::deserialize(&buf.split_to(length + 4).freeze()) {
Ok(v) => Ok(Some(Message::InitialHandshakePacket(v))),
Err(_) => Ok(None),
}
}
}
impl Deserialize for InitialHandshakePacket {
@ -288,6 +277,7 @@ impl Deserialize for OkPacket {
let info = Bytes::from(&buf[index..]);
Ok(OkPacket {
length,
seq_no,
affected_rows,
last_insert_id,
@ -340,6 +330,7 @@ impl Deserialize for ErrPacket {
}
Ok(ErrPacket {
length,
seq_no,
error_code,
stage,