diff --git a/Cargo.lock b/Cargo.lock index d69a78e3..935ec7a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2282,4 +2282,4 @@ checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" dependencies = [ "winapi 0.2.8", "winapi-build", -] +] \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a8151d7b..2cce05d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "sqlx-core", "sqlx-macros", "sqlx-test", + "examples/postgres-listen", "examples/realworld-postgres", "examples/todos-postgres", ] diff --git a/examples/postgres-listen/Cargo.toml b/examples/postgres-listen/Cargo.toml new file mode 100644 index 00000000..9c0463d2 --- /dev/null +++ b/examples/postgres-listen/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "postgres-listen" +version = "0.1.0" +edition = "2018" +workspace = "../.." + +[dependencies] +async-std = { version = "1.4.0", features = [ "attributes", "unstable" ] } +sqlx = { path = "../..", features = [ "postgres", "tls" ] } +futures = "0.3.1" diff --git a/examples/postgres-listen/README.md b/examples/postgres-listen/README.md new file mode 100644 index 00000000..7a0c39a7 --- /dev/null +++ b/examples/postgres-listen/README.md @@ -0,0 +1,18 @@ +Postgres LISTEN/NOTIFY +====================== + +## Usage + +Declare the database URL. This example does not include any reading or writing of data. + +``` +export DATABASE_URL="postgres://postgres@localhost/postgres" +``` + +Run. + +``` +cargo run +``` + +The example program should connect to the database, and create a LISTEN loop on a predefined set of channels. A NOTIFY task will be spawned which will connect to the same database and will emit notifications on a 5 second interval. diff --git a/examples/postgres-listen/src/main.rs b/examples/postgres-listen/src/main.rs new file mode 100644 index 00000000..3f144812 --- /dev/null +++ b/examples/postgres-listen/src/main.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use async_std::stream; +use futures::stream::StreamExt; +use sqlx::postgres::PgPoolExt; + +#[async_std::main] +async fn main() -> Result<(), Box> { + println!("Building PG pool."); + let conn_str = + std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example."); + let pool = sqlx::PgPool::new(&conn_str).await?; + + let notify_pool = pool.clone(); + let _t = async_std::task::spawn(async move { + stream::interval(Duration::from_secs(5)) + .for_each(move |_| notify(notify_pool.clone())) + .await + }); + + println!("Starting LISTEN loop."); + let mut listener = pool.listen(&["chan0", "chan1", "chan2"]); + let mut counter = 0usize; + loop { + let res = listener.recv().await; + println!("[from recv]: {:?}", res); + counter += 1; + if counter >= 3 { + break; + } + } + + let stream = listener.into_stream(); + futures::pin_mut!(stream); + while let Some(res) = stream.next().await { + println!("[from stream]: {:?}", res); + } + + Ok(()) +} + +async fn notify(pool: sqlx::PgPool) { + let mut conn = match pool.acquire().await { + Ok(conn) => conn, + Err(err) => return println!("[from notify]: {:?}", err), + }; + let res = sqlx::Executor::send( + &mut conn, + r#" + NOTIFY "chan0", '{"payload": 0}'; + NOTIFY "chan1", '{"payload": 1}'; + NOTIFY "chan2", '{"payload": 2}'; + "#, + ) + .await; + println!("[from notify]: {:?}", res); +} diff --git a/sqlx-core/src/postgres/listen.rs b/sqlx-core/src/postgres/listen.rs index d53911db..9ed64d4c 100644 --- a/sqlx-core/src/postgres/listen.rs +++ b/sqlx-core/src/postgres/listen.rs @@ -34,7 +34,7 @@ impl PgConnectionExt for PgPoolConnection { /// A stream of async database notifications. /// -/// Notifications will always correspond to the channel(s) specified this object is created. +/// Notifications will always correspond to the channel(s) specified when this object was created. /// /// This listener is bound to the lifetime of its underlying connection. If the connection ever /// dies, this listener will terminate and will no longer yield any notifications. @@ -130,7 +130,7 @@ impl PgPoolExt for PgPool { /// A stream of async database notifications. /// -/// Notifications will always correspond to the channel(s) specified this object is created. +/// Notifications will always correspond to the channel(s) specified when this object was created. /// /// This listener, as it is built from a `PgPool`, supports auto-reconnect. If the active /// connection being used ever dies, this listener will detect that event, acquire a new connection