diff --git a/Cargo.toml b/Cargo.toml index a92c67d8..3c3db27d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -467,3 +467,6 @@ name = "any-pool" path = "benches/any/pool.rs" required-features = ["runtime-tokio", "any"] harness = false + +[profile.bench] +debug = true diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index eb6e827e..046834a4 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -156,12 +156,6 @@ impl PoolInner { return Poll::Ready(Err(Error::PoolClosed)); } - if let Poll::Ready(()) = deadline.as_mut().poll(cx) { - return Poll::Ready(Err(Error::PoolTimedOut { - last_connect_error: connect_shared.take_error().map(Box::new), - })); - } - if let Poll::Ready(res) = acquire_connected.as_mut().poll(cx) { match res { Ok(conn) => { @@ -205,6 +199,12 @@ impl PoolInner { return Poll::Ready(res); } + if let Poll::Ready(()) = deadline.as_mut().poll(cx) { + return Poll::Ready(Err(Error::PoolTimedOut { + last_connect_error: connect_shared.take_error().map(Box::new), + })); + } + return Poll::Pending; }) .await?; diff --git a/sqlx-core/src/pool/shard.rs b/sqlx-core/src/pool/shard.rs index 2385c998..c1964c7c 100644 --- a/sqlx-core/src/pool/shard.rs +++ b/sqlx-core/src/pool/shard.rs @@ -303,12 +303,6 @@ impl Shard>>]> { Mask(!locked_set & connected_mask) } - /// Choose the first index that is unlocked with bit `connected` - #[inline] - fn next_unlocked(&self, connected: bool) -> Option { - self.unlocked_mask(connected).next() - } - async fn acquire(self: &Arc, connected: bool) -> SlotGuard { // Attempt an unfair acquire first, before we modify the waitlist. if let Some(locked) = self.try_acquire(connected) { @@ -323,18 +317,34 @@ impl Shard>>]> { event_listener::listener!(event_to_listen => listener); - // We need to check again after creating the event listener, - // because in the meantime, a concurrent task may have seen that there were no listeners - // and just unlocked its connection. - if let Some(locked) = self.try_acquire(connected) { - return locked; - } + let mut listener = pin!(listener); - listener.await + loop { + // We need to check again after creating the event listener, + // because in the meantime, a concurrent task may have seen that there were no listeners + // and just unlocked its connection. + match rt::timeout(NON_LOCAL_ACQUIRE_DELAY, listener.as_mut()).await { + Ok(slot) => return slot, + Err(_) => { + if let Some(slot) = self.try_acquire(connected) { + return slot; + } + } + } + } } fn try_acquire(self: &Arc, connected: bool) -> Option> { - self.try_lock(self.next_unlocked(connected)?) + // If `locked_set` is constantly changing, don't loop forever. + for index in self.unlocked_mask(connected) { + if let Some(slot) = self.try_lock(index) { + return Some(slot); + } + + std::hint::spin_loop(); + } + + None } fn try_lock(self: &Arc, index: ConnectionIndex) -> Option> { @@ -353,7 +363,7 @@ impl Shard>>]> { } fn iter_min_connections(self: &Arc) -> impl Iterator> + '_ { - (0..self.connections.len()) + self.unlocked_mask(false) .filter_map(|index| { let slot = self.try_lock(index)?; @@ -493,7 +503,7 @@ impl DisconnectedSlot { &self.0.shard.leaked_set, self.0.index, true, - Ordering::Release, + Ordering::AcqRel, ); self.0.shard.leak_event.notify(usize::MAX.tag(self.0.index)); @@ -627,7 +637,7 @@ impl Drop for SlotGuard { // but then fail to lock the mutex for it. drop(locked); - atomic_set(&self.shard.locked_set, self.index, false, Ordering::Release); + atomic_set(&self.shard.locked_set, self.index, false, Ordering::AcqRel); } } @@ -737,7 +747,7 @@ impl Iterator for Mask { #[cfg(test)] mod tests { - use super::{Params, MAX_SHARD_SIZE}; + use super::{Mask, Params, MAX_SHARD_SIZE}; #[test] fn test_params() { @@ -762,4 +772,27 @@ mod tests { } } } + + #[test] + fn test_mask() { + let inputs: &[(usize, &[usize])] = &[ + (0b0, &[]), + (0b1, &[0]), + (0b11, &[0, 1]), + (0b111, &[0, 1, 2]), + (0b1000, &[3]), + (0b1001, &[0, 3]), + (0b1001001, &[0, 3, 6]), + ]; + + for (mask, expected_indices) in inputs { + let actual_indices = Mask(*mask).collect::>(); + + assert_eq!( + actual_indices[..], + expected_indices[..], + "invalid mask: {mask:b}" + ); + } + } }