diff --git a/sqlx-core/src/pool/connection_set.rs b/sqlx-core/src/pool/connection_set.rs index 8683f8a9..7a912f75 100644 --- a/sqlx-core/src/pool/connection_set.rs +++ b/sqlx-core/src/pool/connection_set.rs @@ -33,6 +33,7 @@ enum AcquirePreference { struct Global { unlock_event: Event, disconnect_event: Event, + locked_set: Box<[AtomicBool]>, num_connected: AtomicUsize, min_connections: usize, min_connections_event: Event<()>, @@ -63,6 +64,7 @@ impl ConnectionSet { let global = Arc::new(Global { unlock_event: Event::with_tag(), disconnect_event: Event::with_tag(), + locked_set: (0..*size.end()).map(|_| AtomicBool::new(false)).collect(), num_connected: AtomicUsize::new(0), min_connections: *size.start(), min_connections_event: Event::with_tag(), @@ -118,69 +120,41 @@ impl ConnectionSet { } async fn acquire_inner(&self, pref: AcquirePreference) -> SlotGuard { - /// Smallest time-step supported by [`tokio::time::sleep()`]. - /// - /// `async-io` doesn't document a minimum time-step, instead deferring to the platform. - const STEP_INTERVAL: Duration = Duration::from_millis(1); - - const SEARCH_LIMIT: usize = 5; - - let preferred_slot = current_thread_id() % self.slots.len(); + let preferred_slot = current_thread_id() % self.slots.len(); tracing::trace!(preferred_slot, ?pref, "acquire_inner"); // Always try to lock the connection associated with our thread ID let mut acquire_preferred = pin!(self.slots[preferred_slot].acquire(pref)); - let mut step_interval = pin!(rt::interval_after(STEP_INTERVAL)); - - let mut intervals_elapsed = 0usize; - - let mut search_slots = FuturesUnordered::new(); - let mut listen_global = pin!(self.global.listen(pref)); - let mut search_slot = self.next_slot(preferred_slot); + let mut yielded = false; - std::future::poll_fn(|cx| loop { + std::future::poll_fn(|cx| { if let Poll::Ready(locked) = acquire_preferred.as_mut().poll(cx) { return Poll::Ready(locked); } - // Don't push redundant futures for small sets. - let search_limit = cmp::min(SEARCH_LIMIT, self.slots.len()); - - if search_slots.len() < search_limit && step_interval.as_mut().poll_tick(cx).is_ready() - { - intervals_elapsed = intervals_elapsed.saturating_add(1); - - if search_slot != preferred_slot && self.slots[search_slot].matches_pref(pref) { - search_slots.push(self.slots[search_slot].lock()); - } - - search_slot = self.next_slot(search_slot); - } - - if let Poll::Ready(Some(locked)) = Pin::new(&mut search_slots).poll_next(cx) { - if locked.matches_pref(pref) { + if let Poll::Ready(slot) = listen_global.as_mut().poll(cx) { + if let Some(locked) = self.slots[slot].try_acquire(pref) { return Poll::Ready(locked); } - continue; + listen_global.as_mut().set(self.global.listen(pref)); } - if intervals_elapsed > search_limit && search_slots.len() < search_limit { - if let Poll::Ready(slot) = listen_global.as_mut().poll(cx) { - if self.slots[slot].matches_pref(pref) { - search_slots.push(self.slots[slot].lock()); - } - - listen_global.as_mut().set(self.global.listen(pref)); - continue; - } + if !yielded { + cx.waker().wake_by_ref(); + yielded = true; + return Poll::Pending; } - return Poll::Pending; + if let Some(locked) = self.try_acquire(pref) { + return Poll::Ready(locked); + } + + Poll::Pending }) .await }