refactor(mysql): merge and cleanup recv_columns!

This commit is contained in:
Ryan Leckey 2021-01-29 23:48:40 -08:00
parent 93dc33adc0
commit c5bd3988d9
No known key found for this signature in database
GPG Key ID: F8AA68C235AB08C9
2 changed files with 25 additions and 67 deletions

View File

@ -5,9 +5,14 @@ use crate::protocol::ColumnDefinition;
use crate::stream::MySqlStream;
macro_rules! impl_recv_columns {
($(@$blocking:ident)? $num_columns:ident, $stream:ident, $cmd:ident) => {{
($(@$blocking:ident)? $store:expr, $num_columns:ident, $stream:ident, $cmd:ident) => {{
#[allow(clippy::cast_possible_truncation)]
let mut columns = Vec::<ColumnDefinition>::with_capacity($num_columns as usize);
let mut columns = if $store {
Vec::<ColumnDefinition>::with_capacity($num_columns as usize)
} else {
// we are going to drop column definitions, do not allocate
Vec::new()
};
// STATE: remember how many columns are in this result set
$cmd.columns = $num_columns;
@ -16,7 +21,14 @@ macro_rules! impl_recv_columns {
// STATE: remember that we are expecting the #index column definition
$cmd.state = QueryState::ColumnDefinition { index };
columns.push(read_packet!($(@$blocking)? $stream).deserialize()?);
// read in definition and only deserialize if we are saving
// the column definitions
let packet = read_packet!($(@$blocking)? $stream);
if $store {
columns.push(packet.deserialize()?);
}
}
// STATE: remember that we are now expecting a row or the end
@ -26,93 +38,40 @@ macro_rules! impl_recv_columns {
}};
}
macro_rules! impl_recv_and_drop_columns {
($(@$blocking:ident)? $num_columns:ident, $stream:ident, $cmd:ident) => {{
// STATE: remember how many columns are in this result set
$cmd.columns = $num_columns;
for index in 0..$num_columns {
// STATE: remember that we are expecting the #index column definition
$cmd.state = QueryState::ColumnDefinition { index };
// read and immediately drop the column definition packet
// this method is only invoked when we don't care about query results
let _ = read_packet!($(@$blocking)? $stream);
}
// STATE: remember that we are now expecting a row or the end
$cmd.state = QueryState::QueryStep;
Ok(())
}};
}
impl<Rt: Runtime> MySqlStream<Rt> {
#[cfg(feature = "async")]
pub(super) async fn recv_columns_async(
&mut self,
store: bool,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<Vec<ColumnDefinition>>
where
Rt: sqlx_core::Async,
{
impl_recv_columns!(columns, self, cmd)
}
#[cfg(feature = "async")]
pub(super) async fn recv_and_drop_columns_async(
&mut self,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<()>
where
Rt: sqlx_core::Async,
{
impl_recv_and_drop_columns!(columns, self, cmd)
impl_recv_columns!(store, columns, self, cmd)
}
#[cfg(feature = "blocking")]
pub(crate) fn recv_columns_blocking(
&mut self,
store: bool,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<Vec<ColumnDefinition>>
where
Rt: sqlx_core::blocking::Runtime,
{
impl_recv_columns!(@blocking columns, self, cmd)
}
#[cfg(feature = "blocking")]
pub(crate) fn recv_and_drop_columns_blocking(
&mut self,
columns: u64,
cmd: &mut QueryCommand,
) -> Result<()>
where
Rt: sqlx_core::blocking::Runtime,
{
impl_recv_and_drop_columns!(@blocking columns, self, cmd)
impl_recv_columns!(@blocking store, columns, self, cmd)
}
}
macro_rules! recv_columns {
(@blocking $columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_columns_blocking($columns, $cmd)?
(@blocking $store:expr, $columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_columns_blocking($store, $columns, $cmd)?
};
($columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_columns_async($columns, $cmd).await?
};
}
macro_rules! recv_and_drop_columns {
(@blocking $columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_and_drop_columns_blocking($columns, $cmd)?
};
($columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_and_drop_columns_async($columns, $cmd).await?
($store:expr, $columns:ident, $stream:ident, $cmd:ident) => {
$stream.recv_columns_async($store, $columns, $cmd).await?
};
}

View File

@ -26,9 +26,8 @@ macro_rules! impl_execute {
match read_packet!($(@$blocking)? stream).deserialize_with(capabilities)? {
QueryResponse::Ok(ok) => break 'result ok,
QueryResponse::ResultSet { columns } => {
// acknowledge but discard any columns
// execute returns no rows
recv_and_drop_columns!($(@$blocking)? columns, stream, cmd);
// acknowledge but discard any columns as execute returns no rows
recv_columns!($(@$blocking)? /* store = */ false, columns, stream, cmd);
'rows: loop {
match read_packet!($(@$blocking)? stream).deserialize_with((capabilities, &[][..]))? {