diff --git a/sqlx-core/src/postgres/listener.rs b/sqlx-core/src/postgres/listener.rs index a8dfd0ed..38c92011 100644 --- a/sqlx-core/src/postgres/listener.rs +++ b/sqlx-core/src/postgres/listener.rs @@ -135,11 +135,71 @@ impl PgListener { } /// Receives the next notification available from any of the subscribed channels. + /// + /// If the connection to PostgreSQL is lost, it is automatically reconnected on the next + /// call to `recv()`, and should be entirely transparent (as long as it was just an + /// intermittent network failure or long-lived connection reaper). + /// + /// As notifications are transient, any received while the connection was lost, will not + /// be returned. If you'd prefer the reconnection to be explicit and have a chance to + /// do something before, please see [`try_recv`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx_core::postgres::PgListener; + /// # use sqlx_core::error::Error; + /// # + /// # sqlx_rt::block_on::<_, Result<(), Error>>(async move { + /// # let mut listener = PgListener::connect("postgres:// ...").await?; + /// loop { + /// // ask for next notification, re-connecting (transparently) if needed + /// let notification = listener.recv().await?; + /// + /// // handle notification, do something interesting + /// } + /// # Ok(()) + /// # }).unwrap(); + /// ``` + /// + /// [`try_recv`]: #method.try_recv pub async fn recv(&mut self) -> Result { + loop { + if let Some(notification) = self.try_recv().await? { + return Ok(notification); + } + } + } + + /// Receives the next notification available from any of the subscribed channels. + /// + /// If the connection to PostgreSQL is lost, `None` is returned, and the connection is + /// reconnected on the next call to `try_recv()`. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx_core::postgres::PgListener; + /// # use sqlx_core::error::Error; + /// # + /// # sqlx_rt::block_on::<_, Result<(), Error>>(async move { + /// # let mut listener = PgListener::connect("postgres:// ...").await?; + /// loop { + /// // start handling notifications, connecting if needed + /// while let Some(notification) = listener.try_recv().await? { + /// // handle notification + /// } + /// + /// // connection lost, do something interesting + /// } + /// # Ok(()) + /// # }).unwrap(); + /// ``` + pub async fn try_recv(&mut self) -> Result, Error> { // Flush the buffer first, if anything // This would only fill up if this listener is used as a connection if let Ok(Some(notification)) = self.buffer_rx.try_next() { - return Ok(PgNotification(notification)); + return Ok(Some(PgNotification(notification))); } loop { @@ -155,7 +215,8 @@ impl PgListener { self.buffer_tx = self.connection().stream.notifications.take(); self.connection = None; - continue; + // lost connection + return Ok(None); } // Forward other errors @@ -167,7 +228,7 @@ impl PgListener { match message.format { // We've received an async notification, return it. MessageFormat::NotificationResponse => { - return Ok(PgNotification(message.decode()?)); + return Ok(Some(PgNotification(message.decode()?))); } // Mark the connection as ready for another query @@ -182,6 +243,12 @@ impl PgListener { } /// Consume this listener, returning a `Stream` of notifications. + /// + /// The backing connection will be automatically reconnected should it be lost. + /// + /// This has the same potential drawbacks as [`recv`]. + /// + /// [`recv`]: #method.recv pub fn into_stream(mut self) -> impl Stream> + Unpin { Box::pin(try_stream! { loop {