diff --git a/Cargo.lock b/Cargo.lock index 6835183f..d2db0663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -880,8 +880,8 @@ name = "listen-postgres" version = "0.1.0" dependencies = [ "async-std", - "futures 0.3.1", - "sqlx 0.2.5", + "futures 0.3.4", + "sqlx 0.2.6", ] [[package]] @@ -2291,4 +2291,4 @@ checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" dependencies = [ "winapi 0.2.8", "winapi-build", -] \ No newline at end of file +] diff --git a/examples/listen-postgres/src/main.rs b/examples/listen-postgres/src/main.rs index 3f144812..e7f3bec5 100644 --- a/examples/listen-postgres/src/main.rs +++ b/examples/listen-postgres/src/main.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_std::stream; use futures::stream::StreamExt; use sqlx::postgres::PgPoolExt; +use sqlx::Executor; #[async_std::main] async fn main() -> Result<(), Box> { @@ -44,14 +45,14 @@ async fn notify(pool: sqlx::PgPool) { Ok(conn) => conn, Err(err) => return println!("[from notify]: {:?}", err), }; - let res = sqlx::Executor::send( - &mut conn, - r#" + let res = conn + .execute( + r#" NOTIFY "chan0", '{"payload": 0}'; NOTIFY "chan1", '{"payload": 1}'; NOTIFY "chan2", '{"payload": 2}'; "#, - ) - .await; + ) + .await; println!("[from notify]: {:?}", res); } diff --git a/sqlx-core/src/postgres/listen.rs b/sqlx-core/src/postgres/listen.rs index 6ca19cac..d0080b16 100644 --- a/sqlx-core/src/postgres/listen.rs +++ b/sqlx-core/src/postgres/listen.rs @@ -74,25 +74,26 @@ where } return Err(err); } + self.needs_to_send_listen_cmd = false; } + // Await a notification from the DB. - match self.connection.receive().await? { + return match self.connection.stream.read().await? { // We've received an async notification, return it. - Some(Message::NotificationResponse(notification)) => { - return Ok(Some(notification.into())) + Message::NotificationResponse => { + let notification = NotificationResponse::read(self.connection.stream.buffer())?; + + Ok(Some(notification.into())) } + // Protocol error, return the error. - Some(msg) => { - return Err(protocol_err!( - "unexpected message received from database {:?}", - msg - ) - .into()) - } - // The connection is dead, return None. - None => return Ok(None), - } + message => Err(protocol_err!( + "unexpected message received from database {:?}", + message + ) + .into()), + }; } } @@ -186,25 +187,28 @@ impl PgPoolListener { self.needs_to_send_listen_cmd = false; } // Await a notification from the DB. - match conn.receive().await? { + // TODO: Handle connection dead here + match conn.stream.read().await? { // We've received an async notification, return it. - Some(Message::NotificationResponse(notification)) => { + Message::NotificationResponse => { + let notification = NotificationResponse::read(conn.stream.buffer())?; + return Ok(notification.into()); } + // Protocol error, return the error. - Some(msg) => { + msg => { return Err(protocol_err!( "unexpected message received from database {:?}", msg ) .into()) - } - // The connection is dead, ensure that it is dropped, update self state, and loop to try again. - None => { - self.close_conn().await; - self.needs_to_send_listen_cmd = true; - continue; - } + } // The connection is dead, ensure that it is dropped, update self state, and loop to try again. + // None => { + // self.close_conn().await; + // self.needs_to_send_listen_cmd = true; + // continue; + // } } } } @@ -243,8 +247,8 @@ pub struct PgNotification { pub payload: String, } -impl From> for PgNotification { - fn from(src: Box) -> Self { +impl From for PgNotification { + fn from(src: NotificationResponse) -> Self { Self { pid: src.pid, channel: src.channel_name, @@ -269,7 +273,9 @@ async fn send_listen_query>( channels: impl IntoIterator>, ) -> Result<()> { let cmd = build_listen_all_query(channels); - conn.send(cmd.as_str()).await + let _ = conn.execute(cmd.as_str()).await?; + + Ok(()) } #[cfg(test)] diff --git a/sqlx-core/src/postgres/mod.rs b/sqlx-core/src/postgres/mod.rs index f65b116f..b2392e5f 100644 --- a/sqlx-core/src/postgres/mod.rs +++ b/sqlx-core/src/postgres/mod.rs @@ -29,4 +29,4 @@ pub type PgPool = crate::pool::Pool; make_query_as!(PgQueryAs, Postgres, PgRow); impl_map_row_for_row!(Postgres, PgRow); impl_column_index_for_row!(Postgres); -impl_from_row_for_tuples!(Postgres, PgRow); \ No newline at end of file +impl_from_row_for_tuples!(Postgres, PgRow);