diff --git a/sqlx-mysql/src/connection/command.rs b/sqlx-mysql/src/connection/command.rs new file mode 100644 index 00000000..7537e0cb --- /dev/null +++ b/sqlx-mysql/src/connection/command.rs @@ -0,0 +1,51 @@ +use std::collections::VecDeque; + +use sqlx_core::Runtime; + +use super::MySqlConnection; + +pub(crate) enum Command { + // expecting [OkPacket] + Simple, + Query(QueryCommand), +} + +pub(crate) struct QueryCommand { + pub(crate) state: QueryState, + pub(crate) columns: u64, +} + +pub(crate) enum QueryState { + // expecting [QueryResponse] + QueryResponse, + // expecting [QueryStep] + QueryStep, + // expecting [ColumnDefinition] + ColumnDefinition { index: u64 }, +} + +pub(crate) fn begin_query_command(commands: &mut VecDeque) -> &mut QueryCommand { + commands + .push_back(Command::Query(QueryCommand { state: QueryState::QueryResponse, columns: 0 })); + + if let Some(Command::Query(query_cmd)) = commands.back_mut() { + query_cmd + } else { + // UNREACHABLE: just pushed the query command + unreachable!() + } +} + +impl MySqlConnection { + pub(crate) fn begin_simple_command(&mut self) { + self.commands.push_back(Command::Simple); + } + + pub(crate) fn end_command(&mut self) { + self.commands.pop_front(); + } + + // pub(crate) fn flush_commands(&mut self) { + // // [...] + // } +} diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs new file mode 100644 index 00000000..ec2444f0 --- /dev/null +++ b/sqlx-mysql/src/connection/executor.rs @@ -0,0 +1,109 @@ +#[cfg(feature = "async")] +use futures_util::{future::BoxFuture, FutureExt}; +use sqlx_core::{Executor, Result, Runtime}; + +use super::command::{begin_query_command, QueryState}; +use super::MySqlConnection; +use crate::protocol::{ColumnDefinition, Query, QueryResponse, QueryStep, Status}; +use crate::MySql; + +macro_rules! impl_execute { + ($(@$blocking:ident)? $self:ident, $sql:ident) => {{ + let Self { ref mut stream, ref mut commands, capabilities, .. } = *$self; + + stream.write_packet(&Query { sql: $sql })?; + + // STATE: remember that we are now exepcting a query response + let cmd = begin_query_command(commands); + + #[allow(clippy::while_let_loop, unused_labels)] + 'results: loop { + match read_packet!($(@$blocking)? stream).deserialize_with(capabilities)? { + QueryResponse::ResultSet { columns: num_columns } => { + #[allow(clippy::cast_possible_truncation)] + let mut columns = Vec::::with_capacity(num_columns as usize); + + // STATE: remember how many columns are in this result set + cmd.columns = num_columns; + + for index in 0..num_columns { + // STATE: remember that we are expecting the #index column definition + cmd.state = QueryState::ColumnDefinition { index }; + + columns.push(read_packet!($(@$blocking)? stream).deserialize()?); + } + + // STATE: remember that we are now expecting a row or the end of the result set + cmd.state = QueryState::QueryStep; + + 'rows: loop { + match read_packet!($(@$blocking)? stream) + .deserialize_with((capabilities, columns.as_slice()))? + { + QueryStep::End(end) => { + // TODO: handle rowsaffected/matched - if any + + if !end.status.contains(Status::MORE_RESULTS_EXISTS) { + // TODO: STATE: the current command is complete + + break 'results; + } + } + + QueryStep::Row(row) => { + // TODO: handle row + } + } + } + } + + QueryResponse::Ok(ok) => { + // TODO: handle rows affected + // no rows possible to ever return + break; + } + } + } + + Ok(()) + }}; +} + +#[cfg(feature = "async")] +impl MySqlConnection { + async fn execute_async(&mut self, sql: &str) -> Result<()> { + impl_execute!(self, sql) + } +} + +#[cfg(feature = "blocking")] +impl MySqlConnection { + fn execute_blocking(&mut self, sql: &str) -> Result<()> { + impl_execute!(@blocking self, sql) + } +} + +impl Executor for MySqlConnection { + type Database = MySql; + + #[cfg(feature = "async")] + fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>> + where + Rt: sqlx_core::Async, + 'e: 'x, + 'q: 'x, + { + self.execute_async(sql).boxed() + } +} + +#[cfg(feature = "blocking")] +impl sqlx_core::blocking::Executor for MySqlConnection { + fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> Result<()> + where + 'e: 'x, + 'q: 'x, + { + self.execute_blocking(sql) + } +} diff --git a/sqlx/src/mysql/connection.rs b/sqlx/src/mysql/connection.rs index bd839b7b..b873bbee 100644 --- a/sqlx/src/mysql/connection.rs +++ b/sqlx/src/mysql/connection.rs @@ -7,6 +7,7 @@ use super::{MySql, MySqlConnectOptions}; #[cfg(feature = "async")] use crate::{Async, Result}; use crate::{Close, Connect, Connection, DefaultRuntime, Runtime}; +use sqlx_core::Executor; /// A single connection (also known as a session) to a MySQL database server. #[allow(clippy::module_name_repetitions)] @@ -37,6 +38,10 @@ impl MySqlConnection { self.0.ping().await } + pub async fn execute(&mut self, sql: &str) -> Result<()> { + self.0.execute(sql).await + } + /// Explicitly close this database connection. /// /// This method is **not required** for safe and consistent operation. However, it is @@ -91,3 +96,15 @@ impl Connection for MySqlConnection { self.ping().boxed() } } + +impl Executor for MySqlConnection { + #[cfg(feature = "async")] + fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>> + where + Rt: Async, + 'e: 'x, + 'q: 'x, + { + self.0.execute(sql) + } +}