refactor(mysql): break up execute() into executor/columns and executor/execute

This commit is contained in:
Ryan Leckey 2021-01-29 23:38:32 -08:00
parent d04dc77b4f
commit 93dc33adc0
No known key found for this signature in database
GPG Key ID: F8AA68C235AB08C9
4 changed files with 265 additions and 83 deletions

View File

@ -2,92 +2,19 @@
use futures_util::{future::BoxFuture, FutureExt};
use sqlx_core::{Executor, Result, Runtime};
use super::command::{begin_query_command, QueryState};
use super::MySqlConnection;
use crate::protocol::{ColumnDefinition, Query, QueryResponse, QueryStep, Status};
use crate::MySql;
use crate::{MySql, MySqlConnection, MySqlQueryResult, MySqlRow};
macro_rules! impl_execute {
($(@$blocking:ident)? $self:ident, $sql:ident) => {{
let Self { ref mut stream, ref mut commands, capabilities, .. } = *$self;
#[macro_use]
mod columns;
stream.write_packet(&Query { sql: $sql })?;
// STATE: remember that we are now exepcting a query response
let cmd = begin_query_command(commands);
#[allow(clippy::while_let_loop, unused_labels)]
'results: loop {
match read_packet!($(@$blocking)? stream).deserialize_with(capabilities)? {
QueryResponse::ResultSet { columns: num_columns } => {
#[allow(clippy::cast_possible_truncation)]
let mut columns = Vec::<ColumnDefinition>::with_capacity(num_columns as usize);
// STATE: remember how many columns are in this result set
cmd.columns = num_columns;
for index in 0..num_columns {
// STATE: remember that we are expecting the #index column definition
cmd.state = QueryState::ColumnDefinition { index };
columns.push(read_packet!($(@$blocking)? stream).deserialize()?);
}
// STATE: remember that we are now expecting a row or the end of the result set
cmd.state = QueryState::QueryStep;
'rows: loop {
match read_packet!($(@$blocking)? stream)
.deserialize_with((capabilities, columns.as_slice()))?
{
QueryStep::End(end) => {
// TODO: handle rowsaffected/matched - if any
if !end.status.contains(Status::MORE_RESULTS_EXISTS) {
// TODO: STATE: the current command is complete
break 'results;
}
}
QueryStep::Row(row) => {
// TODO: handle row
}
}
}
}
QueryResponse::Ok(ok) => {
// TODO: handle rows affected
// no rows possible to ever return
break;
}
}
}
Ok(())
}};
}
#[cfg(feature = "async")]
impl<Rt: sqlx_core::Async> MySqlConnection<Rt> {
async fn execute_async(&mut self, sql: &str) -> Result<()> {
impl_execute!(self, sql)
}
}
#[cfg(feature = "blocking")]
impl<Rt: sqlx_core::blocking::Runtime> MySqlConnection<Rt> {
fn execute_blocking(&mut self, sql: &str) -> Result<()> {
impl_execute!(@blocking self, sql)
}
}
#[macro_use]
mod execute;
impl<Rt: Runtime> Executor<Rt> for MySqlConnection<Rt> {
type Database = MySql;
#[cfg(feature = "async")]
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>>
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<MySqlQueryResult>>
where
Rt: sqlx_core::Async,
'e: 'x,
@ -95,11 +22,41 @@ impl<Rt: Runtime> Executor<Rt> for MySqlConnection<Rt> {
{
self.execute_async(sql).boxed()
}
fn fetch_all<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<Vec<MySqlRow>>>
where
Rt: sqlx_core::Async,
'e: 'x,
'q: 'x,
{
todo!()
}
fn fetch_optional<'x, 'e, 'q>(
&'e mut self,
sql: &'q str,
) -> BoxFuture<'x, Result<Option<MySqlRow>>>
where
Rt: sqlx_core::Async,
'e: 'x,
'q: 'x,
{
todo!()
}
fn fetch_one<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<MySqlRow>>
where
Rt: sqlx_core::Async,
'e: 'x,
'q: 'x,
{
todo!()
}
}
#[cfg(feature = "blocking")]
impl<Rt: sqlx_core::blocking::Runtime> sqlx_core::blocking::Executor<Rt> for MySqlConnection<Rt> {
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> Result<()>
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> Result<MySqlQueryResult>
where
'e: 'x,
'q: 'x,

View File

@ -0,0 +1,118 @@
use sqlx_core::{Result, Runtime};
use crate::connection::command::{QueryCommand, QueryState};
use crate::protocol::ColumnDefinition;
use crate::stream::MySqlStream;
macro_rules! impl_recv_columns {
($(@$blocking:ident)? $num_columns:ident, $stream:ident, $cmd:ident) => {{
#[allow(clippy::cast_possible_truncation)]
let mut columns = Vec::<ColumnDefinition>::with_capacity($num_columns as usize);
// STATE: remember how many columns are in this result set
$cmd.columns = $num_columns;
for index in 0..$num_columns {
// STATE: remember that we are expecting the #index column definition
$cmd.state = QueryState::ColumnDefinition { index };
columns.push(read_packet!($(@$blocking)? $stream).deserialize()?);
}
// STATE: remember that we are now expecting a row or the end
$cmd.state = QueryState::QueryStep;
Ok(columns)
}};
}
macro_rules! impl_recv_and_drop_columns {
($(@$blocking:ident)? $num_columns:ident, $stream:ident, $cmd:ident) => {{
// STATE: remember how many columns are in this result set
$cmd.columns = $num_columns;
for index in 0..$num_columns {
// STATE: remember that we are expecting the #index column definition
$cmd.state = QueryState::ColumnDefinition { index };
// read and immediately drop the column definition packet
// this method is only invoked when we don't care about query results
let _ = read_packet!($(@$blocking)? $stream);
}
// STATE: remember that we are now expecting a row or the end
$cmd.state = QueryState::QueryStep;
Ok(())
}};
}
impl<Rt: Runtime> MySqlStream<Rt> {
#[cfg(feature = "async")]
pub(super) async fn recv_columns_async(
&mut self,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<Vec<ColumnDefinition>>
where
Rt: sqlx_core::Async,
{
impl_recv_columns!(columns, self, cmd)
}
#[cfg(feature = "async")]
pub(super) async fn recv_and_drop_columns_async(
&mut self,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<()>
where
Rt: sqlx_core::Async,
{
impl_recv_and_drop_columns!(columns, self, cmd)
}
#[cfg(feature = "blocking")]
pub(crate) fn recv_columns_blocking(
&mut self,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<Vec<ColumnDefinition>>
where
Rt: sqlx_core::blocking::Runtime,
{
impl_recv_columns!(@blocking columns, self, cmd)
}
#[cfg(feature = "blocking")]
pub(crate) fn recv_and_drop_columns_blocking(
&mut self,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<()>
where
Rt: sqlx_core::blocking::Runtime,
{
impl_recv_and_drop_columns!(@blocking columns, self, cmd)
}
}
macro_rules! recv_columns {
(@blocking $columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_columns_blocking($columns, $cmd)?
};
($columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_columns_async($columns, $cmd).await?
};
}
macro_rules! recv_and_drop_columns {
(@blocking $columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_and_drop_columns_blocking($columns, $cmd)?
};
($columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_and_drop_columns_async($columns, $cmd).await?
};
}

View File

@ -0,0 +1,76 @@
use sqlx_core::Result;
use crate::connection::command::{begin_query_command, QueryState};
use crate::protocol::{Query, QueryResponse, QueryStep, Status};
use crate::{MySqlConnection, MySqlQueryResult};
macro_rules! impl_execute {
($(@$blocking:ident)? $self:ident, $sql:ident) => {{
let Self { ref mut stream, ref mut commands, capabilities, .. } = *$self;
// send the server a text-based query that will be executed immediately
// replies with ERR, OK, or a result set
stream.write_packet(&Query { sql: $sql })?;
// STATE: remember that we are now exepcting a query response
let cmd = begin_query_command(commands);
// default an empty query result
// execute collects all discovered query results and SUMs
// their values together
let mut result = MySqlQueryResult::default();
#[allow(clippy::while_let_loop, unused_labels)]
'results: loop {
let ok = 'result: loop {
match read_packet!($(@$blocking)? stream).deserialize_with(capabilities)? {
QueryResponse::Ok(ok) => break 'result ok,
QueryResponse::ResultSet { columns } => {
// acknowledge but discard any columns
// execute returns no rows
recv_and_drop_columns!($(@$blocking)? columns, stream, cmd);
'rows: loop {
match read_packet!($(@$blocking)? stream).deserialize_with((capabilities, &[][..]))? {
// execute ignores any rows returned
// but we do increment affected rows
QueryStep::Row(_row) => result.0.affected_rows += 1,
QueryStep::End(ok) => break 'result ok,
}
}
}
}
};
// fold this into the total result for the SQL
result.extend(Some(ok.into()));
if !result.0.status.contains(Status::MORE_RESULTS_EXISTS) {
// no more results, time to finally call it quits
break;
}
// STATE: expecting a response from another statement
cmd.state = QueryState::QueryResponse;
}
// STATE: the current command is complete
$self.end_command();
Ok(result)
}};
}
#[cfg(feature = "async")]
impl<Rt: sqlx_core::Async> MySqlConnection<Rt> {
pub(super) async fn execute_async(&mut self, sql: &str) -> Result<MySqlQueryResult> {
impl_execute!(self, sql)
}
}
#[cfg(feature = "blocking")]
impl<Rt: sqlx_core::blocking::Runtime> MySqlConnection<Rt> {
pub(super) fn execute_blocking(&mut self, sql: &str) -> Result<MySqlQueryResult> {
impl_execute!(@blocking self, sql)
}
}

View File

@ -2,12 +2,13 @@ use std::fmt::{self, Debug, Formatter};
#[cfg(feature = "async")]
use futures_util::future::{BoxFuture, FutureExt};
use sqlx_core::Executor;
use sqlx_mysql::{MySqlQueryResult, MySqlRow};
use super::{MySql, MySqlConnectOptions};
#[cfg(feature = "async")]
use crate::{Async, Result};
use crate::{Close, Connect, Connection, DefaultRuntime, Runtime};
use sqlx_core::Executor;
/// A single connection (also known as a session) to a MySQL database server.
#[allow(clippy::module_name_repetitions)]
@ -38,7 +39,7 @@ impl<Rt: Async> MySqlConnection<Rt> {
self.0.ping().await
}
pub async fn execute(&mut self, sql: &str) -> Result<()> {
pub async fn execute(&mut self, sql: &str) -> Result<MySqlQueryResult> {
self.0.execute(sql).await
}
@ -101,7 +102,7 @@ impl<Rt: Runtime> Executor<Rt> for MySqlConnection<Rt> {
type Database = MySql;
#[cfg(feature = "async")]
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>>
fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<MySqlQueryResult>>
where
Rt: Async,
'e: 'x,
@ -109,4 +110,34 @@ impl<Rt: Runtime> Executor<Rt> for MySqlConnection<Rt> {
{
self.0.execute(sql)
}
fn fetch_all<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<Vec<MySqlRow>>>
where
Rt: Async,
'e: 'x,
'q: 'x,
{
todo!()
}
fn fetch_optional<'x, 'e, 'q>(
&'e mut self,
sql: &'q str,
) -> BoxFuture<'x, Result<Option<MySqlRow>>>
where
Rt: Async,
'e: 'x,
'q: 'x,
{
todo!()
}
fn fetch_one<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<MySqlRow>>
where
Rt: Async,
'e: 'x,
'q: 'x,
{
todo!()
}
}