Added demo program to show use of LISTEN/NOTIFY.

This commit is contained in:
Anthony Dodd 2020-01-31 16:42:17 -06:00 committed by Ryan Leckey
parent a0da99e128
commit ae7e15cbe3
6 changed files with 89 additions and 3 deletions

2
Cargo.lock generated
View File

@ -2282,4 +2282,4 @@ checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
]

View File

@ -4,6 +4,7 @@ members = [
"sqlx-core",
"sqlx-macros",
"sqlx-test",
"examples/postgres-listen",
"examples/realworld-postgres",
"examples/todos-postgres",
]

View File

@ -0,0 +1,10 @@
[package]
name = "postgres-listen"
version = "0.1.0"
edition = "2018"
workspace = "../.."
[dependencies]
async-std = { version = "1.4.0", features = [ "attributes", "unstable" ] }
sqlx = { path = "../..", features = [ "postgres", "tls" ] }
futures = "0.3.1"

View File

@ -0,0 +1,18 @@
Postgres LISTEN/NOTIFY
======================
## Usage
Declare the database URL. This example does not include any reading or writing of data.
```
export DATABASE_URL="postgres://postgres@localhost/postgres"
```
Run.
```
cargo run
```
The example program should connect to the database, and create a LISTEN loop on a predefined set of channels. A NOTIFY task will be spawned which will connect to the same database and will emit notifications on a 5 second interval.

View File

@ -0,0 +1,57 @@
use std::time::Duration;
use async_std::stream;
use futures::stream::StreamExt;
use sqlx::postgres::PgPoolExt;
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Building PG pool.");
let conn_str =
std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example.");
let pool = sqlx::PgPool::new(&conn_str).await?;
let notify_pool = pool.clone();
let _t = async_std::task::spawn(async move {
stream::interval(Duration::from_secs(5))
.for_each(move |_| notify(notify_pool.clone()))
.await
});
println!("Starting LISTEN loop.");
let mut listener = pool.listen(&["chan0", "chan1", "chan2"]);
let mut counter = 0usize;
loop {
let res = listener.recv().await;
println!("[from recv]: {:?}", res);
counter += 1;
if counter >= 3 {
break;
}
}
let stream = listener.into_stream();
futures::pin_mut!(stream);
while let Some(res) = stream.next().await {
println!("[from stream]: {:?}", res);
}
Ok(())
}
async fn notify(pool: sqlx::PgPool) {
let mut conn = match pool.acquire().await {
Ok(conn) => conn,
Err(err) => return println!("[from notify]: {:?}", err),
};
let res = sqlx::Executor::send(
&mut conn,
r#"
NOTIFY "chan0", '{"payload": 0}';
NOTIFY "chan1", '{"payload": 1}';
NOTIFY "chan2", '{"payload": 2}';
"#,
)
.await;
println!("[from notify]: {:?}", res);
}

View File

@ -34,7 +34,7 @@ impl PgConnectionExt<PgPoolConnection> for PgPoolConnection {
/// A stream of async database notifications.
///
/// Notifications will always correspond to the channel(s) specified this object is created.
/// Notifications will always correspond to the channel(s) specified when this object was created.
///
/// This listener is bound to the lifetime of its underlying connection. If the connection ever
/// dies, this listener will terminate and will no longer yield any notifications.
@ -130,7 +130,7 @@ impl PgPoolExt for PgPool {
/// A stream of async database notifications.
///
/// Notifications will always correspond to the channel(s) specified this object is created.
/// Notifications will always correspond to the channel(s) specified when this object was created.
///
/// This listener, as it is built from a `PgPool`, supports auto-reconnect. If the active
/// connection being used ever dies, this listener will detect that event, acquire a new connection