diff --git a/sqlx-core/src/executor.rs b/sqlx-core/src/executor.rs index 5b918acf..3e137432 100644 --- a/sqlx-core/src/executor.rs +++ b/sqlx-core/src/executor.rs @@ -73,4 +73,9 @@ pub trait Executor: Send { &'e mut self, query: &'q str, ) -> BoxFuture<'e, crate::Result>>; + + /// Send a semicolon-delimited series of arbitrary SQL commands to the server. + /// + /// Does not support fetching results. + fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>>; } diff --git a/sqlx-core/src/mysql/connection.rs b/sqlx-core/src/mysql/connection.rs index 1bac3b10..d7f1ccd7 100644 --- a/sqlx-core/src/mysql/connection.rs +++ b/sqlx-core/src/mysql/connection.rs @@ -19,6 +19,7 @@ use crate::{Describe, Error, io::{Buf, BufMut, BufStream}, mysql::{ use super::establish; use crate::mysql::MySql; +use crate::mysql::protocol::ComQuery; pub type StatementId = u32; @@ -306,4 +307,40 @@ impl Connection { Ok(()) } + + async fn expect_eof_or_err(&mut self) -> crate::Result<()> { + let packet = self.receive().await?; + + match buf[0] { + 0xFE => EofPacket::decode(packet)?, + 0xFF => ErrPacket::decode(packet)?.expect_error()?, + _ => return Err(protocol_err!("expected EOF or ERR, got {:02X}", buf[0]).into()), + } + + Ok(()) + } + + pub(super) async fn send_raw( + &mut self, + commands: &str + ) -> Result<()> { + self.stream.flush().await?; + self.start_sequence(); + // enable multi-statement only for this query + self.write(ComSetOption { option: SetOptionOptions::MySqlOptionMultiStatementsOn }); + self.write(ComQuery { sql_statement: commands }); + self.write(ComSetOption { option: SetOptionOptions::MySqlOptionMultiStatementsOff }); + self.stream.flush().await?; + + self.expect_eof_or_err().await?; + + let packet = self.receive().await?; + + if packet[0] == 0xFF { return ErrPacket::decode(packet)?.expect_error() } + /// otherwise ignore packet + + self.expect_eof_or_err().await?; + + Ok(()) + } } diff --git a/sqlx-core/src/mysql/executor.rs b/sqlx-core/src/mysql/executor.rs index 619908f8..a480c9fe 100644 --- a/sqlx-core/src/mysql/executor.rs +++ b/sqlx-core/src/mysql/executor.rs @@ -152,4 +152,8 @@ impl Executor for Connection { ) -> BoxFuture<'e, crate::Result>> { Box::pin(self.conn.prepare_describe(query)) } + + fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { + unimplemented!() + } } diff --git a/sqlx-core/src/pool/executor.rs b/sqlx-core/src/pool/executor.rs index 6b503898..af71173d 100644 --- a/sqlx-core/src/pool/executor.rs +++ b/sqlx-core/src/pool/executor.rs @@ -1,9 +1,7 @@ -use crate::{ - backend::Backend, describe::Describe, executor::Executor, params::IntoQueryParameters, - pool::Pool, row::FromRow, -}; -use futures_core::{future::BoxFuture, stream::BoxStream}; +use crate::{backend::Backend, describe::Describe, executor::Executor, params::IntoQueryParameters, pool::Pool, row::FromRow, Error}; +use futures_core::{future::BoxFuture, stream::BoxStream, Future}; use futures_util::StreamExt; +use bitflags::_core::pin::Pin; impl Executor for Pool where @@ -107,4 +105,8 @@ where ) -> BoxFuture<'e, crate::Result>> { Box::pin(async move { self.acquire().await?.describe(query).await }) } + + fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { + Box::pin(async move { self.acquire().await?.batch_exec(commands).await }) + } } diff --git a/sqlx-core/src/postgres/connection.rs b/sqlx-core/src/postgres/connection.rs index 5ff4fe67..701f1187 100644 --- a/sqlx-core/src/postgres/connection.rs +++ b/sqlx-core/src/postgres/connection.rs @@ -180,6 +180,11 @@ impl Connection { protocol::Execute { portal, limit }.encode(self.stream.buffer_mut()); } + pub(super) async fn send(&mut self, commands: &str) -> crate::Result<()> { + protocol::Query(commands).encode(self.stream.buffer_mut()); + self.sync().await + } + pub(super) async fn sync(&mut self) -> crate::Result<()> { protocol::Sync.encode(self.stream.buffer_mut()); diff --git a/sqlx-core/src/postgres/executor.rs b/sqlx-core/src/postgres/executor.rs index 377685dd..801bbdf4 100644 --- a/sqlx-core/src/postgres/executor.rs +++ b/sqlx-core/src/postgres/executor.rs @@ -1,14 +1,8 @@ use super::{connection::Step, Connection, Postgres}; -use crate::{ - backend::Backend, - describe::{Describe, ResultField}, - executor::Executor, - params::{IntoQueryParameters, QueryParameters}, - row::FromRow, - url::Url, -}; -use futures_core::{future::BoxFuture, stream::BoxStream}; +use crate::{backend::Backend, describe::{Describe, ResultField}, executor::Executor, params::{IntoQueryParameters, QueryParameters}, row::FromRow, url::Url, Error}; +use futures_core::{future::BoxFuture, stream::BoxStream, Future}; use crate::postgres::query::PostgresQueryParameters; +use bitflags::_core::pin::Pin; impl Connection { async fn prepare_cached(&mut self, query: &str, params: &PostgresQueryParameters) -> crate::Result { @@ -159,4 +153,8 @@ impl Executor for Connection { }) }) } + + fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { + Box::pin(self.conn.send(commands)) + } }