From 17e88ac1a471625e1b1d558887800864effe9bd7 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 1 Jul 2020 02:38:33 -0700 Subject: [PATCH] perf: implement pool benchmark, make fairness an option --- Cargo.lock | 193 +++++++++++++++++++++++++++++++++- Cargo.toml | 1 + sqlx-bench/Cargo.toml | 25 +++++ sqlx-bench/benches/pg_pool.rs | 79 ++++++++++++++ sqlx-core/src/pool/inner.rs | 14 ++- sqlx-core/src/pool/options.rs | 7 ++ sqlx-macros/Cargo.toml | 10 +- sqlx-macros/src/lib.rs | 1 - sqlx-macros/src/query/mod.rs | 2 +- sqlx-macros/src/runtime.rs | 23 ---- sqlx-rt/Cargo.toml | 5 +- sqlx-rt/src/lib.rs | 53 ++++++++++ 12 files changed, 375 insertions(+), 38 deletions(-) create mode 100644 sqlx-bench/Cargo.toml create mode 100644 sqlx-bench/benches/pg_pool.rs delete mode 100644 sqlx-macros/src/runtime.rs diff --git a/Cargo.lock b/Cargo.lock index 94f0552c..836d7536 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,6 +311,18 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "bstr" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31accafdb70df7871592c058eca3985b71104e15ac32f64706022c58867da931" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bumpalo" version = "3.4.0" @@ -367,6 +379,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "cast" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0" +dependencies = [ + "rustc_version", +] + [[package]] name = "cc" version = "1.0.57" @@ -506,6 +527,42 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d375c433320f6c5057ae04a04376eef4d04ce2801448cf8863a78da99107be4" +[[package]] +name = "criterion" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70daa7ceec6cf143990669a04c7df13391d55fb27bd4079d252fca774ba244d8" +dependencies = [ + "atty", + "cast", + "clap", + "criterion-plot", + "csv", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_cbor", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e022feadec601fba1649cfa83586381a4ad31c6bf3a9ab7d408118b05dd9889d" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam-channel" version = "0.4.2" @@ -584,6 +641,28 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279" +dependencies = [ + "bstr", + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "data-encoding" version = "2.2.1" @@ -944,6 +1023,12 @@ dependencies = [ "tokio-io", ] +[[package]] +name = "half" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d36fab90f82edc3c747f9d438e06cf0a491055896f2a279638bb5beed6c40177" + [[package]] name = "hashbrown" version = "0.8.0" @@ -1510,6 +1595,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +[[package]] +name = "oorandom" +version = "11.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a170cebd8021a008ea92e4db85a72f80b35df514ec664b296fdcbb654eac0b2c" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -1788,6 +1879,18 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" +[[package]] +name = "plotters" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d1685fbe7beba33de0330629da9d955ac75bd54f33d7b79f9a895590124f6bb" +dependencies = [ + "js-sys", + "num-traits", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "ppv-lite86" version = "0.2.8" @@ -1907,6 +2010,31 @@ dependencies = [ "rand_core", ] +[[package]] +name = "rayon" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f02856753d04e03e26929f820d0a0a337ebe71f849801eea335d464b349080" +dependencies = [ + "autocfg 1.0.0", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280" +dependencies = [ + "crossbeam-deque", + "crossbeam-queue", + "crossbeam-utils 0.7.2", + "lazy_static", + "num_cpus", +] + [[package]] name = "redox_syscall" version = "0.1.56" @@ -1925,6 +2053,15 @@ dependencies = [ "thread_local", ] +[[package]] +name = "regex-automata" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" +dependencies = [ + "byteorder", +] + [[package]] name = "regex-syntax" version = "0.6.18" @@ -2015,6 +2152,15 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.19" @@ -2097,6 +2243,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_cbor" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.114" @@ -2295,6 +2451,17 @@ dependencies = [ "trybuild", ] +[[package]] +name = "sqlx-bench" +version = "0.1.0" +dependencies = [ + "criterion", + "dotenv", + "once_cell", + "sqlx", + "sqlx-rt", +] + [[package]] name = "sqlx-cli" version = "0.1.0-pre" @@ -2447,20 +2614,18 @@ dependencies = [ name = "sqlx-macros" version = "0.4.0-pre" dependencies = [ - "async-std", "dotenv", "futures 0.3.5", "heck", "hex", - "once_cell", "proc-macro2", "quote", "serde", "serde_json", "sha2 0.8.2", "sqlx-core", + "sqlx-rt", "syn", - "tokio 0.2.21", "url 2.1.1", ] @@ -2473,6 +2638,7 @@ dependencies = [ "async-native-tls", "async-std", "native-tls", + "once_cell", "tokio 0.2.21", "tokio-native-tls", ] @@ -2794,6 +2960,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tinytemplate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d3dc76004a03cec1c5932bca4cdc2e39aaa798e3f82363dd94f9adf6098c12f" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "0.3.3" @@ -3106,6 +3282,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9571542c2ce85ce642e6b58b3364da2fb53526360dfb7c211add4f5c23105ff7" +[[package]] +name = "walkdir" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d" +dependencies = [ + "same-file", + "winapi 0.3.9", + "winapi-util", +] + [[package]] name = "want" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index b623f0e0..0db5d9c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "sqlx-macros", "sqlx-test", "sqlx-cli", + "sqlx-bench", "examples/mysql/todos", "examples/postgres/listen", "examples/postgres/todos", diff --git a/sqlx-bench/Cargo.toml b/sqlx-bench/Cargo.toml new file mode 100644 index 00000000..1e6a690c --- /dev/null +++ b/sqlx-bench/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "sqlx-bench" +version = "0.1.0" +authors = ["Austin Bonander "] +edition = "2018" +publish = false + +[features] +runtime-actix = ["sqlx/runtime-actix", "sqlx-rt/runtime-actix"] +runtime-async-std = ["sqlx/runtime-async-std", "sqlx-rt/runtime-async-std"] +runtime-tokio = ["sqlx/runtime-tokio", "sqlx-rt/runtime-tokio"] + +postgres = ["sqlx/postgres"] + +[dependencies] +criterion = "0.3.3" +dotenv = "0.15.0" +once_cell = "1.4" +sqlx = { version = "0.4.0-pre", path = "../", default-features = false } +sqlx-rt = { version = "0.1.0-pre", path = "../sqlx-rt", default-features = false } + +[[bench]] +name = "pg_pool" +harness = false +required-features = ["postgres"] diff --git a/sqlx-bench/benches/pg_pool.rs b/sqlx-bench/benches/pg_pool.rs new file mode 100644 index 00000000..4cc7edb0 --- /dev/null +++ b/sqlx-bench/benches/pg_pool.rs @@ -0,0 +1,79 @@ +use criterion::{criterion_group, criterion_main, Bencher, Criterion}; +use sqlx::PgPool; + +use std::time::{Duration, Instant}; + +fn bench_pgpool_acquire(c: &mut Criterion) { + let mut group = c.benchmark_group("bench_pgpool_acquire"); + + for &concurrent in [5u32, 10, 50, 100, 500, 1000, 5000 /*, 10_000, 50_000*/].iter() { + for &fair in [false, true].iter() { + let fairness = if fair { "(fair)" } else { "(unfair)" }; + + group.bench_with_input( + format!("{} concurrent {}", concurrent, fairness), + &(concurrent, fair), + |b, &(concurrent, fair)| do_bench_acquire(b, concurrent, fair), + ); + } + } + + group.finish(); +} + +fn do_bench_acquire(b: &mut Bencher, concurrent: u32, fair: bool) { + let pool = sqlx_rt::block_on( + PgPool::builder() + // we don't want timeouts because we want to see how the pool degrades + .connect_timeout(Duration::from_secs(3600)) + // force the pool to start full + .min_size(50) + .max_size(50) + // we're not benchmarking `ping()` + .test_on_acquire(false) + .fair(fair) + .build( + &dotenv::var("DATABASE_URL").expect("DATABASE_URL must be set to run benchmarks"), + ), + ) + .expect("failed to open PgPool"); + + for _ in 0..concurrent { + let pool = pool.clone(); + sqlx_rt::enter_runtime(|| { + sqlx_rt::spawn(async move { + while !pool.is_closed() { + let conn = match pool.acquire().await { + Ok(conn) => conn, + Err(sqlx::Error::PoolClosed) => break, + Err(e) => panic!("failed to acquire concurrent connection: {}", e), + }; + + // pretend we're using the connection + sqlx_rt::sleep(Duration::from_micros(500)).await; + drop(criterion::black_box(conn)); + } + }) + }); + } + + b.iter_custom(|iters| { + sqlx_rt::block_on(async { + // take the start time inside the future to make sure we only count once it's running + let start = Instant::now(); + for _ in 0..iters { + criterion::black_box( + pool.acquire() + .await + .expect("failed to acquire connection for benchmark"), + ); + } + start.elapsed() + }) + }); + + sqlx_rt::block_on(pool.close()); +} + +criterion_group!(pg_pool, bench_pgpool_acquire); +criterion_main!(pg_pool); diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 3e342109..80afd897 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -47,10 +47,20 @@ impl SharedPool { pub(super) async fn close(&self) { self.is_closed.store(true, Ordering::Release); - while self.idle_conns.pop().is_ok() {} while let Ok(waker) = self.waiters.pop() { waker.wake(); } + + // ensure we wait until the pool is actually closed + while self.size() > 0 { + let _ = self + .idle_conns + .pop() + .map(|idle| Floating::from_idle(idle, self)); + + // yield to avoid starving the executor + sqlx_rt::yield_now().await; + } } #[inline] @@ -158,7 +168,7 @@ impl SharedPool { pub(super) async fn acquire<'s>(&'s self) -> Result>, Error> { let start = Instant::now(); let deadline = start + self.options.connect_timeout; - let mut waited = false; + let mut waited = !self.options.fair; // Unless the pool has been closed ... while !self.is_closed() { diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 1935c63e..8653d76d 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -34,6 +34,7 @@ impl Builder { idle_timeout: None, // If true, test the health of a connection on acquire test_on_acquire: true, + fair: false, }, } } @@ -106,6 +107,11 @@ impl Builder { self } + pub fn fair(mut self, fair: bool) -> Self { + self.options.fair = fair; + self + } + /// Consumes the builder, returning a new, initialized connection pool with the given /// connection string. /// @@ -154,4 +160,5 @@ pub(crate) struct Options { pub max_lifetime: Option, pub idle_timeout: Option, pub test_on_acquire: bool, + pub fair: bool, } diff --git a/sqlx-macros/Cargo.toml b/sqlx-macros/Cargo.toml index 163cf981..9bab0661 100644 --- a/sqlx-macros/Cargo.toml +++ b/sqlx-macros/Cargo.toml @@ -19,9 +19,9 @@ proc-macro = true default = [ "runtime-async-std" ] # runtimes -runtime-async-std = [ "sqlx-core/runtime-async-std", "async-std" ] -runtime-tokio = [ "sqlx-core/runtime-tokio", "tokio", "once_cell" ] -runtime-actix = [ "sqlx-core/runtime-actix", "tokio", "once_cell" ] +runtime-async-std = [ "sqlx-core/runtime-async-std", "sqlx-rt/runtime-async-std" ] +runtime-tokio = [ "sqlx-core/runtime-tokio", "sqlx-rt/runtime-tokio" ] +runtime-actix = [ "sqlx-core/runtime-actix", "sqlx-rt/runtime-actix" ] # offline building support offline = ["sqlx-core/offline", "serde", "serde_json", "hex", "sha2"] @@ -41,18 +41,16 @@ uuid = [ "sqlx-core/uuid" ] json = [ "sqlx-core/json", "serde_json" ] [dependencies] -async-std = { version = "1.6.0", default-features = false, optional = true } -tokio = { version = "0.2.21", default-features = false, features = [ "rt-threaded" ], optional = true } dotenv = { version = "0.15.0", default-features = false } futures = { version = "0.3.4", default-features = false, features = [ "executor" ] } hex = { version = "0.4.2", optional = true } heck = "0.3.1" proc-macro2 = { version = "1.0.9", default-features = false } sqlx-core = { version = "0.4.0-pre", default-features = false, path = "../sqlx-core" } +sqlx-rt = { version = "0.1.0-pre", default-features = false, path = "../sqlx-rt" } serde = { version = "1.0.111", optional = true } serde_json = { version = "1.0.30", features = [ "preserve_order" ], optional = true } sha2 = { version = "0.8.2", optional = true } syn = { version = "1.0.30", default-features = false, features = [ "full" ] } quote = { version = "1.0.6", default-features = false } url = { version = "2.1.1", default-features = false } -once_cell = { version = "1.3", features = ["std"], optional = true } diff --git a/sqlx-macros/src/lib.rs b/sqlx-macros/src/lib.rs index d967b4bf..e7d848d3 100644 --- a/sqlx-macros/src/lib.rs +++ b/sqlx-macros/src/lib.rs @@ -15,7 +15,6 @@ type Result = std::result::Result; mod database; mod derives; mod query; -mod runtime; fn macro_result(tokens: proc_macro2::TokenStream) -> TokenStream { quote!( diff --git a/sqlx-macros/src/query/mod.rs b/sqlx-macros/src/query/mod.rs index d2876eaa..196d29b2 100644 --- a/sqlx-macros/src/query/mod.rs +++ b/sqlx-macros/src/query/mod.rs @@ -10,11 +10,11 @@ use quote::{format_ident, quote}; use sqlx_core::connection::Connect; use sqlx_core::database::Database; use sqlx_core::describe::Describe; +use sqlx_rt::block_on; use crate::database::DatabaseExt; use crate::query::data::QueryData; use crate::query::input::RecordType; -use crate::runtime::block_on; mod args; mod data; diff --git a/sqlx-macros/src/runtime.rs b/sqlx-macros/src/runtime.rs deleted file mode 100644 index f31c634c..00000000 --- a/sqlx-macros/src/runtime.rs +++ /dev/null @@ -1,23 +0,0 @@ -// NOTE: this is separate from sqlx-rt because of the non-production nature of it - -#[cfg(feature = "runtime-async-std")] -pub(crate) use async_std::task::block_on; - -#[cfg(any(feature = "runtime-tokio", feature = "runtime-actix"))] -pub fn block_on(future: F) -> F::Output { - use once_cell::sync::Lazy; - use tokio::runtime::{self, Runtime}; - - // lazily initialize a global runtime once for multiple invocations of the macros - static RUNTIME: Lazy = Lazy::new(|| { - runtime::Builder::new() - // `.basic_scheduler()` requires calling `Runtime::block_on()` which needs mutability - .threaded_scheduler() - .enable_io() - .enable_time() - .build() - .expect("failed to initialize Tokio runtime") - }); - - RUNTIME.enter(|| futures::executor::block_on(future)) -} diff --git a/sqlx-rt/Cargo.toml b/sqlx-rt/Cargo.toml index 8bbdb67a..e8350211 100644 --- a/sqlx-rt/Cargo.toml +++ b/sqlx-rt/Cargo.toml @@ -6,9 +6,9 @@ license = "MIT OR Apache-2.0" edition = "2018" [features] -runtime-actix = [ "actix-rt", "actix-threadpool", "tokio", "tokio-native-tls" ] +runtime-actix = [ "actix-rt", "actix-threadpool", "tokio", "tokio-native-tls", "once_cell" ] runtime-async-std = [ "async-std", "async-native-tls" ] -runtime-tokio = [ "tokio", "tokio-native-tls" ] +runtime-tokio = [ "tokio", "tokio-native-tls", "once_cell" ] [dependencies] async-native-tls = { version = "0.3.3", optional = true } @@ -18,3 +18,4 @@ async-std = { version = "1.6.0", features = [ "unstable" ], optional = true } tokio = { version = "0.2.21", optional = true, features = [ "blocking", "fs", "tcp", "uds", "macros", "rt-core", "rt-threaded", "time", "dns", "io-util" ] } tokio-native-tls = { version = "0.1.0", optional = true } native-tls = "0.2.4" +once_cell = { version = "1.3", features = ["std"], optional = true } diff --git a/sqlx-rt/src/lib.rs b/sqlx-rt/src/lib.rs index 835f08eb..1e08de70 100644 --- a/sqlx-rt/src/lib.rs +++ b/sqlx-rt/src/lib.rs @@ -114,3 +114,56 @@ pub use async_std::os::unix::net::UnixStream; #[cfg(all(feature = "async-native-tls", not(feature = "tokio-native-tls")))] pub use async_native_tls::{Error as TlsError, TlsConnector, TlsStream}; + +#[cfg(all( + feature = "runtime-async-std", + not(any(feature = "runtime-actix", feature = "runtime-tokio")) +))] +pub(crate) use async_std::task::block_on; + +#[cfg(all( + feature = "runtime-async-std", + not(any(feature = "runtime-actix", feature = "runtime-tokio")) +))] +pub fn enter_runtime(f: F) -> R +where + F: FnOnce() -> R, +{ + // no-op for async-std + f() +} + +#[cfg(all( + any(feature = "runtime-tokio", feature = "runtime-actix"), + not(feature = "runtime-async-std") +))] +pub use tokio_runtime::{block_on, enter_runtime}; + +#[cfg(any(feature = "runtime-tokio", feature = "runtime-actix"))] +mod tokio_runtime { + use once_cell::sync::Lazy; + use tokio::runtime::{self, Runtime}; + + // lazily initialize a global runtime once for multiple invocations of the macros + static RUNTIME: Lazy = Lazy::new(|| { + runtime::Builder::new() + // `.basic_scheduler()` requires calling `Runtime::block_on()` which needs mutability + .threaded_scheduler() + .enable_io() + .enable_time() + .build() + .expect("failed to initialize Tokio runtime") + }); + + #[cfg(any(feature = "runtime-tokio", feature = "runtime-actix"))] + pub fn block_on(future: F) -> F::Output { + RUNTIME.enter(|| RUNTIME.handle().block_on(future)) + } + + pub fn enter_runtime(f: F) -> R + where + F: FnOnce() -> R, + { + RUNTIME.enter(f) + } +}