From 82923a1aaa887ffe60ac08ebfdf0b63ce3b75a78 Mon Sep 17 00:00:00 2001 From: Anthony Dodd Date: Thu, 20 Feb 2020 10:05:01 -0600 Subject: [PATCH] Update stream impls. The basic PgListener stream impl now yields `Result` elements without an `Option` in the result. The option condition originally represented the closure of the underlying connection. Now such conditions will terminate the stream, as one would expect. The `PgListener.recv()` method signature has not been changed. PgPoolListener has also been updated. The interfaces on this struct will never yield an inner `Option` as it will instead acquire a new connection and continue its work. Both the stream impl & the `recv` method have received an update to their signatures. --- sqlx-core/src/postgres/listen.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sqlx-core/src/postgres/listen.rs b/sqlx-core/src/postgres/listen.rs index b502e2e43..6b8c37f56 100644 --- a/sqlx-core/src/postgres/listen.rs +++ b/sqlx-core/src/postgres/listen.rs @@ -98,10 +98,14 @@ where } /// Consume this listener, returning a `Stream` of notifications. - pub fn into_stream(mut self) -> impl Stream>> { + pub fn into_stream(mut self) -> impl Stream> { stream! { loop { - yield self.recv().await + match self.recv().await { + Ok(Some(msg)) => yield Ok(msg), + Ok(None) => break, + Err(err) => yield Err(err), + } } } } @@ -159,7 +163,7 @@ impl PgPoolListener { impl PgPoolListener { /// Receives the next notification available from any of the subscribed channels. - pub async fn recv(&mut self) -> Result> { + pub async fn recv(&mut self) -> Result { loop { // Ensure we have an active connection to work with. let conn = match &mut self.connection { @@ -186,7 +190,7 @@ impl PgPoolListener { match conn.receive().await? { // We've received an async notification, return it. Some(Message::NotificationResponse(notification)) => { - return Ok(Some(notification.into())); + return Ok(notification.into()); } // Protocol error, return the error. Some(msg) => { @@ -207,7 +211,7 @@ impl PgPoolListener { } /// Consume this listener, returning a `Stream` of notifications. - pub fn into_stream(mut self) -> impl Stream>> { + pub fn into_stream(mut self) -> impl Stream> { stream! { loop { yield self.recv().await