feat: add PgListener#try_recv to give users a chance to act on lost connection if desired

closes #447
This commit is contained in:
Ryan Leckey 2020-07-18 21:05:36 -07:00
parent 47772b0850
commit 49ffcf6f46

View File

@ -135,11 +135,71 @@ impl PgListener {
}
/// Receives the next notification available from any of the subscribed channels.
///
/// If the connection to PostgreSQL is lost, it is automatically reconnected on the next
/// call to `recv()`, and should be entirely transparent (as long as it was just an
/// intermittent network failure or long-lived connection reaper).
///
/// As notifications are transient, any received while the connection was lost, will not
/// be returned. If you'd prefer the reconnection to be explicit and have a chance to
/// do something before, please see [`try_recv`].
///
/// # Example
///
/// ```rust,no_run
/// # use sqlx_core::postgres::PgListener;
/// # use sqlx_core::error::Error;
/// #
/// # sqlx_rt::block_on::<_, Result<(), Error>>(async move {
/// # let mut listener = PgListener::connect("postgres:// ...").await?;
/// loop {
/// // ask for next notification, re-connecting (transparently) if needed
/// let notification = listener.recv().await?;
///
/// // handle notification, do something interesting
/// }
/// # Ok(())
/// # }).unwrap();
/// ```
///
/// [`try_recv`]: #method.try_recv
pub async fn recv(&mut self) -> Result<PgNotification, Error> {
loop {
if let Some(notification) = self.try_recv().await? {
return Ok(notification);
}
}
}
/// Receives the next notification available from any of the subscribed channels.
///
/// If the connection to PostgreSQL is lost, `None` is returned, and the connection is
/// reconnected on the next call to `try_recv()`.
///
/// # Example
///
/// ```rust,no_run
/// # use sqlx_core::postgres::PgListener;
/// # use sqlx_core::error::Error;
/// #
/// # sqlx_rt::block_on::<_, Result<(), Error>>(async move {
/// # let mut listener = PgListener::connect("postgres:// ...").await?;
/// loop {
/// // start handling notifications, connecting if needed
/// while let Some(notification) = listener.try_recv().await? {
/// // handle notification
/// }
///
/// // connection lost, do something interesting
/// }
/// # Ok(())
/// # }).unwrap();
/// ```
pub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error> {
// Flush the buffer first, if anything
// This would only fill up if this listener is used as a connection
if let Ok(Some(notification)) = self.buffer_rx.try_next() {
return Ok(PgNotification(notification));
return Ok(Some(PgNotification(notification)));
}
loop {
@ -155,7 +215,8 @@ impl PgListener {
self.buffer_tx = self.connection().stream.notifications.take();
self.connection = None;
continue;
// lost connection
return Ok(None);
}
// Forward other errors
@ -167,7 +228,7 @@ impl PgListener {
match message.format {
// We've received an async notification, return it.
MessageFormat::NotificationResponse => {
return Ok(PgNotification(message.decode()?));
return Ok(Some(PgNotification(message.decode()?)));
}
// Mark the connection as ready for another query
@ -182,6 +243,12 @@ impl PgListener {
}
/// Consume this listener, returning a `Stream` of notifications.
///
/// The backing connection will be automatically reconnected should it be lost.
///
/// This has the same potential drawbacks as [`recv`].
///
/// [`recv`]: #method.recv
pub fn into_stream(mut self) -> impl Stream<Item = Result<PgNotification, Error>> + Unpin {
Box::pin(try_stream! {
loop {