From 685d70ba62dbd3d33dd07fb95a189232af9b93a8 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Mon, 27 Jul 2020 16:17:04 -0700 Subject: [PATCH] refactor(examples): use `select pg_notify` in postgres/listen Signed-off-by: Austin Bonander --- examples/postgres/listen/src/main.rs | 46 ++++++++++++++++++---------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/examples/postgres/listen/src/main.rs b/examples/postgres/listen/src/main.rs index ed779b5c..1e3698e2 100644 --- a/examples/postgres/listen/src/main.rs +++ b/examples/postgres/listen/src/main.rs @@ -3,7 +3,7 @@ use futures::StreamExt; use futures::TryStreamExt; use sqlx::postgres::PgListener; use sqlx::{Executor, PgPool}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicI64, Ordering}; use std::time::Duration; #[async_std::main] @@ -48,22 +48,36 @@ async fn main() -> Result<(), Box> { Ok(()) } -async fn notify(mut pool: &PgPool) { - static COUNTER: AtomicUsize = AtomicUsize::new(0); +async fn notify(pool: &PgPool) { + static COUNTER: AtomicI64 = AtomicI64::new(0); - // Note that channel names are lower-cased by Postgres unless they are quoted - let res = pool - .execute(&*format!( - r#" -NOTIFY "chan0", '{{"payload": {}}}'; -NOTIFY "chan1", '{{"payload": {}}}'; -NOTIFY "chan2", '{{"payload": {}}}'; - "#, - COUNTER.fetch_add(1, Ordering::SeqCst), - COUNTER.fetch_add(1, Ordering::SeqCst), - COUNTER.fetch_add(1, Ordering::SeqCst) - )) - .await; + // There's two ways you can invoke `NOTIFY`: + // + // 1: `NOTIFY , ''` which cannot take bind parameters and + // is an identifier which is lowercased unless double-quoted + // + // 2: `SELECT pg_notify('', '')` which can take bind parameters + // and preserves its case + // + // We recommend #2 for consistency and usability. + + // language=PostgreSQL + let res = sqlx::query( + r#" +-- this emits '{ "payload": N }' as the actual payload +select pg_notify(chan, json_build_object('payload', payload)::text) +from ( + values ('chan0', $1), + ('chan1', $2), + ('chan2', $3) + ) notifies(chan, payload) + "#, + ) + .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) + .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) + .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) + .execute(pool) + .await; println!("[from notify]: {:?}", res); }