mirror of
https://github.com/launchbadge/sqlx.git
synced 2026-03-06 05:42:38 +00:00
WIP fix: add guards for cancellation in executor methods
This commit is contained in:
parent
568256b654
commit
2ee1393469
107
Cargo.lock
generated
107
Cargo.lock
generated
@ -40,6 +40,21 @@ dependencies = [
|
||||
"threadpool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b6a2d3371669ab3ca9797670853d61402b03d0b4b9ebf33d677dfa720203072"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e"
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.5.3"
|
||||
@ -193,6 +208,20 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.53"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "707b586e0e2f247cbde68cdd2c3ce69ea7b7be43e1c5b426e37c9319c4b9838e"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cfg-if 1.0.0",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base-x"
|
||||
version = "0.2.6"
|
||||
@ -332,6 +361,12 @@ version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.13"
|
||||
@ -520,7 +555,7 @@ version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
@ -542,7 +577,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
"lazy_static",
|
||||
"maybe-uninit",
|
||||
@ -556,7 +591,7 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
"maybe-uninit",
|
||||
]
|
||||
@ -568,7 +603,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
@ -669,7 +704,7 @@ version = "0.8.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8ac63f94732332f44fe654443c46f6375d1939684c17b0afb6cb56b0456e171"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -870,7 +905,7 @@ version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"libc",
|
||||
"wasi",
|
||||
]
|
||||
@ -881,11 +916,17 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee8025cf36f917e6a52cce185b7c7177689b838b7ec138364e50cc2277a56cf4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"libc",
|
||||
"wasi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aaf91faf136cb47367fa430cd46e37a788775e7fa104f8b4bcb3861dc389b724"
|
||||
|
||||
[[package]]
|
||||
name = "glob"
|
||||
version = "0.3.0"
|
||||
@ -1098,7 +1139,7 @@ version = "0.4.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1154,13 +1195,23 @@ dependencies = [
|
||||
"autocfg 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0f2d26ec3309788e423cfbf68ad1800f061638098d76a83681af979dc4eda19d"
|
||||
dependencies = [
|
||||
"adler",
|
||||
"autocfg 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.6.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"fuchsia-zircon",
|
||||
"fuchsia-zircon-sys",
|
||||
"iovec",
|
||||
@ -1242,7 +1293,7 @@ version = "0.2.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"libc",
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
@ -1317,6 +1368,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.21.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37fd5004feb2ce328a52b0b3d01dbf4ffff72583493900ed15f22d4111c51693"
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.4.0"
|
||||
@ -1342,7 +1399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d575eff3665419f9b83678ff2815858ad9d11567e082f5ac1814baba4e2bcb4"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"foreign-types",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
@ -1407,7 +1464,7 @@ version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"cloudabi",
|
||||
"instant",
|
||||
"libc",
|
||||
@ -1736,6 +1793,12 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
version = "0.2.3"
|
||||
@ -1876,7 +1939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "170a36ea86c864a3f16dd2687712dd6646f7019f301e57537c7f4dc9f5916770"
|
||||
dependencies = [
|
||||
"block-buffer",
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"cpuid-bool",
|
||||
"digest",
|
||||
"opaque-debug",
|
||||
@ -1895,7 +1958,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2933378ddfeda7ea26f48c555bdad8bb446bf8a3d17832dc83e380d444cfb8c1"
|
||||
dependencies = [
|
||||
"block-buffer",
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"cpuid-bool",
|
||||
"digest",
|
||||
"opaque-debug",
|
||||
@ -1961,7 +2024,7 @@ version = "0.3.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"winapi 0.3.9",
|
||||
@ -2171,6 +2234,8 @@ dependencies = [
|
||||
"actix-threadpool",
|
||||
"async-native-tls",
|
||||
"async-std",
|
||||
"backtrace",
|
||||
"futures",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"tokio",
|
||||
@ -2183,9 +2248,13 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-std",
|
||||
"backtrace",
|
||||
"dotenv",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"log",
|
||||
"sqlx",
|
||||
"sqlx-rt",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@ -2339,7 +2408,7 @@ version = "3.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"libc",
|
||||
"rand",
|
||||
"redox_syscall",
|
||||
@ -2447,7 +2516,7 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a51cadc5b1eec673a685ff7c33192ff7b7603d0b75446fb354939ee615acb15"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"libc",
|
||||
"standback",
|
||||
"stdweb",
|
||||
@ -2669,7 +2738,7 @@ version = "0.2.65"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f3edbcc9536ab7eababcc6d2374a0b7bfe13a2b6d562c5e07f370456b1a8f33d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"wasm-bindgen-macro",
|
||||
]
|
||||
|
||||
@ -2694,7 +2763,7 @@ version = "0.4.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41ad6e4e8b2b7f8c90b6e09a9b590ea15cb0d1dbe28502b5a405cd95d1981671"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cfg-if 0.1.10",
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
"web-sys",
|
||||
|
||||
@ -85,6 +85,21 @@ pub trait Connection: Send {
|
||||
#[doc(hidden)]
|
||||
fn should_flush(&self) -> bool;
|
||||
|
||||
#[doc(hidden)]
|
||||
fn set_has_cancellation(&mut self, has_cancellation: bool);
|
||||
|
||||
/// If this connection previously had a canceled execution future. If true, the connection
|
||||
/// should be closed as it may be in an inconsistent state.
|
||||
#[doc(hidden)]
|
||||
fn has_cancellation(&self) -> bool;
|
||||
|
||||
#[doc(hidden)]
|
||||
#[must_use = "don't forget to call `.forget()`"]
|
||||
fn cancellation_guard(&mut self) -> CancellationGuard<'_, Self> where Self: Sized {
|
||||
self.set_has_cancellation(false);
|
||||
CancellationGuard { conn: self, ignore: false }
|
||||
}
|
||||
|
||||
/// Establish a new database connection.
|
||||
///
|
||||
/// A value of `Options` is parsed from the provided connection string. This parsing
|
||||
@ -116,3 +131,16 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug {
|
||||
where
|
||||
Self::Connection: Sized;
|
||||
}
|
||||
|
||||
pub struct CancellationGuard<'a, C: Connection> {
|
||||
pub conn: &'a mut C,
|
||||
pub ignore: bool
|
||||
}
|
||||
|
||||
impl<'a, C: Connection> Drop for CancellationGuard<'a, C> {
|
||||
fn drop(&mut self) {
|
||||
if !self.ignore {
|
||||
self.conn.set_has_cancellation(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,6 +68,12 @@ impl<DB: Database> Drop for PoolConnection<DB> {
|
||||
if let Some(mut live) = self.live.take() {
|
||||
let pool = self.pool.clone();
|
||||
|
||||
if live.raw.has_cancellation() {
|
||||
// drop the connection as it may be in an inconsistent state
|
||||
drop(live.float(&pool));
|
||||
return;
|
||||
}
|
||||
|
||||
if live.raw.should_flush() {
|
||||
spawn(async move {
|
||||
// flush the connection (will immediately return if not needed) before
|
||||
|
||||
@ -142,6 +142,7 @@ impl PgConnection {
|
||||
cache_statement: StatementCache::new(options.statement_cache_capacity),
|
||||
cache_type_oid: HashMap::new(),
|
||||
cache_type_info: HashMap::new(),
|
||||
has_cancellation: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ use futures_core::stream::BoxStream;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, TryStreamExt};
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use crate::connection::Connection;
|
||||
|
||||
async fn prepare(
|
||||
conn: &mut PgConnection,
|
||||
@ -347,13 +348,19 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
|
||||
let persistent = query.persistent();
|
||||
|
||||
Box::pin(try_stream! {
|
||||
let s = self.run(sql, arguments, 0, persistent, metadata).await?;
|
||||
let mut guard = self.cancellation_guard();
|
||||
|
||||
let s = guard.conn.run(sql, arguments, 0, persistent, metadata).await?;
|
||||
pin_mut!(s);
|
||||
|
||||
while let Some(v) = s.try_next().await? {
|
||||
r#yield!(v);
|
||||
}
|
||||
|
||||
drop(s);
|
||||
|
||||
guard.ignore = true;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
@ -372,7 +379,8 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
|
||||
let persistent = query.persistent();
|
||||
|
||||
Box::pin(async move {
|
||||
let s = self.run(sql, arguments, 1, persistent, metadata).await?;
|
||||
let mut guard = self.cancellation_guard();
|
||||
let s = guard.conn.run(sql, arguments, 1, persistent, metadata).await?;
|
||||
pin_mut!(s);
|
||||
|
||||
while let Some(s) = s.try_next().await? {
|
||||
@ -381,6 +389,8 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
|
||||
}
|
||||
}
|
||||
|
||||
guard.ignore = true;
|
||||
|
||||
Ok(None)
|
||||
})
|
||||
}
|
||||
@ -394,9 +404,13 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
|
||||
'c: 'e,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self.wait_until_ready().await?;
|
||||
let mut guard = self.cancellation_guard();
|
||||
|
||||
let (_, metadata) = self.get_or_prepare(sql, parameters, true, None).await?;
|
||||
guard.conn.wait_until_ready().await?;
|
||||
|
||||
let (_, metadata) = guard.conn.get_or_prepare(sql, parameters, true, None).await?;
|
||||
|
||||
guard.ignore = true;
|
||||
|
||||
Ok(PgStatement {
|
||||
sql: Cow::Borrowed(sql),
|
||||
@ -413,11 +427,15 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
|
||||
'c: 'e,
|
||||
{
|
||||
Box::pin(async move {
|
||||
self.wait_until_ready().await?;
|
||||
let mut guard = self.cancellation_guard();
|
||||
|
||||
let (_, metadata) = self.get_or_prepare(sql, &[], true, None).await?;
|
||||
guard.conn.wait_until_ready().await?;
|
||||
|
||||
let nullable = self.get_nullable_for_columns(&metadata.columns).await?;
|
||||
let (_, metadata) = guard.conn.get_or_prepare(sql, &[], true, None).await?;
|
||||
|
||||
let nullable = guard.conn.get_nullable_for_columns(&metadata.columns).await?;
|
||||
|
||||
guard.ignore = true;
|
||||
|
||||
Ok(Describe {
|
||||
columns: metadata.columns.clone(),
|
||||
|
||||
@ -60,6 +60,8 @@ pub struct PgConnection {
|
||||
// current transaction status
|
||||
transaction_status: TransactionStatus,
|
||||
pub(crate) transaction_depth: usize,
|
||||
|
||||
pub(crate) has_cancellation: bool,
|
||||
}
|
||||
|
||||
impl PgConnection {
|
||||
@ -174,4 +176,12 @@ impl Connection for PgConnection {
|
||||
fn should_flush(&self) -> bool {
|
||||
!self.stream.wbuf.is_empty()
|
||||
}
|
||||
|
||||
fn set_has_cancellation(&mut self, has_cancellation: bool) {
|
||||
self.has_cancellation = has_cancellation;
|
||||
}
|
||||
|
||||
fn has_cancellation(&self) -> bool {
|
||||
self.has_cancellation
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user