diff --git a/.gitignore b/.gitignore
index 11759fd1..ca1cc0c5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,3 @@ Cargo.lock
# Environment
.env
-
-# rustfmt backup files
-**/*.rs.bk
diff --git a/Cargo.toml b/Cargo.toml
index 619f5f80..611c5692 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,13 +3,16 @@ members = [
".",
"sqlx-core",
"sqlx-macros",
- "examples/realworld"
+ "examples/realworld-postgres"
]
[package]
name = "sqlx"
version = "0.1.1-pre"
license = "MIT OR Apache-2.0"
+readme = "README.md"
+repository = "https://github.com/launchbadge/sqlx"
+documentation = "https://docs.rs/sqlx"
description = "The Rust SQL Toolkit."
edition = "2018"
authors = [
@@ -19,11 +22,14 @@ authors = [
[features]
default = [ "macros" ]
-unstable = [ "sqlx-core/unstable" ]
+macros = [ "sqlx-macros", "proc-macro-hack" ]
+
+# database
postgres = [ "sqlx-core/postgres", "sqlx-macros/postgres" ]
mysql = [ "sqlx-core/mysql", "sqlx-macros/mysql" ]
-macros = [ "sqlx-macros", "proc-macro-hack" ]
-chrono = ["sqlx-core/chrono", "sqlx-macros/chrono"]
+
+# types
+chrono = [ "sqlx-core/chrono", "sqlx-macros/chrono" ]
uuid = [ "sqlx-core/uuid", "sqlx-macros/uuid" ]
[dependencies]
@@ -32,15 +38,23 @@ sqlx-macros = { version = "0.1.0-pre", path = "sqlx-macros", optional = true }
proc-macro-hack = { version = "0.5.11", optional = true }
[dev-dependencies]
+anyhow = "1.0.25"
+futures = "0.3.1"
async-std = { version = "1.2.0", features = [ "attributes" ] }
dotenv = "0.15.0"
-matches = "0.1.8"
-criterion = "0.3.0"
[[test]]
name = "macros"
required-features = [ "postgres", "uuid", "macros" ]
+[[test]]
+name = "mysql"
+required-features = [ "mysql" ]
+
+[[test]]
+name = "postgres"
+required-features = [ "postgres" ]
+
[[test]]
name = "postgres-types"
required-features = [ "postgres" ]
@@ -48,8 +62,3 @@ required-features = [ "postgres" ]
[[test]]
name = "mysql-types"
required-features = [ "mysql" ]
-
-[[bench]]
-name = "postgres-protocol"
-required-features = [ "postgres", "unstable" ]
-harness = false
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 00000000..a488a491
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,201 @@
+Apache License
+Version 2.0, January 2004
+http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+"License" shall mean the terms and conditions for use, reproduction,
+and distribution as defined by Sections 1 through 9 of this document.
+
+"Licensor" shall mean the copyright owner or entity authorized by
+the copyright owner that is granting the License.
+
+"Legal Entity" shall mean the union of the acting entity and all
+other entities that control, are controlled by, or are under common
+control with that entity. For the purposes of this definition,
+"control" means (i) the power, direct or indirect, to cause the
+direction or management of such entity, whether by contract or
+otherwise, or (ii) ownership of fifty percent (50%) or more of the
+outstanding shares, or (iii) beneficial ownership of such entity.
+
+"You" (or "Your") shall mean an individual or Legal Entity
+exercising permissions granted by this License.
+
+"Source" form shall mean the preferred form for making modifications,
+including but not limited to software source code, documentation
+source, and configuration files.
+
+"Object" form shall mean any form resulting from mechanical
+transformation or translation of a Source form, including but
+not limited to compiled object code, generated documentation,
+and conversions to other media types.
+
+"Work" shall mean the work of authorship, whether in Source or
+Object form, made available under the License, as indicated by a
+copyright notice that is included in or attached to the work
+(an example is provided in the Appendix below).
+
+"Derivative Works" shall mean any work, whether in Source or Object
+form, that is based on (or derived from) the Work and for which the
+editorial revisions, annotations, elaborations, or other modifications
+represent, as a whole, an original work of authorship. For the purposes
+of this License, Derivative Works shall not include works that remain
+separable from, or merely link (or bind by name) to the interfaces of,
+the Work and Derivative Works thereof.
+
+"Contribution" shall mean any work of authorship, including
+the original version of the Work and any modifications or additions
+to that Work or Derivative Works thereof, that is intentionally
+submitted to Licensor for inclusion in the Work by the copyright owner
+or by an individual or Legal Entity authorized to submit on behalf of
+the copyright owner. For the purposes of this definition, "submitted"
+means any form of electronic, verbal, or written communication sent
+to the Licensor or its representatives, including but not limited to
+communication on electronic mailing lists, source code control systems,
+and issue tracking systems that are managed by, or on behalf of, the
+Licensor for the purpose of discussing and improving the Work, but
+excluding communication that is conspicuously marked or otherwise
+designated in writing by the copyright owner as "Not a Contribution."
+
+"Contributor" shall mean Licensor and any individual or Legal Entity
+on behalf of whom a Contribution has been received by Licensor and
+subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+this License, each Contributor hereby grants to You a perpetual,
+worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+copyright license to reproduce, prepare Derivative Works of,
+publicly display, publicly perform, sublicense, and distribute the
+Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+this License, each Contributor hereby grants to You a perpetual,
+worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+(except as stated in this section) patent license to make, have made,
+use, offer to sell, sell, import, and otherwise transfer the Work,
+where such license applies only to those patent claims licensable
+by such Contributor that are necessarily infringed by their
+Contribution(s) alone or by combination of their Contribution(s)
+with the Work to which such Contribution(s) was submitted. If You
+institute patent litigation against any entity (including a
+cross-claim or counterclaim in a lawsuit) alleging that the Work
+or a Contribution incorporated within the Work constitutes direct
+or contributory patent infringement, then any patent licenses
+granted to You under this License for that Work shall terminate
+as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+Work or Derivative Works thereof in any medium, with or without
+modifications, and in Source or Object form, provided that You
+meet the following conditions:
+
+(a) You must give any other recipients of the Work or
+Derivative Works a copy of this License; and
+
+(b) You must cause any modified files to carry prominent notices
+stating that You changed the files; and
+
+(c) You must retain, in the Source form of any Derivative Works
+that You distribute, all copyright, patent, trademark, and
+attribution notices from the Source form of the Work,
+excluding those notices that do not pertain to any part of
+the Derivative Works; and
+
+(d) If the Work includes a "NOTICE" text file as part of its
+distribution, then any Derivative Works that You distribute must
+include a readable copy of the attribution notices contained
+within such NOTICE file, excluding those notices that do not
+pertain to any part of the Derivative Works, in at least one
+of the following places: within a NOTICE text file distributed
+as part of the Derivative Works; within the Source form or
+documentation, if provided along with the Derivative Works; or,
+within a display generated by the Derivative Works, if and
+wherever such third-party notices normally appear. The contents
+of the NOTICE file are for informational purposes only and
+do not modify the License. You may add Your own attribution
+notices within Derivative Works that You distribute, alongside
+or as an addendum to the NOTICE text from the Work, provided
+that such additional attribution notices cannot be construed
+as modifying the License.
+
+You may add Your own copyright statement to Your modifications and
+may provide additional or different license terms and conditions
+for use, reproduction, or distribution of Your modifications, or
+for any such Derivative Works as a whole, provided Your use,
+reproduction, and distribution of the Work otherwise complies with
+the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+any Contribution intentionally submitted for inclusion in the Work
+by You to the Licensor shall be under the terms and conditions of
+this License, without any additional terms or conditions.
+Notwithstanding the above, nothing herein shall supersede or modify
+the terms of any separate license agreement you may have executed
+with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+names, trademarks, service marks, or product names of the Licensor,
+except as required for reasonable and customary use in describing the
+origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+agreed to in writing, Licensor provides the Work (and each
+Contributor provides its Contributions) on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied, including, without limitation, any warranties or conditions
+of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+PARTICULAR PURPOSE. You are solely responsible for determining the
+appropriateness of using or redistributing the Work and assume any
+risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+whether in tort (including negligence), contract, or otherwise,
+unless required by applicable law (such as deliberate and grossly
+negligent acts) or agreed to in writing, shall any Contributor be
+liable to You for damages, including any direct, indirect, special,
+incidental, or consequential damages of any character arising as a
+result of this License or out of the use or inability to use the
+Work (including but not limited to damages for loss of goodwill,
+work stoppage, computer failure or malfunction, or any and all
+other commercial damages or losses), even if such Contributor
+has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+the Work or Derivative Works thereof, You may choose to offer,
+and charge a fee for, acceptance of support, warranty, indemnity,
+or other liability obligations and/or rights consistent with this
+License. However, in accepting such obligations, You may act only
+on Your own behalf and on Your sole responsibility, not on behalf
+of any other Contributor, and only if You agree to indemnify,
+defend, and hold each Contributor harmless for any liability
+incurred by, or claims asserted against, such Contributor by reason
+of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+To apply the Apache License to your work, attach the following
+boilerplate notice, with the fields enclosed by brackets "[]"
+replaced with your own identifying information. (Don't include
+the brackets!) The text should be enclosed in the appropriate
+comment syntax for the file format. We also recommend that a
+file or class name and description of purpose be included on the
+same "printed page" as the copyright notice for easier
+identification within third-party archives.
+
+Copyright 2019 LaunchBadge, LLC
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file
diff --git a/LICENSE-MIT b/LICENSE-MIT
new file mode 100644
index 00000000..13735bd1
--- /dev/null
+++ b/LICENSE-MIT
@@ -0,0 +1,25 @@
+Copyright (c) 2019 LaunchBadge, LLC
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/README.md b/README.md
index d988e3c3..3262c1e4 100644
--- a/README.md
+++ b/README.md
@@ -1,16 +1,47 @@
-# SQLx
+
SQLx
+
+
+ 🧰 The Rust SQL Toolkit
+
+
-The Rust SQL Toolkit.
-
- * **Asynchronous**. Handle thousands of database connections from a single thread.
-
- * **Fast**. _TO BE WRITTEN_
-
- * **Native**. SQLx is a pure Rust† toolkit for SQL. Where possible, drivers are written from scratch, in Rust, utilizing the modern ecosystem for asynchronous network services development.
-
- * **Agnostic**. SQLx is agnostic over the database engine and can operate against a variety of database backends with the backend chosen **at compile-time** through generic constraints **or at runtime** with a slight performance loss (due to dynamic dispatch).
+
-† The SQLite driver (which does not yet exist) will use the libsqlite3 C library as SQLite is an embedded database (the only way we could be pure Rust for SQLite is by porting _all_ of SQLite to Rust).
+
+
+
+
+
+
+SQLx is a modern SQL client built from the ground up for Rust, in Rust.
+
+ * **Asynchronous**.
+
+ * **Native**. SQLx is a pure Rust toolkit for SQL. Where possible, drivers are written from scratch, in Rust, utilizing the modern ecosystem for asynchronous network services development.
+
+ * **Type-safe**. SQLx is built upon the novel idea of preparing SQL statements before or duing compilation to provide strong type safety while not getting in your way with a custom DSL.
+
+## Safety
+
+This crate uses `#[deny(unsafe_code)]` to ensure everything is implemented in 100% Safe Rust.
## License
diff --git a/benches/postgres-protocol.rs b/benches/postgres-protocol.rs
deleted file mode 100644
index 552372d7..00000000
--- a/benches/postgres-protocol.rs
+++ /dev/null
@@ -1,37 +0,0 @@
-use criterion::{black_box, criterion_group, criterion_main, Criterion};
-use sqlx::postgres::protocol::{Bind, DataRow, Decode, Encode, RowDescription};
-
-fn bench(c: &mut Criterion) {
- c.bench_function("decode_data_row", |b| {
- b.iter(|| {
- let _ = DataRow::decode(&black_box(b"\0\x03\0\0\0\x011\0\0\0\x012\0\0\0\x013")[..]);
- });
- });
-
- c.bench_function( "decode_row_description",|b| {
- b.iter(|| {
- let _ = RowDescription::decode(&black_box(b"\0\x02user_id\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0number_of_pages\0\0\0\0\0\0\0\0\0\x05\0\0\0\0\0\0\0\0\0")[..]);
- });
- });
-
- c.bench_function("encode_bind", |b| {
- let mut buf = Vec::new();
-
- b.iter(|| {
- black_box(Bind {
- portal: "__sqlx_portal_5121",
- statement: "__sqlx_statement_5121",
- formats: &[1],
- values_len: 2,
- values: &[(-1_i8) as _, 0, 0, 0, 1, 0, 0, 0, 25],
- result_formats: &[1],
- })
- .encode(&mut buf);
-
- buf.clear();
- });
- });
-}
-
-criterion_group!(benches, bench);
-criterion_main!(benches);
diff --git a/examples/realworld/Cargo.toml b/examples/realworld-postgres/Cargo.toml
similarity index 66%
rename from examples/realworld/Cargo.toml
rename to examples/realworld-postgres/Cargo.toml
index d6cebd73..73d9dec4 100644
--- a/examples/realworld/Cargo.toml
+++ b/examples/realworld-postgres/Cargo.toml
@@ -7,8 +7,8 @@ workspace = "../.."
[dependencies]
anyhow = "1.0.25"
dotenv = "0.15.0"
-async-std = "1.2.0"
+async-std = { version = "1.2.0", features = [ "attributes" ] }
tide = "0.4.0"
sqlx = { path = "../..", features = [ "postgres" ] }
-serde = { version = "1.0.103", features = [ "derive"] }
+serde = { version = "1.0.103", features = [ "derive" ] }
futures = "0.3.1"
diff --git a/examples/realworld/schema/1_users.sql b/examples/realworld-postgres/schema.sql
similarity index 100%
rename from examples/realworld/schema/1_users.sql
rename to examples/realworld-postgres/schema.sql
diff --git a/examples/realworld/setup.sh b/examples/realworld-postgres/setup.sh
old mode 100755
new mode 100644
similarity index 62%
rename from examples/realworld/setup.sh
rename to examples/realworld-postgres/setup.sh
index 7d6096a8..68581759
--- a/examples/realworld/setup.sh
+++ b/examples/realworld-postgres/setup.sh
@@ -3,5 +3,5 @@
# Get current directory (of this script)
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-# Run SQL files in schema/ directory
-psql -d "$DATABASE_URL" -f $DIR/schema/*.sql
+# Run schema file
+psql -d "$DATABASE_URL" -f schema.sql
diff --git a/examples/realworld/src/main.rs b/examples/realworld-postgres/src/main.rs
similarity index 99%
rename from examples/realworld/src/main.rs
rename to examples/realworld-postgres/src/main.rs
index 270cdaff..cd4dcbb3 100644
--- a/examples/realworld/src/main.rs
+++ b/examples/realworld-postgres/src/main.rs
@@ -52,4 +52,4 @@ async fn register(mut req: Request>) -> Response {
Response::new(200)
.body_json(&RegisterResponseBody { id: user_id })
.unwrap()
-}
+}
\ No newline at end of file
diff --git a/rustfmt.toml b/rustfmt.toml
deleted file mode 100644
index 4afdfbd8..00000000
--- a/rustfmt.toml
+++ /dev/null
@@ -1,2 +0,0 @@
-unstable_features = true
-merge_imports = true
diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml
index a810fdcf..e003fdeb 100644
--- a/sqlx-core/Cargo.toml
+++ b/sqlx-core/Cargo.toml
@@ -2,7 +2,6 @@
name = "sqlx-core"
version = "0.1.0-pre"
license = "MIT OR Apache-2.0"
-description = "The Rust SQL Toolkit."
edition = "2018"
authors = [
"Ryan Leckey ",
@@ -16,21 +15,18 @@ postgres = []
mysql = []
[dependencies]
+async-stream = { version = "0.2.0", default-features = false }
async-std = { version = "1.2.0", default-features = false, features = [ "unstable" ] }
-async-stream = "0.2.0"
-bitflags = "1.2.1"
-byteorder = { version = "1.3.2", default-features = false }
-chrono = { version = "0.4", optional = true }
-futures-channel = "0.3.1"
-futures-core = "0.3.1"
-futures-util = "0.3.1"
-log = "0.4.8"
-md-5 = "0.8.0"
-memchr = "2.2.1"
-url = "2.1.0"
-uuid = { version = "0.8.1", optional = true }
+bitflags = { version = "1.2.1", default-features = false }
+futures-core = { version = "0.3.1", default-features = false }
+futures-util = { version = "0.3.1", default-features = false }
+log = { version = "0.4", default-features = false }
+url = { version = "2.1.0", default-features = false }
+byteorder = { version ="1.3.2", default-features = false }
+memchr = { version = "2.2.1", default-features = false }
+md-5 = { version = "0.8.0", default-features = false }
+uuid = { version = "0.8.1", default-features = false, optional = true }
+chrono = { version = "0.4.10", default-features = false, features = [ "clock" ], optional = true }
[dev-dependencies]
matches = "0.1.8"
-bytes = "0.5.2"
-async-std = { version = "1.2.0", default-features = false, features = [ "attributes" ] }
diff --git a/sqlx-core/src/arguments.rs b/sqlx-core/src/arguments.rs
new file mode 100644
index 00000000..b756bbd1
--- /dev/null
+++ b/sqlx-core/src/arguments.rs
@@ -0,0 +1,160 @@
+//! Traits for passing arguments to SQL queries.
+
+use crate::database::Database;
+use crate::encode::Encode;
+use crate::types::HasSqlType;
+
+/// A tuple of arguments to be sent to the database.
+pub trait Arguments: Send + Sized + Default + 'static {
+ type Database: Database + ?Sized;
+
+ /// Returns `true` if there are no values.
+ #[inline]
+ fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Returns the number of values.
+ fn len(&self) -> usize;
+
+ /// Returns the size of the arguments, in bytes.
+ fn size(&self) -> usize;
+
+ /// Reserves the capacity for at least `len` more values (of `size` bytes) to
+ /// be added to the arguments without a reallocation.
+ fn reserve(&mut self, len: usize, size: usize);
+
+ /// Add the value to the end of the arguments.
+ fn add(&mut self, value: T)
+ where
+ Self::Database: HasSqlType,
+ T: Encode;
+}
+
+pub trait IntoArguments
+where
+ DB: Database,
+{
+ fn into_arguments(self) -> DB::Arguments;
+}
+
+impl IntoArguments for DB::Arguments
+where
+ DB: Database,
+{
+ #[inline]
+ fn into_arguments(self) -> DB::Arguments {
+ self
+ }
+}
+
+#[allow(unused)]
+macro_rules! impl_into_arguments {
+ ($B:ident: $( ($idx:tt) -> $T:ident );+;) => {
+ impl<$($T,)+> crate::arguments::IntoArguments<$B> for ($($T,)+)
+ where
+ $($B: crate::types::HasSqlType<$T>,)+
+ $($T: crate::encode::Encode<$B>,)+
+ {
+ fn into_arguments(self) -> <$B as crate::database::Database>::Arguments {
+ use crate::arguments::Arguments;
+
+ let mut arguments = <$B as crate::database::Database>::Arguments::default();
+
+ let binds = 0 $(+ { $idx; 1 } )+;
+ let bytes = 0 $(+ crate::encode::Encode::size_hint(&self.$idx))+;
+
+ arguments.reserve(binds, bytes);
+
+ $(crate::arguments::Arguments::bind(&mut arguments, self.$idx);)+
+
+ arguments
+ }
+ }
+ };
+}
+
+#[allow(unused)]
+macro_rules! impl_into_arguments_for_database {
+ ($B:ident) => {
+ impl crate::arguments::IntoArguments<$B> for ()
+ {
+ #[inline]
+ fn into_arguments(self) -> <$B as crate::database::Database>::Arguments {
+ Default::default()
+ }
+ }
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ (2) -> T3;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ (2) -> T3;
+ (3) -> T4;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ (2) -> T3;
+ (3) -> T4;
+ (4) -> T5;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ (2) -> T3;
+ (3) -> T4;
+ (4) -> T5;
+ (5) -> T6;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ (2) -> T3;
+ (3) -> T4;
+ (4) -> T5;
+ (5) -> T6;
+ (6) -> T7;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ (2) -> T3;
+ (3) -> T4;
+ (4) -> T5;
+ (5) -> T6;
+ (6) -> T7;
+ (7) -> T8;
+ );
+
+ impl_into_arguments!($B:
+ (0) -> T1;
+ (1) -> T2;
+ (2) -> T3;
+ (3) -> T4;
+ (4) -> T5;
+ (5) -> T6;
+ (6) -> T7;
+ (7) -> T8;
+ (8) -> T9;
+ );
+ }
+}
diff --git a/sqlx-core/src/backend.rs b/sqlx-core/src/backend.rs
deleted file mode 100644
index d83d7251..00000000
--- a/sqlx-core/src/backend.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-use crate::{
- describe::Describe, executor::Executor, params::QueryParameters, row::Row,
- types::HasTypeMetadata,
-};
-use futures_core::future::BoxFuture;
-
-/// A database backend.
-///
-/// Represents a connection to the database and further provides auxillary but
-/// important related traits as associated types.
-///
-/// This trait is not intended to be used directly.
-pub trait Backend: HasTypeMetadata + Send + Sync + Sized + 'static {
- type Connection: crate::Connection;
-
- /// The concrete `QueryParameters` implementation for this backend.
- type QueryParameters: QueryParameters;
-
- /// The concrete `Row` implementation for this backend.
- type Row: Row;
-
- /// The identifier for tables; in Postgres this is an `oid` while
- /// in MySQL/MariaDB this is the qualified name of the table.
- type TableIdent;
-
- /// Establish a new connection to the database server.
- fn connect(url: &str) -> BoxFuture<'static, crate::Result>;
-}
diff --git a/sqlx-core/src/cache.rs b/sqlx-core/src/cache.rs
index f7420819..0920df99 100644
--- a/sqlx-core/src/cache.rs
+++ b/sqlx-core/src/cache.rs
@@ -1,6 +1,6 @@
-use std::collections::hash_map::{HashMap, Entry};
-use std::cmp::Ordering;
-use futures_core::Future;
+use std::collections::HashMap;
+use std::hash::Hash;
+use std::sync::Arc;
// TODO: figure out a cache eviction strategy
// we currently naively cache all prepared statements which could live-leak memory
@@ -11,44 +11,39 @@ use futures_core::Future;
/// Per-connection prepared statement cache.
pub struct StatementCache {
- statements: HashMap
+ statements: HashMap,
+ columns: HashMap, usize>>>,
}
-impl StatementCache {
+impl StatementCache
+where
+ Id: Eq + Hash,
+{
pub fn new() -> Self {
StatementCache {
statements: HashMap::with_capacity(10),
+ columns: HashMap::with_capacity(10),
}
}
- #[cfg(feature = "mysql")]
- pub async fn get_or_compute<'a, E, Fut>(&'a mut self, query: &str, compute: impl FnOnce() -> Fut)
- -> Result<&'a Id, E>
- where
- Fut: Future>
- {
- match self.statements.entry(query.to_string()) {
- Entry::Occupied(occupied) => Ok(occupied.into_mut()),
- Entry::Vacant(vacant) => {
- Ok(vacant.insert(compute().await?))
- }
- }
+ pub fn has_columns(&self, id: Id) -> bool {
+ self.columns.contains_key(&id)
}
- // for Postgres so it can return the synthetic statement name instead of formatting twice
- #[cfg(feature = "postgres")]
- pub async fn map_or_compute(&mut self, query: &str, map: impl FnOnce(&Id) -> R, compute: impl FnOnce() -> Fut)
- -> Result
- where
- Fut: Future> {
+ pub fn get(&self, query: &str) -> Option<&Id> {
+ self.statements.get(query)
+ }
- match self.statements.entry(query.to_string()) {
- Entry::Occupied(occupied) => Ok(map(occupied.get())),
- Entry::Vacant(vacant) => {
- let (id, ret) = compute().await?;
- vacant.insert(id);
- Ok(ret)
- }
- }
+ // It is a logical error to call this without first calling [put_columns]
+ pub fn get_columns(&self, id: Id) -> Arc, usize>> {
+ Arc::clone(&self.columns[&id])
+ }
+
+ pub fn put(&mut self, query: String, id: Id) {
+ self.statements.insert(query, id);
+ }
+
+ pub fn put_columns(&mut self, id: Id, columns: HashMap, usize>) {
+ self.columns.insert(id, Arc::new(columns));
}
}
diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs
index 78d1ea7a..266791ca 100644
--- a/sqlx-core/src/connection.rs
+++ b/sqlx-core/src/connection.rs
@@ -1,7 +1,25 @@
+use crate::executor::Executor;
+use crate::url::Url;
use futures_core::future::BoxFuture;
-use crate::Executor;
+use futures_util::TryFutureExt;
+use std::convert::TryInto;
-pub trait Connection: Executor + Sized {
- /// Gracefully close the connection.
+/// Represents a single database connection rather than a pool of database connections.
+///
+/// Prefer running queries from [Pool] unless there is a specific need for a single, continuous
+/// connection.
+pub trait Connection: Executor + Send + 'static {
+ /// Establish a new database connection.
+ fn open(url: T) -> BoxFuture<'static, crate::Result>
+ where
+ T: TryInto,
+ Self: Sized;
+
+ /// Close this database connection.
fn close(self) -> BoxFuture<'static, crate::Result<()>>;
+
+ /// Verifies a connection to the database is still alive.
+ fn ping(&mut self) -> BoxFuture> {
+ Box::pin(self.execute("SELECT 1", Default::default()).map_ok(|_| ()))
+ }
}
diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs
new file mode 100644
index 00000000..878db648
--- /dev/null
+++ b/sqlx-core/src/database.rs
@@ -0,0 +1,19 @@
+use crate::arguments::Arguments;
+use crate::connection::Connection;
+use crate::row::Row;
+use crate::types::HasTypeMetadata;
+
+/// A database driver.
+///
+/// This trait encapsulates a complete driver implementation to a specific
+/// database (e.g., MySQL, Postgres).
+pub trait Database: HasTypeMetadata + 'static {
+ /// The concrete `Connection` implementation for this database.
+ type Connection: Connection;
+
+ /// The concrete `Arguments` implementation for this database.
+ type Arguments: Arguments;
+
+ /// The concrete `Row` implementation for this database.
+ type Row: Row;
+}
diff --git a/sqlx-core/src/decode.rs b/sqlx-core/src/decode.rs
index a9efaf19..a46e3aa4 100644
--- a/sqlx-core/src/decode.rs
+++ b/sqlx-core/src/decode.rs
@@ -1,18 +1,86 @@
-//! Types and traits related to deserializing values from the database.
-use crate::{backend::Backend, types::HasSqlType};
+//! Types and traits for decoding values from the database.
-// TODO: Allow decode to return an error (that can be unified)
+use std::error::Error as StdError;
+use std::fmt::{self, Display};
-pub trait Decode {
- fn decode(raw: Option<&[u8]>) -> Self;
+use crate::database::Database;
+use crate::types::HasSqlType;
+
+pub enum DecodeError {
+ /// An unexpected `NULL` was encountered while decoding.
+ UnexpectedNull,
+
+ Message(Box),
+
+ Other(Box),
+}
+
+/// Decode a single value from the database.
+pub trait Decode: Sized
+where
+ DB: Database + ?Sized,
+{
+ fn decode(raw: &[u8]) -> Result;
+
+ /// Creates a new value of this type from a `NULL` SQL value.
+ ///
+ /// The default implementation returns [DecodeError::UnexpectedNull].
+ fn decode_null() -> Result {
+ return Err(DecodeError::UnexpectedNull);
+ }
+
+ fn decode_nullable(raw: Option<&[u8]>) -> Result {
+ if let Some(raw) = raw {
+ Self::decode(raw)
+ } else {
+ Self::decode_null()
+ }
+ }
}
impl Decode for Option
where
- DB: Backend + HasSqlType,
+ DB: Database + HasSqlType,
T: Decode,
{
- fn decode(raw: Option<&[u8]>) -> Self {
- Some(T::decode(Some(raw?)))
+ fn decode(buf: &[u8]) -> Result {
+ T::decode(buf).map(Some)
+ }
+
+ fn decode_null() -> Result {
+ Ok(None)
+ }
+}
+
+impl fmt::Debug for DecodeError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.write_str("DecodeError(")?;
+
+ match self {
+ DecodeError::UnexpectedNull => write!(f, "unexpected null for non-null column")?,
+ DecodeError::Message(err) => write!(f, "{}", err)?,
+ DecodeError::Other(err) => write!(f, "{:?}", err)?,
+ }
+
+ f.write_str(")")
+ }
+}
+
+impl fmt::Display for DecodeError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ DecodeError::UnexpectedNull => f.write_str("unexpected null for non-null column"),
+ DecodeError::Message(err) => write!(f, "{}", err),
+ DecodeError::Other(err) => write!(f, "{}", err),
+ }
+ }
+}
+
+impl From for DecodeError
+where
+ E: StdError + Send + Sync + 'static,
+{
+ fn from(err: E) -> DecodeError {
+ DecodeError::Other(Box::new(err))
}
}
diff --git a/sqlx-core/src/describe.rs b/sqlx-core/src/describe.rs
index 92b5eb10..dd8ea043 100644
--- a/sqlx-core/src/describe.rs
+++ b/sqlx-core/src/describe.rs
@@ -1,46 +1,59 @@
-use crate::Backend;
+//! Types for returning SQL type information about queries.
use crate::types::HasTypeMetadata;
+use crate::Database;
+use std::fmt::{self, Debug};
-use std::fmt;
+/// The return type of [Executor::describe].
+pub struct Describe
+where
+ DB: Database + ?Sized,
+{
+ /// The expected types for the parameters of the query.
+ pub param_types: Box<[::TypeId]>,
-/// The result of running prepare + describe for the given backend.
-pub struct Describe {
- /// The expected type IDs of bind parameters.
- pub param_types: Vec<::TypeId>,
- ///
- pub result_fields: Vec>,
- pub(crate) _backcompat: (),
+ /// The type and table information, if any for the results of the query.
+ pub result_columns: Box<[Column]>,
+
+ // TODO: Remove and use #[non_exhaustive] when we can
+ pub(crate) _non_exhaustive: (),
}
-impl fmt::Debug for Describe
+impl Debug for Describe
where
- ::TypeId: fmt::Debug,
- ResultField: fmt::Debug,
+ DB: Database,
+ ::TypeId: Debug,
+ Column: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Describe")
.field("param_types", &self.param_types)
- .field("result_fields", &self.result_fields)
+ .field("result_columns", &self.result_columns)
.finish()
}
}
-pub struct ResultField {
- pub name: Option,
- pub table_id: Option<::TableIdent>,
- /// The type ID of this result column.
+/// A single column of a result set.
+pub struct Column
+where
+ DB: Database + ?Sized,
+{
+ pub name: Option>,
+ pub table_id: Option<::TableId>,
pub type_id: ::TypeId,
- pub(crate) _backcompat: (),
+
+ // TODO: Remove and use #[non_exhaustive] when we can
+ pub(crate) _non_exhaustive: (),
}
-impl fmt::Debug for ResultField
+impl Debug for Column
where
- ::TableIdent: fmt::Debug,
- ::TypeId: fmt::Debug,
+ DB: Database + ?Sized,
+ ::TableId: Debug,
+ ::TypeId: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("ResultField")
+ f.debug_struct("Column")
.field("name", &self.name)
.field("table_id", &self.table_id)
.field("type_id", &self.type_id)
diff --git a/sqlx-core/src/encode.rs b/sqlx-core/src/encode.rs
index 0367f8c5..b3ab8cad 100644
--- a/sqlx-core/src/encode.rs
+++ b/sqlx-core/src/encode.rs
@@ -1,72 +1,82 @@
-//! Types and traits related to serializing values for the database.
-use crate::{backend::Backend, types::HasSqlType};
+//! Types and traits for encoding values to the database.
+use crate::database::Database;
+use crate::types::HasSqlType;
use std::mem;
-/// Annotates the result of [Encode] to differentiate between an empty value and a null value.
+/// The return type of [Encode::encode].
pub enum IsNull {
- /// The value was null (and no data was written to the buffer).
+ /// The value is null; no data was written.
Yes,
- /// The value was not null.
+ /// The value is not null.
///
- /// This does not necessarily mean that any data was written to the buffer.
+ /// This does not mean that data was written.
No,
}
-/// Serializes a single value to be sent to the database.
-///
-/// The data must be written to the buffer in the expected format
-/// for the given backend.
-///
-/// When possible, implementations of this trait should prefer using an
-/// existing implementation, rather than writing to `buf` directly.
-pub trait Encode {
- /// Writes the value of `self` into `buf` as the expected format
- /// for the given backend.
- ///
- /// The return value indicates if this value should be represented as `NULL`.
- /// If this is the case, implementations **must not** write anything to `out`.
- fn encode(&self, buf: &mut Vec) -> IsNull;
+/// Encode a single value to be sent to the database.
+pub trait Encode
+where
+ DB: Database + ?Sized,
+{
+ /// Writes the value of `self` into `buf` in the expected format for the database.
+ fn encode(&self, buf: &mut Vec);
+
+ fn encode_nullable(&self, buf: &mut Vec) -> IsNull {
+ self.encode(buf);
+
+ IsNull::No
+ }
- /// Calculate the number of bytes this type will use when encoded.
fn size_hint(&self) -> usize {
mem::size_of_val(self)
}
}
-/// [Encode] is implemented for `Option` where `T` implements `Encode`. An `Option`
-/// represents a nullable SQL value.
-impl Encode for Option
-where
- DB: Backend + HasSqlType,
- T: Encode,
-{
- #[inline]
- fn encode(&self, buf: &mut Vec) -> IsNull {
- if let Some(self_) = self {
- self_.encode(buf)
- } else {
- IsNull::Yes
- }
- }
-
- fn size_hint(&self) -> usize {
- if self.is_some() { mem::size_of::() } else { 0 }
- }
-}
-
impl Encode for &'_ T
where
- DB: Backend + HasSqlType,
+ DB: Database + HasSqlType,
T: Encode,
{
- #[inline]
- fn encode(&self, buf: &mut Vec) -> IsNull {
+ fn encode(&self, buf: &mut Vec) {
(*self).encode(buf)
}
+ fn encode_nullable(&self, buf: &mut Vec) -> IsNull {
+ (*self).encode_nullable(buf)
+ }
+
fn size_hint(&self) -> usize {
(*self).size_hint()
}
}
+
+impl Encode for Option
+where
+ DB: Database + HasSqlType,
+ T: Encode,
+{
+ fn encode(&self, buf: &mut Vec) {
+ // Forward to [encode_nullable] and ignore the result
+ let _ = self.encode_nullable(buf);
+ }
+
+ fn encode_nullable(&self, buf: &mut Vec) -> IsNull {
+ if let Some(self_) = self {
+ self_.encode(buf);
+
+ IsNull::No
+ } else {
+ IsNull::Yes
+ }
+ }
+
+ fn size_hint(&self) -> usize {
+ if self.is_some() {
+ mem::size_of::()
+ } else {
+ 0
+ }
+ }
+}
diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs
index 08c59e53..8a458c94 100644
--- a/sqlx-core/src/error.rs
+++ b/sqlx-core/src/error.rs
@@ -1,49 +1,51 @@
-use std::{
- error::Error as StdError,
- fmt::{self, Debug, Display},
- io,
-};
+//! Error and Result types.
-use async_std::future::TimeoutError;
+use crate::decode::DecodeError;
+use std::error::Error as StdError;
+use std::fmt::{self, Debug, Display};
+use std::io;
-/// A convenient Result instantiation appropriate for SQLx.
-pub type Result = std::result::Result;
+/// A specialized `Result` type for SQLx.
+pub type Result = std::result::Result;
/// A generic error that represents all the ways a method can fail inside of SQLx.
#[derive(Debug)]
pub enum Error {
- /// Error communicating with the database backend.
- ///
- /// Some reasons for this to be caused:
- ///
- /// - [io::ErrorKind::ConnectionRefused] - Database backend is most likely behind a firewall.
- ///
- /// - [io::ErrorKind::ConnectionReset] - Database backend dropped the client connection (perhaps from an administrator action).
+ /// Error communicating with the database.
Io(io::Error),
- /// An error was returned by the database backend.
- Database(Box),
+ /// Connection URL was malformed.
+ UrlParse(url::ParseError),
- /// No rows were returned by a query expected to return at least one row.
+ /// An error was returned by the database.
+ Database(Box),
+
+ /// No rows were returned by a query that expected to return at least one row.
NotFound,
- /// More than one row was returned by a query expected to return exactly one row.
+ /// More than one row was returned by a query that expected to return exactly one row.
FoundMoreThanOne,
- /// Unexpected or invalid data was encountered. This would indicate that we received data that we were not
- /// expecting or it was in a format we did not understand. This generally means either there is a programming error in a SQLx driver or
- /// something with the connection or the database backend itself is corrupted.
+ /// Column was not found in Row during [Row::try_get].
+ ColumnNotFound(Box),
+
+ /// Unexpected or invalid data was encountered. This would indicate that we received
+ /// data that we were not expecting or it was in a format we did not understand. This
+ /// generally means either there is a programming error in a SQLx driver or
+ /// something with the connection or the database database itself is corrupted.
///
/// Context is provided by the included error message.
Protocol(Box),
- /// A `Pool::acquire()` timed out due to connections not becoming available or
+ /// A [Pool::acquire] timed out due to connections not becoming available or
/// because another task encountered too many errors while trying to open a new connection.
- TimedOut,
+ PoolTimedOut,
- /// `Pool::close()` was called while we were waiting in `Pool::acquire()`.
+ /// [Pool::close] was called while we were waiting in [Pool::acquire].
PoolClosed,
+ Decode(DecodeError),
+
// TODO: Remove and replace with `#[non_exhaustive]` when possible
#[doc(hidden)]
__Nonexhaustive,
@@ -54,6 +56,10 @@ impl StdError for Error {
match self {
Error::Io(error) => Some(error),
+ Error::UrlParse(error) => Some(error),
+
+ Error::Decode(DecodeError::Other(error)) => Some(&**error),
+
_ => None,
}
}
@@ -64,17 +70,25 @@ impl Display for Error {
match self {
Error::Io(error) => write!(f, "{}", error),
+ Error::UrlParse(error) => write!(f, "{}", error),
+
+ Error::Decode(error) => write!(f, "{}", error),
+
Error::Database(error) => Display::fmt(error, f),
Error::NotFound => f.write_str("found no rows when we expected at least one"),
+ Error::ColumnNotFound(ref name) => {
+ write!(f, "no column found with the name {:?}", name)
+ }
+
Error::FoundMoreThanOne => {
f.write_str("found more than one row when we expected exactly one")
}
Error::Protocol(ref err) => f.write_str(err),
- Error::TimedOut => f.write_str("timed out while waiting for an open connection"),
+ Error::PoolTimedOut => f.write_str("timed out while waiting for an open connection"),
Error::PoolClosed => f.write_str("attempted to acquire a connection on a closed pool"),
@@ -90,9 +104,24 @@ impl From for Error {
}
}
-impl From for Error {
- fn from(_: TimeoutError) -> Self {
- Error::TimedOut
+impl From for Error {
+ #[inline]
+ fn from(err: io::ErrorKind) -> Self {
+ Error::Io(err.into())
+ }
+}
+
+impl From for Error {
+ #[inline]
+ fn from(err: DecodeError) -> Self {
+ Error::Decode(err)
+ }
+}
+
+impl From for Error {
+ #[inline]
+ fn from(err: url::ParseError) -> Self {
+ Error::UrlParse(err)
}
}
@@ -113,9 +142,20 @@ where
}
}
-/// An error that was returned by the database backend.
+/// An error that was returned by the database.
pub trait DatabaseError: Display + Debug + Send + Sync {
+ /// The primary, human-readable error message.
fn message(&self) -> &str;
+
+ fn details(&self) -> Option<&str>;
+
+ fn hint(&self) -> Option<&str>;
+
+ fn table_name(&self) -> Option<&str>;
+
+ fn column_name(&self) -> Option<&str>;
+
+ fn constraint_name(&self) -> Option<&str>;
}
/// Used by the `protocol_error!()` macro for a lazily evaluated conversion to
@@ -124,6 +164,7 @@ pub(crate) struct ProtocolError<'a> {
pub args: fmt::Arguments<'a>,
}
+#[cfg(any(feature = "mysql", feature = "postgres"))]
macro_rules! protocol_err (
($($args:tt)*) => {
$crate::error::ProtocolError { args: format_args!($($args)*) }
diff --git a/sqlx-core/src/executor.rs b/sqlx-core/src/executor.rs
index 3e137432..0f50dac3 100644
--- a/sqlx-core/src/executor.rs
+++ b/sqlx-core/src/executor.rs
@@ -1,81 +1,60 @@
-use crate::{
- backend::Backend,
- describe::Describe,
- error::Error,
- params::{IntoQueryParameters, QueryParameters},
- row::FromRow,
-};
-use futures_core::{future::BoxFuture, stream::BoxStream};
-use futures_util::{TryFutureExt, TryStreamExt};
+use crate::database::Database;
+use crate::describe::Describe;
+use futures_core::future::BoxFuture;
+use futures_core::stream::BoxStream;
+use futures_util::TryStreamExt;
-pub trait Executor: Send {
- type Backend: Backend;
+/// Encapsulates query execution on the database.
+///
+/// Implemented by [Pool], [Connection], and [Transaction].
+pub trait Executor {
+ type Database: Database + ?Sized;
- /// Verifies a connection to the database is still alive.
- fn ping<'e>(&'e mut self) -> BoxFuture<'e, crate::Result<()>> {
- Box::pin(
- self.execute(
- "SELECT 1",
- Default::default(),
- )
- .map_ok(|_| ()),
- )
- }
+ /// Send a raw SQL command to the database.
+ ///
+ /// This is intended for queries that cannot or should not be prepared (ex. `BEGIN`).
+ ///
+ /// Does not support fetching results.
+ fn send<'e, 'q: 'e>(&'e mut self, command: &'q str) -> BoxFuture<'e, crate::Result<()>>;
+ /// Execute the query, returning the number of rows affected.
fn execute<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- params: ::QueryParameters,
+ args: ::Arguments,
) -> BoxFuture<'e, crate::Result>;
- fn fetch<'e, 'q: 'e, T: 'e>(
+ /// Executes the query and returns a [Stream] of [Row].
+ fn fetch<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- params: ::QueryParameters,
- ) -> BoxStream<'e, crate::Result>
- where
- T: FromRow + Send + Unpin;
+ args: ::Arguments,
+ ) -> BoxStream<'e, crate::Result<::Row>>;
- fn fetch_all<'e, 'q: 'e, T: 'e>(
+ /// Executes the query and returns up to resulting record.
+ /// * `Error::FoundMoreThanOne` will be returned if the query produced more than 1 row.
+ fn fetch_optional<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- params: ::QueryParameters,
- ) -> BoxFuture<'e, crate::Result>>
- where
- T: FromRow + Send + Unpin,
- {
- Box::pin(self.fetch(query, params).try_collect())
+ args: ::Arguments,
+ ) -> BoxFuture<'e, crate::Result::Row>>> {
+ let mut s = self.fetch(query, args);
+ Box::pin(async move { s.try_next().await })
}
- fn fetch_optional<'e, 'q: 'e, T: 'e>(
+ /// Execute the query and return at most one resulting record.
+ fn fetch_one<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- params: ::QueryParameters,
- ) -> BoxFuture<'e, crate::Result>>
- where
- T: FromRow + Send;
-
- fn fetch_one<'e, 'q: 'e, T: 'e>(
- &'e mut self,
- query: &'q str,
- params: ::QueryParameters,
- ) -> BoxFuture<'e, crate::Result>
- where
- T: FromRow + Send,
- {
- let fut = self.fetch_optional(query, params);
- Box::pin(async move { fut.await?.ok_or(Error::NotFound) })
+ args: ::Arguments,
+ ) -> BoxFuture<'e, crate::Result<::Row>> {
+ let mut s = self.fetch(query, args);
+ Box::pin(async move { s.try_next().await?.ok_or(crate::Error::NotFound) })
}
- /// Analyze the SQL statement and report the inferred bind parameter types and returned
- /// columns.
+ /// Analyze the SQL query and report the inferred bind parameter types and returned columns.
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- ) -> BoxFuture<'e, crate::Result>>;
-
- /// Send a semicolon-delimited series of arbitrary SQL commands to the server.
- ///
- /// Does not support fetching results.
- fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>>;
+ ) -> BoxFuture<'e, crate::Result>>;
}
diff --git a/sqlx-core/src/io/buf.rs b/sqlx-core/src/io/buf.rs
index 424f7c07..e80fe446 100644
--- a/sqlx-core/src/io/buf.rs
+++ b/sqlx-core/src/io/buf.rs
@@ -5,6 +5,8 @@ use std::{io, slice, str};
pub trait Buf {
fn advance(&mut self, cnt: usize);
+ fn get_uint(&mut self, n: usize) -> io::Result;
+
fn get_u8(&mut self) -> io::Result;
fn get_u16(&mut self) -> io::Result;
@@ -22,6 +24,8 @@ pub trait Buf {
fn get_str(&mut self, len: usize) -> io::Result<&str>;
fn get_str_nul(&mut self) -> io::Result<&str>;
+
+ fn get_bytes(&mut self, len: usize) -> io::Result<&[u8]>;
}
impl<'a> Buf for &'a [u8] {
@@ -29,9 +33,15 @@ impl<'a> Buf for &'a [u8] {
*self = &self[cnt..];
}
+ fn get_uint(&mut self, n: usize) -> io::Result {
+ let val = T::read_uint(*self, n);
+ self.advance(n);
+
+ Ok(val)
+ }
+
fn get_u8(&mut self) -> io::Result {
let val = self[0];
-
self.advance(1);
Ok(val)
@@ -51,16 +61,16 @@ impl<'a> Buf for &'a [u8] {
Ok(val)
}
- fn get_i32(&mut self) -> io::Result {
- let val = T::read_i32(*self);
- self.advance(4);
+ fn get_u24(&mut self) -> io::Result {
+ let val = T::read_u24(*self);
+ self.advance(3);
Ok(val)
}
- fn get_u24(&mut self) -> io::Result {
- let val = T::read_u24(*self);
- self.advance(3);
+ fn get_i32(&mut self) -> io::Result {
+ let val = T::read_i32(*self);
+ self.advance(4);
Ok(val)
}
@@ -80,15 +90,8 @@ impl<'a> Buf for &'a [u8] {
}
fn get_str(&mut self, len: usize) -> io::Result<&str> {
- let buf = &self[..len];
-
- self.advance(len);
-
- if cfg!(debug_asserts) {
- str::from_utf8(buf).map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
- } else {
- Ok(unsafe { str::from_utf8_unchecked(buf) })
- }
+ str::from_utf8(self.get_bytes(len)?)
+ .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
}
fn get_str_nul(&mut self) -> io::Result<&str> {
@@ -97,6 +100,13 @@ impl<'a> Buf for &'a [u8] {
Ok(s)
}
+
+ fn get_bytes(&mut self, len: usize) -> io::Result<&[u8]> {
+ let buf = &self[..len];
+ self.advance(len);
+
+ Ok(buf)
+ }
}
pub trait ToBuf {
@@ -104,9 +114,13 @@ pub trait ToBuf {
}
impl ToBuf for [u8] {
- fn to_buf(&self) -> &[u8] { self }
+ fn to_buf(&self) -> &[u8] {
+ self
+ }
}
impl ToBuf for u8 {
- fn to_buf(&self) -> &[u8] { slice::from_ref(self) }
+ fn to_buf(&self) -> &[u8] {
+ slice::from_ref(self)
+ }
}
diff --git a/sqlx-core/src/io/buf_mut.rs b/sqlx-core/src/io/buf_mut.rs
index 486e7c89..8ebe8891 100644
--- a/sqlx-core/src/io/buf_mut.rs
+++ b/sqlx-core/src/io/buf_mut.rs
@@ -18,6 +18,10 @@ pub trait BufMut {
fn put_u64(&mut self, val: u64);
+ fn put_bytes(&mut self, val: &[u8]);
+
+ fn put_str(&mut self, val: &str);
+
fn put_str_nul(&mut self, val: &str);
}
@@ -30,18 +34,18 @@ impl BufMut for Vec {
self.push(val);
}
- fn put_i16(&mut self, val: i16) {
- let mut buf = [0; 2];
- T::write_i16(&mut buf, val);
- self.extend_from_slice(&buf);
- }
-
fn put_u16(&mut self, val: u16) {
let mut buf = [0; 2];
T::write_u16(&mut buf, val);
self.extend_from_slice(&buf);
}
+ fn put_i16(&mut self, val: i16) {
+ let mut buf = [0; 2];
+ T::write_i16(&mut buf, val);
+ self.extend_from_slice(&buf);
+ }
+
fn put_u24(&mut self, val: u32) {
let mut buf = [0; 3];
T::write_u24(&mut buf, val);
@@ -66,8 +70,16 @@ impl BufMut for Vec {
self.extend_from_slice(&buf);
}
- fn put_str_nul(&mut self, val: &str) {
+ fn put_bytes(&mut self, val: &[u8]) {
+ self.extend_from_slice(val);
+ }
+
+ fn put_str(&mut self, val: &str) {
self.extend_from_slice(val.as_bytes());
+ }
+
+ fn put_str_nul(&mut self, val: &str) {
+ self.put_str(val);
self.push(0);
}
}
diff --git a/sqlx-core/src/io/buf_stream.rs b/sqlx-core/src/io/buf_stream.rs
index 04e74407..f5607925 100644
--- a/sqlx-core/src/io/buf_stream.rs
+++ b/sqlx-core/src/io/buf_stream.rs
@@ -2,7 +2,6 @@ use async_std::io::{
prelude::{ReadExt, WriteExt},
Read, Write,
};
-use std::mem::MaybeUninit;
use std::io;
pub struct BufStream {
@@ -66,7 +65,9 @@ where
// If we have enough bytes in our read buffer,
// return immediately
if self.rbuf_windex >= (self.rbuf_rindex + cnt) {
- return Ok(Some(&self.rbuf[self.rbuf_rindex..(self.rbuf_rindex + cnt)]));
+ let buf = &self.rbuf[self.rbuf_rindex..(self.rbuf_rindex + cnt)];
+
+ return Ok(Some(buf));
}
// If we are out of space to write to in the read buffer,
diff --git a/sqlx-core/src/io/mod.rs b/sqlx-core/src/io/mod.rs
index 922a1af0..2995d857 100644
--- a/sqlx-core/src/io/mod.rs
+++ b/sqlx-core/src/io/mod.rs
@@ -5,4 +5,24 @@ mod buf;
mod buf_mut;
mod byte_str;
-pub use self::{buf::{Buf, ToBuf}, buf_mut::BufMut, buf_stream::BufStream, byte_str::ByteStr};
+pub use self::{
+ buf::{Buf, ToBuf},
+ buf_mut::BufMut,
+ buf_stream::BufStream,
+ byte_str::ByteStr,
+};
+
+#[cfg(test)]
+#[doc(hidden)]
+macro_rules! bytes (
+ ($($b: expr), *) => {{
+ use $crate::io::ToBuf;
+
+ let mut buf = Vec::new();
+ $(
+ buf.extend_from_slice($b.to_buf());
+ )*
+
+ buf
+ }}
+);
diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs
index 9691d3c3..6d28af68 100644
--- a/sqlx-core/src/lib.rs
+++ b/sqlx-core/src/lib.rs
@@ -1,73 +1,60 @@
#![recursion_limit = "256"]
-#![allow(unused_imports)]
-
-#[macro_use]
-mod macros;
+#![deny(unsafe_code)]
#[macro_use]
pub mod error;
-#[cfg(any(feature = "postgres", feature = "mysql"))]
+#[cfg(any(feature = "mysql", feature = "postgres"))]
#[macro_use]
mod io;
-mod backend;
-pub mod decode;
-
-#[cfg(any(feature = "postgres", feature = "mysql"))]
-mod url;
-
-#[macro_use]
-mod row;
-
-mod connection;
-mod executor;
-mod pool;
-
-#[macro_use]
-pub mod params;
-
-pub mod encode;
-mod query;
-pub mod types;
-
-mod describe;
-
+#[cfg(any(feature = "mysql", feature = "postgres"))]
mod cache;
-#[doc(inline)]
-pub use self::{
- backend::Backend,
- connection::Connection,
- decode::Decode,
- encode::Encode,
- error::{Error, Result},
- executor::Executor,
- pool::Pool,
- query::{query, Query},
- row::{FromRow, Row},
- types::HasSqlType,
-};
+mod connection;
+mod database;
+mod executor;
+mod query;
+mod url;
-#[doc(hidden)]
-pub use types::HasTypeMetadata;
+pub mod arguments;
+pub mod decode;
+pub mod describe;
+pub mod encode;
+pub mod pool;
+pub mod types;
-#[doc(hidden)]
-pub use describe::{Describe, ResultField};
+#[macro_use]
+pub mod row;
#[cfg(feature = "mysql")]
pub mod mysql;
+#[cfg(feature = "postgres")]
+pub mod postgres;
+
+pub use database::Database;
+
+#[doc(inline)]
+pub use error::{Error, Result};
+
+pub use connection::Connection;
+pub use executor::Executor;
+pub use query::{query, Query};
+
+#[doc(inline)]
+pub use pool::Pool;
+
+#[doc(inline)]
+pub use row::{FromRow, Row};
+
#[cfg(feature = "mysql")]
#[doc(inline)]
pub use mysql::MySql;
-#[cfg(feature = "postgres")]
-pub mod postgres;
-
#[cfg(feature = "postgres")]
#[doc(inline)]
-pub use self::postgres::Postgres;
+pub use postgres::Postgres;
use std::marker::PhantomData;
diff --git a/sqlx-core/src/macros.rs b/sqlx-core/src/macros.rs
deleted file mode 100644
index e13567c7..00000000
--- a/sqlx-core/src/macros.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-#[cfg(test)]
-#[doc(hidden)]
-#[macro_export]
-macro_rules! __bytes_builder (
- ($($b: expr), *) => {{
- use $crate::io::ToBuf;
-
- let mut buf = Vec::new();
- $(
- buf.extend_from_slice($b.to_buf());
- )*
- buf
- }}
-);
diff --git a/sqlx-core/src/mysql/arguments.rs b/sqlx-core/src/mysql/arguments.rs
new file mode 100644
index 00000000..3cb1bd42
--- /dev/null
+++ b/sqlx-core/src/mysql/arguments.rs
@@ -0,0 +1,51 @@
+use crate::arguments::Arguments;
+use crate::encode::{Encode, IsNull};
+use crate::mysql::types::MySqlTypeMetadata;
+use crate::mysql::MySql;
+use crate::types::HasSqlType;
+
+#[derive(Default)]
+pub struct MySqlArguments {
+ pub(crate) param_types: Vec,
+ pub(crate) params: Vec,
+ pub(crate) null_bitmap: Vec,
+}
+
+impl Arguments for MySqlArguments {
+ type Database = MySql;
+
+ fn len(&self) -> usize {
+ self.param_types.len()
+ }
+
+ fn size(&self) -> usize {
+ self.params.len()
+ }
+
+ fn reserve(&mut self, len: usize, size: usize) {
+ self.param_types.reserve(len);
+ self.params.reserve(size);
+
+ // ensure we have enough size in the bitmap to hold at least `len` extra bits
+ // the second `& 7` gives us 0 spare bits when param_types.len() is a multiple of 8
+ let spare_bits = (8 - (self.param_types.len()) & 7) & 7;
+ // ensure that if there are no spare bits left, `len = 1` reserves another byte
+ self.null_bitmap.reserve((len + 7 - spare_bits) / 8);
+ }
+
+ fn add(&mut self, value: T)
+ where
+ Self::Database: HasSqlType,
+ T: Encode,
+ {
+ let metadata = >::metadata();
+ let index = self.param_types.len();
+
+ self.param_types.push(metadata);
+ self.null_bitmap.resize((index / 8) + 1, 0);
+
+ if let IsNull::Yes = value.encode_nullable(&mut self.params) {
+ self.null_bitmap[index / 8] &= (1 << index % 8) as u8;
+ }
+ }
+}
diff --git a/sqlx-core/src/mysql/backend.rs b/sqlx-core/src/mysql/backend.rs
deleted file mode 100644
index db325074..00000000
--- a/sqlx-core/src/mysql/backend.rs
+++ /dev/null
@@ -1,34 +0,0 @@
-use futures_core::{future::BoxFuture, stream::BoxStream};
-
-use crate::{
- backend::Backend,
- describe::{Describe, ResultField},
- mysql::{protocol::ResultRow, query::MySqlDbParameters},
- url::Url,
-};
-
-use super::{Connection, RawConnection};
-use super::MySql;
-use crate::cache::StatementCache;
-
-impl Backend for MySql {
- type Connection = Connection;
- type QueryParameters = MySqlDbParameters;
- type Row = ResultRow;
- type TableIdent = String;
-
- fn connect(url: &str) -> BoxFuture<'static, crate::Result> {
- let url = Url::parse(url);
-
- Box::pin(async move {
- let url = url?;
- Ok(Connection {
- conn: RawConnection::open(url).await?,
- cache: StatementCache::new(),
- })
- })
- }
-}
-
-impl_from_row_for_backend!(MySql, ResultRow);
-impl_into_query_parameters_for_backend!(MySql);
diff --git a/sqlx-core/src/mysql/connection.rs b/sqlx-core/src/mysql/connection.rs
index f7c35e1b..3bb6c0c5 100644
--- a/sqlx-core/src/mysql/connection.rs
+++ b/sqlx-core/src/mysql/connection.rs
@@ -1,121 +1,41 @@
-use std::{
- io,
- net::{IpAddr, SocketAddr},
-};
-use std::net::Shutdown;
+use std::convert::TryInto;
+use std::io;
-use async_std::net::TcpStream;
+use async_std::net::{Shutdown, TcpStream};
use byteorder::{ByteOrder, LittleEndian};
-use futures_util::AsyncWriteExt;
+use futures_core::future::BoxFuture;
-use crate::{Describe, Error, io::{Buf, BufMut, BufStream}, mysql::{
- protocol::{
- Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComPing, ComQuit,
- ComSetOption, ComStmtExecute,
- ComStmtPrepare, ComStmtPrepareOk, Encode, EofPacket, ErrPacket, OkPacket,
- ResultRow, SetOptionOptions, StmtExecFlag,
- },
- query::MySqlDbParameters,
-}, Result, ResultField, url::Url};
-use crate::mysql::MySql;
-use crate::mysql::protocol::ComQuery;
+use crate::cache::StatementCache;
+use crate::connection::Connection;
+use crate::io::{Buf, BufMut, BufStream, ByteStr};
+use crate::mysql::error::MySqlError;
+use crate::mysql::protocol::{
+ Capabilities, Decode, Encode, EofPacket, ErrPacket, Handshake, HandshakeResponse, OkPacket,
+};
+use crate::url::Url;
-use super::establish;
+pub struct MySqlConnection {
+ pub(super) stream: BufStream,
-pub type StatementId = u32;
+ pub(super) capabilities: Capabilities,
+
+ pub(super) statement_cache: StatementCache,
+
+ rbuf: Vec,
-pub struct Connection {
- pub(crate) stream: BufStream,
- pub(crate) rbuf: Vec,
- pub(crate) capabilities: Capabilities,
next_seq_no: u8,
+
+ pub(super) ready: bool,
}
-impl Connection {
- pub async fn open(url: Url) -> Result {
- // TODO: Handle errors
- let host = url.host();
- let port = url.port(3306);
-
- // TODO: handle errors
- let host: IpAddr = host.parse().unwrap();
- let addr: SocketAddr = (host, port).into();
-
- let stream = TcpStream::connect(&addr).await?;
-
- let mut conn = Self {
- stream: BufStream::new(stream),
- rbuf: Vec::with_capacity(8 * 1024),
- capabilities: Capabilities::empty(),
- next_seq_no: 0,
- };
-
- establish::establish(&mut conn, &url).await?;
-
- Ok(conn)
- }
-
- pub async fn close(mut self) -> Result<()> {
- // Send the quit command
-
- self.start_sequence();
- self.write(ComQuit);
-
- self.stream.flush().await?;
- self.stream.stream.shutdown(Shutdown::Both)?;
-
- Ok(())
- }
-
- pub async fn ping(&mut self) -> Result<()> {
- // Send the ping command and wait for (and drop) an OK packet
-
- self.start_sequence();
- self.write(ComPing);
-
- self.stream.flush().await?;
-
- let _ = self.receive_ok_or_err().await?;
-
- Ok(())
- }
-
- pub(crate) async fn receive(&mut self) -> Result<&[u8]> {
- Ok(self
- .try_receive()
- .await?
- .ok_or(Error::Io(io::ErrorKind::UnexpectedEof.into()))?)
- }
-
- async fn try_receive(&mut self) -> Result> {
- // Read the packet header which contains the length and the sequence number
- // https://mariadb.com/kb/en/library/0-packet/#standard-packet
- let mut header = ret_if_none!(self.stream.peek(4).await?);
- let len = header.get_u24::()? as usize;
- self.next_seq_no = header.get_u8()? + 1;
- self.stream.consume(4);
-
- // Read the packet body and copy it into our internal buf
- // We must have a separate buffer around the stream as we can't operate directly
- // on bytes returend from the stream. We have compression, split, etc. to
- // unpack.
- let body = ret_if_none!(self.stream.peek(len).await?);
- self.rbuf.clear();
- self.rbuf.extend_from_slice(body);
- self.stream.consume(len);
-
- Ok(Some(&self.rbuf[..len]))
- }
-
- pub(super) fn start_sequence(&mut self) {
- // At the start of a command sequence we reset our understanding
- // of [next_seq_no]. In a sequence our initial command must be 0, followed
- // by the server response that is 1, then our response to that response (if any),
- // would be 2
+impl MySqlConnection {
+ pub(super) fn begin_command_phase(&mut self) {
+ // At the start of the *command phase*, the sequence ID sent from the client
+ // must be 0
self.next_seq_no = 0;
}
- pub(crate) fn write(&mut self, packet: T) {
+ pub(super) fn write(&mut self, packet: impl Encode + std::fmt::Debug) {
let buf = self.stream.buffer_mut();
// Allocate room for the header that we write after the packet;
@@ -137,19 +57,16 @@ impl Connection {
// Take the last sequence number received, if any, and increment by 1
// If there was no sequence number, we only increment if we split packets
header[3] = self.next_seq_no;
- self.next_seq_no += 1;
+ self.next_seq_no = self.next_seq_no.wrapping_add(1);
}
- // Decode an OK packet or bubble an ERR packet as an error
- // to terminate immediately
- pub(crate) async fn receive_ok_or_err(&mut self) -> Result {
- let capabilities = self.capabilities;
- let buf = self.receive().await?;
- Ok(match buf[0] {
- 0xfe | 0x00 => OkPacket::decode(buf, capabilities)?,
+ async fn receive_ok(&mut self) -> crate::Result {
+ let packet = self.receive().await?;
+ Ok(match packet[0] {
+ 0xfe | 0x00 => OkPacket::decode(packet)?,
0xff => {
- return ErrPacket::decode(buf)?.expect_error();
+ return Err(MySqlError(ErrPacket::decode(packet)?).into());
}
id => {
@@ -163,185 +80,124 @@ impl Connection {
})
}
- async fn check_eof(&mut self) -> Result<()> {
+ pub(super) async fn receive_eof(&mut self) -> crate::Result<()> {
// When (legacy) EOFs are enabled, the fixed number column definitions are further
// terminated by an EOF packet
- if !self
- .capabilities
- .contains(Capabilities::CLIENT_DEPRECATE_EOF)
- {
+ if !self.capabilities.contains(Capabilities::DEPRECATE_EOF) {
let _eof = EofPacket::decode(self.receive().await?)?;
}
Ok(())
}
- async fn send_prepare<'c>(
- &'c mut self,
- statement: &'c str,
- ) -> Result {
- self.stream.flush().await?;
-
- self.start_sequence();
- self.write(ComStmtPrepare { statement });
-
- self.stream.flush().await?;
-
- // COM_STMT_PREPARE returns COM_STMT_PREPARE_OK (0x00) or ERR (0xFF)
- let packet = self.receive().await?;
-
- if packet[0] == 0xFF {
- return ErrPacket::decode(packet)?.expect_error();
- }
-
- let ok = ComStmtPrepareOk::decode(packet)?;
-
- Ok(ok)
+ pub(super) async fn receive(&mut self) -> crate::Result<&[u8]> {
+ Ok(self
+ .try_receive()
+ .await?
+ .ok_or(io::ErrorKind::UnexpectedEof)?)
}
- // MySQL/Mysql responds with statement metadata for every PREPARE command
- // sometimes we care, sometimes we don't
- pub(super) async fn prepare_ignore_describe(&mut self, statement: &str) -> Result {
- let ok = self.send_prepare(statement).await?;
+ pub(super) async fn try_receive(&mut self) -> crate::Result> {
+ self.rbuf.clear();
- if ok.params > 0 {
- // Input parameters
- for _ in 0..ok.params {
- // TODO: Maybe do something with this data ?
- let _column = ColumnDefinitionPacket::decode(self.receive().await?)?;
- }
+ // Read the packet header which contains the length and the sequence number
+ // https://dev.mysql.com/doc/dev/mysql-server/8.0.12/page_protocol_basic_packets.html
+ // https://mariadb.com/kb/en/library/0-packet/#standard-packet
+ let mut header = ret_if_none!(self.stream.peek(4).await?);
+ let payload_len = header.get_uint::(3)? as usize;
+ self.next_seq_no = header.get_u8()?.wrapping_add(1);
+ self.stream.consume(4);
- self.check_eof().await?;
- }
+ // Read the packet body and copy it into our internal buf
+ // We must have a separate buffer around the stream as we can't operate directly
+ // on bytes returned from the stream. We have various kinds of payload manipulation
+ // that must be handled before decoding.
+ let mut payload = ret_if_none!(self.stream.peek(payload_len).await?);
+ self.rbuf.extend_from_slice(payload);
+ self.stream.consume(payload_len);
- if ok.columns > 0 {
- // Output parameters
- for _ in 0..ok.columns {
- // TODO: Maybe do something with this data ?
- let _column = ColumnDefinitionPacket::decode(self.receive().await?)?;
- }
+ // TODO: Implement packet compression
+ // TODO: Implement packet joining
- self.check_eof().await?;
- }
-
- Ok(ok.statement_id)
+ Ok(Some(&self.rbuf[..payload_len]))
}
+}
- pub(super) async fn prepare_describe(&mut self, statement: &str) -> Result> {
- let ok = self.send_prepare(statement).await?;
+impl MySqlConnection {
+ // TODO: Authentication ?!
+ async fn open(url: crate::Result) -> crate::Result {
+ let url = url?;
+ let stream = TcpStream::connect((url.host(), url.port(3306))).await?;
- let mut param_types = Vec::with_capacity(ok.params as usize);
- let mut result_fields= Vec::with_capacity(ok.columns as usize);
+ let mut self_ = Self {
+ stream: BufStream::new(stream),
+ capabilities: Capabilities::empty(),
+ rbuf: Vec::with_capacity(8192),
+ next_seq_no: 0,
+ statement_cache: StatementCache::new(),
+ ready: true,
+ };
- // Input parameters
- for _ in 0..ok.params {
- let param = ColumnDefinitionPacket::decode(self.receive().await?)?;
- param_types.push(param.field_type.0);
- }
+ // https://dev.mysql.com/doc/dev/mysql-server/8.0.12/page_protocol_connection_phase.html
+ // https://mariadb.com/kb/en/connection/
- self.check_eof().await?;
+ // First, we receive the Handshake
- // Output parameters
- for _ in 0..ok.columns {
- let column = ColumnDefinitionPacket::decode(self.receive().await?)?;
- result_fields.push(ResultField {
- name: column.column_alias.or(column.column),
- table_id: column.table_alias.or(column.table),
- type_id: column.field_type.0,
- _backcompat: ()
- });
- }
+ let handshake_packet = self_.receive().await?;
+ let handshake = Handshake::decode(handshake_packet)?;
- self.check_eof().await?;
+ // TODO: Capabilities::SECURE_CONNECTION
+ // TODO: Capabilities::CONNECT_ATTRS
+ // TODO: Capabilities::PLUGIN_AUTH
+ // TODO: Capabilities::PLUGIN_AUTH_LENENC_CLIENT_DATA
+ // TODO: Capabilities::TRANSACTIONS
+ // TODO: Capabilities::CLIENT_DEPRECATE_EOF
+ // TODO: Capabilities::COMPRESS
+ // TODO: Capabilities::ZSTD_COMPRESSION_ALGORITHM
+ let client_capabilities = Capabilities::PROTOCOL_41
+ | Capabilities::IGNORE_SPACE
+ | Capabilities::FOUND_ROWS
+ | Capabilities::CONNECT_WITH_DB;
- Ok(Describe {
- param_types,
- result_fields,
- _backcompat: (),
- })
- }
+ // Fails if [Capabilities::PROTOCOL_41] is not in [server_capabilities]
+ self_.capabilities =
+ (client_capabilities & handshake.server_capabilities) | Capabilities::PROTOCOL_41;
- pub(super) async fn result_column_defs(&mut self) -> Result> {
- let packet = self.receive().await?;
+ // Next we send the response
- // A Resultset starts with a [ColumnCountPacket] which is a single field that encodes
- // how many columns we can expect when fetching rows from this statement
-
- if packet[0] == 255 {
- ErrPacket::decode(packet)?.expect_error()?;
- }
-
- let column_count: u64 = ColumnCountPacket::decode(packet)?.columns;
-
- // Next we have a [ColumnDefinitionPacket] which verbosely explains each minute
- // detail about the column in question including table, aliasing, and type
- // TODO: This information was *already* returned by PREPARE .., is there a way to suppress generation
- let mut columns = vec![];
- for _ in 0..column_count {
- let column = ColumnDefinitionPacket::decode(self.receive().await?)?;
- columns.push(column);
- }
-
- self.check_eof().await?;
-
- Ok(columns)
- }
-
- pub(super) async fn send_execute(
- &mut self,
- statement_id: u32,
- params: MySqlDbParameters,
- ) -> Result<()> {
- // TODO: EXECUTE(READ_ONLY) => FETCH instead of EXECUTE(NO)
-
- // SEND ================
- self.start_sequence();
- self.write(ComStmtExecute {
- statement_id,
- params: ¶ms.params,
- null: ¶ms.null_bitmap,
- flags: StmtExecFlag::NO_CURSOR,
- param_types: ¶ms.param_types,
+ self_.write(HandshakeResponse {
+ client_collation: 192, // utf8_unicode_ci
+ max_packet_size: 1024,
+ username: url.username().unwrap_or("root"),
+ // TODO: Remove the panic!
+ database: url.database().expect("required database"),
});
- self.stream.flush().await?;
- // =====================
- Ok(())
+ self_.stream.flush().await?;
+
+ let _ok = self_.receive_ok().await?;
+
+ Ok(self_)
}
- async fn expect_eof_or_err(&mut self) -> crate::Result<()> {
- let packet = self.receive().await?;
-
- match packet[0] {
- 0xFE => { EofPacket::decode(packet)?; },
- 0xFF => { ErrPacket::decode(packet)?.expect_error()?; },
- _ => return Err(protocol_err!("expected EOF or ERR, got {:02X}", packet[0]).into()),
- }
-
- Ok(())
- }
-
- pub(super) async fn send_raw(
- &mut self,
- commands: &str
- ) -> Result<()> {
+ async fn close(mut self) -> crate::Result<()> {
self.stream.flush().await?;
- self.start_sequence();
- // enable multi-statement only for this query
- self.write(ComSetOption { option: SetOptionOptions::MySqlOptionMultiStatementsOn });
- self.write(ComQuery { sql_statement: commands });
- self.write(ComSetOption { option: SetOptionOptions::MySqlOptionMultiStatementsOff });
- self.stream.flush().await?;
-
- self.expect_eof_or_err().await?;
-
- let packet = self.receive().await?;
-
- if packet[0] == 0xFF { return ErrPacket::decode(packet)?.expect_error() }
- // otherwise ignore packet
-
- self.expect_eof_or_err().await?;
+ self.stream.stream.shutdown(Shutdown::Both)?;
Ok(())
}
}
+
+impl Connection for MySqlConnection {
+ fn open(url: T) -> BoxFuture<'static, crate::Result>
+ where
+ T: TryInto,
+ Self: Sized,
+ {
+ Box::pin(MySqlConnection::open(url.try_into()))
+ }
+
+ fn close(self) -> BoxFuture<'static, crate::Result<()>> {
+ Box::pin(self.close())
+ }
+}
diff --git a/sqlx-core/src/mysql/database.rs b/sqlx-core/src/mysql/database.rs
new file mode 100644
index 00000000..ddd753ba
--- /dev/null
+++ b/sqlx-core/src/mysql/database.rs
@@ -0,0 +1,12 @@
+use crate::Database;
+
+/// **MySQL** database driver.
+pub struct MySql;
+
+impl Database for MySql {
+ type Connection = super::MySqlConnection;
+
+ type Arguments = super::MySqlArguments;
+
+ type Row = super::MySqlRow;
+}
diff --git a/sqlx-core/src/mysql/error.rs b/sqlx-core/src/mysql/error.rs
index af7c5bca..0c5a00dc 100644
--- a/sqlx-core/src/mysql/error.rs
+++ b/sqlx-core/src/mysql/error.rs
@@ -1,25 +1,53 @@
-use crate::{error::DatabaseError, mysql::protocol::ErrorCode};
+use std::fmt::{self, Debug, Display};
-use std::fmt;
+use crate::error::DatabaseError;
+use crate::mysql::protocol::ErrPacket;
-#[derive(Debug)]
-pub struct Error {
- pub code: ErrorCode,
- pub message: Box,
-}
+pub struct MySqlError(pub(super) ErrPacket);
-impl DatabaseError for Error {
+impl DatabaseError for MySqlError {
fn message(&self) -> &str {
- &self.message
+ &*self.0.error_message
+ }
+
+ fn details(&self) -> Option<&str> {
+ None
+ }
+
+ fn hint(&self) -> Option<&str> {
+ None
+ }
+
+ fn table_name(&self) -> Option<&str> {
+ None
+ }
+
+ fn column_name(&self) -> Option<&str> {
+ None
+ }
+
+ fn constraint_name(&self) -> Option<&str> {
+ None
}
}
-impl fmt::Display for Error {
+// TODO: De-duplicate these two impls with Postgres (macro?)
+
+impl Debug for MySqlError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(
- f,
- "Mysql returned an error: {}; {}",
- self.code, self.message
- )
+ f.debug_struct("DatabaseError")
+ .field("message", &self.message())
+ .field("details", &self.details())
+ .field("hint", &self.hint())
+ .field("table_name", &self.table_name())
+ .field("column_name", &self.column_name())
+ .field("constraint_name", &self.constraint_name())
+ .finish()
+ }
+}
+
+impl Display for MySqlError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad(self.message())
}
}
diff --git a/sqlx-core/src/mysql/establish.rs b/sqlx-core/src/mysql/establish.rs
deleted file mode 100644
index 1d911172..00000000
--- a/sqlx-core/src/mysql/establish.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-use crate::{
- mysql::{
- connection::Connection,
- protocol::{Capabilities, HandshakeResponsePacket, InitialHandshakePacket},
- },
- url::Url,
- Result,
-};
-
-pub(crate) async fn establish(conn: &mut Connection, url: &Url) -> Result<()> {
- let initial = InitialHandshakePacket::decode(conn.receive().await?)?;
-
- // TODO: Capabilities::SECURE_CONNECTION
- // TODO: Capabilities::CONNECT_ATTRS
- // TODO: Capabilities::PLUGIN_AUTH
- // TODO: Capabilities::PLUGIN_AUTH_LENENC_CLIENT_DATA
- // TODO: Capabilities::TRANSACTIONS
- // TODO: Capabilities::CLIENT_DEPRECATE_EOF
- // TODO?: Capabilities::CLIENT_SESSION_TRACK
- let capabilities = Capabilities::CLIENT_PROTOCOL_41 | Capabilities::CONNECT_WITH_DB;
-
- let response = HandshakeResponsePacket {
- // TODO: Find a good value for [max_packet_size]
- capabilities,
- max_packet_size: 1024,
- client_collation: 192, // utf8_unicode_ci
- username: url.username(),
- database: &url.database(),
- auth_data: None,
- auth_plugin_name: None,
- connection_attrs: &[],
- };
-
- // The AND between our supported capabilities and the servers' is
- // what we can use so remember it on the connection
- conn.capabilities = capabilities & initial.capabilities;
-
- conn.write(response);
- conn.stream.flush().await?;
-
- let _ = conn.receive_ok_or_err().await?;
-
- // TODO: If CONNECT_WITH_DB is not supported we need to send an InitDb command just after establish
-
- Ok(())
-}
diff --git a/sqlx-core/src/mysql/executor.rs b/sqlx-core/src/mysql/executor.rs
index 7c7e9070..46fbb124 100644
--- a/sqlx-core/src/mysql/executor.rs
+++ b/sqlx-core/src/mysql/executor.rs
@@ -1,159 +1,375 @@
-use super::{MySql, Connection};
-use crate::{backend::Backend, describe::{Describe, ResultField}, executor::Executor, mysql::{
- protocol::{
- Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComStmtExecute, EofPacket,
- ErrPacket, OkPacket, ResultRow, StmtExecFlag,
- },
- query::MySqlDbParameters,
-}, params::{IntoQueryParameters, QueryParameters}, row::FromRow, url::Url, Error};
-use futures_core::{future::BoxFuture, stream::BoxStream, Future};
-use std::pin::Pin;
+use std::collections::HashMap;
+use std::sync::Arc;
-impl Connection {
- async fn prepare_cached(&mut self, query: &str) -> crate::Result {
- let conn = &mut self.conn;
- Ok(*(self.cache.get_or_compute(query, || conn.prepare_ignore_describe(query)).await?))
+use futures_core::future::BoxFuture;
+use futures_core::stream::BoxStream;
+
+use crate::describe::{Column, Describe};
+use crate::executor::Executor;
+use crate::mysql::error::MySqlError;
+use crate::mysql::protocol::{
+ Capabilities, ColumnCount, ColumnDefinition, ComQuery, ComSetOption, ComStmtExecute,
+ ComStmtPrepare, ComStmtPrepareOk, Cursor, Decode, EofPacket, ErrPacket, OkPacket, Row,
+ SetOption, Type,
+};
+use crate::mysql::{MySql, MySqlArguments, MySqlConnection, MySqlRow};
+
+enum Step {
+ Command(u64),
+ Row(Row),
+}
+
+enum OkOrResultSet {
+ Ok(OkPacket),
+ ResultSet(ColumnCount),
+}
+
+impl MySqlConnection {
+ async fn ignore_columns(&mut self, count: usize) -> crate::Result<()> {
+ for _ in 0..count {
+ let _column = ColumnDefinition::decode(self.receive().await?)?;
+ }
+
+ if count > 0 {
+ self.receive_eof().await?;
+ }
+
+ Ok(())
+ }
+
+ async fn receive_ok_or_column_count(&mut self) -> crate::Result {
+ let packet = self.receive().await?;
+
+ match packet[0] {
+ 0xfe if packet.len() < 0xffffff => {
+ let ok = OkPacket::decode(packet)?;
+ self.ready = true;
+
+ Ok(OkOrResultSet::Ok(ok))
+ }
+
+ 0x00 => {
+ let ok = OkPacket::decode(packet)?;
+ self.ready = true;
+
+ Ok(OkOrResultSet::Ok(ok))
+ }
+
+ 0xff => {
+ let err = ErrPacket::decode(packet)?;
+ self.ready = true;
+
+ Err(MySqlError(err).into())
+ }
+
+ _ => {
+ let cc = ColumnCount::decode(packet)?;
+
+ Ok(OkOrResultSet::ResultSet(cc))
+ }
+ }
+ }
+
+ async fn receive_column_types(&mut self, count: usize) -> crate::Result> {
+ let mut columns: Vec = Vec::with_capacity(count);
+
+ for _ in 0..count {
+ let packet = self.receive().await?;
+ let column: ColumnDefinition = ColumnDefinition::decode(packet)?;
+
+ columns.push(column.r#type);
+ }
+
+ if count > 0 {
+ self.receive_eof().await?;
+ }
+
+ Ok(columns.into_boxed_slice())
+ }
+
+ async fn wait_for_ready(&mut self) -> crate::Result<()> {
+ if !self.ready {
+ while let Some(_step) = self.step(&[], true).await? {
+ // Drain steps until we hit the end
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn prepare(&mut self, query: &str) -> crate::Result {
+ // Start by sending a COM_STMT_PREPARE
+ self.begin_command_phase();
+ self.write(ComStmtPrepare { query });
+ self.stream.flush().await?;
+
+ // https://dev.mysql.com/doc/dev/mysql-server/8.0.12/page_protocol_com_stmt_prepare.html
+
+ // First we should receive a COM_STMT_PREPARE_OK
+ let packet = self.receive().await?;
+
+ if packet[0] == 0xff {
+ // Oops, there was an error in the prepare command
+ return Err(MySqlError(ErrPacket::decode(packet)?).into());
+ }
+
+ ComStmtPrepareOk::decode(packet)
+ }
+
+ async fn prepare_with_cache(&mut self, query: &str) -> crate::Result {
+ if let Some(&id) = self.statement_cache.get(query) {
+ Ok(id)
+ } else {
+ let prepare_ok = self.prepare(query).await?;
+
+ // Remember our statement ID, so we do'd do this again the next time
+ self.statement_cache
+ .put(query.to_owned(), prepare_ok.statement_id);
+
+ // Ignore input parameters
+ self.ignore_columns(prepare_ok.params as usize).await?;
+
+ // Collect output parameter names
+ let mut columns = HashMap::with_capacity(prepare_ok.columns as usize);
+ let mut index = 0_usize;
+ for _ in 0..prepare_ok.columns {
+ let column = ColumnDefinition::decode(self.receive().await?)?;
+
+ if let Some(name) = column.column_alias.or(column.column) {
+ columns.insert(name, index);
+ }
+
+ index += 1;
+ }
+
+ if prepare_ok.columns > 0 {
+ self.receive_eof().await?;
+ }
+
+ // Remember our column map in the statement cache
+ self.statement_cache
+ .put_columns(prepare_ok.statement_id, columns);
+
+ Ok(prepare_ok.statement_id)
+ }
+ }
+
+ // [COM_STMT_EXECUTE]
+ async fn execute_statement(&mut self, id: u32, args: MySqlArguments) -> crate::Result<()> {
+ self.begin_command_phase();
+ self.ready = false;
+
+ self.write(ComStmtExecute {
+ cursor: Cursor::NO_CURSOR,
+ statement_id: id,
+ params: &args.params,
+ null_bitmap: &args.null_bitmap,
+ param_types: &args.param_types,
+ });
+
+ self.stream.flush().await?;
+
+ Ok(())
+ }
+
+ async fn step(&mut self, columns: &[Type], binary: bool) -> crate::Result> {
+ let capabilities = self.capabilities;
+ let packet = ret_if_none!(self.try_receive().await?);
+
+ match packet[0] {
+ 0xfe if packet.len() < 0xffffff => {
+ // Resultset row can begin with 0xfe byte (when using text protocol
+ // with a field length > 0xffffff)
+
+ if !capabilities.contains(Capabilities::DEPRECATE_EOF) {
+ let _eof = EofPacket::decode(packet)?;
+ self.ready = true;
+
+ return Ok(None);
+ } else {
+ let ok = OkPacket::decode(packet)?;
+ self.ready = true;
+
+ return Ok(Some(Step::Command(ok.affected_rows)));
+ }
+ }
+
+ 0xff => {
+ let err = ErrPacket::decode(packet)?;
+ self.ready = true;
+
+ return Err(MySqlError(err).into());
+ }
+
+ _ => {
+ return Ok(Some(Step::Row(Row::decode(packet, columns, binary)?)));
+ }
+ }
}
}
-impl Executor for Connection {
- type Backend = MySql;
+impl MySqlConnection {
+ async fn send(&mut self, query: &str) -> crate::Result<()> {
+ self.wait_for_ready().await?;
- fn ping(&mut self) -> BoxFuture> {
- Box::pin(self.conn.ping())
+ self.begin_command_phase();
+ self.ready = false;
+
+ // enable multi-statement only for this query
+ self.write(ComQuery { query });
+
+ self.stream.flush().await?;
+
+ // COM_QUERY can terminate before the result set with an ERR or OK packet
+ let num_columns = match self.receive_ok_or_column_count().await? {
+ OkOrResultSet::Ok(_) => {
+ return Ok(());
+ }
+
+ OkOrResultSet::ResultSet(cc) => cc.columns as usize,
+ };
+
+ let columns = self.receive_column_types(num_columns as usize).await?;
+
+ while let Some(step) = self.step(&columns, false).await? {
+ // Drop all responses
+ }
+
+ Ok(())
+ }
+
+ async fn execute(&mut self, query: &str, args: MySqlArguments) -> crate::Result {
+ self.wait_for_ready().await?;
+
+ let statement_id = self.prepare_with_cache(query).await?;
+
+ self.execute_statement(statement_id, args).await?;
+
+ // COM_STMT_EXECUTE can terminate before the result set with an ERR or OK packet
+ let num_columns = match self.receive_ok_or_column_count().await? {
+ OkOrResultSet::Ok(ok) => {
+ return Ok(ok.affected_rows);
+ }
+
+ OkOrResultSet::ResultSet(cc) => cc.columns as usize,
+ };
+
+ self.ignore_columns(num_columns).await?;
+
+ let mut res = 0;
+
+ while let Some(step) = self.step(&[], true).await? {
+ if let Step::Command(affected) = step {
+ res = affected;
+ }
+ }
+
+ Ok(res)
+ }
+
+ async fn describe(&mut self, query: &str) -> crate::Result> {
+ self.wait_for_ready().await?;
+
+ let prepare_ok = self.prepare(query).await?;
+
+ let mut param_types = Vec::with_capacity(prepare_ok.params as usize);
+ let mut result_columns = Vec::with_capacity(prepare_ok.columns as usize);
+
+ for _ in 0..prepare_ok.params {
+ let param = ColumnDefinition::decode(self.receive().await?)?;
+ param_types.push(param.r#type.0);
+ }
+
+ if prepare_ok.params > 0 {
+ self.receive_eof().await?;
+ }
+
+ for _ in 0..prepare_ok.columns {
+ let column = ColumnDefinition::decode(self.receive().await?)?;
+ result_columns.push(Column:: {
+ name: column.column_alias.or(column.column),
+
+ table_id: column.table_alias.or(column.table),
+
+ type_id: column.r#type.0,
+
+ _non_exhaustive: (),
+ });
+ }
+
+ if prepare_ok.columns > 0 {
+ self.receive_eof().await?;
+ }
+
+ Ok(Describe {
+ param_types: param_types.into_boxed_slice(),
+ result_columns: result_columns.into_boxed_slice(),
+
+ _non_exhaustive: (),
+ })
+ }
+
+ fn fetch<'e, 'q: 'e>(
+ &'e mut self,
+ query: &'q str,
+ args: MySqlArguments,
+ ) -> BoxStream<'e, crate::Result> {
+ Box::pin(async_stream::try_stream! {
+ self.wait_for_ready().await?;
+
+ let statement_id = self.prepare_with_cache(query).await?;
+
+ let columns = self.statement_cache.get_columns(statement_id);
+
+ self.execute_statement(statement_id, args).await?;
+
+ // COM_STMT_EXECUTE can terminate before the result set with an ERR or OK packet
+ let num_columns = match self.receive_ok_or_column_count().await? {
+ OkOrResultSet::Ok(_) => {
+ return;
+ }
+
+ OkOrResultSet::ResultSet(cc) => {
+ cc.columns as usize
+ }
+ };
+
+ let column_types = self.receive_column_types(num_columns).await?;
+
+ while let Some(Step::Row(row)) = self.step(&column_types, true).await? {
+ yield MySqlRow { row, columns: Arc::clone(&columns) };
+ }
+ })
+ }
+}
+
+impl Executor for MySqlConnection {
+ type Database = super::MySql;
+
+ fn send<'e, 'q: 'e>(&'e mut self, query: &'q str) -> BoxFuture<'e, crate::Result<()>> {
+ Box::pin(self.send(query))
}
fn execute<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- params: MySqlDbParameters,
+ args: MySqlArguments,
) -> BoxFuture<'e, crate::Result> {
- Box::pin(async move {
- let statement_id = self.prepare_cached(query).await?;
- self.conn.send_execute(statement_id, params).await?;
-
- let columns = self.conn.result_column_defs().await?;
- let capabilities = self.conn.capabilities;
-
- // For each row in the result set we will receive a ResultRow packet.
- // We may receive an [OkPacket], [EofPacket], or [ErrPacket] (depending on if EOFs are enabled) to finalize the iteration.
- let mut rows = 0u64;
- loop {
- let packet = self.conn.receive().await?;
- if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
- // NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
- // but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
- if !capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
- let _eof = EofPacket::decode(packet)?;
- } else {
- let _ok = OkPacket::decode(packet, capabilities)?;
- }
-
- break;
- } else if packet[0] == 0xFF {
- let err = ErrPacket::decode(packet)?;
- panic!("received db err = {:?}", err);
- } else {
- // Ignore result rows; exec only returns number of affected rows;
- let _ = ResultRow::decode(packet, &columns)?;
-
- // For every row we decode we increment counter
- rows = rows + 1;
- }
- }
-
- Ok(rows)
- })
+ Box::pin(self.execute(query, args))
}
- fn fetch<'e, 'q: 'e, T: 'e>(
+ fn fetch<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- params: MySqlDbParameters,
- ) -> BoxStream<'e, crate::Result>
- where
- T: FromRow + Send + Unpin,
- {
- Box::pin(async_stream::try_stream! {
- let prepare = self.prepare_cached(query).await?;
- self.conn.send_execute(prepare, params).await?;
-
- let columns = self.conn.result_column_defs().await?;
- let capabilities = self.conn.capabilities;
-
- loop {
- let packet = self.conn.receive().await?;
- if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
- // NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
- // but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
- if !capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
- let _eof = EofPacket::decode(packet)?;
- } else {
- let _ok = OkPacket::decode(packet, capabilities)?;
- }
-
- break;
- } else if packet[0] == 0xFF {
- let _err = ErrPacket::decode(packet)?;
- panic!("ErrPacket received");
- } else {
- let row = ResultRow::decode(packet, &columns)?;
- yield FromRow::from_row(row);
- }
- }
- })
- }
-
- fn fetch_optional<'e, 'q: 'e, T: 'e>(
- &'e mut self,
- query: &'q str,
- params: MySqlDbParameters,
- ) -> BoxFuture<'e, crate::Result>>
- where
- T: FromRow + Send,
- {
- Box::pin(async move {
- let statement_id = self.prepare_cached(query).await?;
- self.conn.send_execute(statement_id, params).await?;
-
- let columns = self.conn.result_column_defs().await?;
- let capabilities = self.conn.capabilities;
-
- let mut row = None;
-
- loop {
- let packet = self.conn.receive().await?;
-
- if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
- // NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
- // but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
- if !capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
- let _eof = EofPacket::decode(packet)?;
- } else {
- let _ok = OkPacket::decode(packet, capabilities)?;
- }
-
- break;
- } else if packet[0] == 0xFF {
- let _err = ErrPacket::decode(packet)?;
- panic!("Received error packet: {:?}", _err);
- } else {
- row = Some(FromRow::from_row(ResultRow::decode(packet, &columns)?));
- }
- }
-
- Ok(row)
- })
+ args: MySqlArguments,
+ ) -> BoxStream<'e, crate::Result> {
+ self.fetch(query, args)
}
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
- ) -> BoxFuture<'e, crate::Result>> {
- Box::pin(self.conn.prepare_describe(query))
- }
-
- fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
- Box::pin(self.conn.send_raw(commands))
+ ) -> BoxFuture<'e, crate::Result>> {
+ Box::pin(self.describe(query))
}
}
diff --git a/sqlx-core/src/mysql/io/buf_ext.rs b/sqlx-core/src/mysql/io/buf_ext.rs
index 55786d4b..fc01624b 100644
--- a/sqlx-core/src/mysql/io/buf_ext.rs
+++ b/sqlx-core/src/mysql/io/buf_ext.rs
@@ -1,52 +1,35 @@
-use crate::io::Buf;
-use byteorder::ByteOrder;
use std::io;
+use byteorder::ByteOrder;
+
+use crate::io::Buf;
+
pub trait BufExt {
- fn get_uint(&mut self, n: usize) -> io::Result;
fn get_uint_lenenc(&mut self) -> io::Result>;
- fn get_str_eof(&mut self) -> io::Result<&str>;
+
fn get_str_lenenc(&mut self) -> io::Result>;
- fn get_bytes(&mut self, n: usize) -> io::Result<&[u8]>;
+
fn get_bytes_lenenc(&mut self) -> io::Result>;
}
-impl<'a> BufExt for &'a [u8] {
- fn get_uint(&mut self, n: usize) -> io::Result {
- let val = T::read_uint(*self, n);
- self.advance(n);
-
- Ok(val)
- }
-
+impl BufExt for &'_ [u8] {
fn get_uint_lenenc(&mut self) -> io::Result> {
Ok(match self.get_u8()? {
0xFB => None,
0xFC => Some(u64::from(self.get_u16::()?)),
0xFD => Some(u64::from(self.get_u24::()?)),
0xFE => Some(self.get_u64::()?),
- // ? 0xFF => panic!("int unprocessable first byte 0xFF"),
+
value => Some(u64::from(value)),
})
}
- fn get_str_eof(&mut self) -> io::Result<&str> {
- self.get_str(self.len())
- }
-
fn get_str_lenenc(&mut self) -> io::Result> {
self.get_uint_lenenc::()?
.map(move |len| self.get_str(len as usize))
.transpose()
}
- fn get_bytes(&mut self, n: usize) -> io::Result<&[u8]> {
- let buf = &self[..n];
- self.advance(n);
-
- Ok(buf)
- }
-
fn get_bytes_lenenc(&mut self) -> io::Result> {
self.get_uint_lenenc::()?
.map(move |len| self.get_bytes(len as usize))
diff --git a/sqlx-core/src/mysql/io/buf_mut_ext.rs b/sqlx-core/src/mysql/io/buf_mut_ext.rs
index 4c1e2f34..332c6f8c 100644
--- a/sqlx-core/src/mysql/io/buf_mut_ext.rs
+++ b/sqlx-core/src/mysql/io/buf_mut_ext.rs
@@ -1,16 +1,14 @@
-use crate::io::BufMut;
-use byteorder::ByteOrder;
use std::{u16, u32, u64, u8};
+use byteorder::ByteOrder;
+
+use crate::io::BufMut;
+
pub trait BufMutExt {
fn put_uint_lenenc>>(&mut self, val: U);
fn put_str_lenenc(&mut self, val: &str);
- fn put_str(&mut self, val: &str);
-
- fn put_bytes(&mut self, val: &[u8]);
-
fn put_bytes_lenenc(&mut self, val: &[u8]);
}
@@ -49,23 +47,11 @@ impl BufMutExt for Vec {
}
}
- #[inline]
- fn put_str(&mut self, val: &str) {
- self.put_bytes(val.as_bytes());
- }
-
- #[inline]
fn put_str_lenenc(&mut self, val: &str) {
self.put_uint_lenenc::(val.len() as u64);
self.extend_from_slice(val.as_bytes());
}
- #[inline]
- fn put_bytes(&mut self, val: &[u8]) {
- self.extend_from_slice(val);
- }
-
- #[inline]
fn put_bytes_lenenc(&mut self, val: &[u8]) {
self.put_uint_lenenc::(val.len() as u64);
self.extend_from_slice(val);
@@ -74,28 +60,9 @@ impl BufMutExt for Vec {
#[cfg(test)]
mod tests {
- use super::BufMutExt;
- use crate::io::BufMut;
+ use super::{BufMut, BufMutExt};
use byteorder::LittleEndian;
- // [X] it_encodes_int_lenenc_u64
- // [X] it_encodes_int_lenenc_u32
- // [X] it_encodes_int_lenenc_u24
- // [X] it_encodes_int_lenenc_u16
- // [X] it_encodes_int_lenenc_u8
- // [X] it_encodes_int_u64
- // [X] it_encodes_int_u32
- // [X] it_encodes_int_u24
- // [X] it_encodes_int_u16
- // [X] it_encodes_int_u8
- // [X] it_encodes_string_lenenc
- // [X] it_encodes_string_fix
- // [X] it_encodes_string_null
- // [X] it_encodes_string_eof
- // [X] it_encodes_byte_lenenc
- // [X] it_encodes_byte_fix
- // [X] it_encodes_byte_eof
-
#[test]
fn it_encodes_int_lenenc_none() {
let mut buf = Vec::with_capacity(1024);
diff --git a/sqlx-core/src/mysql/io/mod.rs b/sqlx-core/src/mysql/io/mod.rs
index 11f11f39..a8867b15 100644
--- a/sqlx-core/src/mysql/io/mod.rs
+++ b/sqlx-core/src/mysql/io/mod.rs
@@ -1,5 +1,5 @@
-pub mod buf_ext;
-pub mod buf_mut_ext;
+mod buf_ext;
+mod buf_mut_ext;
pub use buf_ext::BufExt;
pub use buf_mut_ext::BufMutExt;
diff --git a/sqlx-core/src/mysql/mod.rs b/sqlx-core/src/mysql/mod.rs
index 98f83f4c..3b8b00da 100644
--- a/sqlx-core/src/mysql/mod.rs
+++ b/sqlx-core/src/mysql/mod.rs
@@ -1,36 +1,17 @@
-mod backend;
+//! **MySQL** database and connection types.
+
+mod arguments;
mod connection;
+mod database;
mod error;
-mod establish;
mod executor;
mod io;
mod protocol;
-mod query;
mod row;
-pub mod types;
+mod types;
-use self::connection::Connection as RawConnection;
-use crate::cache::StatementCache;
-use futures_core::future::BoxFuture;
-use crate::Backend;
-
-/// Backend for MySQL.
-pub enum MySql {}
-
-impl MySql {
- /// An alias for [Backend::connect()](../trait.Backend.html#method.connect)
- pub async fn connect(url: &str) -> crate::Result {
- ::connect(url).await
- }
-}
-
-pub struct Connection {
- conn: RawConnection,
- cache: StatementCache