diff --git a/Cargo.lock b/Cargo.lock index 7df78920..65b2f23d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2193,6 +2193,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.5.0" @@ -2343,12 +2352,11 @@ checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" [[package]] name = "nu-ansi-term" -version = "0.46.0" +version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "overload", - "winapi", + "windows-sys 0.60.2", ] [[package]] @@ -2495,12 +2503,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "owo-colors" version = "4.2.2" @@ -3492,7 +3494,6 @@ dependencies = [ "async-std", "criterion", "dotenvy", - "env_logger", "futures-util", "hex", "libsqlite3-sys", @@ -3511,6 +3512,8 @@ dependencies = [ "time", "tokio", "tracing", + "tracing-flame", + "tracing-subscriber", "trybuild", "url", ] @@ -4005,6 +4008,7 @@ dependencies = [ "dotenvy", "env_logger", "sqlx", + "tracing-subscriber", ] [[package]] @@ -4451,6 +4455,17 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-flame" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9" +dependencies = [ + "lazy_static", + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -4464,14 +4479,18 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.19" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex-automata", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index 3c3db27d..9faa8b96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -241,6 +241,7 @@ criterion = { version = "0.7.0", features = ["async_tokio"] } libsqlite3-sys = { version = "0.30.1" } tracing = "0.1.41" +tracing-flame = "0.2.0" tracing-subscriber = "0.3.20" # If this is an unconditional dev-dependency then Cargo will *always* try to build `libsqlite3-sys`, diff --git a/benches/any/pool.rs b/benches/any/pool.rs index 423b2ce0..0163590f 100644 --- a/benches/any/pool.rs +++ b/benches/any/pool.rs @@ -4,6 +4,10 @@ use std::fmt::{Display, Formatter}; use std::thread; use std::time::{Duration, Instant}; use tracing::Instrument; +use tracing_flame::FlameLayer; +use tracing_subscriber::{EnvFilter, Layer}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; #[derive(Debug)] struct Input { @@ -24,7 +28,27 @@ impl Display for Input { fn bench_pool(c: &mut Criterion) { sqlx::any::install_default_drivers(); - tracing_subscriber::fmt::try_init().ok(); + + let _guard = if let Ok(path) = dotenvy::var("FLAMEGRAPH_OUT") { + let (layer, guard) = FlameLayer::with_file(&path) + .expect(&format!("error opening path {path:?} (`FLAMEGRAPH_OUT`)")); + + tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()) + ) + .with(layer.with_threads_collapsed(true)) + .try_init() + .ok(); + + tracing::info!("Writing flamegraph to {path:?}"); + + Some(guard) + } else { + tracing_subscriber::fmt::try_init().ok(); + + None + }; let database_url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set"); @@ -136,4 +160,4 @@ fn bench_pool_with(b: &mut Bencher, input: &Input, database_url: &str) { } criterion_group!(benches, bench_pool,); -criterion_main!(benches); +criterion_main!(benches); \ No newline at end of file diff --git a/sqlx-core/src/pool/connection_set.rs b/sqlx-core/src/pool/connection_set.rs index 7a912f75..8ddf5f2b 100644 --- a/sqlx-core/src/pool/connection_set.rs +++ b/sqlx-core/src/pool/connection_set.rs @@ -13,6 +13,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; +use tracing::Instrument; pub struct ConnectionSet { global: Arc, @@ -120,22 +121,37 @@ impl ConnectionSet { } async fn acquire_inner(&self, pref: AcquirePreference) -> SlotGuard { - let preferred_slot = current_thread_id() % self.slots.len(); - - tracing::trace!(preferred_slot, ?pref, "acquire_inner"); + let preferred_slot = current_thread_id() % self.slots.len(); // Always try to lock the connection associated with our thread ID let mut acquire_preferred = pin!(self.slots[preferred_slot].acquire(pref)); + let alternate_slot = (preferred_slot + 547usize.wrapping_mul( + Arc::strong_count(&self.slots[preferred_slot].connection) + )) % self.slots.len(); + + let mut acquire_alternate = pin!(self.slots[alternate_slot].acquire(pref)); + let mut listen_global = pin!(self.global.listen(pref)); - let mut yielded = false; + let mut yielded_1 = false; + let mut yielded_2 = false; std::future::poll_fn(|cx| { if let Poll::Ready(locked) = acquire_preferred.as_mut().poll(cx) { return Poll::Ready(locked); } + if let Poll::Ready(locked) = acquire_alternate.as_mut().poll(cx) { + return Poll::Ready(locked); + } + + // if !yielded_1 { + // cx.waker().wake_by_ref(); + // yielded_1 = true; + // return Poll::Pending; + // } + if let Poll::Ready(slot) = listen_global.as_mut().poll(cx) { if let Some(locked) = self.slots[slot].try_acquire(pref) { return Poll::Ready(locked); @@ -144,9 +160,9 @@ impl ConnectionSet { listen_global.as_mut().set(self.global.listen(pref)); } - if !yielded { + if !yielded_2 { cx.waker().wake_by_ref(); - yielded = true; + yielded_2 = true; return Poll::Pending; } @@ -156,6 +172,12 @@ impl ConnectionSet { Poll::Pending }) + .instrument(tracing::trace_span!( + target: "sqlx::pool::connection_set", + "acquire_inner", + preferred_slot, + ?pref, + )) .await } @@ -174,14 +196,24 @@ impl ConnectionSet { } fn try_acquire(&self, pref: AcquirePreference) -> Option> { - let mut search_slot = current_thread_id() % self.slots.len(); + let preferred_slot = current_thread_id() % self.slots.len(); - for _ in 0..self.slots.len() { - if let Some(locked) = self.slots[search_slot].try_acquire(pref) { - return Some(locked); + let (slots_before, slots_after) = self.slots.split_at(preferred_slot); + + let (preferred_slot, slots_after) = slots_after.split_first().unwrap(); + + if let Some(locked) = preferred_slot.try_acquire(pref) { + return Some(locked); + } + + for slot in slots_before.iter().chain(slots_after).rev() { + if self.global.locked_set[slot.index].load(Ordering::Relaxed) { + continue; } - search_slot = self.next_slot(search_slot); + if let Some(locked) = slot.try_acquire(pref) { + return Some(locked); + } } None @@ -398,6 +430,7 @@ impl SlotGuard { let connected = locked.is_some(); self.slot.set_is_connected(connected); self.slot.locked.store(false, Ordering::Release); + self.slot.global.locked_set[self.slot.index].store(false, Ordering::Relaxed); connected }) } diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index 1ae687f1..775b8976 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -20,7 +20,7 @@ use crate::{private_tracing_dynamic_event, rt}; use event_listener::listener; use futures_util::future::{self}; use std::time::{Duration, Instant}; -use tracing::Level; +use tracing::{Instrument, Level}; const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5); const TEST_BEFORE_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(60); @@ -181,7 +181,7 @@ impl PoolInner { tracing::trace!("waiting for any connection"); let disconnected = match self.connections.acquire_any().await { - Ok(conn) => match finish_acquire(self, conn).await { + Ok(conn) => match self.finish_acquire(conn).await { Ok(conn) => return Ok(conn), Err(slot) => slot, }, @@ -199,7 +199,7 @@ impl PoolInner { match race(&mut connect_task, self.connections.acquire_connected()).await { Ok(Ok(conn)) => return Ok(conn), Ok(Err(e)) => return Err(e), - Err(conn) => match finish_acquire(self, conn).await { + Err(conn) => match self.finish_acquire(conn).await { Ok(conn) => return Ok(conn), Err(_) => continue, }, @@ -207,6 +207,15 @@ impl PoolInner { } } + #[inline(always)] + async fn finish_acquire(self: &Arc, conn: ConnectedSlot>) -> Result, DisconnectedSlot>> { + let span = tracing::trace_span!(target: "sqlx::pool", "finish_acquire", connection_id=?conn.id); + + finish_acquire(self, conn) + .instrument(span) + .await + } + pub(crate) async fn try_min_connections( self: &Arc, deadline: Option,