update postgres/listen.rs for internal changes

This commit is contained in:
Ryan Leckey 2020-03-14 20:10:37 -07:00
parent 3db54dd724
commit 4419aea619
4 changed files with 42 additions and 35 deletions

6
Cargo.lock generated
View File

@ -880,8 +880,8 @@ name = "listen-postgres"
version = "0.1.0"
dependencies = [
"async-std",
"futures 0.3.1",
"sqlx 0.2.5",
"futures 0.3.4",
"sqlx 0.2.6",
]
[[package]]
@ -2291,4 +2291,4 @@ checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
]

View File

@ -3,6 +3,7 @@ use std::time::Duration;
use async_std::stream;
use futures::stream::StreamExt;
use sqlx::postgres::PgPoolExt;
use sqlx::Executor;
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -44,14 +45,14 @@ async fn notify(pool: sqlx::PgPool) {
Ok(conn) => conn,
Err(err) => return println!("[from notify]: {:?}", err),
};
let res = sqlx::Executor::send(
&mut conn,
r#"
let res = conn
.execute(
r#"
NOTIFY "chan0", '{"payload": 0}';
NOTIFY "chan1", '{"payload": 1}';
NOTIFY "chan2", '{"payload": 2}';
"#,
)
.await;
)
.await;
println!("[from notify]: {:?}", res);
}

View File

@ -74,25 +74,26 @@ where
}
return Err(err);
}
self.needs_to_send_listen_cmd = false;
}
// Await a notification from the DB.
match self.connection.receive().await? {
return match self.connection.stream.read().await? {
// We've received an async notification, return it.
Some(Message::NotificationResponse(notification)) => {
return Ok(Some(notification.into()))
Message::NotificationResponse => {
let notification = NotificationResponse::read(self.connection.stream.buffer())?;
Ok(Some(notification.into()))
}
// Protocol error, return the error.
Some(msg) => {
return Err(protocol_err!(
"unexpected message received from database {:?}",
msg
)
.into())
}
// The connection is dead, return None.
None => return Ok(None),
}
message => Err(protocol_err!(
"unexpected message received from database {:?}",
message
)
.into()),
};
}
}
@ -186,25 +187,28 @@ impl PgPoolListener {
self.needs_to_send_listen_cmd = false;
}
// Await a notification from the DB.
match conn.receive().await? {
// TODO: Handle connection dead here
match conn.stream.read().await? {
// We've received an async notification, return it.
Some(Message::NotificationResponse(notification)) => {
Message::NotificationResponse => {
let notification = NotificationResponse::read(conn.stream.buffer())?;
return Ok(notification.into());
}
// Protocol error, return the error.
Some(msg) => {
msg => {
return Err(protocol_err!(
"unexpected message received from database {:?}",
msg
)
.into())
}
// The connection is dead, ensure that it is dropped, update self state, and loop to try again.
None => {
self.close_conn().await;
self.needs_to_send_listen_cmd = true;
continue;
}
} // The connection is dead, ensure that it is dropped, update self state, and loop to try again.
// None => {
// self.close_conn().await;
// self.needs_to_send_listen_cmd = true;
// continue;
// }
}
}
}
@ -243,8 +247,8 @@ pub struct PgNotification {
pub payload: String,
}
impl From<Box<NotificationResponse>> for PgNotification {
fn from(src: Box<NotificationResponse>) -> Self {
impl From<NotificationResponse> for PgNotification {
fn from(src: NotificationResponse) -> Self {
Self {
pid: src.pid,
channel: src.channel_name,
@ -269,7 +273,9 @@ async fn send_listen_query<C: DerefMut<Target = PgConnection>>(
channels: impl IntoIterator<Item = impl AsRef<str>>,
) -> Result<()> {
let cmd = build_listen_all_query(channels);
conn.send(cmd.as_str()).await
let _ = conn.execute(cmd.as_str()).await?;
Ok(())
}
#[cfg(test)]

View File

@ -29,4 +29,4 @@ pub type PgPool = crate::pool::Pool<PgConnection>;
make_query_as!(PgQueryAs, Postgres, PgRow);
impl_map_row_for_row!(Postgres, PgRow);
impl_column_index_for_row!(Postgres);
impl_from_row_for_tuples!(Postgres, PgRow);
impl_from_row_for_tuples!(Postgres, PgRow);