diff --git a/Cargo.lock b/Cargo.lock index 313afd5a..5f58c8e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -278,6 +278,15 @@ dependencies = [ "crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-queue" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-utils" version = "0.6.6" @@ -1163,7 +1172,10 @@ dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1556,6 +1568,7 @@ dependencies = [ "checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca" "checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac" "checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" +"checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" "checksum crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" "checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4" "checksum data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4f47ca1860a761136924ddd2422ba77b2ea54fe8cc75b9040804a0d9d32ad97" diff --git a/examples/realworld-postgres/src/main.rs b/examples/realworld-postgres/src/main.rs index dc20385c..2a6c4281 100644 --- a/examples/realworld-postgres/src/main.rs +++ b/examples/realworld-postgres/src/main.rs @@ -12,12 +12,7 @@ const SECRET_KEY: &str = "this-is-the-most-secret-key-ever-secreted"; #[async_std::main] async fn main() -> anyhow::Result<()> { - env_logger::try_init()?; - - let pool = PgPool::builder() - .max_size(100) - .test_on_acquire(false) - .build(&env::var("DATABASE_URL")?).await?; + let pool = PgPool::new(&env::var("DATABASE_URL")?).await?; let mut server = tide::with_state(pool); @@ -25,7 +20,7 @@ async fn main() -> anyhow::Result<()> { server.at("/api/user").get(get_current_user); - server.listen(("0.0.0.0", 8080)).await?; + server.listen(("localhost", 8080)).await?; Ok(()) } diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 229fc6fa..d1479c41 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -24,8 +24,11 @@ async-stream = { version = "0.2.0", default-features = false } base64 = { version = "0.11.0", default-features = false, optional = true, features = [ "std" ] } bitflags = { version = "1.2.1", default-features = false } byteorder = { version = "1.3.2", default-features = false } +crossbeam-queue = "0.2.1" +crossbeam-utils = { version = "0.7.0", default-features = false } chrono = { version = "0.4.10", default-features = false, features = [ "clock" ], optional = true } digest = { version = "0.8.1", default-features = false, optional = true, features = [ "std" ] } +futures-channel = { version = "0.3.1", default-features = false } futures-core = { version = "0.3.1", default-features = false } futures-util = { version = "0.3.1", default-features = false } generic-array = { version = "0.12.3", default-features = false, optional = true } diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index c3e0350a..d105c47b 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -3,12 +3,12 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::time::Instant; +use crossbeam_queue::{ArrayQueue, SegQueue}; use async_std::prelude::FutureExt as _; -use async_std::sync::{channel, Receiver, Sender}; +use futures_channel::oneshot::{channel, Sender}; use async_std::task; -use futures_util::future::FutureExt as _; -use super::{Idle, Options, Raw}; +use super::{Idle, Options, Live}; use crate::{error::Error, Connection, Database}; pub(super) struct SharedPool @@ -16,9 +16,10 @@ where DB: Database, { url: String, - pool_rx: Receiver>, + idle: ArrayQueue>, + waiters: SegQueue>>, size: AtomicU32, - closed: AtomicBool, + is_closed: AtomicBool, options: Options, } @@ -30,33 +31,35 @@ where pub(super) async fn new_arc( url: &str, options: Options, - ) -> crate::Result<(Arc, Sender>)> { - let (pool_tx, pool_rx) = channel(options.max_size as usize); - + ) -> crate::Result> { let pool = Arc::new(Self { url: url.to_owned(), - pool_rx, + idle: ArrayQueue::new(options.max_size as usize), + waiters: SegQueue::new(), size: AtomicU32::new(0), - closed: AtomicBool::new(false), + is_closed: AtomicBool::new(false), options, }); + // If a minimum size was configured for the pool, + // establish N connections + // TODO: Should we do this in the background? for _ in 0..pool.options.min_size { - let raw = pool + let live = pool .eventually_connect(Instant::now() + pool.options.connect_timeout) .await?; - pool_tx - .send(Idle { - raw, - since: Instant::now(), - }) - .await; + // Ignore error here, we are capping this loop by min_size which we + // already should make sure is less than max_size + let _ = pool.idle.push(Idle { + live, + since: Instant::now(), + }); } - conn_reaper(&pool, &pool_tx); + spawn_reaper(&pool); - Ok((pool, pool_tx)) + Ok(pool) } pub fn options(&self) -> &Options { @@ -72,66 +75,82 @@ where } pub(super) fn num_idle(&self) -> usize { - self.pool_rx.len() + // NOTE: This is very expensive + self.waiters.len() } - pub(super) fn closed(&self) -> bool { - self.closed.load(Ordering::SeqCst) + pub(super) fn is_closed(&self) -> bool { + self.is_closed.load(Ordering::Acquire) } pub(super) async fn close(&self) { - self.closed.store(true, Ordering::Release); + self.is_closed.store(true, Ordering::Release); while self.size.load(Ordering::Acquire) > 0 { // don't block on the receiver because we own one Sender so it should never return // `None`; a `select!()` would also work but that produces more complicated code // and a timeout isn't necessarily appropriate - match self.pool_rx.recv().now_or_never() { - Some(Some(idle)) => { - idle.close().await; - self.size.fetch_sub(1, Ordering::AcqRel); - } - Some(None) => { - log::warn!("was not able to close all connections"); - break; - } - None => task::yield_now().await, + while let Ok(idle) = self.idle.pop() { + idle.close().await; + self.size.fetch_sub(1, Ordering::AcqRel); } + + task::yield_now().await } } #[inline] - pub(super) fn try_acquire(&self) -> Option> { - if self.closed.load(Ordering::Acquire) { + pub(super) fn try_acquire(&self) -> Option> { + if self.is_closed.load(Ordering::Acquire) { return None; } - Some(self.pool_rx.recv().now_or_never()??.raw) + Some(self.idle.pop().ok()?.live) } - pub(super) async fn acquire(&self) -> crate::Result> { + pub(super) fn release(&self, mut live: Live) { + // Try waiters in (FIFO) order until one is still waiting .. + while let Ok(waiter) = self.waiters.pop() { + live = match waiter.send(live) { + // successfully released + Ok(()) => return, + + Err(live) => { + live + }, + }; + } + + // .. if there were no waiters still waiting, just push the connection + // back to the idle queue + let _ = self.idle.push(Idle { + live, + since: Instant::now(), + }); + } + + pub(super) async fn acquire(&self) -> crate::Result> { let start = Instant::now(); let deadline = start + self.options.connect_timeout; // Unless the pool has been closed ... - while !self.closed.load(Ordering::Acquire) { - let size = self.size.load(Ordering::Acquire); - + while !self.is_closed.load(Ordering::Acquire) { // Attempt to immediately acquire a connection. This will return Some // if there is an idle connection in our channel. - let mut idle = if let Some(idle) = self.pool_rx.recv().now_or_never() { - let idle = match idle { - Some(idle) => idle, + if let Some(idle) = self.idle.pop().ok() { + if let Some(live) = check_live(idle.live, &self.options).await { + return Ok(live); + } + } - // This isn't possible. [Pool] owns the sender and [SharedPool] - // owns the receiver. - None => unreachable!(), - }; - - idle - } else if size >= self.options.max_size { + let size = self.size.load(Ordering::Acquire); + + if size >= self.options.max_size { // Too many open connections // Wait until one is available + let (tx, rx) = channel(); + + self.waiters.push(tx); // get the time between the deadline and now and use that as our timeout let until = deadline @@ -139,13 +158,12 @@ where .ok_or(Error::PoolTimedOut(None))?; // don't sleep forever - let idle = match self.pool_rx.recv().timeout(until).await { + let live = match rx.timeout(until).await { // A connection was returned to the pool - Ok(Some(idle)) => idle, + Ok(Ok(live)) => live, - // This isn't possible. [Pool] owns the sender and [SharedPool] - // owns the receiver. - Ok(None) => unreachable!(), + // Pool dropped without dropping waiter + Ok(Err(_)) => unreachable!(), // Timed out waiting for a connection // Error is not forwarded as its useless context @@ -154,55 +172,27 @@ where } }; - idle - } else if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { - // pool has slots available; open a new connection - match self.connect(deadline).await { - Ok(Some(conn)) => return Ok(conn), - // [size] is internally decremented on _retry_ and _error_ - Ok(None) => continue, - Err(e) => return Err(e), + // If pool was closed while waiting for a connection, + // release the connection + if self.is_closed.load(Ordering::Acquire) { + live.close().await; + self.size.fetch_sub(1, Ordering::AcqRel); + + return Err(Error::PoolClosed); } - } else { + + match check_live(live, &self.options).await { + Some(live) => return Ok(live), + + // Need to re-connect + None => {} + } + } else if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) != size { + // size was incremented while we compared it just above continue; - }; - - // If pool was closed while waiting for a connection, - // release the connection - if self.closed.load(Ordering::Acquire) { - idle.close().await; - self.size.fetch_sub(1, Ordering::AcqRel); - - return Err(Error::PoolClosed); } - // If the connection we pulled has expired, close the connection and - // immediately create a new connection - if is_beyond_lifetime(&idle.raw, &self.options) { - // close the connection but don't really care about the result - let _ = idle.close().await; - } else if self.options.test_on_acquire { - // TODO: Check on acquire should be a configuration setting - // Check that the connection is still live - match idle.raw.inner.ping().await { - // Connection still seems to respond - Ok(_) => return Ok(idle.raw), - - // an error here means the other end has hung up or we lost connectivity - // either way we're fine to just discard the connection - // the error itself here isn't necessarily unexpected so WARN is too strong - Err(e) => log::info!("ping on idle connection returned error: {}", e), - } - - // make sure the idle connection is gone explicitly before we open one - // this will close the resources for the stream on our side - drop(idle); - } else { - // No need to re-connect - return Ok(idle.raw); - } - - // while there is still room in the pool, acquire a new connection + // pool has slots available; open a new connection match self.connect(deadline).await { Ok(Some(conn)) => return Ok(conn), // [size] is internally decremented on _retry_ and _error_ @@ -214,7 +204,7 @@ where Err(Error::PoolClosed) } - async fn eventually_connect(&self, deadline: Instant) -> crate::Result> { + async fn eventually_connect(&self, deadline: Instant) -> crate::Result> { loop { // [connect] will raise an error when past deadline // [connect] returns None if its okay to retry @@ -224,7 +214,7 @@ where } } - async fn connect(&self, deadline: Instant) -> crate::Result>> { + async fn connect(&self, deadline: Instant) -> crate::Result>> { // FIXME: Code between `-` is duplicate with [acquire] // --------------------------------- @@ -235,7 +225,7 @@ where // If pool was closed while waiting for a connection, // release the connection - if self.closed.load(Ordering::Acquire) { + if self.is_closed.load(Ordering::Acquire) { self.size.fetch_sub(1, Ordering::AcqRel); // ? return Err(Error::PoolClosed); @@ -246,9 +236,9 @@ where // result here is `Result, TimeoutError>` match DB::Connection::open(&self.url).timeout(until).await { // successfully established connection - Ok(Ok(inner)) => { - Ok(Some(Raw { - inner, + Ok(Ok(raw)) => { + Ok(Some(Live { + raw, // remember when it was created so we can expire it // if there is a [max_lifetime] set created: Instant::now(), @@ -281,17 +271,26 @@ where DB::Connection: Connection, { async fn close(self) { - let _ = self.raw.inner.close().await; + self.live.close().await; + } +} + +impl Live +where + DB::Connection: Connection, +{ + async fn close(self) { + let _ = self.raw.close().await; } } // NOTE: Function names here are bizzare. Helpful help would be appreciated. -fn is_beyond_lifetime(raw: &Raw, options: &Options) -> bool { +fn is_beyond_lifetime(live: &Live, options: &Options) -> bool { // check if connection was within max lifetime (or not set) options .max_lifetime - .map_or(false, |max| raw.created.elapsed() > max) + .map_or(false, |max| live.created.elapsed() > max) } fn is_beyond_idle(idle: &Idle, options: &Options) -> bool { @@ -301,8 +300,38 @@ fn is_beyond_idle(idle: &Idle, options: &Options) -> bool { .map_or(false, |timeout| idle.since.elapsed() > timeout) } +async fn check_live(mut live: Live, options: &Options) -> Option> { + // If the connection we pulled has expired, close the connection and + // immediately create a new connection + if is_beyond_lifetime(&live, options) { + // close the connection but don't really care about the result + let _ = live.close().await; + } else if options.test_on_acquire { + // TODO: Check on acquire should be a configuration setting + // Check that the connection is still live + match live.raw.ping().await { + // Connection still seems to respond + Ok(_) => return Some(live), + + // an error here means the other end has hung up or we lost connectivity + // either way we're fine to just discard the connection + // the error itself here isn't necessarily unexpected so WARN is too strong + Err(e) => log::info!("ping on idle connection returned error: {}", e), + } + + // make sure the idle connection is gone explicitly before we open one + // this will close the resources for the stream on our side + drop(live); + } else { + // No need to re-connect + return Some(live); + } + + None +} + /// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections -fn conn_reaper(pool: &Arc>, pool_tx: &Sender>) +fn spawn_reaper(pool: &Arc>) where DB::Connection: Connection, { @@ -314,11 +343,10 @@ where (None, None) => return, }; - let pool = pool.clone(); - let pool_tx = pool_tx.clone(); + let pool = Arc::clone(&pool); task::spawn(async move { - while !pool.closed.load(Ordering::Acquire) { + while !pool.is_closed.load(Ordering::Acquire) { // reap at most the current size minus the minimum idle let max_reaped = pool .size @@ -328,15 +356,15 @@ where // collect connections to reap let (reap, keep) = (0..max_reaped) // only connections waiting in the queue - .filter_map(|_| pool.pool_rx.recv().now_or_never()?) + .filter_map(|_| pool.idle.pop().ok()) .partition::, _>(|conn| { is_beyond_idle(conn, &pool.options) - || is_beyond_lifetime(&conn.raw, &pool.options) + || is_beyond_lifetime(&conn.live, &pool.options) }); for conn in keep { // return these connections to the pool first - pool_tx.send(conn).await; + pool.idle.push(conn).expect("unreachable: pool overflowed"); } for conn in reap { diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 94081f12..f68aefb5 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -7,9 +7,6 @@ use std::{ time::{Duration, Instant}, }; -use async_std::sync::Sender; -use futures_util::future::FutureExt; - use crate::Database; use self::inner::SharedPool; @@ -21,26 +18,22 @@ mod inner; mod options; /// A pool of database connections. -pub struct Pool +pub struct Pool(Arc>) where - DB: Database, -{ - inner: Arc>, - pool_tx: Sender>, -} + DB: Database; struct Connection { - raw: Option>, - pool_tx: Sender>, + live: Option>, + pool: Arc>, } -struct Raw { - inner: DB::Connection, +struct Live { + raw: DB::Connection, created: Instant, } struct Idle { - raw: Raw, + live: Live, since: Instant, } @@ -55,9 +48,9 @@ where } async fn with_options(url: &str, options: Options) -> crate::Result { - let (inner, pool_tx) = SharedPool::new_arc(url, options).await?; + let inner = SharedPool::new_arc(url, options).await?; - Ok(Pool { inner, pool_tx }) + Ok(Pool(inner)) } /// Returns a [Builder] to configure a new connection pool. @@ -69,9 +62,9 @@ where /// /// Waits for at most the configured connection timeout before returning an error. pub async fn acquire(&self) -> crate::Result> { - self.inner.acquire().await.map(|conn| Connection { - raw: Some(conn), - pool_tx: self.pool_tx.clone(), + self.0.acquire().await.map(|conn| Connection { + live: Some(conn), + pool: Arc::clone(&self.0), }) } @@ -79,9 +72,9 @@ where /// /// Returns `None` immediately if there are no idle connections available in the pool. pub fn try_acquire(&self) -> Option> { - self.inner.try_acquire().map(|conn| Connection { - raw: Some(conn), - pool_tx: self.pool_tx.clone(), + self.0.try_acquire().map(|conn| Connection { + live: Some(conn), + pool: Arc::clone(&self.0), }) } @@ -90,42 +83,42 @@ where /// /// Does not resolve until all connections are closed. pub async fn close(&self) { - self.inner.close().await; + self.0.close().await; } /// Returns the number of connections currently being managed by the pool. pub fn size(&self) -> u32 { - self.inner.size() + self.0.size() } /// Returns the number of idle connections. pub fn idle(&self) -> usize { - self.inner.num_idle() + self.0.num_idle() } /// Returns the configured maximum pool size. pub fn max_size(&self) -> u32 { - self.inner.options().max_size + self.0.options().max_size } /// Returns the maximum time spent acquiring a new connection before an error is returned. pub fn connect_timeout(&self) -> Duration { - self.inner.options().connect_timeout + self.0.options().connect_timeout } /// Returns the configured minimum idle connection count. pub fn min_size(&self) -> u32 { - self.inner.options().min_size + self.0.options().min_size } /// Returns the configured maximum connection lifetime. pub fn max_lifetime(&self) -> Option { - self.inner.options().max_lifetime + self.0.options().max_lifetime } /// Returns the configured idle connection timeout. pub fn idle_timeout(&self) -> Option { - self.inner.options().idle_timeout + self.0.options().idle_timeout } } @@ -135,21 +128,18 @@ where DB: Database, { fn clone(&self) -> Self { - Self { - inner: Arc::clone(&self.inner), - pool_tx: self.pool_tx.clone(), - } + Self(Arc::clone(&self.0)) } } impl fmt::Debug for Pool { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Pool") - .field("url", &self.inner.url()) - .field("size", &self.inner.size()) - .field("num_idle", &self.inner.num_idle()) - .field("closed", &self.inner.closed()) - .field("options", self.inner.options()) + .field("url", &self.0.url()) + .field("size", &self.0.size()) + .field("num_idle", &self.0.num_idle()) + .field("is_closed", &self.0.is_closed()) + .field("options", self.0.options()) .finish() } } @@ -160,26 +150,20 @@ impl Deref for Connection { type Target = DB::Connection; fn deref(&self) -> &Self::Target { - &self.raw.as_ref().expect(DEREF_ERR).inner + &self.live.as_ref().expect(DEREF_ERR).raw } } impl DerefMut for Connection { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.raw.as_mut().expect(DEREF_ERR).inner + &mut self.live.as_mut().expect(DEREF_ERR).raw } } impl Drop for Connection { fn drop(&mut self) { - if let Some(conn) = self.raw.take() { - self.pool_tx - .send(Idle { - raw: conn, - since: Instant::now(), - }) - .now_or_never() - .expect("(bug) connection released into a full pool") + if let Some(live) = self.live.take() { + self.pool.release(live); } } }