refactor(examples): use select pg_notify in postgres/listen

Signed-off-by: Austin Bonander <austin@launchbadge.com>
This commit is contained in:
Austin Bonander 2020-07-27 16:17:04 -07:00 committed by Ryan Leckey
parent 4e00ee050d
commit 685d70ba62

View File

@ -3,7 +3,7 @@ use futures::StreamExt;
use futures::TryStreamExt;
use sqlx::postgres::PgListener;
use sqlx::{Executor, PgPool};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::Duration;
#[async_std::main]
@ -48,22 +48,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
async fn notify(mut pool: &PgPool) {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
async fn notify(pool: &PgPool) {
static COUNTER: AtomicI64 = AtomicI64::new(0);
// Note that channel names are lower-cased by Postgres unless they are quoted
let res = pool
.execute(&*format!(
r#"
NOTIFY "chan0", '{{"payload": {}}}';
NOTIFY "chan1", '{{"payload": {}}}';
NOTIFY "chan2", '{{"payload": {}}}';
"#,
COUNTER.fetch_add(1, Ordering::SeqCst),
COUNTER.fetch_add(1, Ordering::SeqCst),
COUNTER.fetch_add(1, Ordering::SeqCst)
))
.await;
// There's two ways you can invoke `NOTIFY`:
//
// 1: `NOTIFY <channel>, '<payload>'` which cannot take bind parameters and
// <channel> is an identifier which is lowercased unless double-quoted
//
// 2: `SELECT pg_notify('<channel>', '<payload>')` which can take bind parameters
// and <channel> preserves its case
//
// We recommend #2 for consistency and usability.
// language=PostgreSQL
let res = sqlx::query(
r#"
-- this emits '{ "payload": N }' as the actual payload
select pg_notify(chan, json_build_object('payload', payload)::text)
from (
values ('chan0', $1),
('chan1', $2),
('chan2', $3)
) notifies(chan, payload)
"#,
)
.bind(&COUNTER.fetch_add(1, Ordering::SeqCst))
.bind(&COUNTER.fetch_add(1, Ordering::SeqCst))
.bind(&COUNTER.fetch_add(1, Ordering::SeqCst))
.execute(pool)
.await;
println!("[from notify]: {:?}", res);
}