diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs index ec2444f0..4a1f23df 100644 --- a/sqlx-mysql/src/connection/executor.rs +++ b/sqlx-mysql/src/connection/executor.rs @@ -2,92 +2,19 @@ use futures_util::{future::BoxFuture, FutureExt}; use sqlx_core::{Executor, Result, Runtime}; -use super::command::{begin_query_command, QueryState}; -use super::MySqlConnection; -use crate::protocol::{ColumnDefinition, Query, QueryResponse, QueryStep, Status}; -use crate::MySql; +use crate::{MySql, MySqlConnection, MySqlQueryResult, MySqlRow}; -macro_rules! impl_execute { - ($(@$blocking:ident)? $self:ident, $sql:ident) => {{ - let Self { ref mut stream, ref mut commands, capabilities, .. } = *$self; +#[macro_use] +mod columns; - stream.write_packet(&Query { sql: $sql })?; - - // STATE: remember that we are now exepcting a query response - let cmd = begin_query_command(commands); - - #[allow(clippy::while_let_loop, unused_labels)] - 'results: loop { - match read_packet!($(@$blocking)? stream).deserialize_with(capabilities)? { - QueryResponse::ResultSet { columns: num_columns } => { - #[allow(clippy::cast_possible_truncation)] - let mut columns = Vec::::with_capacity(num_columns as usize); - - // STATE: remember how many columns are in this result set - cmd.columns = num_columns; - - for index in 0..num_columns { - // STATE: remember that we are expecting the #index column definition - cmd.state = QueryState::ColumnDefinition { index }; - - columns.push(read_packet!($(@$blocking)? stream).deserialize()?); - } - - // STATE: remember that we are now expecting a row or the end of the result set - cmd.state = QueryState::QueryStep; - - 'rows: loop { - match read_packet!($(@$blocking)? stream) - .deserialize_with((capabilities, columns.as_slice()))? - { - QueryStep::End(end) => { - // TODO: handle rowsaffected/matched - if any - - if !end.status.contains(Status::MORE_RESULTS_EXISTS) { - // TODO: STATE: the current command is complete - - break 'results; - } - } - - QueryStep::Row(row) => { - // TODO: handle row - } - } - } - } - - QueryResponse::Ok(ok) => { - // TODO: handle rows affected - // no rows possible to ever return - break; - } - } - } - - Ok(()) - }}; -} - -#[cfg(feature = "async")] -impl MySqlConnection { - async fn execute_async(&mut self, sql: &str) -> Result<()> { - impl_execute!(self, sql) - } -} - -#[cfg(feature = "blocking")] -impl MySqlConnection { - fn execute_blocking(&mut self, sql: &str) -> Result<()> { - impl_execute!(@blocking self, sql) - } -} +#[macro_use] +mod execute; impl Executor for MySqlConnection { type Database = MySql; #[cfg(feature = "async")] - fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>> + fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result> where Rt: sqlx_core::Async, 'e: 'x, @@ -95,11 +22,41 @@ impl Executor for MySqlConnection { { self.execute_async(sql).boxed() } + + fn fetch_all<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result>> + where + Rt: sqlx_core::Async, + 'e: 'x, + 'q: 'x, + { + todo!() + } + + fn fetch_optional<'x, 'e, 'q>( + &'e mut self, + sql: &'q str, + ) -> BoxFuture<'x, Result>> + where + Rt: sqlx_core::Async, + 'e: 'x, + 'q: 'x, + { + todo!() + } + + fn fetch_one<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result> + where + Rt: sqlx_core::Async, + 'e: 'x, + 'q: 'x, + { + todo!() + } } #[cfg(feature = "blocking")] impl sqlx_core::blocking::Executor for MySqlConnection { - fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> Result<()> + fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> Result where 'e: 'x, 'q: 'x, diff --git a/sqlx-mysql/src/connection/executor/columns.rs b/sqlx-mysql/src/connection/executor/columns.rs new file mode 100644 index 00000000..e25007dc --- /dev/null +++ b/sqlx-mysql/src/connection/executor/columns.rs @@ -0,0 +1,118 @@ +use sqlx_core::{Result, Runtime}; + +use crate::connection::command::{QueryCommand, QueryState}; +use crate::protocol::ColumnDefinition; +use crate::stream::MySqlStream; + +macro_rules! impl_recv_columns { + ($(@$blocking:ident)? $num_columns:ident, $stream:ident, $cmd:ident) => {{ + #[allow(clippy::cast_possible_truncation)] + let mut columns = Vec::::with_capacity($num_columns as usize); + + // STATE: remember how many columns are in this result set + $cmd.columns = $num_columns; + + for index in 0..$num_columns { + // STATE: remember that we are expecting the #index column definition + $cmd.state = QueryState::ColumnDefinition { index }; + + columns.push(read_packet!($(@$blocking)? $stream).deserialize()?); + } + + // STATE: remember that we are now expecting a row or the end + $cmd.state = QueryState::QueryStep; + + Ok(columns) + }}; +} + +macro_rules! impl_recv_and_drop_columns { + ($(@$blocking:ident)? $num_columns:ident, $stream:ident, $cmd:ident) => {{ + // STATE: remember how many columns are in this result set + $cmd.columns = $num_columns; + + for index in 0..$num_columns { + // STATE: remember that we are expecting the #index column definition + $cmd.state = QueryState::ColumnDefinition { index }; + + // read and immediately drop the column definition packet + // this method is only invoked when we don't care about query results + let _ = read_packet!($(@$blocking)? $stream); + } + + // STATE: remember that we are now expecting a row or the end + $cmd.state = QueryState::QueryStep; + + Ok(()) + }}; +} + +impl MySqlStream { + #[cfg(feature = "async")] + pub(super) async fn recv_columns_async( + &mut self, + columns: u64, + cmd: &mut QueryCommand, + ) -> Result> + where + Rt: sqlx_core::Async, + { + impl_recv_columns!(columns, self, cmd) + } + + #[cfg(feature = "async")] + pub(super) async fn recv_and_drop_columns_async( + &mut self, + columns: u64, + cmd: &mut QueryCommand, + ) -> Result<()> + where + Rt: sqlx_core::Async, + { + impl_recv_and_drop_columns!(columns, self, cmd) + } + + #[cfg(feature = "blocking")] + pub(crate) fn recv_columns_blocking( + &mut self, + columns: u64, + cmd: &mut QueryCommand, + ) -> Result> + where + Rt: sqlx_core::blocking::Runtime, + { + impl_recv_columns!(@blocking columns, self, cmd) + } + + #[cfg(feature = "blocking")] + pub(crate) fn recv_and_drop_columns_blocking( + &mut self, + columns: u64, + cmd: &mut QueryCommand, + ) -> Result<()> + where + Rt: sqlx_core::blocking::Runtime, + { + impl_recv_and_drop_columns!(@blocking columns, self, cmd) + } +} + +macro_rules! recv_columns { + (@blocking $columns:ident, $stream:ident, $cmd:ident) => { + $stream.recv_columns_blocking($columns, $cmd)? + }; + + ($columns:ident, $stream:ident, $cmd:ident) => { + $stream.recv_columns_async($columns, $cmd).await? + }; +} + +macro_rules! recv_and_drop_columns { + (@blocking $columns:ident, $stream:ident, $cmd:ident) => { + $stream.recv_and_drop_columns_blocking($columns, $cmd)? + }; + + ($columns:ident, $stream:ident, $cmd:ident) => { + $stream.recv_and_drop_columns_async($columns, $cmd).await? + }; +} diff --git a/sqlx-mysql/src/connection/executor/execute.rs b/sqlx-mysql/src/connection/executor/execute.rs new file mode 100644 index 00000000..9233f9d7 --- /dev/null +++ b/sqlx-mysql/src/connection/executor/execute.rs @@ -0,0 +1,76 @@ +use sqlx_core::Result; + +use crate::connection::command::{begin_query_command, QueryState}; +use crate::protocol::{Query, QueryResponse, QueryStep, Status}; +use crate::{MySqlConnection, MySqlQueryResult}; + +macro_rules! impl_execute { + ($(@$blocking:ident)? $self:ident, $sql:ident) => {{ + let Self { ref mut stream, ref mut commands, capabilities, .. } = *$self; + + // send the server a text-based query that will be executed immediately + // replies with ERR, OK, or a result set + stream.write_packet(&Query { sql: $sql })?; + + // STATE: remember that we are now exepcting a query response + let cmd = begin_query_command(commands); + + // default an empty query result + // execute collects all discovered query results and SUMs + // their values together + let mut result = MySqlQueryResult::default(); + + #[allow(clippy::while_let_loop, unused_labels)] + 'results: loop { + let ok = 'result: loop { + match read_packet!($(@$blocking)? stream).deserialize_with(capabilities)? { + QueryResponse::Ok(ok) => break 'result ok, + QueryResponse::ResultSet { columns } => { + // acknowledge but discard any columns + // execute returns no rows + recv_and_drop_columns!($(@$blocking)? columns, stream, cmd); + + 'rows: loop { + match read_packet!($(@$blocking)? stream).deserialize_with((capabilities, &[][..]))? { + // execute ignores any rows returned + // but we do increment affected rows + QueryStep::Row(_row) => result.0.affected_rows += 1, + QueryStep::End(ok) => break 'result ok, + } + } + } + } + }; + + // fold this into the total result for the SQL + result.extend(Some(ok.into())); + + if !result.0.status.contains(Status::MORE_RESULTS_EXISTS) { + // no more results, time to finally call it quits + break; + } + + // STATE: expecting a response from another statement + cmd.state = QueryState::QueryResponse; + } + + // STATE: the current command is complete + $self.end_command(); + + Ok(result) + }}; +} + +#[cfg(feature = "async")] +impl MySqlConnection { + pub(super) async fn execute_async(&mut self, sql: &str) -> Result { + impl_execute!(self, sql) + } +} + +#[cfg(feature = "blocking")] +impl MySqlConnection { + pub(super) fn execute_blocking(&mut self, sql: &str) -> Result { + impl_execute!(@blocking self, sql) + } +} diff --git a/sqlx/src/mysql/connection.rs b/sqlx/src/mysql/connection.rs index 7c0f5c88..69f34baa 100644 --- a/sqlx/src/mysql/connection.rs +++ b/sqlx/src/mysql/connection.rs @@ -2,12 +2,13 @@ use std::fmt::{self, Debug, Formatter}; #[cfg(feature = "async")] use futures_util::future::{BoxFuture, FutureExt}; +use sqlx_core::Executor; +use sqlx_mysql::{MySqlQueryResult, MySqlRow}; use super::{MySql, MySqlConnectOptions}; #[cfg(feature = "async")] use crate::{Async, Result}; use crate::{Close, Connect, Connection, DefaultRuntime, Runtime}; -use sqlx_core::Executor; /// A single connection (also known as a session) to a MySQL database server. #[allow(clippy::module_name_repetitions)] @@ -38,7 +39,7 @@ impl MySqlConnection { self.0.ping().await } - pub async fn execute(&mut self, sql: &str) -> Result<()> { + pub async fn execute(&mut self, sql: &str) -> Result { self.0.execute(sql).await } @@ -101,7 +102,7 @@ impl Executor for MySqlConnection { type Database = MySql; #[cfg(feature = "async")] - fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result<()>> + fn execute<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result> where Rt: Async, 'e: 'x, @@ -109,4 +110,34 @@ impl Executor for MySqlConnection { { self.0.execute(sql) } + + fn fetch_all<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result>> + where + Rt: Async, + 'e: 'x, + 'q: 'x, + { + todo!() + } + + fn fetch_optional<'x, 'e, 'q>( + &'e mut self, + sql: &'q str, + ) -> BoxFuture<'x, Result>> + where + Rt: Async, + 'e: 'x, + 'q: 'x, + { + todo!() + } + + fn fetch_one<'x, 'e, 'q>(&'e mut self, sql: &'q str) -> BoxFuture<'x, Result> + where + Rt: Async, + 'e: 'x, + 'q: 'x, + { + todo!() + } }