WIP feat: implement metrics-gathering support for Pool

TODO: tests
This commit is contained in:
Austin Bonander 2022-06-15 19:50:26 -07:00
parent 7d8ded9a1a
commit 99b900e146
No known key found for this signature in database
GPG Key ID: 461F7F0F45383F2B
6 changed files with 599 additions and 13 deletions

View File

@ -62,7 +62,7 @@ all-types = [
]
bigdecimal = ["bigdecimal_", "num-bigint"]
decimal = ["rust_decimal", "num-bigint"]
json = ["serde", "serde_json"]
json = ["serde", "serde_json", "enum-map/serde"]
# runtimes
runtime-actix-native-tls = [
@ -185,6 +185,7 @@ hashlink = "0.8.0"
indexmap = "1.6.0"
hkdf = { version = "0.12.0", optional = true }
event-listener = "2.5.2"
enum-map = "2.4.0"
[dev-dependencies]
sqlx = { version = "0.6.0", path = "..", features = ["postgres", "sqlite", "mysql"] }

View File

@ -13,6 +13,7 @@ use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use crate::pool::metrics::{AcquirePhase, PoolMetricsCollector};
use crate::pool::options::PoolConnectionMetadata;
use std::time::{Duration, Instant};
@ -178,13 +179,22 @@ impl<DB: Database> PoolInner<DB> {
return Err(Error::PoolClosed);
}
let deadline = Instant::now() + self.options.acquire_timeout;
let metrics = &self.options.metrics;
sqlx_rt::timeout(
let acquire_start = Instant::now();
let deadline = acquire_start + self.options.acquire_timeout;
let mut phase = AcquirePhase::Waiting;
let res = sqlx_rt::timeout(
self.options.acquire_timeout,
async {
loop {
phase = AcquirePhase::Waiting;
let waiting_start = Instant::now();
let permit = self.semaphore.acquire(1).await;
metrics.permit_wait_time(waiting_start.elapsed());
if self.is_closed() {
return Err(Error::PoolClosed);
@ -194,7 +204,7 @@ impl<DB: Database> PoolInner<DB> {
let guard = match self.pop_idle(permit) {
// Then, check that we can use it...
Ok(conn) => match check_idle_conn(conn, &self.options).await {
Ok(conn) => match check_idle_conn(conn, &self.options, &mut phase).await {
// All good!
Ok(live) => return Ok(live),
@ -207,24 +217,40 @@ impl<DB: Database> PoolInner<DB> {
// we can open a new connection
guard
} else {
// I can't imagine this occurring unless there's a race condition where
// the number of available permits can exceed the max size
// without the pool being closed.
//
// If this does happen, the safest thing to do is return to the top
// and wait for another permit.
log::debug!("woke but was unable to acquire idle connection or open new one; retrying");
continue;
}
};
// Attempt to connect...
return self.connect(deadline, guard).await;
return self.connect(deadline, guard, &mut phase).await;
}
}
)
.await
.map_err(|_| Error::PoolTimedOut)?
.map_err(|_| {
metrics.acquire_timed_out(phase);
Error::PoolTimedOut
})?;
if res.is_ok() {
metrics.connection_acquired(acquire_start.elapsed());
}
res
}
pub(super) async fn connect(
self: &Arc<Self>,
deadline: Instant,
guard: DecrementSizeGuard<DB>,
phase: &mut AcquirePhase,
) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
@ -238,16 +264,20 @@ impl<DB: Database> PoolInner<DB> {
// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
*phase = AcquirePhase::Connecting;
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
let meta = PoolConnectionMetadata {
age: Duration::ZERO,
idle_for: Duration::ZERO,
};
let res = if let Some(callback) = &self.options.after_connect {
*phase = AcquirePhase::AfterConnectCallback;
// See comment on `PoolOptions::after_connect`
let meta = PoolConnectionMetadata {
age: Duration::ZERO,
idle_for: Duration::ZERO,
};
callback(&mut raw, meta).await
} else {
Ok(())
@ -258,6 +288,7 @@ impl<DB: Database> PoolInner<DB> {
Err(e) => {
log::error!("error returned from after_connect: {:?}", e);
// The connection is broken, don't try to close nicely.
*phase = AcquirePhase::ClosingInvalidConnection;
let _ = raw.close_hard().await;
// Fall through to the backoff.
@ -279,6 +310,8 @@ impl<DB: Database> PoolInner<DB> {
Err(_) => return Err(Error::PoolTimedOut),
}
*phase = AcquirePhase::Backoff;
// If the connection is refused, wait in exponentially
// increasing steps for the server to come up,
// capped by a factor of the remaining time until the deadline
@ -310,7 +343,10 @@ impl<DB: Database> PoolInner<DB> {
// We skip `after_release` since the connection was never provided to user code
// besides `after_connect`, if they set it.
self.release(self.connect(deadline, guard).await?);
self.release(
self.connect(deadline, guard, &mut AcquirePhase::Connecting)
.await?,
);
}
Ok(())
@ -351,16 +387,22 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
phase: &mut AcquirePhase,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&conn, options) {
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close().await);
}
if options.test_before_acquire {
*phase = AcquirePhase::TestBeforeAcquire;
// Check that the connection is still live
if let Err(e) = conn.ping().await {
*phase = AcquirePhase::ClosingInvalidConnection;
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
@ -371,16 +413,20 @@ async fn check_idle_conn<DB: Database>(
}
if let Some(test) = &options.before_acquire {
*phase = AcquirePhase::BeforeAcquireCallback;
let meta = conn.metadata();
match test(&mut conn.live.raw, meta).await {
Ok(false) => {
// connection was rejected by user-defined hook, close nicely
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close().await);
}
Err(error) => {
log::warn!("error from `before_acquire`: {}", error);
// connection is broken so don't try to close nicely
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close_hard().await);
}

View File

@ -0,0 +1,150 @@
//! Metrics collection utilities for [`Pool`][crate::pool::Pool].
//!
//!
use std::sync::Arc;
use std::time::Duration;
// Saves a bunch of redundant links in docs.
// Just `#[cfg(doc)]` doesn't work for some reason.
#[cfg_attr(not(doc), allow(unused_imports))]
use {
crate::connection::Connection,
crate::pool::{Pool, PoolOptions},
};
mod simple;
pub use simple::{
AcquireTimeoutsPerPhase, SimplePoolMetrics, SimplePoolMetricsSnapshot, SimpleTimingStats,
};
/// Describes a type that can collect metrics from [`Pool`].
///
/// You can set the metrics collector for a `Pool` instance using [`PoolOptions::metrics_collector`].
///
/// For an easy-start implementation, see [`SimplePoolMetrics`].
///
/// All methods on this trait have provided impls so you can override just the ones you care about.
pub trait PoolMetricsCollector: Send + Sync + 'static {
/// Record when [`Pool::acquire()`] is called.
fn acquire_called(&self) {}
/// Record how long a [`Pool::acquire()`] call waited for a semaphore permit.
///
/// This is the first stage of `acquire()` and gives the call the right-of-way to either
/// pop a connection from the idle queue or open a new one.
///
/// This time is likely to increase as the pool comes under higher and higher load,
/// and will asymptotically approach the [acquire timeout][PoolOptions::acquire_timeout].
///
/// If `acquire()` times out while waiting for a permit, this method will not be called.
/// You will get an <code>acquire_timed_out([AcquirePhase::Waiting])</code> call instead.
///
/// [acquire_timed_out]: Self::acquire_timed_out
fn permit_wait_time(&self, duration: Duration) {
drop(duration);
}
/// Record when [`Pool::acquire()`] times out as governed by [`PoolOptions::acquire_timeout`].
///
/// `acquire()` has several internal asynchronous operations that it may time out on.
/// The given [`AcquirePhase`] tells you which one timed out.
fn acquire_timed_out(&self, phase: AcquirePhase) {
drop(phase);
}
/// Record when a connection is successfully acquired.
fn connection_acquired(&self, total_wait: Duration) {
drop(total_wait);
}
}
macro_rules! opt_delegate {
($receiver:ident.$method:ident $( ( $($arg:expr),*) )?) => {
if let Some(this) = $receiver {
this.$method($( $($arg),* )?);
}
}
}
#[doc(hidden)]
impl PoolMetricsCollector for Option<Arc<dyn PoolMetricsCollector>> {
fn acquire_called(&self) {
opt_delegate!(self.acquire_called());
}
fn permit_wait_time(&self, duration: Duration) {
opt_delegate!(self.permit_wait_time(duration));
}
fn acquire_timed_out(&self, phase: AcquirePhase) {
opt_delegate!(self.acquire_timed_out(phase));
}
fn connection_acquired(&self, total_wait: Duration) {
opt_delegate!(self.connection_acquired(total_wait));
}
}
/// The phase that [`Pool::acquire()`] was in when it timed out.
///
/// [`Pool::acquire()`] has several internal asynchronous operations, any of which may lead
/// to it timing out. Which phases are executed depends on multiple things:
///
/// * The pool's configuration.
/// * If an idle connection was available or not.
/// * If there is room in the pool for a new connection.
///
/// ### Note: Some Trait impls are Unstable
/// The `enum_map` trait impls are *not* considered part of the stable API.
/// They would not be listed in documentation if it was possible to tell the derive to hide them.
///
/// We reserve the right to update `enum_map` to a non-compatible version if necessary.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, enum_map::Enum)]
#[cfg_attr(feature = "json", derive(serde::Serialize))]
#[non_exhaustive]
pub enum AcquirePhase {
/// Initial [`Pool::acquire()`] phase: waiting for a semaphore permit.
///
/// A permit represents the privilege to acquire a connection, either by popping one
/// from the idle queue or opening a new one.
Waiting,
/// `acquire()` found an idle connection. It then calls [`Connection::ping()`] on it.
///
/// Only done if [`PoolOptions::test_before_acquire`] is `true` (enabled by default).
TestBeforeAcquire,
/// `acquire()` found an idle connection and the `TestBeforeAcquire` phase succeeded
/// or was skipped.
///
/// It then invokes the user-defined [`before_acquire`][PoolOptions::before_acquire] callback, if set.
BeforeAcquireCallback,
/// `acquire()` found an idle connection but decided to close it.
///
/// This may have happened for any of the following reasons:
/// * The connection's age exceeded [`PoolOptions::max_lifetime`].
/// * The `TestBeforeAcquire` phase failed.
/// * The `BeforeAcquireCallback` errored or rejected the connection.
/// * A new connection was opened but the `AfterConnectCallback` phase errored.
ClosingInvalidConnection,
/// `acquire()` either did not find an idle connection or the connection it got failed
/// the `TestBeforeAcquire` or `BeforeAcquireCallback` phase and was closed.
///
/// It then attempted to open a new connection.
Connecting,
/// `acquire()` successfully opened a new connection.
///
/// It then invokes the user-defined [`after_connect`][PoolOptions::after_connect] callback, if set.
AfterConnectCallback,
/// `acquire()` failed to open a new connection or the connection failed the
/// `AfterConnectCallback` phase.
///
/// It then waits in a backoff loop before attempting to open another connection.
Backoff,
}

View File

@ -0,0 +1,369 @@
use std::cmp;
use std::fmt::{self, Formatter};
use std::ops::Index;
use std::sync::atomic::{self, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use enum_map::EnumMap;
// Saves a bunch of redundant links in docs.
// Just `#[cfg(doc)]` doesn't work for some reason.
use crate::pool::metrics::{AcquirePhase, PoolMetricsCollector};
#[cfg_attr(not(doc), allow(unused_imports))]
use crate::pool::{Pool, PoolOptions};
/// A simple but hopefully useful metrics collector for [`Pool`].
///
/// See [`SimplePoolMetricsSnapshot`] for the metrics collected by this implementation.
///
/// # Example
/// This example is written for PostgreSQL and Tokio but can trivially be adapted
/// to other databases and/or async-std.
///
/// ```no_run
/// # #[cfg(feature = "postgres")]
/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
/// use sqlx::Executor;
/// use sqlx::postgres::PgPoolOptions;
/// use sqlx::pool::metrics::SimplePoolMetrics;
///
/// let metrics = SimplePoolMetrics::new();
///
/// let pool = PgPoolOptions::new()
/// .metrics_collector(metrics.collector())
/// .connect("postgres:// …")
/// .await?;
///
/// tokio::spawn(async move {
/// // Post metrics every minute.
/// tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
///
/// // Warning: very verbose!
/// println!("current pool metrics: {:#?}", metrics.snapshot());
/// });
///
/// // use `pool`...
///
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct SimplePoolMetrics {
inner: Arc<SimpleMetricsInner>,
}
/// A snapshot of metrics returned by [`SimplePoolMetrics::snapshot()`].
///
/// If the `json` feature is enabled, this type implements `serde::Serialize`.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "json", derive(serde::Serialize))]
#[non_exhaustive]
pub struct SimplePoolMetricsSnapshot {
/// Total number of calls to [`Pool::acquire()`] when the snapshot was taken.
pub acquire_calls: u64,
/// Statistics for the time [`Pool::acquire()`] spent in [`AcquirePhase::Waiting`].
pub permit_wait_time: SimpleTimingStats,
/// Statistics for the time [`Pool::acquire()`] takes to acquire a connection.
pub acquire_time: SimpleTimingStats,
/// Total number of times [`Pool::acquire()`] timed out.
pub acquire_timeouts: u64,
/// Total number of times [`Pool::acquire()`] timed out aggregated per [`AcquirePhase`] in which
/// the timeout occurred.
///
/// The value type can be indexed by `AcquirePhase`.
///
/// ```rust
/// use sqlx::pool::metrics::{AcquirePhase, SimplePoolMetrics, SimplePoolMetricsSnapshot};
///
/// let metrics: SimplePoolMetrics = SimplePoolMetrics::new();
///
/// // pass `metrics.collector()` to `PoolOptions::metrics_collector()`
/// // then construct and start using the `Pool`
///
/// // sometime later...
///
/// let snapshot: SimplePoolMetricsSnapshot = metrics.snapshot();
///
/// println!(
/// "number of times the pool timed out waiting for a permit = {}",
/// snapshot.acquire_timeouts_per_phase[AcquirePhase::Waiting]
/// );
/// ```
pub acquire_timeouts_per_phase: AcquireTimeoutsPerPhase,
}
/// The statistics for an individual [`Pool`] timing metric collected by [`SimplePoolMetrics`].
#[derive(Debug, Clone)]
#[cfg_attr(feature = "json", derive(serde::Serialize))]
#[non_exhaustive]
pub struct SimpleTimingStats {
/// The total count of samples collected for this metric.
pub sample_count: u64,
/// The minimum time for this metric. [`Duration::ZERO`] if no samples were collected.
pub min: Duration,
/// The average time for this metric, calculated as an [Exponential Moving Average].
///
/// [`Duration::ZERO`] if no samples were collected.
///
/// The EMA coefficient is set during construction of [`SimplePoolMetrics`].
/// See [`SimplePoolMetrics::with_ema_coefficient()`] for details.
///
/// [Exponential Moving Average]: https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
pub average: Duration,
/// The maximum time for this metric. [`Duration::ZERO`] if no samples were collected.
pub max: Duration,
}
/// Counts of [`Pool::acquire()`] timeouts aggregated per [`AcquirePhase`] in which the timeout occurred.
#[derive(Debug, Clone)]
#[cfg_attr(feature = "json", derive(serde::Serialize))]
pub struct AcquireTimeoutsPerPhase(EnumMap<AcquirePhase, u64>);
#[derive(Default)]
struct SimpleMetricsInner {
ema_coefficient: f64,
acquire_calls: AtomicU64,
permit_wait_time: AtomicTimingStats,
acquire_time: AtomicTimingStats,
acquire_timeouts: AtomicU64,
acquire_timeouts_per_phase: EnumMap<AcquirePhase, AtomicU64>,
}
#[derive(Default)]
struct AtomicTimingStats {
sample_count: AtomicU64,
min_nanos: AtomicU64,
average_nanos: AtomicU64,
max_nanos: AtomicU64,
}
impl SimplePoolMetrics {
/// Construct with default settings.
pub fn new() -> SimplePoolMetrics {
// Arbitrarily chosen, but should give decent metrics.
// See the table in the docs below for details.
Self::with_ema_coefficient(0.01)
}
/// Construct with the given coefficient for calculating [Exponential Moving Averages].
///
/// `ema_coefficient` is the factor `α` in the following formula:
///
/// <img
/// src="https://wikimedia.org/api/rest_v1/media/math/render/svg/692012ed1c78d38cbbe6ad9935786c80ec7c24de"
/// style="filter: invert(100%)"></img>
///
/// Essentially, it determines how much new samples influence the average. A smaller coefficient
/// produces a more stable but more slowly moving average, a larger coefficient produces
/// a quickly moving but chaotic average.
///
/// The following table shows how much each sample contributes to the average
/// for some arbitrary coefficients, where the Nth sample is the latest:
///
// Got kinda nerd sniped calculating this table, tbh.
// I was trying to demonstrate how quickly each coefficient makes old samples irrelevant.
/// | α = | 0.01 | 0.05 | 0.1 | 0.2 | 0.25 | 0.5 |
/// |-------|-------|-------|----------|---------|--------|---------|
/// | N | 1% | 5% | 10% | 20% | 25% | 50% |
/// | N-1 | 0.99% | 4.75% | 9% | 16% | 18.75% | 25% |
/// | N-2 | 0.98% | 4.51% | 8.1% | 12.8% | 14.06% | 12.5% |
/// | N-3 | 0.97% | 4.29% | 7.29% | 10.24% | 10.54% | 6.25% |
/// | N-4 | 0.96% | 4.07% | 6.56% | 8.19% | 7.91% | 3.125% |
/// | ⋮ | | ⋮ | ⋮ | ⋮ | ⋮ | ⋮ |
/// | N-10 | 0.90% | 2.99% | 3.4% | 2.15% | 1.41% | 0.049% |
/// | ⋮ | | ⋮ | ⋮ | ⋮ | ⋮ | ⋮ |
/// | N-20 | 0.82% | 1.79% | 1.22% | 0.23% | 0.079% | 4.8 ppb |
/// | ⋮ | | ⋮ | ⋮ | ⋮ | ⋮ | ⋮ |
/// | N-100 | 0.36% | 0.03% | 26.5 ppb | 0.4 ppt | <1 ppt | <1 ppt |
///
/// For coefficients greater than ~0.19, the N-100th sample contributes less than
/// one part per trillion to the average.
/// Greater than ~0.13, less than one part per billion.
/// Greater than ~0.6, less than one part per million.
///
/// ### Panics
/// If `ema_coefficient` is outside the range `(0, 1)` or is non-normal.
///
/// A coefficient of zero causes the average to never change.
/// A coefficient of one causes the average to always be equal to the last sample.
/// In either case, it's no longer an average.
///
/// [Exponential Moving Averages]: https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
pub fn with_ema_coefficient(ema_coefficient: f64) -> Self {
assert!(ema_coefficient.is_normal());
assert!(ema_coefficient > 0.0);
assert!(ema_coefficient < 1.0);
SimplePoolMetrics {
inner: Arc::new(SimpleMetricsInner {
ema_coefficient,
acquire_calls: AtomicU64::new(0),
permit_wait_time: AtomicTimingStats::default(),
acquire_time: AtomicTimingStats::default(),
acquire_timeouts: AtomicU64::new(0),
acquire_timeouts_per_phase: EnumMap::default(),
}),
}
}
/// Get the collector instance to pass to [`PoolOptions::metrics_collector()`].
pub fn collector(&self) -> Arc<dyn PoolMetricsCollector> {
self.inner.clone()
}
/// Get the current count of calls to [`Pool::acquire()`].
///
/// If you want to inspect multiple statistics at once,
/// [`.snapshot()`][Self::snapshot] is more efficient.
pub fn acquire_calls(&self) -> u64 {
self.inner.acquire_calls.load(Ordering::Acquire)
}
/// Get the current statistics for the time [`Pool::acquire()`] spends in [`AcquirePhase::Waiting`].
///
/// If you want to inspect multiple statistics at once,
/// [`.snapshot()`][Self::snapshot] is more efficient.
pub fn permit_wait_time(&self) -> SimpleTimingStats {
atomic::fence(Ordering::Acquire);
self.inner.permit_wait_time.get()
}
/// Get the current statistics for the total time [`Pool::acquire()`] takes to get a connection.
///
/// If you want to inspect multiple statistics at once,
/// [`.snapshot()`][Self::snapshot] is more efficient.
pub fn acquire_time(&self) -> SimpleTimingStats {
atomic::fence(Ordering::Acquire);
self.inner.acquire_time.get()
}
/// Load the current values for all metrics.
///
/// More efficient than calling individual getters.
pub fn snapshot(&self) -> SimplePoolMetricsSnapshot {
use Ordering::*;
atomic::fence(Acquire);
SimplePoolMetricsSnapshot {
acquire_calls: self.inner.acquire_calls.load(Relaxed),
permit_wait_time: self.inner.permit_wait_time.get(),
acquire_time: self.inner.acquire_time.get(),
acquire_timeouts: self.inner.acquire_timeouts.load(Relaxed),
acquire_timeouts_per_phase: AcquireTimeoutsPerPhase(
self.inner
.acquire_timeouts_per_phase
.iter()
.map(|(phase, count)| (phase, count.load(Relaxed)))
.collect(),
),
}
}
}
/// Debug-prints the current metrics as determined by [`Self::snapshot()`].
impl fmt::Debug for SimplePoolMetrics {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("SimplePoolMetrics")
.field("current", &self.snapshot())
.finish()
}
}
impl PoolMetricsCollector for SimpleMetricsInner {
fn acquire_called(&self) {
self.acquire_calls.fetch_add(1, Ordering::AcqRel);
}
fn permit_wait_time(&self, duration: Duration) {
self.permit_wait_time.update(self.ema_coefficient, duration)
}
fn acquire_timed_out(&self, phase: AcquirePhase) {
self.acquire_timeouts.fetch_add(1, Ordering::AcqRel);
self.acquire_timeouts_per_phase[phase].fetch_add(1, Ordering::AcqRel);
}
fn connection_acquired(&self, total_wait: Duration) {
self.acquire_time.update(self.ema_coefficient, total_wait);
}
}
impl AtomicTimingStats {
fn update(&self, ema_coefficient: f64, time_sample: Duration) {
use Ordering::*;
// If your application triggers this assert then either an `.elapsed()` call overflowed or
// you somehow kept it running for ~585 years, so congratulate yourself on a job well done.
let nanos: u64 = time_sample
.as_nanos()
.try_into()
.expect("BUG: `duration` is too large!");
// Since this is just collecting some statistics, consistency isn't *too* important.
// We use relaxed orderings for all internal updates and just emit a single fence to
// get some semblance of synchronization.
atomic::fence(Acquire);
self.sample_count.fetch_add(1, Relaxed);
let _ = self.min_nanos.fetch_update(Relaxed, Relaxed, |prev| {
if prev == 0 {
// If our minimum is exactly zero, then we likely haven't collected any samples yet.
return Some(nanos);
}
Some(cmp::min(prev, nanos))
});
let _ = self
.average_nanos
.fetch_update(Relaxed, Relaxed, |average| {
if average == 0 {
// If we don't have an average, just use our first sample.
return Some(nanos);
}
// Exponential Moving Average algorithm
Some(
((nanos as f64 * ema_coefficient) + (average as f64 * (1.0 - ema_coefficient)))
as u64,
)
});
let _ = self
.max_nanos
.fetch_update(Relaxed, Relaxed, |prev| Some(cmp::max(prev, nanos)));
// Suggest that our update be published to main memory.
atomic::fence(Release);
}
/// Assumes an atomic fence is issued first.
fn get(&self) -> SimpleTimingStats {
use Ordering::*;
SimpleTimingStats {
sample_count: self.sample_count.load(Relaxed),
min: Duration::from_nanos(self.min_nanos.load(Relaxed)),
average: Duration::from_nanos(self.average_nanos.load(Relaxed)),
max: Duration::from_nanos(self.max_nanos.load(Relaxed)),
}
}
}
impl Index<AcquirePhase> for AcquireTimeoutsPerPhase {
type Output = u64;
fn index(&self, index: AcquirePhase) -> &u64 {
&self.0[index]
}
}

View File

@ -89,6 +89,8 @@ mod connection;
mod inner;
mod options;
pub mod metrics;
pub use self::connection::PoolConnection;
pub(crate) use self::maybe::MaybePoolConnection;
pub use self::options::{PoolConnectionMetadata, PoolOptions};

View File

@ -2,6 +2,7 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::pool::inner::PoolInner;
use crate::pool::metrics::PoolMetricsCollector;
use crate::pool::Pool;
use futures_core::future::BoxFuture;
use std::fmt::{self, Debug, Formatter};
@ -74,6 +75,7 @@ pub struct PoolOptions<DB: Database> {
+ Sync,
>,
>,
pub(crate) metrics: Option<Arc<dyn PoolMetricsCollector>>,
pub(crate) max_connections: u32,
pub(crate) acquire_timeout: Duration,
pub(crate) min_connections: u32,
@ -117,6 +119,7 @@ impl<DB: Database> PoolOptions<DB> {
after_connect: None,
before_acquire: None,
after_release: None,
metrics: None,
test_before_acquire: true,
// A production application will want to set a higher limit than this.
max_connections: 10,
@ -258,6 +261,7 @@ impl<DB: Database> PoolOptions<DB> {
/// This example is written for PostgreSQL but can likely be adapted to other databases.
///
/// ```no_run
/// # #[cfg(feature = "postgres")]
/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
/// use sqlx::Executor;
/// use sqlx::postgres::PgPoolOptions;
@ -312,6 +316,7 @@ impl<DB: Database> PoolOptions<DB> {
///
/// This example is written for Postgres but should be trivially adaptable to other databases.
/// ```no_run
/// # #[cfg(feature = "postgres")]
/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
/// use sqlx::{Connection, Executor};
/// use sqlx::postgres::PgPoolOptions;
@ -364,6 +369,7 @@ impl<DB: Database> PoolOptions<DB> {
/// which is only allowed for superusers.
///
/// ```no_run
/// # #[cfg(feature = "postgres")]
/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
/// use sqlx::{Connection, Executor};
/// use sqlx::postgres::PgPoolOptions;
@ -400,6 +406,18 @@ impl<DB: Database> PoolOptions<DB> {
self
}
/// Hook in a custom metrics collector for the pool.
///
/// See [`PoolMetricsCollector`] for details or [`SimplePoolMetrics`] for an easy start.
///
/// [`SimplePoolMetrics`]: crate::pool::metrics::SimplePoolMetrics
pub fn metrics_collector(self, collector: Arc<dyn PoolMetricsCollector>) -> Self {
Self {
metrics: Some(collector),
..self
}
}
/// Create a new pool from this `PoolOptions` and immediately open at least one connection.
///
/// This ensures the configuration is correct.