diff --git a/Cargo.lock b/Cargo.lock index dd53d6ac4..dc7fc1021 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,6 +40,21 @@ dependencies = [ "threadpool", ] +[[package]] +name = "addr2line" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6a2d3371669ab3ca9797670853d61402b03d0b4b9ebf33d677dfa720203072" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" + [[package]] name = "ahash" version = "0.5.3" @@ -193,6 +208,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "backtrace" +version = "0.3.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "707b586e0e2f247cbde68cdd2c3ce69ea7b7be43e1c5b426e37c9319c4b9838e" +dependencies = [ + "addr2line", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base-x" version = "0.2.6" @@ -332,6 +361,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "chrono" version = "0.4.13" @@ -520,7 +555,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "crossbeam-utils", ] @@ -542,7 +577,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg 1.0.0", - "cfg-if", + "cfg-if 0.1.10", "crossbeam-utils", "lazy_static", "maybe-uninit", @@ -556,7 +591,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "crossbeam-utils", "maybe-uninit", ] @@ -568,7 +603,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ "autocfg 1.0.0", - "cfg-if", + "cfg-if 0.1.10", "lazy_static", ] @@ -669,7 +704,7 @@ version = "0.8.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8ac63f94732332f44fe654443c46f6375d1939684c17b0afb6cb56b0456e171" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", ] [[package]] @@ -870,7 +905,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "wasi", ] @@ -881,11 +916,17 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee8025cf36f917e6a52cce185b7c7177689b838b7ec138364e50cc2277a56cf4" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "wasi", ] +[[package]] +name = "gimli" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf91faf136cb47367fa430cd46e37a788775e7fa104f8b4bcb3861dc389b724" + [[package]] name = "glob" version = "0.3.0" @@ -1098,7 +1139,7 @@ version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", ] [[package]] @@ -1154,13 +1195,23 @@ dependencies = [ "autocfg 1.0.0", ] +[[package]] +name = "miniz_oxide" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f2d26ec3309788e423cfbf68ad1800f061638098d76a83681af979dc4eda19d" +dependencies = [ + "adler", + "autocfg 1.0.0", +] + [[package]] name = "mio" version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "fuchsia-zircon", "fuchsia-zircon-sys", "iovec", @@ -1242,7 +1293,7 @@ version = "0.2.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "winapi 0.3.9", ] @@ -1317,6 +1368,12 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37fd5004feb2ce328a52b0b3d01dbf4ffff72583493900ed15f22d4111c51693" + [[package]] name = "once_cell" version = "1.4.0" @@ -1342,7 +1399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d575eff3665419f9b83678ff2815858ad9d11567e082f5ac1814baba4e2bcb4" dependencies = [ "bitflags", - "cfg-if", + "cfg-if 0.1.10", "foreign-types", "lazy_static", "libc", @@ -1407,7 +1464,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "cloudabi", "instant", "libc", @@ -1736,6 +1793,12 @@ dependencies = [ "serde", ] +[[package]] +name = "rustc-demangle" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232" + [[package]] name = "rustc_version" version = "0.2.3" @@ -1876,7 +1939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "170a36ea86c864a3f16dd2687712dd6646f7019f301e57537c7f4dc9f5916770" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 0.1.10", "cpuid-bool", "digest", "opaque-debug", @@ -1895,7 +1958,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2933378ddfeda7ea26f48c555bdad8bb446bf8a3d17832dc83e380d444cfb8c1" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 0.1.10", "cpuid-bool", "digest", "opaque-debug", @@ -1961,7 +2024,7 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "redox_syscall", "winapi 0.3.9", @@ -2171,6 +2234,8 @@ dependencies = [ "actix-threadpool", "async-native-tls", "async-std", + "backtrace", + "futures", "native-tls", "once_cell", "tokio", @@ -2183,9 +2248,13 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", + "backtrace", "dotenv", "env_logger", + "futures", + "log", "sqlx", + "sqlx-rt", "tokio", ] @@ -2339,7 +2408,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "rand", "redox_syscall", @@ -2447,7 +2516,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a51cadc5b1eec673a685ff7c33192ff7b7603d0b75446fb354939ee615acb15" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "libc", "standback", "stdweb", @@ -2669,7 +2738,7 @@ version = "0.2.65" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3edbcc9536ab7eababcc6d2374a0b7bfe13a2b6d562c5e07f370456b1a8f33d" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "wasm-bindgen-macro", ] @@ -2694,7 +2763,7 @@ version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41ad6e4e8b2b7f8c90b6e09a9b590ea15cb0d1dbe28502b5a405cd95d1981671" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "js-sys", "wasm-bindgen", "web-sys", diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 2fe8b7ca8..c6af77051 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -85,6 +85,21 @@ pub trait Connection: Send { #[doc(hidden)] fn should_flush(&self) -> bool; + #[doc(hidden)] + fn set_has_cancellation(&mut self, has_cancellation: bool); + + /// If this connection previously had a canceled execution future. If true, the connection + /// should be closed as it may be in an inconsistent state. + #[doc(hidden)] + fn has_cancellation(&self) -> bool; + + #[doc(hidden)] + #[must_use = "don't forget to call `.forget()`"] + fn cancellation_guard(&mut self) -> CancellationGuard<'_, Self> where Self: Sized { + self.set_has_cancellation(false); + CancellationGuard { conn: self, ignore: false } + } + /// Establish a new database connection. /// /// A value of `Options` is parsed from the provided connection string. This parsing @@ -116,3 +131,16 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug { where Self::Connection: Sized; } + +pub struct CancellationGuard<'a, C: Connection> { + pub conn: &'a mut C, + pub ignore: bool +} + +impl<'a, C: Connection> Drop for CancellationGuard<'a, C> { + fn drop(&mut self) { + if !self.ignore { + self.conn.set_has_cancellation(true); + } + } +} diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 891299ef9..64862cc99 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -68,6 +68,12 @@ impl Drop for PoolConnection { if let Some(mut live) = self.live.take() { let pool = self.pool.clone(); + if live.raw.has_cancellation() { + // drop the connection as it may be in an inconsistent state + drop(live.float(&pool)); + return; + } + if live.raw.should_flush() { spawn(async move { // flush the connection (will immediately return if not needed) before diff --git a/sqlx-core/src/postgres/connection/establish.rs b/sqlx-core/src/postgres/connection/establish.rs index e3de0c6d6..706b18de8 100644 --- a/sqlx-core/src/postgres/connection/establish.rs +++ b/sqlx-core/src/postgres/connection/establish.rs @@ -142,6 +142,7 @@ impl PgConnection { cache_statement: StatementCache::new(options.statement_cache_capacity), cache_type_oid: HashMap::new(), cache_type_info: HashMap::new(), + has_cancellation: false, }) } } diff --git a/sqlx-core/src/postgres/connection/executor.rs b/sqlx-core/src/postgres/connection/executor.rs index d3997bcf0..3e5d1cb41 100644 --- a/sqlx-core/src/postgres/connection/executor.rs +++ b/sqlx-core/src/postgres/connection/executor.rs @@ -18,6 +18,7 @@ use futures_core::stream::BoxStream; use futures_core::Stream; use futures_util::{pin_mut, TryStreamExt}; use std::{borrow::Cow, sync::Arc}; +use crate::connection::Connection; async fn prepare( conn: &mut PgConnection, @@ -347,13 +348,19 @@ impl<'c> Executor<'c> for &'c mut PgConnection { let persistent = query.persistent(); Box::pin(try_stream! { - let s = self.run(sql, arguments, 0, persistent, metadata).await?; + let mut guard = self.cancellation_guard(); + + let s = guard.conn.run(sql, arguments, 0, persistent, metadata).await?; pin_mut!(s); while let Some(v) = s.try_next().await? { r#yield!(v); } + drop(s); + + guard.ignore = true; + Ok(()) }) } @@ -372,7 +379,8 @@ impl<'c> Executor<'c> for &'c mut PgConnection { let persistent = query.persistent(); Box::pin(async move { - let s = self.run(sql, arguments, 1, persistent, metadata).await?; + let mut guard = self.cancellation_guard(); + let s = guard.conn.run(sql, arguments, 1, persistent, metadata).await?; pin_mut!(s); while let Some(s) = s.try_next().await? { @@ -381,6 +389,8 @@ impl<'c> Executor<'c> for &'c mut PgConnection { } } + guard.ignore = true; + Ok(None) }) } @@ -394,9 +404,13 @@ impl<'c> Executor<'c> for &'c mut PgConnection { 'c: 'e, { Box::pin(async move { - self.wait_until_ready().await?; + let mut guard = self.cancellation_guard(); - let (_, metadata) = self.get_or_prepare(sql, parameters, true, None).await?; + guard.conn.wait_until_ready().await?; + + let (_, metadata) = guard.conn.get_or_prepare(sql, parameters, true, None).await?; + + guard.ignore = true; Ok(PgStatement { sql: Cow::Borrowed(sql), @@ -413,11 +427,15 @@ impl<'c> Executor<'c> for &'c mut PgConnection { 'c: 'e, { Box::pin(async move { - self.wait_until_ready().await?; + let mut guard = self.cancellation_guard(); - let (_, metadata) = self.get_or_prepare(sql, &[], true, None).await?; + guard.conn.wait_until_ready().await?; - let nullable = self.get_nullable_for_columns(&metadata.columns).await?; + let (_, metadata) = guard.conn.get_or_prepare(sql, &[], true, None).await?; + + let nullable = guard.conn.get_nullable_for_columns(&metadata.columns).await?; + + guard.ignore = true; Ok(Describe { columns: metadata.columns.clone(), diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index b8b75ea3e..2cc910099 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -60,6 +60,8 @@ pub struct PgConnection { // current transaction status transaction_status: TransactionStatus, pub(crate) transaction_depth: usize, + + pub(crate) has_cancellation: bool, } impl PgConnection { @@ -174,4 +176,12 @@ impl Connection for PgConnection { fn should_flush(&self) -> bool { !self.stream.wbuf.is_empty() } + + fn set_has_cancellation(&mut self, has_cancellation: bool) { + self.has_cancellation = has_cancellation; + } + + fn has_cancellation(&self) -> bool { + self.has_cancellation + } }