fix Pool to remove possibility of "leaking" connections (#84)

* fix `Pool` to reduce possibility of "leaking" connections

now uses RAII guards to control `SharedPool::size`

* add smoke test for `Pool` to both Postgres and MySQL tests

add `Pool::is_closed()`

* fix documentation re: pool

* refactor pool implementation to not use futures oneshot channels

https://github.com/launchbadge/sqlx/pull/84#issuecomment-580476223

* run cargo fmt

* Pool: remove superfluous guard struct, document some internal methods
This commit is contained in:
Austin Bonander 2020-01-31 23:33:42 -08:00 committed by GitHub
parent 745c5c3957
commit eff7c9e125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 800 additions and 314 deletions

288
Cargo.lock generated
View File

@ -8,6 +8,14 @@ dependencies = [
"memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "anyhow"
version = "1.0.26"
@ -34,7 +42,7 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -68,8 +76,8 @@ dependencies = [
"mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -88,9 +96,9 @@ name = "async-stream-impl"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -255,6 +263,20 @@ dependencies = [
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "clap"
version = "2.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "cloudabi"
version = "0.0.3"
@ -515,9 +537,9 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -611,6 +633,14 @@ dependencies = [
"tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "heck"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"unicode-segmentation 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "hermit-abi"
version = "0.1.6"
@ -658,7 +688,7 @@ dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -964,7 +994,7 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.3.0"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -1026,6 +1056,30 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "paw"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"paw-attributes 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"paw-raw 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "paw-attributes"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "paw-raw"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "percent-encoding"
version = "1.0.1"
@ -1038,7 +1092,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "pin-project-lite"
version = "0.1.2"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -1056,14 +1110,38 @@ name = "ppv-lite86"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "proc-macro-error"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro-error-attr 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rustversion 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "proc-macro-error-attr"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rustversion 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
"syn-mid 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1073,7 +1151,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "proc-macro2"
version = "1.0.7"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1089,7 +1167,7 @@ name = "quote"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1200,6 +1278,16 @@ dependencies = [
"semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rustversion"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ryu"
version = "1.0.2"
@ -1264,9 +1352,9 @@ name = "serde_derive"
version = "1.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1370,6 +1458,15 @@ dependencies = [
"trybuild 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sqlx"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"sqlx-core 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"sqlx-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sqlx-core"
version = "0.2.4"
@ -1403,6 +1500,32 @@ dependencies = [
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sqlx-core"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"async-native-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"async-std 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"async-stream 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"md-5 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"sha-1 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sqlx-example-realworld-postgres"
version = "0.1.0"
@ -1428,14 +1551,29 @@ dependencies = [
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sqlx-core 0.2.4",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sqlx-macros"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"async-std 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dotenv 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sqlx-core 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "string"
version = "0.2.1"
@ -1444,6 +1582,33 @@ dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "structopt"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"structopt-derive 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "structopt-derive"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro-error 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "subtle"
version = "1.0.0"
@ -1451,14 +1616,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "syn"
version = "1.0.13"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "syn-mid"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tempfile"
version = "3.1.0"
@ -1480,6 +1655,14 @@ dependencies = [
"winapi-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "thiserror"
version = "1.0.9"
@ -1493,9 +1676,9 @@ name = "thiserror-impl"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1518,7 +1701,7 @@ dependencies = [
"http-service-hyper 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"mime 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"route-recognizer 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1535,6 +1718,18 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "todos-postgres"
version = "0.1.0"
dependencies = [
"anyhow 1.0.26 (registry+https://github.com/rust-lang/crates.io-index)",
"async-std 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"paw 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sqlx 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"structopt 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio"
version = "0.1.22"
@ -1568,7 +1763,7 @@ dependencies = [
"mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1619,7 +1814,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1736,6 +1931,16 @@ dependencies = [
"smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "unicode-segmentation"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "unicode-width"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "unicode-xid"
version = "0.2.0"
@ -1766,6 +1971,11 @@ name = "vcpkg"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "vec_map"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "version_check"
version = "0.1.5"
@ -1834,6 +2044,7 @@ dependencies = [
[metadata]
"checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d"
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum anyhow 1.0.26 (registry+https://github.com/rust-lang/crates.io-index)" = "7825f6833612eb2414095684fcf6c635becf3ce97fe48cf6421321e93bfbd53c"
"checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff"
"checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee"
@ -1864,6 +2075,7 @@ dependencies = [
"checksum cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "31850b4a4d6bae316f7a09e691c944c28299298837edc0a03f755618c23cbc01"
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum constant_time_eq 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
"checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d"
@ -1908,6 +2120,7 @@ dependencies = [
"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
"checksum glob 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
"checksum h2 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eff2656d88f158ce120947499e971d743c05dbcbed62e5bd2f38f1698bbc3772"
"checksum hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5dcb5e64cda4c23119ab41ba960d1e170a774c8e4b9d9e6a9bc18aabf5e59695"
"checksum http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "d6ccf5ede3a895d8856620237b2f02972c1bbc78d2965ad7fe8838d4a0ed41f0"
@ -1945,22 +2158,27 @@ dependencies = [
"checksum num-integer 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba"
"checksum num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096"
"checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6"
"checksum once_cell 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f5941ec2d5ee5916c709580d71553b81a633df245bcc73c04dcbd62152ceefc4"
"checksum once_cell 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
"checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
"checksum openssl 0.10.26 (registry+https://github.com/rust-lang/crates.io-index)" = "3a3cc5799d98e1088141b8e01ff760112bbd9f19d850c124500566ca6901a585"
"checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
"checksum openssl-sys 0.9.53 (registry+https://github.com/rust-lang/crates.io-index)" = "465d16ae7fc0e313318f7de5cecf57b2fbe7511fd213978b457e1c96ff46736f"
"checksum parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252"
"checksum parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b"
"checksum paw 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "09c0fc9b564dbc3dc2ed7c92c0c144f4de340aa94514ce2b446065417c4084e9"
"checksum paw-attributes 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0f35583365be5d148e959284f42526841917b7bfa09e2d1a7ad5dde2cf0eaa39"
"checksum paw-raw 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7f0b59668fe80c5afe998f0c0bf93322bf2cd66cafeeb80581f291716f3467f2"
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
"checksum pin-project-lite 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e8822eb8bb72452f038ebf6048efa02c3fe22bf83f76519c9583e47fc194a422"
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587"
"checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
"checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
"checksum proc-macro-error 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "1b79a464461615532fcc8a6ed8296fa66cc12350c18460ab3f4594a6cee0fcb6"
"checksum proc-macro-error-attr 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "23832e5eae6bac56bbac190500eef1aaede63776b5cd131eaa4ee7fe120cd892"
"checksum proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)" = "ecd45702f76d6d3c75a80564378ae228a85f0b59d2f3ed43c91b4a69eb2ebfc5"
"checksum proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e"
"checksum proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "0319972dcae462681daf4da1adeeaa066e3ebd29c69be96c6abb1259d2ee2bcc"
"checksum proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3acb317c6ff86a4e579dfa00fc5e6cca91ecbb4e7eb2df0468805b674eb88548"
"checksum quick-error 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
"checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe"
"checksum rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
@ -1976,6 +2194,7 @@ dependencies = [
"checksum rust-argon2 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "416f5109bdd413cec4f04c029297838e7604c993f8d1483b1d438f23bdc3eb35"
"checksum rustc-demangle 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783"
"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
"checksum rustversion 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6"
"checksum ryu 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bfa8506c1de11c9c4e4c38863ccbe02a305c8188e85a05a784c9e11e1c3910c8"
"checksum schannel 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "87f550b06b6cba9c8b8be3ee73f391990116bf527450d2556e9b9ce263b9a021"
"checksum scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d"
@ -1995,11 +2214,19 @@ dependencies = [
"checksum smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44e59e0c9fa00817912ae6e4e6e3c4fe04455e75699d06eedc7d85917ed8e8f4"
"checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85"
"checksum spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
"checksum sqlx 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ee32338e37ff890318e86f12411d0ade56cc71fefc8f32aa74b1c8e76f490023"
"checksum sqlx-core 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b74431ebf01eda617fb326471f712213a21473a0ecd34710e13f640092382a17"
"checksum sqlx-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3c63eb99eda1a38e509926b1f9406951cf89f27ce5554f24f1c38c8853d82b62"
"checksum string 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d24114bfcceb867ca7f71a0d3fe45d45619ec47a6fbfa98cb14e14250bfa5d6d"
"checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
"checksum structopt 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "df136b42d76b1fbea72e2ab3057343977b04b4a2e00836c3c7c0673829572713"
"checksum structopt-derive 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd50a87d2f7b8958055f3e73a963d78feaccca3836767a9069844e34b5b03c0a"
"checksum subtle 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee"
"checksum syn 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1e4ff033220a41d1a57d8125eab57bf5263783dfdcc18688b1dacc6ce9651ef8"
"checksum syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5"
"checksum syn-mid 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9fd3937748a7eccff61ba5b90af1a20dbf610858923a9192ea0ecb0cb77db1d0"
"checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
"checksum termcolor 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f"
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
"checksum thiserror 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6f357d1814b33bc2dc221243f8424104bfe72dbe911d5b71b3816a2dff1c977e"
"checksum thiserror-impl 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2e25d25307eb8436894f727aba8f65d07adf02e5b35a13cebed48bd282bfef"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
@ -2023,11 +2250,14 @@ dependencies = [
"checksum typenum 1.11.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6d2783fe2d6b8c1101136184eb41be8b1ad379e4657050b8aaff0c79ee7575f9"
"checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5"
"checksum unicode-normalization 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b561e267b2326bb4cebfc0ef9e68355c7abe6c6f522aeac2f5bf95d56c59bdcf"
"checksum unicode-segmentation 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0"
"checksum unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479"
"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
"checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f"
"checksum url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb"
"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
"checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168"
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
"checksum want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6395efa4784b027708f7451087e647ec73cc74f5d9bc2e418404248d679a230"
"checksum wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)" = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"

200
sqlx-core/src/pool/conn.rs Normal file
View File

@ -0,0 +1,200 @@
use crate::{Connect, Connection};
use futures_core::future::BoxFuture;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Instant;
use super::inner::{DecrementSizeGuard, SharedPool};
/// A connection checked out from [`Pool`][crate::Pool].
///
/// Will be returned to the pool on-drop.
pub struct PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
live: Option<Live<C>>,
pool: Arc<SharedPool<C>>,
}
pub(super) struct Live<C> {
raw: C,
pub(super) created: Instant,
}
pub(super) struct Idle<C> {
live: Live<C>,
pub(super) since: Instant,
}
/// RAII wrapper for connections being handled by functions that may drop them
pub(super) struct Floating<'p, C> {
inner: C,
guard: DecrementSizeGuard<'p>,
}
const DEREF_ERR: &str = "(bug) connection already released to pool";
impl<C> Deref for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
type Target = C;
fn deref(&self) -> &Self::Target {
&self.live.as_ref().expect(DEREF_ERR).raw
}
}
impl<C> DerefMut for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live.as_mut().expect(DEREF_ERR).raw
}
}
impl<C> Connection for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
/// Detach the connection from the pool and close it nicely.
fn close(mut self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(async move {
let live = self.live.take().expect("PoolConnection double-dropped");
live.float(&self.pool).into_idle().close().await
})
}
}
/// Returns the connection to the [`Pool`][crate::Pool] it was checked-out from.
impl<C> Drop for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
fn drop(&mut self) {
if let Some(live) = self.live.take() {
self.pool.release(live.float(&self.pool));
}
}
}
impl<C> Live<C> {
pub fn float(self, pool: &SharedPool<C>) -> Floating<Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new(pool),
}
}
pub fn into_idle(self) -> Idle<C> {
Idle {
live: self,
since: Instant::now(),
}
}
}
impl<C> Deref for Idle<C> {
type Target = Live<C>;
fn deref(&self) -> &Self::Target {
&self.live
}
}
impl<C> DerefMut for Idle<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live
}
}
impl<'s, C> Floating<'s, C> {
pub fn into_leakable(self) -> C {
self.guard.cancel();
self.inner
}
}
impl<'s, C> Floating<'s, Live<C>> {
pub fn new_live(conn: C, guard: DecrementSizeGuard<'s>) -> Self {
Self {
inner: Live {
raw: conn,
created: Instant::now(),
},
guard,
}
}
pub fn attach(self, pool: &Arc<SharedPool<C>>) -> PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
let Floating { inner, guard } = self;
debug_assert!(
guard.same_pool(pool),
"BUG: attaching connection to different pool"
);
guard.cancel();
PoolConnection {
live: Some(inner),
pool: Arc::clone(pool),
}
}
pub fn into_idle(self) -> Floating<'s, Idle<C>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
}
}
}
impl<'s, C> Floating<'s, Idle<C>> {
pub fn from_idle(idle: Idle<C>, pool: &'s SharedPool<C>) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::new(pool),
}
}
pub async fn ping(&mut self) -> crate::Result<()>
where
C: Connection,
{
self.live.raw.ping().await
}
pub fn into_live(self) -> Floating<'s, Live<C>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}
pub async fn close(self) -> crate::Result<()>
where
C: Connection,
{
// `guard` is dropped as intended
self.inner.live.raw.close().await
}
}
impl<C> Deref for Floating<'_, C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<C> DerefMut for Floating<'_, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

View File

@ -1,62 +1,37 @@
use std::cmp;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
use crossbeam_queue::{ArrayQueue, SegQueue};
use futures_channel::oneshot::{channel, Sender};
use futures_core::task::{Poll, Waker};
use futures_util::future;
use super::{Idle, Live, Options};
use crate::runtime::{sleep, spawn, timeout, yield_now};
use crate::pool::deadline_as_timeout;
use crate::runtime::{sleep, spawn, timeout};
use crate::{
connection::{Connect, Connection},
error::Error,
};
use super::conn::{Floating, Idle, Live};
use super::Options;
pub(super) struct SharedPool<C> {
url: String,
idle: ArrayQueue<Idle<C>>,
waiters: SegQueue<Sender<Live<C>>>,
size: AtomicU32,
idle_conns: ArrayQueue<Idle<C>>,
waiters: SegQueue<Waker>,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
options: Options,
}
impl<C> SharedPool<C>
where
C: Connection + Connect<Connection = C>,
C: Connection,
{
pub(super) async fn new_arc(url: &str, options: Options) -> crate::Result<Arc<Self>> {
let pool = Arc::new(Self {
url: url.to_owned(),
idle: ArrayQueue::new(options.max_size as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
options,
});
// If a minimum size was configured for the pool,
// establish N connections
// TODO: Should we do this in the background?
for _ in 0..pool.options.min_size {
let live = pool
.eventually_connect(Instant::now() + pool.options.connect_timeout)
.await?;
// Ignore error here, we are capping this loop by min_size which we
// already should make sure is less than max_size
let _ = pool.idle.push(Idle {
live,
since: Instant::now(),
});
}
spawn_reaper(&pool);
Ok(pool)
}
pub fn options(&self) -> &Options {
&self.options
}
@ -71,7 +46,7 @@ where
pub(super) fn num_idle(&self) -> usize {
// NOTE: This is very expensive
self.waiters.len()
self.idle_conns.len()
}
pub(super) fn is_closed(&self) -> bool {
@ -80,163 +55,170 @@ where
pub(super) async fn close(&self) {
self.is_closed.store(true, Ordering::Release);
while self.size.load(Ordering::Acquire) > 0 {
// don't block on the receiver because we own one Sender so it should never return
// `None`; a `select!()` would also work but that produces more complicated code
// and a timeout isn't necessarily appropriate
while let Ok(idle) = self.idle.pop() {
idle.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
}
yield_now().await
while let Ok(_) = self.idle_conns.pop() {}
while let Ok(waker) = self.waiters.pop() {
waker.wake();
}
}
#[inline]
pub(super) fn try_acquire(&self) -> Option<Live<C>> {
pub(super) fn try_acquire(&self) -> Option<Floating<Live<C>>> {
Some(self.pop_idle()?.into_live())
}
fn pop_idle(&self) -> Option<Floating<Idle<C>>> {
if self.is_closed.load(Ordering::Acquire) {
return None;
}
Some(self.idle.pop().ok()?.live)
Some(Floating::from_idle(self.idle_conns.pop().ok()?, self))
}
pub(super) fn release(&self, mut live: Live<C>) {
// Try waiters in (FIFO) order until one is still waiting ..
while let Ok(waiter) = self.waiters.pop() {
live = match waiter.send(live) {
// successfully released
Ok(()) => return,
pub(super) fn release(&self, floating: Floating<Live<C>>) {
self.idle_conns
.push(floating.into_idle().into_leakable())
.expect("BUG: connection queue overflow in release()");
if let Ok(waker) = self.waiters.pop() {
waker.wake();
}
}
Err(live) => live,
};
/// Try to atomically increment the pool size for a new connection.
///
/// Returns `None` if we are at max_size.
fn try_increment_size(&self) -> Option<DecrementSizeGuard> {
let mut size = self.size();
while size < self.options.max_size {
let new_size = self.size.compare_and_swap(size, size + 1, Ordering::AcqRel);
if new_size == size {
return Some(DecrementSizeGuard::new(self));
}
size = new_size;
}
// .. if there were no waiters still waiting, just push the connection
// back to the idle queue
let _ = self.idle.push(Idle {
live,
since: Instant::now(),
});
None
}
pub(super) async fn acquire(&self) -> crate::Result<Live<C>> {
/// Wait for a connection, if either `size` drops below `max_size` so we can
/// open a new connection, or if an idle connection is returned to the pool.
///
/// Returns an error if `deadline` elapses before we are woken.
async fn wait_for_conn(&self, deadline: Instant) -> crate::Result<()> {
let mut waker_pushed = false;
timeout(
deadline_as_timeout(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|ctx| -> Poll<()> {
if !waker_pushed {
// only push the waker once
self.waiters.push(ctx.waker().to_owned());
waker_pushed = true;
Poll::Pending
} else {
Poll::Ready(())
}
}),
)
.await
.map_err(|_| crate::Error::PoolTimedOut(None))
}
}
impl<C> SharedPool<C>
where
C: Connection + Connect<Connection = C>,
{
pub(super) async fn new_arc(url: &str, options: Options) -> crate::Result<Arc<Self>> {
let mut pool = Self {
url: url.to_owned(),
idle_conns: ArrayQueue::new(options.max_size as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
options,
};
pool.init_min_connections().await?;
let pool = Arc::new(pool);
spawn_reaper(&pool);
Ok(pool)
}
pub(super) async fn acquire<'s>(&'s self) -> crate::Result<Floating<'s, Live<C>>> {
let start = Instant::now();
let deadline = start + self.options.connect_timeout;
// Unless the pool has been closed ...
while !self.is_closed.load(Ordering::Acquire) {
while !self.is_closed() {
// Attempt to immediately acquire a connection. This will return Some
// if there is an idle connection in our channel.
if let Some(idle) = self.idle.pop().ok() {
if let Some(live) = check_live(idle.live, &self.options).await {
if let Ok(conn) = self.idle_conns.pop() {
let conn = Floating::from_idle(conn, self);
if let Some(live) = check_conn(conn, &self.options).await {
return Ok(live);
}
}
let size = self.size.load(Ordering::Acquire);
if size >= self.options.max_size {
// Too many open connections
// Wait until one is available
let (tx, rx) = channel();
self.waiters.push(tx);
// get the time between the deadline and now and use that as our timeout
let until = deadline
.checked_duration_since(Instant::now())
.ok_or(Error::PoolTimedOut(None))?;
// don't sleep forever
let live = match timeout(until, rx).await {
// A connection was returned to the pool
Ok(Ok(live)) => live,
// Pool dropped without dropping waiter
Ok(Err(_)) => unreachable!(),
// Timed out waiting for a connection
// Error is not forwarded as its useless context
Err(_) => {
return Err(Error::PoolTimedOut(None));
}
};
// If pool was closed while waiting for a connection,
// release the connection
if self.is_closed.load(Ordering::Acquire) {
live.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
return Err(Error::PoolClosed);
if let Some(guard) = self.try_increment_size() {
// pool has slots available; open a new connection
match self.connect(deadline, guard).await {
Ok(Some(conn)) => return Ok(conn),
// [size] is internally decremented on _retry_ and _error_
Ok(None) => continue,
Err(e) => return Err(e),
}
match check_live(live, &self.options).await {
Some(live) => return Ok(live),
// Need to re-connect
None => {}
}
} else if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) != size {
// size was incremented while we compared it just above
continue;
}
// pool has slots available; open a new connection
match self.connect(deadline).await {
Ok(Some(conn)) => return Ok(conn),
// [size] is internally decremented on _retry_ and _error_
Ok(None) => continue,
Err(e) => return Err(e),
}
// Wait for a connection to become available (or we are allowed to open a new one)
// Returns an error if `deadline` passes
self.wait_for_conn(deadline).await?;
}
Err(Error::PoolClosed)
}
async fn eventually_connect(&self, deadline: Instant) -> crate::Result<Live<C>> {
loop {
// [connect] will raise an error when past deadline
// [connect] returns None if its okay to retry
if let Some(conn) = self.connect(deadline).await? {
return Ok(conn);
// takes `&mut self` so this can only be called during init
async fn init_min_connections(&mut self) -> crate::Result<()> {
for _ in 0..self.options.min_size {
let deadline = Instant::now() + self.options.connect_timeout;
// this guard will prevent us from exceeding `max_size`
while let Some(guard) = self.try_increment_size() {
// [connect] will raise an error when past deadline
// [connect] returns None if its okay to retry
if let Some(conn) = self.connect(deadline, guard).await? {
self.idle_conns
.push(conn.into_idle().into_leakable())
.expect("BUG: connection queue overflow in init_min_connections");
}
}
}
Ok(())
}
async fn connect(&self, deadline: Instant) -> crate::Result<Option<Live<C>>> {
// FIXME: Code between `-` is duplicate with [acquire]
// ---------------------------------
// get the time between the deadline and now and use that as our timeout
let until = deadline
.checked_duration_since(Instant::now())
.ok_or(Error::PoolTimedOut(None))?;
// If pool was closed while waiting for a connection,
// release the connection
if self.is_closed.load(Ordering::Acquire) {
self.size.fetch_sub(1, Ordering::AcqRel); // ?
async fn connect<'s>(
&'s self,
deadline: Instant,
guard: DecrementSizeGuard<'s>,
) -> crate::Result<Option<Floating<'s, Live<C>>>> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
// ---------------------------------
let timeout = super::deadline_as_timeout(deadline)?;
// result here is `Result<Result<C, Error>, TimeoutError>`
match timeout(until, C::connect(&self.url)).await {
match crate::runtime::timeout(timeout, C::connect(&self.url)).await {
// successfully established connection
Ok(Ok(raw)) => {
Ok(Some(Live {
raw,
// remember when it was created so we can expire it
// if there is a [max_lifetime] set
created: Instant::now(),
}))
}
Ok(Ok(raw)) => Ok(Some(Floating::new_live(raw, guard))),
// IO error while connecting, this should definitely be logged
// and we should attempt to retry
@ -251,32 +233,11 @@ where
Ok(Err(e)) => Err(e),
// timed out
Err(e) => {
self.size.fetch_sub(1, Ordering::AcqRel); // ?
Err(Error::PoolTimedOut(Some(Box::new(e))))
}
Err(e) => Err(Error::PoolTimedOut(Some(Box::new(e)))),
}
}
}
impl<C> Idle<C>
where
C: Connection,
{
async fn close(self) {
self.live.close().await;
}
}
impl<C> Live<C>
where
C: Connection,
{
async fn close(self) {
let _ = self.raw.close().await;
}
}
// NOTE: Function names here are bizzare. Helpful help would be appreciated.
fn is_beyond_lifetime<C>(live: &Live<C>, options: &Options) -> bool {
@ -293,37 +254,35 @@ fn is_beyond_idle<C>(idle: &Idle<C>, options: &Options) -> bool {
.map_or(false, |timeout| idle.since.elapsed() > timeout)
}
async fn check_live<C>(mut live: Live<C>, options: &Options) -> Option<Live<C>>
async fn check_conn<'s, C>(
mut conn: Floating<'s, Idle<C>>,
options: &Options,
) -> Option<Floating<'s, Live<C>>>
where
C: Connection,
{
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_lifetime(&live, options) {
if is_beyond_lifetime(&conn, options) {
// we're closing the connection either way
// close the connection but don't really care about the result
let _ = live.close().await;
let _ = conn.close().await;
return None;
} else if options.test_on_acquire {
// TODO: Check on acquire should be a configuration setting
// Check that the connection is still live
match live.raw.ping().await {
// Connection still seems to respond
Ok(_) => return Some(live),
if let Err(e) = conn.ping().await {
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
Err(e) => log::info!("ping on idle connection returned error: {}", e),
log::info!("ping on idle connection returned error: {}", e);
// connection is broken so don't try to close nicely
return None;
}
// make sure the idle connection is gone explicitly before we open one
// this will close the resources for the stream on our side
drop(live);
} else {
// No need to re-connect
return Some(live);
}
None
// No need to re-connect; connection is alive or we don't care
Some(conn.into_live())
}
/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections
@ -344,31 +303,68 @@ where
spawn(async move {
while !pool.is_closed.load(Ordering::Acquire) {
// reap at most the current size minus the minimum idle
let max_reaped = pool
.size
.load(Ordering::Acquire)
.saturating_sub(pool.options.min_size);
let max_reaped = pool.size().saturating_sub(pool.options.min_size);
// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.idle.pop().ok())
.filter_map(|_| pool.pop_idle())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle(conn, &pool.options)
|| is_beyond_lifetime(&conn.live, &pool.options)
is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options)
});
for conn in keep {
// return these connections to the pool first
pool.idle.push(conn).expect("unreachable: pool overflowed");
pool.idle_conns
.push(conn.into_leakable())
.expect("BUG: connection queue overflow in spawn_reaper");
}
for conn in reap {
conn.close().await;
pool.size.fetch_sub(1, Ordering::AcqRel);
let _ = conn.close().await;
}
sleep(period).await;
}
});
}
/// RAII guard returned by `Pool::try_increment_size()` and others.
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32,
waiters: &'a SegQueue<Waker>,
dropped: bool,
}
impl<'a> DecrementSizeGuard<'a> {
pub fn new<C>(pool: &'a SharedPool<C>) -> Self {
Self {
size: &pool.size,
waiters: &pool.waiters,
dropped: false,
}
}
/// Return `true` if the internal references point to the same fields in `SharedPool`.
pub fn same_pool<C>(&self, pool: &'a SharedPool<C>) -> bool {
ptr::eq(self.size, &pool.size) && ptr::eq(self.waiters, &pool.waiters)
}
pub fn cancel(self) {
mem::forget(self);
}
}
impl Drop for DecrementSizeGuard<'_> {
fn drop(&mut self) {
assert!(!self.dropped, "double-dropped!");
self.dropped = true;
self.size.fetch_sub(1, Ordering::SeqCst);
if let Ok(waker) = self.waiters.pop() {
waker.wake();
}
}
}

View File

@ -1,20 +1,20 @@
//! **Pool** for SQLx database connections.
use std::{
fmt, mem,
ops::{Deref, DerefMut},
fmt,
sync::Arc,
time::{Duration, Instant},
};
use futures_core::future::BoxFuture;
use crate::connection::{Connect, Connection};
use crate::transaction::Transaction;
use self::inner::SharedPool;
use self::options::Options;
pub use self::conn::PoolConnection;
mod conn;
mod executor;
mod inner;
mod options;
@ -24,24 +24,6 @@ pub use self::options::Builder;
/// A pool of database connections.
pub struct Pool<C>(Arc<SharedPool<C>>);
pub struct PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
live: Option<Live<C>>,
pool: Arc<SharedPool<C>>,
}
struct Live<C> {
raw: C,
created: Instant,
}
struct Idle<C> {
live: Live<C>,
since: Instant,
}
impl<C> Pool<C>
where
C: Connection + Connect<Connection = C>,
@ -51,8 +33,8 @@ where
/// The connection URL syntax is documented on the connection type for the respective
/// database you're connecting to:
///
/// * MySQL/MariaDB: [crate::MySqlConnection]
/// * PostgreSQL: [crate::PgConnection]
/// * MySQL/MariaDB: [crate::mysql::MySqlConnection]
/// * PostgreSQL: [crate::postgres::PgConnection]
pub async fn new(url: &str) -> crate::Result<Self> {
Self::builder().build(url).await
}
@ -72,20 +54,14 @@ where
///
/// Waits for at most the configured connection timeout before returning an error.
pub async fn acquire(&self) -> crate::Result<PoolConnection<C>> {
self.0.acquire().await.map(|conn| PoolConnection {
live: Some(conn),
pool: Arc::clone(&self.0),
})
self.0.acquire().await.map(|conn| conn.attach(&self.0))
}
/// Attempts to retrieve a connection from the pool if there is one available.
///
/// Returns `None` immediately if there are no idle connections available in the pool.
pub fn try_acquire(&self) -> Option<PoolConnection<C>> {
self.0.try_acquire().map(|conn| PoolConnection {
live: Some(conn),
pool: Arc::clone(&self.0),
})
self.0.try_acquire().map(|conn| conn.attach(&self.0))
}
/// Retrieves a new connection and immediately begins a new transaction.
@ -101,6 +77,11 @@ where
self.0.close().await;
}
/// Returns `true` if [`.close()`][Pool::close] has been called on the pool, `false` otherwise.
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
/// Returns the number of connections currently being managed by the pool.
pub fn size(&self) -> u32 {
self.0.size()
@ -159,56 +140,22 @@ where
}
}
const DEREF_ERR: &str = "(bug) connection already released to pool";
impl<C> Deref for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
type Target = C;
fn deref(&self) -> &Self::Target {
&self.live.as_ref().expect(DEREF_ERR).raw
}
/// get the time between the deadline and now and use that as our timeout
///
/// returns `Error::PoolTimedOut` if the deadline is in the past
fn deadline_as_timeout(deadline: Instant) -> crate::Result<Duration> {
deadline
.checked_duration_since(Instant::now())
.ok_or(crate::Error::PoolTimedOut(None))
}
impl<C> DerefMut for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live.as_mut().expect(DEREF_ERR).raw
}
}
impl<C> Connection for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
fn close(mut self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(async move {
if let Some(live) = self.live.take() {
let raw = live.raw;
// Explicitly close the connection
raw.close().await?;
}
// Forget ourself so it does not go back to the pool
mem::forget(self);
Ok(())
})
}
}
impl<C> Drop for PoolConnection<C>
where
C: Connection + Connect<Connection = C>,
{
fn drop(&mut self) {
if let Some(live) = self.live.take() {
self.pool.release(live);
}
#[test]
fn assert_pool_traits() {
fn assert_send_sync<T: Send + Sync>() {}
fn assert_clone<T: Clone>() {}
fn assert_pool<C: Connection + Connect<Connection = C>>() {
assert_send_sync::<Pool<C>>();
assert_clone::<Pool<C>>();
}
}

View File

@ -18,9 +18,9 @@ use crate::postgres::PgError;
use crate::url::Url;
use crate::Result;
/// An asynchronous connection to a [Postgres] database.
/// An asynchronous connection to a [Postgres][super::Postgres] database.
///
/// The connection string expected by [Connection::open] should be a PostgreSQL connection
/// The connection string expected by [Connect::connect] should be a PostgreSQL connection
/// string, as documented at
/// <https://www.postgresql.org/docs/12/libpq-connect.html#LIBPQ-CONNSTRING>
///

View File

@ -16,5 +16,5 @@ mod protocol;
mod row;
mod types;
/// An alias for [`Pool`], specialized for **Postgres**.
/// An alias for [`Pool`][crate::Pool], specialized for **Postgres**.
pub type PgPool = super::Pool<PgConnection>;

View File

@ -1,5 +1,6 @@
use futures::TryStreamExt;
use sqlx::{Connection as _, Executor as _, MySqlConnection, MySqlPool, Row as _};
use std::time::Duration;
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
@ -87,6 +88,61 @@ async fn pool_immediately_fails_with_db_error() -> anyhow::Result<()> {
Ok(())
}
// run with `cargo test --features mysql -- --ignored --nocapture pool_smoke_test`
#[ignore]
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
async fn pool_smoke_test() -> anyhow::Result<()> {
use sqlx_core::runtime::{sleep, spawn, timeout};
eprintln!("starting pool");
let pool = MySqlPool::builder()
.connect_timeout(Duration::from_secs(5))
.min_size(5)
.max_size(10)
.build(&dotenv::var("DATABASE_URL")?)
.await?;
// spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 {
let pool = pool.clone();
spawn(async move {
loop {
if let Err(e) = sqlx::query("select 1 + 1").fetch_one(&mut &pool).await {
eprintln!("pool task {} dying due to {}", i, e);
break;
}
}
});
}
for _ in 0..5 {
let pool = pool.clone();
spawn(async move {
while !pool.is_closed() {
// drop acquire() futures in a hot loop
// https://github.com/launchbadge/sqlx/issues/83
drop(pool.acquire());
}
});
}
eprintln!("sleeping for 30 seconds");
sleep(Duration::from_secs(30)).await;
assert_eq!(pool.size(), 10);
eprintln!("closing pool");
timeout(Duration::from_secs(30), pool.close()).await?;
eprintln!("pool closed successfully");
Ok(())
}
fn url() -> anyhow::Result<String> {
Ok(dotenv::var("DATABASE_URL")?)
}

View File

@ -1,5 +1,7 @@
use futures::TryStreamExt;
use sqlx::{postgres::PgConnection, Connection as _, Executor as _, Row as _};
use sqlx_core::postgres::PgPool;
use std::time::Duration;
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
@ -68,6 +70,61 @@ async fn it_remains_stable_issue_30() -> anyhow::Result<()> {
Ok(())
}
// run with `cargo test --features postgres -- --ignored --nocapture pool_smoke_test`
#[ignore]
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
async fn pool_smoke_test() -> anyhow::Result<()> {
use sqlx_core::runtime::{sleep, spawn, timeout};
eprintln!("starting pool");
let pool = PgPool::builder()
.connect_timeout(Duration::from_secs(5))
.min_size(5)
.max_size(10)
.build(&dotenv::var("DATABASE_URL")?)
.await?;
// spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 {
let pool = pool.clone();
spawn(async move {
loop {
if let Err(e) = sqlx::query("select 1 + 1").fetch_one(&mut &pool).await {
eprintln!("pool task {} dying due to {}", i, e);
break;
}
}
});
}
for _ in 0..5 {
let pool = pool.clone();
spawn(async move {
while !pool.is_closed() {
// drop acquire() futures in a hot loop
// https://github.com/launchbadge/sqlx/issues/83
drop(pool.acquire());
}
});
}
eprintln!("sleeping for 30 seconds");
sleep(Duration::from_secs(30)).await;
assert_eq!(pool.size(), 10);
eprintln!("closing pool");
timeout(Duration::from_secs(30), pool.close()).await?;
eprintln!("pool closed successfully");
Ok(())
}
async fn connect() -> anyhow::Result<PgConnection> {
let _ = dotenv::dotenv();
let _ = env_logger::try_init();