diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 9998a832..b50003b4 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -255,8 +255,8 @@ impl PgListener { pub async fn try_recv(&mut self) -> Result, Error> { // Flush the buffer first, if anything // This would only fill up if this listener is used as a connection - if let Ok(Some(notification)) = self.buffer_rx.try_next() { - return Ok(Some(PgNotification(notification))); + if let Some(notification) = self.next_buffered() { + return Ok(Some(notification)); } // Fetch our `CloseEvent` listener, if applicable. @@ -319,6 +319,19 @@ impl PgListener { } } + /// Receives the next notification that already exists in the connection buffer, if any. + /// + /// This is similar to `try_recv`, except it will not wait if the connection has not yet received a notification. + /// + /// This is helpful if you want to retrieve all buffered notifications and process them in batches. + pub fn next_buffered(&mut self) -> Option { + if let Ok(Some(notification)) = self.buffer_rx.try_next() { + Some(PgNotification(notification)) + } else { + None + } + } + /// Consume this listener, returning a `Stream` of notifications. /// /// The backing connection will be automatically reconnected should it be lost. diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index e6c397d9..87a18db5 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -1057,6 +1057,75 @@ async fn test_listener_cleanup() -> anyhow::Result<()> { Ok(()) } +#[sqlx_macros::test] +async fn test_listener_try_recv_buffered() -> anyhow::Result<()> { + use sqlx_core::rt::timeout; + + use sqlx::pool::PoolOptions; + use sqlx::postgres::PgListener; + + // Create a connection on which to send notifications + let mut notify_conn = new::().await?; + + let pool = PoolOptions::::new() + .min_connections(1) + .max_connections(1) + .test_before_acquire(true) + .connect(&env::var("DATABASE_URL")?) + .await?; + + let mut listener = PgListener::connect_with(&pool).await?; + listener.listen("test_channel2").await?; + + // Checks for a notification on the test channel + async fn try_recv(listener: &mut PgListener) -> anyhow::Result { + match timeout(Duration::from_millis(100), listener.recv()).await { + Ok(res) => { + res?; + Ok(true) + } + Err(_) => Ok(false), + } + } + + // Check no notification is buffered, since we haven't sent one. + assert!(listener.next_buffered().is_none()); + + // Send five notifications transactionally, so they all arrive at once. + { + let mut txn = notify_conn.begin().await?; + for i in 0..5 { + txn.execute(format!("NOTIFY test_channel2, 'payload {i}'").as_str()) + .await?; + } + txn.commit().await?; + } + + // Still no notifications buffered, since we haven't awaited the listener yet. + assert!(listener.next_buffered().is_none()); + + // Activate connection. + sqlx::query!("SELECT 1 AS one") + .fetch_all(&mut listener) + .await?; + + // The next five notifications should now be buffered. + for i in 0..5 { + assert!( + listener.next_buffered().is_some(), + "Notification {i} was not buffered" + ); + } + + // Should be no more. + assert!(listener.next_buffered().is_none()); + + // Even if we wait. + assert!(!try_recv(&mut listener).await?, "Notification received"); + + Ok(()) +} + #[sqlx_macros::test] async fn test_pg_listener_allows_pool_to_close() -> anyhow::Result<()> { let pool = pool::().await?;