mirror of
https://github.com/launchbadge/sqlx.git
synced 2025-12-29 21:00:54 +00:00
fix: debug timeouts in benchmark
This commit is contained in:
parent
44e40b2816
commit
d905016923
@ -467,3 +467,6 @@ name = "any-pool"
|
||||
path = "benches/any/pool.rs"
|
||||
required-features = ["runtime-tokio", "any"]
|
||||
harness = false
|
||||
|
||||
[profile.bench]
|
||||
debug = true
|
||||
|
||||
@ -156,12 +156,6 @@ impl<DB: Database> PoolInner<DB> {
|
||||
return Poll::Ready(Err(Error::PoolClosed));
|
||||
}
|
||||
|
||||
if let Poll::Ready(()) = deadline.as_mut().poll(cx) {
|
||||
return Poll::Ready(Err(Error::PoolTimedOut {
|
||||
last_connect_error: connect_shared.take_error().map(Box::new),
|
||||
}));
|
||||
}
|
||||
|
||||
if let Poll::Ready(res) = acquire_connected.as_mut().poll(cx) {
|
||||
match res {
|
||||
Ok(conn) => {
|
||||
@ -205,6 +199,12 @@ impl<DB: Database> PoolInner<DB> {
|
||||
return Poll::Ready(res);
|
||||
}
|
||||
|
||||
if let Poll::Ready(()) = deadline.as_mut().poll(cx) {
|
||||
return Poll::Ready(Err(Error::PoolTimedOut {
|
||||
last_connect_error: connect_shared.take_error().map(Box::new),
|
||||
}));
|
||||
}
|
||||
|
||||
return Poll::Pending;
|
||||
})
|
||||
.await?;
|
||||
|
||||
@ -303,12 +303,6 @@ impl<T> Shard<T, [Arc<Mutex<Option<T>>>]> {
|
||||
Mask(!locked_set & connected_mask)
|
||||
}
|
||||
|
||||
/// Choose the first index that is unlocked with bit `connected`
|
||||
#[inline]
|
||||
fn next_unlocked(&self, connected: bool) -> Option<ConnectionIndex> {
|
||||
self.unlocked_mask(connected).next()
|
||||
}
|
||||
|
||||
async fn acquire(self: &Arc<Self>, connected: bool) -> SlotGuard<T> {
|
||||
// Attempt an unfair acquire first, before we modify the waitlist.
|
||||
if let Some(locked) = self.try_acquire(connected) {
|
||||
@ -323,18 +317,34 @@ impl<T> Shard<T, [Arc<Mutex<Option<T>>>]> {
|
||||
|
||||
event_listener::listener!(event_to_listen => listener);
|
||||
|
||||
// We need to check again after creating the event listener,
|
||||
// because in the meantime, a concurrent task may have seen that there were no listeners
|
||||
// and just unlocked its connection.
|
||||
if let Some(locked) = self.try_acquire(connected) {
|
||||
return locked;
|
||||
}
|
||||
let mut listener = pin!(listener);
|
||||
|
||||
listener.await
|
||||
loop {
|
||||
// We need to check again after creating the event listener,
|
||||
// because in the meantime, a concurrent task may have seen that there were no listeners
|
||||
// and just unlocked its connection.
|
||||
match rt::timeout(NON_LOCAL_ACQUIRE_DELAY, listener.as_mut()).await {
|
||||
Ok(slot) => return slot,
|
||||
Err(_) => {
|
||||
if let Some(slot) = self.try_acquire(connected) {
|
||||
return slot;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_acquire(self: &Arc<Self>, connected: bool) -> Option<SlotGuard<T>> {
|
||||
self.try_lock(self.next_unlocked(connected)?)
|
||||
// If `locked_set` is constantly changing, don't loop forever.
|
||||
for index in self.unlocked_mask(connected) {
|
||||
if let Some(slot) = self.try_lock(index) {
|
||||
return Some(slot);
|
||||
}
|
||||
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn try_lock(self: &Arc<Self>, index: ConnectionIndex) -> Option<SlotGuard<T>> {
|
||||
@ -353,7 +363,7 @@ impl<T> Shard<T, [Arc<Mutex<Option<T>>>]> {
|
||||
}
|
||||
|
||||
fn iter_min_connections(self: &Arc<Self>) -> impl Iterator<Item = DisconnectedSlot<T>> + '_ {
|
||||
(0..self.connections.len())
|
||||
self.unlocked_mask(false)
|
||||
.filter_map(|index| {
|
||||
let slot = self.try_lock(index)?;
|
||||
|
||||
@ -493,7 +503,7 @@ impl<T> DisconnectedSlot<T> {
|
||||
&self.0.shard.leaked_set,
|
||||
self.0.index,
|
||||
true,
|
||||
Ordering::Release,
|
||||
Ordering::AcqRel,
|
||||
);
|
||||
|
||||
self.0.shard.leak_event.notify(usize::MAX.tag(self.0.index));
|
||||
@ -627,7 +637,7 @@ impl<T> Drop for SlotGuard<T> {
|
||||
// but then fail to lock the mutex for it.
|
||||
drop(locked);
|
||||
|
||||
atomic_set(&self.shard.locked_set, self.index, false, Ordering::Release);
|
||||
atomic_set(&self.shard.locked_set, self.index, false, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
@ -737,7 +747,7 @@ impl Iterator for Mask {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Params, MAX_SHARD_SIZE};
|
||||
use super::{Mask, Params, MAX_SHARD_SIZE};
|
||||
|
||||
#[test]
|
||||
fn test_params() {
|
||||
@ -762,4 +772,27 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mask() {
|
||||
let inputs: &[(usize, &[usize])] = &[
|
||||
(0b0, &[]),
|
||||
(0b1, &[0]),
|
||||
(0b11, &[0, 1]),
|
||||
(0b111, &[0, 1, 2]),
|
||||
(0b1000, &[3]),
|
||||
(0b1001, &[0, 3]),
|
||||
(0b1001001, &[0, 3, 6]),
|
||||
];
|
||||
|
||||
for (mask, expected_indices) in inputs {
|
||||
let actual_indices = Mask(*mask).collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
actual_indices[..],
|
||||
expected_indices[..],
|
||||
"invalid mask: {mask:b}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user