From bd013ae375ba19b83163ce1f51ecf4b006577a78 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Sun, 2 Aug 2020 09:45:04 -0700 Subject: [PATCH] refactor: start by moving chunks of core into a core2 module --- Cargo.lock | 10 + Cargo.toml | 1 + sqlx-core/src/database.rs | 162 ------------- sqlx-core/src/error.rs | 246 -------------------- sqlx-core/src/io/buf_stream.rs | 136 ----------- sqlx-core/src/io/decode.rs | 29 --- sqlx-core/src/io/encode.rs | 16 -- sqlx-core/src/io/mod.rs | 7 - sqlx-core/src/io/write_and_flush.rs | 45 ---- sqlx-core2/Cargo.toml | 21 ++ {sqlx-core => sqlx-core2}/src/connection.rs | 60 +---- sqlx-core2/src/database.rs | 157 +++++++++++++ sqlx-core2/src/error.rs | 79 +++++++ sqlx-core2/src/io/buf_stream.rs | 124 ++++++++++ sqlx-core2/src/io/decode.rs | 23 ++ sqlx-core2/src/io/encode.rs | 24 ++ sqlx-core2/src/io/mod.rs | 9 + sqlx-core2/src/lib.rs | 8 + sqlx-core2/src/options.rs | 24 ++ 19 files changed, 486 insertions(+), 695 deletions(-) delete mode 100644 sqlx-core/src/database.rs delete mode 100644 sqlx-core/src/error.rs delete mode 100644 sqlx-core/src/io/buf_stream.rs delete mode 100644 sqlx-core/src/io/decode.rs delete mode 100644 sqlx-core/src/io/encode.rs delete mode 100644 sqlx-core/src/io/write_and_flush.rs create mode 100644 sqlx-core2/Cargo.toml rename {sqlx-core => sqlx-core2}/src/connection.rs (51%) create mode 100644 sqlx-core2/src/database.rs create mode 100644 sqlx-core2/src/error.rs create mode 100644 sqlx-core2/src/io/buf_stream.rs create mode 100644 sqlx-core2/src/io/decode.rs create mode 100644 sqlx-core2/src/io/encode.rs create mode 100644 sqlx-core2/src/io/mod.rs create mode 100644 sqlx-core2/src/lib.rs create mode 100644 sqlx-core2/src/options.rs diff --git a/Cargo.lock b/Cargo.lock index ff85040eb..d15a3b497 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2089,6 +2089,16 @@ dependencies = [ "whoami", ] +[[package]] +name = "sqlx-core2" +version = "0.4.0-beta.2" +dependencies = [ + "bytes", + "futures-core", + "sqlx-rt", + "thiserror", +] + [[package]] name = "sqlx-example-mysql-todos" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 9969fd913..928206924 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ ".", "sqlx-core", + "sqlx-core2", "sqlx-rt", "sqlx-macros", "sqlx-test", diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs deleted file mode 100644 index 0aa224214..000000000 --- a/sqlx-core/src/database.rs +++ /dev/null @@ -1,162 +0,0 @@ -//! Traits to represent a database driver. -//! -//! # Support -//! -//! ## Tier 1 -//! -//! Tier 1 support can be thought of as "guaranteed to work". Automated testing is setup to -//! ensure a high level of stability and functionality. -//! -//! | Database | Version | Driver | -//! | - | - | - | -//! | [MariaDB] | 10.1+ | [`mysql`] | -//! | [Microsoft SQL Server] | 2019 | [`mssql`] | -//! | [MySQL] | 5.6, 5.7, 8.0 | [`mysql`] | -//! | [PostgreSQL] | 9.5+ | [`postgres`] | -//! | [SQLite] | 3.20.1+ | [`sqlite`] | -//! -//! [MariaDB]: https://mariadb.com/ -//! [MySQL]: https://www.mysql.com/ -//! [Microsoft SQL Server]: https://www.microsoft.com/en-us/sql-server -//! [PostgreSQL]: https://www.postgresql.org/ -//! [SQLite]: https://www.sqlite.org/ -//! -//! [`mysql`]: ../sqlite/index.html -//! [`postgres`]: ../postgres/index.html -//! [`mssql`]: ../mssql/index.html -//! [`sqlite`]: ../sqlite/index.html -//! -//! ## Tier 2 -//! -//! Tier 2 support can be thought as "should work". No specific automated testing is done, -//! at this time, but there are efforts to ensure compatibility. Tier 2 support also includes -//! database distributions that provide protocols that closely match a database from Tier 1. -//! -//! _No databases are in tier 2 at this time._ -//! -//! # `Any` -//! -//! Selecting a database driver is, by default, a compile-time decision. SQLx is designed this way -//! to take full advantage of the performance and type safety made available by Rust. -//! -//! We recognize that you may wish to make a runtime decision to decide the database driver. The -//! [`Any`] driver is provided for that purpose. -//! -//! ## Example -//! -//! ```rust,ignore -//! // connect to SQLite -//! let conn = AnyConnection::connect("sqlite://file.db").await?; -//! -//! // connect to Postgres, no code change -//! // required, decided by the scheme of the URI -//! let conn = AnyConnection::connect("postgres://localhost/sqlx").await?; -//! ``` -//! -//! [`Any`]: ../any/index.html -//! - -use std::fmt::Debug; - -use crate::arguments::Arguments; -use crate::column::Column; -use crate::connection::Connection; -use crate::done::Done; -use crate::row::Row; -use crate::statement::Statement; -use crate::transaction::TransactionManager; -use crate::type_info::TypeInfo; -use crate::value::{Value, ValueRef}; - -/// A database driver. -/// -/// This trait encapsulates a complete set of traits that implement a driver for a -/// specific database (e.g., MySQL, PostgreSQL). -pub trait Database: - 'static - + Sized - + Send - + Debug - + for<'r> HasValueRef<'r, Database = Self> - + for<'q> HasArguments<'q, Database = Self> - + for<'q> HasStatement<'q, Database = Self> -{ - /// The concrete `Connection` implementation for this database. - type Connection: Connection; - - /// The concrete `TransactionManager` implementation for this database. - type TransactionManager: TransactionManager; - - /// The concrete `Row` implementation for this database. - type Row: Row; - - /// The concrete `Done` implementation for this database. - type Done: Done; - - /// The concrete `Column` implementation for this database. - type Column: Column; - - /// The concrete `TypeInfo` implementation for this database. - type TypeInfo: TypeInfo; - - /// The concrete type used to hold an owned copy of the not-yet-decoded value that was - /// received from the database. - type Value: Value + 'static; -} - -/// Associate [`Database`] with a [`ValueRef`](crate::value::ValueRef) of a generic lifetime. -/// -/// --- -/// -/// The upcoming Rust feature, [Generic Associated Types], should obviate -/// the need for this trait. -/// -/// [`Database`]: trait.Database.html -/// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265 -pub trait HasValueRef<'r> { - type Database: Database; - - /// The concrete type used to hold a reference to the not-yet-decoded value that has just been - /// received from the database. - type ValueRef: ValueRef<'r, Database = Self::Database>; -} - -/// Associate [`Database`] with an [`Arguments`](crate::arguments::Arguments) of a generic lifetime. -/// -/// --- -/// -/// The upcoming Rust feature, [Generic Associated Types], should obviate -/// the need for this trait. -/// -/// [`Database`]: trait.Database.html -/// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265 -pub trait HasArguments<'q> { - type Database: Database; - - /// The concrete `Arguments` implementation for this database. - type Arguments: Arguments<'q, Database = Self::Database>; - - /// The concrete type used as a buffer for arguments while encoding. - type ArgumentBuffer; -} - -/// Associate [`Database`] with a [`Statement`](crate::statement::Statement) of a generic lifetime. -/// -/// --- -/// -/// The upcoming Rust feature, [Generic Associated Types], should obviate -/// the need for this trait. -/// -/// [`Database`]: trait.Database.html -/// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265 -pub trait HasStatement<'q> { - type Database: Database; - - /// The concrete `Statement` implementation for this database. - type Statement: Statement<'q, Database = Self::Database>; -} - -/// A [`Database`] that maintains a client-side cache of prepared statements. -/// -/// [`Database`]: trait.Database.html -pub trait HasStatementCache {} diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs deleted file mode 100644 index ab164cd4b..000000000 --- a/sqlx-core/src/error.rs +++ /dev/null @@ -1,246 +0,0 @@ -//! Types for working with errors produced by SQLx. - -use std::any::type_name; -use std::borrow::Cow; -use std::error::Error as StdError; -use std::fmt::Display; -use std::io; -use std::result::Result as StdResult; - -use crate::database::Database; -use crate::type_info::TypeInfo; -use crate::types::Type; - -/// A specialized `Result` type for SQLx. -pub type Result = StdResult; - -// Convenience type alias for usage within SQLx. -// Do not make this type public. -pub type BoxDynError = Box; - -/// An unexpected `NULL` was encountered during decoding. -/// -/// Returned from [`Row::get`](sqlx_core::row::Row::get) if the value from the database is `NULL`, -/// and you are not decoding into an `Option`. -#[derive(thiserror::Error, Debug)] -#[error("unexpected null; try decoding as an `Option`")] -pub struct UnexpectedNullError; - -/// Represents all the ways a method can fail within SQLx. -#[derive(Debug, thiserror::Error)] -#[non_exhaustive] -pub enum Error { - /// Error occurred while parsing a connection string. - #[error("error with configuration: {0}")] - Configuration(#[source] BoxDynError), - - /// Error returned from the database. - #[error("error returned from database: {0}")] - Database(Box), - - /// Error communicating with the database backend. - #[error("error communicating with the server: {0}")] - Io(#[from] io::Error), - - /// Error occurred while attempting to establish a TLS connection. - #[error("error occurred while attempting to establish a TLS connection: {0}")] - Tls(#[source] BoxDynError), - - /// Unexpected or invalid data encountered while communicating with the database. - /// - /// This should indicate there is a programming error in a SQLx driver or there - /// is something corrupted with the connection to the database itself. - #[error("encountered unexpected or invalid data: {0}")] - Protocol(String), - - /// No rows returned by a query that expected to return at least one row. - #[error("no rows returned by a query that expected to return at least one row")] - RowNotFound, - - /// Column index was out of bounds. - #[error("column index out of bounds: the len is {len}, but the index is {index}")] - ColumnIndexOutOfBounds { index: usize, len: usize }, - - /// No column found for the given name. - #[error("no column found for name: {0}")] - ColumnNotFound(String), - - /// Error occurred while decoding a value from a specific column. - #[error("error occurred while decoding column {index}: {source}")] - ColumnDecode { - index: String, - - #[source] - source: BoxDynError, - }, - - /// Error occurred while decoding a value. - #[error("error occurred while decoding: {0}")] - Decode(#[source] BoxDynError), - - /// A [`Pool::acquire`] timed out due to connections not becoming available or - /// because another task encountered too many errors while trying to open a new connection. - /// - /// [`Pool::acquire`]: crate::pool::Pool::acquire - #[error("pool timed out while waiting for an open connection")] - PoolTimedOut, - - /// [`Pool::close`] was called while we were waiting in [`Pool::acquire`]. - /// - /// [`Pool::acquire`]: crate::pool::Pool::acquire - /// [`Pool::close`]: crate::pool::Pool::close - #[error("attempted to acquire a connection on a closed pool")] - PoolClosed, - - #[cfg(feature = "migrate")] - #[error("{0}")] - Migrate(#[source] Box), -} - -impl Error { - pub fn into_database_error(self) -> Option> { - match self { - Error::Database(err) => Some(err), - _ => None, - } - } - - pub fn as_database_error(&self) -> Option<&(dyn DatabaseError + 'static)> { - match self { - Error::Database(err) => Some(&**err), - _ => None, - } - } - - #[allow(dead_code)] - #[inline] - pub(crate) fn protocol(err: impl Display) -> Self { - Error::Protocol(err.to_string()) - } - - #[allow(dead_code)] - #[inline] - pub(crate) fn config(err: impl StdError + Send + Sync + 'static) -> Self { - Error::Configuration(err.into()) - } - - #[allow(dead_code)] - #[inline] - pub(crate) fn tls(err: impl StdError + Send + Sync + 'static) -> Self { - Error::Tls(err.into()) - } -} - -pub(crate) fn mismatched_types>(ty: &DB::TypeInfo) -> BoxDynError { - // TODO: `#name` only produces `TINYINT` but perhaps we want to show `TINYINT(1)` - format!( - "mismatched types; Rust type `{}` (as SQL type `{}`) is not compatible with SQL type `{}`", - type_name::(), - T::type_info().name(), - ty.name() - ) - .into() -} - -/// An error that was returned from the database. -pub trait DatabaseError: 'static + Send + Sync + StdError { - /// The primary, human-readable error message. - fn message(&self) -> &str; - - /// The (SQLSTATE) code for the error. - fn code(&self) -> Option> { - None - } - - #[doc(hidden)] - fn as_error(&self) -> &(dyn StdError + Send + Sync + 'static); - - #[doc(hidden)] - fn as_error_mut(&mut self) -> &mut (dyn StdError + Send + Sync + 'static); - - #[doc(hidden)] - fn into_error(self: Box) -> Box; -} - -impl dyn DatabaseError { - /// Downcast a reference to this generic database error to a specific - /// database error type. - /// - /// # Panics - /// - /// Panics if the database error type is not `E`. This is a deliberate contrast from - /// `Error::downcast_ref` which returns `Option<&E>`. In normal usage, you should know the - /// specific error type. In other cases, use `try_downcast_ref`. - /// - pub fn downcast_ref(&self) -> &E { - self.try_downcast_ref().unwrap_or_else(|| { - panic!( - "downcast to wrong DatabaseError type; original error: {}", - self - ) - }) - } - - /// Downcast this generic database error to a specific database error type. - /// - /// # Panics - /// - /// Panics if the database error type is not `E`. This is a deliberate contrast from - /// `Error::downcast` which returns `Option`. In normal usage, you should know the - /// specific error type. In other cases, use `try_downcast`. - /// - pub fn downcast(self: Box) -> Box { - self.try_downcast().unwrap_or_else(|e| { - panic!( - "downcast to wrong DatabaseError type; original error: {}", - e - ) - }) - } - - /// Downcast a reference to this generic database error to a specific - /// database error type. - #[inline] - pub fn try_downcast_ref(&self) -> Option<&E> { - self.as_error().downcast_ref() - } - - /// Downcast this generic database error to a specific database error type. - #[inline] - pub fn try_downcast(self: Box) -> StdResult, Box> { - if self.as_error().is::() { - Ok(self.into_error().downcast().unwrap()) - } else { - Err(self) - } - } -} - -impl From for Error -where - E: DatabaseError, -{ - #[inline] - fn from(error: E) -> Self { - Error::Database(Box::new(error)) - } -} - -#[cfg(feature = "migrate")] -impl From for Error { - #[inline] - fn from(error: crate::migrate::MigrateError) -> Self { - Error::Migrate(Box::new(error)) - } -} - -// Format an error message as a `Protocol` error -macro_rules! err_protocol { - ($expr:expr) => { - $crate::error::Error::Protocol($expr.into()) - }; - - ($fmt:expr, $($arg:tt)*) => { - $crate::error::Error::Protocol(format!($fmt, $($arg)*)) - }; -} diff --git a/sqlx-core/src/io/buf_stream.rs b/sqlx-core/src/io/buf_stream.rs deleted file mode 100644 index 4646cb7a2..000000000 --- a/sqlx-core/src/io/buf_stream.rs +++ /dev/null @@ -1,136 +0,0 @@ -#![allow(dead_code)] - -use std::io; -use std::ops::{Deref, DerefMut}; - -use bytes::BytesMut; -use sqlx_rt::{AsyncRead, AsyncReadExt, AsyncWrite}; - -use crate::error::Error; -use crate::io::write_and_flush::WriteAndFlush; -use crate::io::{decode::Decode, encode::Encode}; -use std::io::Cursor; - -pub struct BufStream -where - S: AsyncRead + AsyncWrite + Unpin, -{ - stream: S, - - // writes with `write` to the underlying stream are buffered - // this can be flushed with `flush` - pub(crate) wbuf: Vec, - - // we read into the read buffer using 100% safe code - rbuf: BytesMut, -} - -impl BufStream -where - S: AsyncRead + AsyncWrite + Unpin, -{ - pub fn new(stream: S) -> Self { - Self { - stream, - wbuf: Vec::with_capacity(512), - rbuf: BytesMut::with_capacity(4096), - } - } - - pub fn write<'en, T>(&mut self, value: T) - where - T: Encode<'en, ()>, - { - self.write_with(value, ()) - } - - pub fn write_with<'en, T, C>(&mut self, value: T, context: C) - where - T: Encode<'en, C>, - { - value.encode_with(&mut self.wbuf, context); - } - - pub fn flush(&mut self) -> WriteAndFlush<'_, S> { - WriteAndFlush { - stream: &mut self.stream, - buf: Cursor::new(&mut self.wbuf), - } - } - - pub async fn read<'de, T>(&mut self, cnt: usize) -> Result - where - T: Decode<'de, ()>, - { - self.read_with(cnt, ()).await - } - - pub async fn read_with<'de, T, C>(&mut self, cnt: usize, context: C) -> Result - where - T: Decode<'de, C>, - { - T::decode_with(self.read_raw(cnt).await?.freeze(), context) - } - - pub async fn read_raw(&mut self, cnt: usize) -> Result { - read_raw_into(&mut self.stream, &mut self.rbuf, cnt).await?; - let buf = self.rbuf.split_to(cnt); - - Ok(buf) - } - - pub async fn read_raw_into(&mut self, buf: &mut BytesMut, cnt: usize) -> Result<(), Error> { - read_raw_into(&mut self.stream, buf, cnt).await - } -} - -impl Deref for BufStream -where - S: AsyncRead + AsyncWrite + Unpin, -{ - type Target = S; - - fn deref(&self) -> &Self::Target { - &self.stream - } -} - -impl DerefMut for BufStream -where - S: AsyncRead + AsyncWrite + Unpin, -{ - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.stream - } -} - -async fn read_raw_into( - stream: &mut S, - buf: &mut BytesMut, - cnt: usize, -) -> Result<(), Error> { - let offset = buf.len(); - - // zero-fills the space in the read buffer - buf.resize(offset + cnt, 0); - - let mut read = offset; - while (offset + cnt) > read { - // read in bytes from the stream into the read buffer starting - // from the offset we last read from - let n = stream.read(&mut buf[read..]).await?; - - if n == 0 { - // a zero read when we had space in the read buffer - // should be treated as an EOF - - // and an unexpected EOF means the server told us to go away - - return Err(io::Error::from(io::ErrorKind::ConnectionAborted).into()); - } - - read += n; - } - - Ok(()) -} diff --git a/sqlx-core/src/io/decode.rs b/sqlx-core/src/io/decode.rs deleted file mode 100644 index 2f397127b..000000000 --- a/sqlx-core/src/io/decode.rs +++ /dev/null @@ -1,29 +0,0 @@ -use bytes::Bytes; - -use crate::error::Error; - -pub trait Decode<'de, Context = ()> -where - Self: Sized, -{ - fn decode(buf: Bytes) -> Result - where - Self: Decode<'de, ()>, - { - Self::decode_with(buf, ()) - } - - fn decode_with(buf: Bytes, context: Context) -> Result; -} - -impl Decode<'_> for Bytes { - fn decode_with(buf: Bytes, _: ()) -> Result { - Ok(buf) - } -} - -impl Decode<'_> for () { - fn decode_with(_: Bytes, _: ()) -> Result<(), Error> { - Ok(()) - } -} diff --git a/sqlx-core/src/io/encode.rs b/sqlx-core/src/io/encode.rs deleted file mode 100644 index a417ef9eb..000000000 --- a/sqlx-core/src/io/encode.rs +++ /dev/null @@ -1,16 +0,0 @@ -pub trait Encode<'en, Context = ()> { - fn encode(&self, buf: &mut Vec) - where - Self: Encode<'en, ()>, - { - self.encode_with(buf, ()); - } - - fn encode_with(&self, buf: &mut Vec, context: Context); -} - -impl<'en, C> Encode<'en, C> for &'_ [u8] { - fn encode_with(&self, buf: &mut Vec, _: C) { - buf.extend_from_slice(self); - } -} diff --git a/sqlx-core/src/io/mod.rs b/sqlx-core/src/io/mod.rs index f99496540..c89902ce5 100644 --- a/sqlx-core/src/io/mod.rs +++ b/sqlx-core/src/io/mod.rs @@ -1,12 +1,5 @@ mod buf; mod buf_mut; -mod buf_stream; -mod decode; -mod encode; -mod write_and_flush; pub use buf::BufExt; pub use buf_mut::BufMutExt; -pub use buf_stream::BufStream; -pub use decode::Decode; -pub use encode::Encode; diff --git a/sqlx-core/src/io/write_and_flush.rs b/sqlx-core/src/io/write_and_flush.rs deleted file mode 100644 index 9e7824af8..000000000 --- a/sqlx-core/src/io/write_and_flush.rs +++ /dev/null @@ -1,45 +0,0 @@ -use crate::error::Error; -use futures_core::Future; -use futures_util::ready; -use sqlx_rt::AsyncWrite; -use std::io::{BufRead, Cursor}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -// Atomic operation that writes the full buffer to the stream, flushes the stream, and then -// clears the buffer (even if either of the two previous operations failed). -pub struct WriteAndFlush<'a, S> { - pub(super) stream: &'a mut S, - pub(super) buf: Cursor<&'a mut Vec>, -} - -impl Future for WriteAndFlush<'_, S> { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - ref mut stream, - ref mut buf, - } = *self; - - loop { - let read = buf.fill_buf()?; - - if !read.is_empty() { - let written = ready!(Pin::new(&mut *stream).poll_write(cx, read)?); - buf.consume(written); - } else { - break; - } - } - - Pin::new(stream).poll_flush(cx).map_err(Error::Io) - } -} - -impl<'a, S> Drop for WriteAndFlush<'a, S> { - fn drop(&mut self) { - // clear the buffer regardless of whether the flush succeeded or not - self.buf.get_mut().clear(); - } -} diff --git a/sqlx-core2/Cargo.toml b/sqlx-core2/Cargo.toml new file mode 100644 index 000000000..2a759ed75 --- /dev/null +++ b/sqlx-core2/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "sqlx-core2" +version = "0.4.0-beta.2" +edition = "2018" +authors = [ + "Ryan Leckey ", + "Austin Bonander ", + "Chloe Ross ", + "Daniel Akhterov ", +] + +[features] +runtime-async-std = [ "sqlx-rt/runtime-async-std" ] +runtime-tokio = [ "sqlx-rt/runtime-tokio" ] +runtime-actix = [ "sqlx-rt/runtime-actix" ] + +[dependencies] +bytes = "0.5.6" +thiserror = "1.0.20" +sqlx-rt = { path = "../sqlx-rt", version = "0.1.1" } +futures-core = "0.3.5" diff --git a/sqlx-core/src/connection.rs b/sqlx-core2/src/connection.rs similarity index 51% rename from sqlx-core/src/connection.rs rename to sqlx-core2/src/connection.rs index 2fe8b7ca8..5ac8de7a9 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core2/src/connection.rs @@ -1,10 +1,11 @@ -use crate::database::{Database, HasStatementCache}; +//! Provides the [`Connection`] trait to represent a single database connection. +use crate::database::HasStatementCache; use crate::error::Error; -use crate::transaction::Transaction; +use crate::{database::Database, options::ConnectOptions}; use futures_core::future::BoxFuture; -use futures_core::Future; -use std::fmt::Debug; -use std::str::FromStr; + +// TODO: Connection#transaction() +// TODO: Connection#begin() /// Represents a single database connection. pub trait Connection: Send { @@ -22,46 +23,6 @@ pub trait Connection: Send { /// Checks if a connection to the database is still valid. fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>; - /// Begin a new transaction or establish a savepoint within the active transaction. - /// - /// Returns a [`Transaction`] for controlling and tracking the new transaction. - fn begin(&mut self) -> BoxFuture<'_, Result, Error>> - where - Self: Sized; - - /// Execute the function inside a transaction. - /// - /// If the function returns an error, the transaction will be rolled back. If it does not - /// return an error, the transaction will be committed. - fn transaction<'c: 'f, 'f, T, E, F, Fut>(&'c mut self, f: F) -> BoxFuture<'f, Result> - where - Self: Sized, - T: Send, - F: FnOnce(&mut ::Connection) -> Fut + Send + 'f, - E: From + Send, - Fut: Future> + Send, - { - Box::pin(async move { - let mut tx = self.begin().await?; - - match f(&mut tx).await { - Ok(r) => { - // no error occurred, commit the transaction - tx.commit().await?; - - Ok(r) - } - - Err(e) => { - // an error occurred, rollback the transaction - tx.rollback().await?; - - Err(e) - } - } - }) - } - /// The number of statements currently cached in the connection. fn cached_statements_size(&self) -> usize where @@ -107,12 +68,3 @@ pub trait Connection: Send { options.connect() } } - -pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug { - type Connection: Connection + ?Sized; - - /// Establish a new database connection with the options specified by `self`. - fn connect(&self) -> BoxFuture<'_, Result> - where - Self::Connection: Sized; -} diff --git a/sqlx-core2/src/database.rs b/sqlx-core2/src/database.rs new file mode 100644 index 000000000..4672db6b6 --- /dev/null +++ b/sqlx-core2/src/database.rs @@ -0,0 +1,157 @@ +//! Provides the [`Database`] trait and other associated traits to represent a database driver. +//! +//! # Support +//! +//! ## Tier 1 +//! +//! Tier 1 support can be thought of as "guaranteed to work". Automated testing is setup to +//! ensure a high level of stability and functionality. +//! +//! | Database | Version | Driver | +//! | - | - | - | +//! | [MariaDB] | 10.1+ | [`mysql`] | +//! | [Microsoft SQL Server] | 2019 | [`mssql`] | +//! | [MySQL] | 5.6, 5.7, 8.0 | [`mysql`] | +//! | [PostgreSQL] | 9.5+ | [`postgres`] | +//! | [SQLite] | 3.20.1+ | [`sqlite`] | +//! +//! [MariaDB]: https://mariadb.com/ +//! [MySQL]: https://www.mysql.com/ +//! [Microsoft SQL Server]: https://www.microsoft.com/en-us/sql-server +//! [PostgreSQL]: https://www.postgresql.org/ +//! [SQLite]: https://www.sqlite.org/ +//! +//! [`mysql`]: ../sqlite/index.html +//! [`postgres`]: ../postgres/index.html +//! [`mssql`]: ../mssql/index.html +//! [`sqlite`]: ../sqlite/index.html +//! +//! ## Tier 2 +//! +//! Tier 2 support can be thought as "should work". No specific automated testing is done, +//! at this time, but there are efforts to ensure compatibility. Tier 2 support also includes +//! database distributions that provide protocols that closely match a database from Tier 1. +//! +//! _No databases are in tier 2 at this time._ +//! +//! # `Any` +//! +//! Selecting a database driver is, by default, a compile-time decision. SQLx is designed this way +//! to take full advantage of the performance and type safety made available by Rust. +//! +//! We recognize that you may wish to make a runtime decision to decide the database driver. The +//! [`Any`] driver is provided for that purpose. +//! +//! ## Example +//! +//! ```rust,ignore +//! // connect to SQLite +//! let conn = AnyConnection::connect("sqlite://file.db").await?; +//! +//! // connect to Postgres, no code change +//! // required, decided by the scheme of the URI +//! let conn = AnyConnection::connect("postgres://localhost/sqlx").await?; +//! ``` +//! +//! [`Any`]: ../any/index.html +//! + +// use crate::arguments::Arguments; +// use crate::column::Column; +use crate::connection::Connection; +// use crate::done::Done; +// use crate::row::Row; +// use crate::statement::Statement; +// use crate::transaction::TransactionManager; +// use crate::type_info::TypeInfo; +// use crate::value::{Value, ValueRef}; +use std::fmt::Debug; + +/// A database driver. +/// +/// This trait encapsulates a complete set of traits that implement a driver for a +/// specific database (e.g., MySQL, PostgreSQL). +pub trait Database: 'static + Sized + Send + Debug +// + for<'r> HasValueRef<'r, Database = Self> +// + for<'q> HasArguments<'q, Database = Self> +// + for<'q> HasStatement<'q, Database = Self> +{ + /// The concrete `Connection` implementation for this database. + type Connection: Connection; + + // /// The concrete `TransactionManager` implementation for this database. + // type TransactionManager: TransactionManager; + + // /// The concrete `Row` implementation for this database. + // type Row: Row; + // + // /// The concrete `Done` implementation for this database. + // type Done: Done; + // + // /// The concrete `Column` implementation for this database. + // type Column: Column; + // + // /// The concrete `TypeInfo` implementation for this database. + // type TypeInfo: TypeInfo; + + // /// The concrete type used to hold an owned copy of the not-yet-decoded value that was + // /// received from the database. + // type Value: Value + 'static; +} + +// /// Associate [`Database`] with a [`ValueRef`](crate::value::ValueRef) of a generic lifetime. +// /// +// /// --- +// /// +// /// The upcoming Rust feature, [Generic Associated Types], should obviate +// /// the need for this trait. +// /// +// /// [`Database`]: trait.Database.html +// /// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265 +// pub trait HasValueRef<'r> { +// type Database: Database; +// +// /// The concrete type used to hold a reference to the not-yet-decoded value that has just been +// /// received from the database. +// type ValueRef: ValueRef<'r, Database = Self::Database>; +// } + +// /// Associate [`Database`] with an [`Arguments`](crate::arguments::Arguments) of a generic lifetime. +// /// +// /// --- +// /// +// /// The upcoming Rust feature, [Generic Associated Types], should obviate +// /// the need for this trait. +// /// +// /// [`Database`]: trait.Database.html +// /// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265 +// pub trait HasArguments<'q> { +// type Database: Database; +// +// /// The concrete `Arguments` implementation for this database. +// type Arguments: Arguments<'q, Database = Self::Database>; +// +// /// The concrete type used as a buffer for arguments while encoding. +// type ArgumentBuffer; +// } +// +// /// Associate [`Database`] with a [`Statement`](crate::statement::Statement) of a generic lifetime. +// /// +// /// --- +// /// +// /// The upcoming Rust feature, [Generic Associated Types], should obviate +// /// the need for this trait. +// /// +// /// [`Database`]: trait.Database.html +// /// [Generic Associated Types]: https://github.com/rust-lang/rust/issues/44265 +// pub trait HasStatement<'q> { +// type Database: Database; +// +// /// The concrete `Statement` implementation for this database. +// type Statement: Statement<'q, Database = Self::Database>; +// } + +/// A [`Database`] that maintains a client-side cache of prepared statements. +/// +/// [`Database`]: trait.Database.html +pub trait HasStatementCache {} diff --git a/sqlx-core2/src/error.rs b/sqlx-core2/src/error.rs new file mode 100644 index 000000000..215feaee4 --- /dev/null +++ b/sqlx-core2/src/error.rs @@ -0,0 +1,79 @@ +//! Errors produced by SQLx. + +use std::error::Error as StdError; + +/// A boxed alias of [`std::error::Error`] used in the variants of [`Error`] +/// to accept unknown error types. +/// +/// Ideally, Rust would provide an error primitive such as `std::error` or a [boxed alias itself](https://github.com/rust-lang/rfcs/pull/2820). +/// +pub type BoxStdError = Box; + +// TODO: #RowNotFound +// TODO: #ColumnIndexOutOfBounds +// TODO: #ColumnNotFound +// TODO: #ColumnDecode -> #ColumnFromValue +// TODO: #Decode -> #FromValue +// TODO: #ToValue +// TODO: #PoolTimedOut +// TODO: #PoolClosed +// TODO: #Migrate + +/// Represents all the ways a method can fail within SQLx. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error occurred while parsing a connection string or otherwise resolving configuration. + #[error("error with configuration: {0}")] + Configuration(#[source] BoxStdError), + + /// Error communicating with the database server. + #[error("error communicating with the server: {0}")] + Network(#[from] std::io::Error), + + /// Unexpected or invalid data encountered while communicating with the database. + /// + /// This should indicate there is a programming error in a SQLx driver or there + /// is something corrupted with the connection to the database itself. + #[error("encountered unexpected or invalid data: {0}")] + Protocol(#[source] BoxStdError), + + /// Invalid SQL query or arguments. + /// + /// Typically we catch these kinds of errors at compile-time with Rust's type system. An example + /// of when it can still occur is a query string that is too large. For instance, in PostgreSQL, + /// query strings have a maximum size of `i32::MAX`. + #[error("{0}")] + Query(#[source] BoxStdError), + + /// Error occurred while attempting to establish a TLS connection. + #[error("error occurred while attempting to establish a TLS connection: {0}")] + Tls(#[source] BoxStdError), +} + +#[doc(hidden)] +impl Error { + #[inline] + pub fn protocol(err: impl StdError + Send + Sync + 'static) -> Self { + Error::Protocol(err.into()) + } + + #[inline] + pub fn protocol_msg(msg: impl Into) -> Self { + Error::Protocol(msg.into().into()) + } + + #[inline] + pub fn configuration(err: impl StdError + Send + Sync + 'static) -> Self { + Error::Configuration(err.into()) + } + + #[inline] + pub fn configuration_msg(msg: impl Into) -> Self { + Error::Configuration(msg.into().into()) + } + + #[inline] + pub fn tls(err: impl StdError + Send + Sync + 'static) -> Self { + Error::Tls(err.into()) + } +} diff --git a/sqlx-core2/src/io/buf_stream.rs b/sqlx-core2/src/io/buf_stream.rs new file mode 100644 index 000000000..e3ca7455f --- /dev/null +++ b/sqlx-core2/src/io/buf_stream.rs @@ -0,0 +1,124 @@ +use crate::error::Error; +use crate::io::Encode; +use bytes::{BufMut, Bytes, BytesMut}; +use sqlx_rt::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use std::mem::MaybeUninit; +use std::ops::{Deref, DerefMut}; + +/// A buffered I/O wrapper around an async read/write stream. +pub struct BufStream { + inner: S, + + // read buffer + rbuf: BytesMut, + + // write buffer + wbuf: Vec, + + // offset into the write buffer that a previous write operation has written to + wbuf_offset: usize, +} + +impl BufStream { + pub fn with_capacity(inner: S, read: usize, write: usize) -> Self { + Self { + inner, + rbuf: BytesMut::with_capacity(read), + wbuf: Vec::with_capacity(write), + wbuf_offset: 0, + } + } + + pub fn write<'en, T: Encode<'en>>(&mut self, packet: T) -> Result<(), Error> { + // we must be sure to not call [write] directly after a dropped [flush] + debug_assert_eq!(self.wbuf_offset, 0); + + packet.encode(&mut self.wbuf) + } +} + +impl BufStream { + pub async fn read(&mut self, offset: usize, n: usize) -> Result { + self.fill_buf(offset, n).await?; + + if offset != 0 { + // drop the bytes from 0 .. offset + let _ = self.rbuf.split_to(offset); + } + + // and return the slice of `n` bytes + Ok(self.rbuf.split_to(n).freeze()) + } + + pub async fn peek(&mut self, offset: usize, n: usize) -> Result<&[u8], Error> { + self.fill_buf(offset, n).await?; + + Ok(&self.rbuf[offset..offset + n]) + } + + async fn fill_buf(&mut self, offset: usize, n: usize) -> Result<(), Error> { + // before waiting to receive data, + // flush the write buffer (if needed) + if !self.wbuf.is_empty() { + self.flush().await?; + } + + while self.rbuf.len() < (offset + n) { + // ensure that there is room in the read buffer; this does nothing if there is at + // least 128 unwritten bytes in the buffer + self.rbuf.reserve(n.max(128)); + + #[allow(unsafe_code)] + unsafe { + // get a chunk of uninitialized memory to write to + // this is UB if the Read impl of the stream reads the write buffer + let b = self.rbuf.bytes_mut(); + let b = UnsafeSend(&mut *(b as *mut [MaybeUninit] as *mut [u8])); + + // read as much as we can and return when the stream or our buffer is exhausted + let n = self.inner.read(b.0).await?; + + // [!] read more than the length of our buffer + debug_assert!(n <= b.0.len()); + + // update the `len` of the read buffer + self.rbuf.advance_mut(n); + }; + } + + Ok(()) + } + + pub async fn flush(&mut self) -> Result<(), Error> { + // write as much as we can each time and move the cursor as we write from the buffer + // if _this_ future drops, offset will have a record of how much of the wbuf has + // been written + while self.wbuf_offset < self.wbuf.len() { + self.wbuf_offset += self.inner.write(&self.wbuf[self.wbuf_offset..]).await?; + } + + // fully written buffer, move cursor back to the beginning + self.wbuf_offset = 0; + self.wbuf.clear(); + + Ok(()) + } +} + +struct UnsafeSend<'a>(&'a mut [u8]); + +// TODO? unsafe impl Send for UnsafeSend<'_> {} + +impl Deref for BufStream { + type Target = S; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for BufStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} diff --git a/sqlx-core2/src/io/decode.rs b/sqlx-core2/src/io/decode.rs new file mode 100644 index 000000000..5ec6b9c54 --- /dev/null +++ b/sqlx-core2/src/io/decode.rs @@ -0,0 +1,23 @@ +use crate::error::Error; +use bytes::Bytes; + +/// An object that can be decoded from a byte buffer. +/// Context is optional metadata that is required to decode this object. +pub trait Decode<'de, Context = ()>: Sized { + #[inline] + fn decode(buf: Bytes) -> Result + where + Self: Decode<'de, ()>, + { + Self::decode_with(buf, ()) + } + + fn decode_with(buf: Bytes, context: Context) -> Result; +} + +impl Decode<'_, C> for Bytes { + #[inline] + fn decode_with(buf: Bytes, _: C) -> Result { + Ok(buf) + } +} diff --git a/sqlx-core2/src/io/encode.rs b/sqlx-core2/src/io/encode.rs new file mode 100644 index 000000000..f4202492d --- /dev/null +++ b/sqlx-core2/src/io/encode.rs @@ -0,0 +1,24 @@ +use crate::error::Error; + +/// An object that can be encoded to a byte buffer. +/// Context is optional metadata that is required to encode this object. +pub trait Encode<'en, Context = ()> { + #[inline] + fn encode(&self, buf: &mut Vec) -> Result<(), Error> + where + Self: Encode<'en, ()>, + { + self.encode_with(buf, ()) + } + + fn encode_with(&self, buf: &mut Vec, context: Context) -> Result<(), Error>; +} + +impl Encode<'_, C> for &'_ [u8] { + #[inline] + fn encode_with(&self, buf: &mut Vec, _: C) -> Result<(), Error> { + buf.extend_from_slice(self); + + Ok(()) + } +} diff --git a/sqlx-core2/src/io/mod.rs b/sqlx-core2/src/io/mod.rs new file mode 100644 index 000000000..e19fd8001 --- /dev/null +++ b/sqlx-core2/src/io/mod.rs @@ -0,0 +1,9 @@ +//! Low-level I/O shared between database driver implementations. + +mod buf_stream; +mod decode; +mod encode; + +pub use buf_stream::BufStream; +pub use decode::Decode; +pub use encode::Encode; diff --git a/sqlx-core2/src/lib.rs b/sqlx-core2/src/lib.rs new file mode 100644 index 000000000..a5ef33525 --- /dev/null +++ b/sqlx-core2/src/lib.rs @@ -0,0 +1,8 @@ +//! Core of SQLx, the rust SQL toolkit. Not intended to be used directly. +#![deny(unsafe_code)] +#![warn(future_incompatible, rust_2018_idioms, unreachable_pub)] +pub mod connection; +pub mod database; +pub mod error; +pub mod io; +pub mod options; diff --git a/sqlx-core2/src/options.rs b/sqlx-core2/src/options.rs new file mode 100644 index 000000000..735bc2445 --- /dev/null +++ b/sqlx-core2/src/options.rs @@ -0,0 +1,24 @@ +//! Provides the [`ConnectOptions`] trait for configuring a new connection. + +use crate::connection::Connection; +use crate::error::Error; +use futures_core::future::BoxFuture; +use std::fmt::Debug; +use std::str::FromStr; + +/// Connection options for configuring a new connection. +/// +/// Can be parsed from a semi-universal connection URI format of the form: +/// +/// ```text +/// driver://user:pass@host:port/database?param1=value1¶m2=value2 +/// ``` +/// +pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug { + type Connection: Connection + ?Sized; + + /// Establish a new database connection with the options specified by `self`. + fn connect(&self) -> BoxFuture<'_, Result> + where + Self::Connection: Sized; +}