wip(mysql): impl Executor

This commit is contained in:
Ryan Leckey 2021-01-26 01:16:38 -08:00
parent ce2fba7b8d
commit 5836e1eb63
No known key found for this signature in database
GPG Key ID: F8AA68C235AB08C9
3 changed files with 177 additions and 0 deletions

View File

@ -0,0 +1,51 @@
use std::collections::VecDeque;
use sqlx_core::Runtime;
use super::MySqlConnection;
pub(crate) enum Command {
// expecting [OkPacket]
Simple,
Query(QueryCommand),
}
pub(crate) struct QueryCommand {
pub(crate) state: QueryState,
pub(crate) columns: u64,
}
pub(crate) enum QueryState {
// expecting [QueryResponse]
QueryResponse,
// expecting [QueryStep]
QueryStep,
// expecting [ColumnDefinition]
ColumnDefinition { index: u64 },
}
pub(crate) fn begin_query_command(commands: &mut VecDeque<Command>) -> &mut QueryCommand {
commands
.push_back(Command::Query(QueryCommand { state: QueryState::QueryResponse, columns: 0 }));
if let Some(Command::Query(query_cmd)) = commands.back_mut() {
query_cmd
} else {
// UNREACHABLE: just pushed the query command
unreachable!()
}
}
impl<Rt: Runtime> MySqlConnection<Rt> {
pub(crate) fn begin_simple_command(&mut self) {
self.commands.push_back(Command::Simple);
}
pub(crate) fn end_command(&mut self) {
self.commands.pop_front();
}
// pub(crate) fn flush_commands(&mut self) {
// // [...]
// }
}

View File

@ -0,0 +1,109 @@
#[cfg(feature = "async")]
use futures_util::{future::BoxFuture, FutureExt};
use sqlx_core::{Executor, Result, Runtime};
use super::command::{begin_query_command, QueryState};
use super::MySqlConnection;
use crate::protocol::{ColumnDefinition, Query, QueryResponse, QueryStep, Status};
use crate::MySql;
macro_rules! impl_execute {
($(@$blocking:ident)? $self:ident, $sql:ident) => {{
let Self { ref mut stream, ref mut commands, capabilities, .. } = *$self;
stream.write_packet(&Query { sql: $sql })?;
// STATE: remember that we are now exepcting a query response
let cmd = begin_query_command(commands);
#[allow(clippy::while_let_loop, unused_labels)]
'results: loop {
match read_packet!($(@$blocking)? stream).deserialize_with(capabilities)? {
QueryResponse::ResultSet { columns: num_columns } => {
#[allow(clippy::cast_possible_truncation)]
let mut columns = Vec::<ColumnDefinition>::with_capacity(num_columns as usize);
// STATE: remember how many columns are in this result set
cmd.columns = num_columns;
for index in 0..num_columns {
// STATE: remember that we are expecting the #index column definition
cmd.state = QueryState::ColumnDefinition { index };
columns.push(read_packet!($(@$blocking)? stream).deserialize()?);
}
// STATE: remember that we are now expecting a row or the end of the result set
cmd.state = QueryState::QueryStep;
'rows: loop {
match read_packet!($(@$blocking)? stream)
.deserialize_with((capabilities, columns.as_slice()))?
{
QueryStep::End(end) => {
// TODO: handle rowsaffected/matched - if any
if !end.status.contains(Status::MORE_RESULTS_EXISTS) {
// TODO: STATE: the current command is complete
break 'results;
}
}
QueryStep::Row(row) => {
// TODO: handle row
}
}
}
}
QueryResponse::Ok(ok) => {
// TODO: handle rows affected
// no rows possible to ever return
break;
}
}
}
Ok(())
}};
}
#[cfg(feature = "async")]
impl<Rt: sqlx_core::Async> MySqlConnection<Rt> {
async fn execute_async(&mut self, sql: &str) -> Result<()> {
impl_execute!(self, sql)
}
}
#[cfg(feature = "blocking")]
impl<Rt: sqlx_core::blocking::Runtime> MySqlConnection<Rt> {
fn execute_blocking(&mut self, sql: &str) -> Result<()> {
impl_execute!(@blocking self, sql)
}
}
impl<Rt: Runtime> Executor<Rt> for MySqlConnection<Rt> {
type Database = MySql;
#[cfg(feature = "async")]
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>>
where
Rt: sqlx_core::Async,
'e: 'x,
'q: 'x,
{
self.execute_async(sql).boxed()
}
}
#[cfg(feature = "blocking")]
impl<Rt: sqlx_core::blocking::Runtime> sqlx_core::blocking::Executor<Rt> for MySqlConnection<Rt> {
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> Result<()>
where
'e: 'x,
'q: 'x,
{
self.execute_blocking(sql)
}
}

View File

@ -7,6 +7,7 @@ use super::{MySql, MySqlConnectOptions};
#[cfg(feature = "async")]
use crate::{Async, Result};
use crate::{Close, Connect, Connection, DefaultRuntime, Runtime};
use sqlx_core::Executor;
/// A single connection (also known as a session) to a MySQL database server.
#[allow(clippy::module_name_repetitions)]
@ -37,6 +38,10 @@ impl<Rt: Async> MySqlConnection<Rt> {
self.0.ping().await
}
pub async fn execute(&mut self, sql: &str) -> Result<()> {
self.0.execute(sql).await
}
/// Explicitly close this database connection.
///
/// This method is **not required** for safe and consistent operation. However, it is
@ -91,3 +96,15 @@ impl<Rt: Runtime> Connection<Rt> for MySqlConnection<Rt> {
self.ping().boxed()
}
}
impl<Rt: Runtime> Executor<Rt> for MySqlConnection<Rt> {
#[cfg(feature = "async")]
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>>
where
Rt: Async,
'e: 'x,
'q: 'x,
{
self.0.execute(sql)
}
}