diff --git a/index.html b/index.html new file mode 100644 index 0000000..a5013e4 --- /dev/null +++ b/index.html @@ -0,0 +1,25 @@ + + + +WebSocket Example + + + +
+ + + + diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..5a27a65 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,200 @@ +#![allow(clippy::unwrap_used, clippy::expect_used)] +use axum::{ + Router, + extract::ws::{Message, WebSocket, WebSocketUpgrade}, + response::IntoResponse, + routing::get, +}; +use futures::{SinkExt, StreamExt}; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::{task, time}; +use tracing::info; + +use std::process::Stdio; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; +use tokio::sync::mpsc; // Use std::process::Stdio + +type Tx = tokio::sync::mpsc::UnboundedSender; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt().pretty().init(); + let clients = Arc::new(Mutex::new(Vec::::new())); + let clients_clone = Arc::clone(&clients); + + let (tx, mut rx) = mpsc::channel(100); + + task::spawn(async move { + let cmd = "$count = 0; while($count -lt 100) {Write-Error 'test-error';write-warning 'test-warning';Write-Output \"$(Get-Date -Format 'yyyy-MM-dd_HH:mm:ss'): Hello World #$($count) from PowerShell!\"; $count += 1}"; + spawn_and_capture(cmd, tx).await; + }); + + // In batches - but this does not preserve the order. Or does it? It's possible that stdout is + // faster than stderr. + //task::spawn(async move { + // loop { + // //let now = chrono::Utc::now().to_rfc3339(); + // //let mut msg = Message::Text(now.into()); + // let mut closed = vec![]; + // + // let mut messages = vec![]; + // if rx.recv_many(&mut messages, 20).await > 0 { + // for message in messages { + // let msg = Message::Text(if message.1 { + // format!("[ERR]: {}", message.0).into() + // } else { + // message.0.into() + // }); + // { + // let mut lock = clients_clone.lock().unwrap(); + // for (i, tx) in lock.iter().enumerate() { + // if tx.send(msg.clone()).is_err() { + // closed.push(i); + // } + // } + // + // for i in closed.iter().rev() { + // lock.remove(*i); + // } + // } + // + // //if let Some((line, is_err)) = rx.recv().await { + // // msg = Message::Text(if is_err { + // // format!("[ERR]: {line}").into() + // // } else { + // // line.into() + // // }) + // //} + // //{ + // // let mut lock = clients_clone.lock().unwrap(); + // // for (i, tx) in lock.iter().enumerate() { + // // if tx.send(msg.clone()).is_err() { + // // closed.push(i); + // // } + // // } + // // + // // for i in closed.iter().rev() { + // // lock.remove(*i); + // // } + // //} + // // + // } + // } + // time::sleep(Duration::from_millis(500)).await; + // } + //}); + + task::spawn(async move { + loop { + let now = chrono::Utc::now().to_rfc3339(); + let mut msg = Message::Text(now.into()); + let mut closed = vec![]; + + if let Some((line, is_err)) = rx.recv().await { + msg = Message::Text(if is_err { + format!("[ERR]: {line}").into() + } else { + line.into() + }) + } + { + let mut lock = clients_clone.lock().unwrap(); + for (i, tx) in lock.iter().enumerate() { + if tx.send(msg.clone()).is_err() { + closed.push(i); + } + } + + for i in closed.iter().rev() { + lock.remove(*i); + } + } + + time::sleep(Duration::from_millis(200)).await; + } + }); + + let app = Router::new().route( + "/ws", + get({ + let clients = Arc::clone(&clients); + move |ws| handle_socket(ws, Arc::clone(&clients)) + }), + ); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} + +async fn handle_socket(ws: WebSocketUpgrade, clients: Arc>>) -> impl IntoResponse { + ws.on_upgrade(move |socket| handle_ws(socket, clients)) +} + +async fn handle_ws(socket: WebSocket, clients: Arc>>) { + info!(websocket = "connected"); + let (mut sender, mut receiver) = socket.split(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + { + let mut lock = clients.lock().unwrap(); + lock.push(tx); + } + + let send_task = task::spawn(async move { + while let Some(msg) = rx.recv().await { + if sender.send(msg).await.is_err() { + break; + } + } + }); + + while let Some(Ok(_)) = receiver.next().await {} + + send_task.abort(); +} + +async fn spawn_and_capture>(command: T, tx: mpsc::Sender<(String, bool)>) { + let mut child = Command::new("pwsh") + .arg("-NoProfile") + .arg("-Command") + .arg(command.as_ref()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("Failed to spawn command"); + + let stdout = BufReader::new(child.stdout.take().expect("No stdout")); + let stderr = BufReader::new(child.stderr.take().expect("No stderr")); + + // Create an unbounded channel for communication + let tx_out = tx.clone(); + // Spawn stdout task + let stdout_task = tokio::spawn(async move { + let mut lines = stdout.lines(); + while let Ok(Some(line)) = lines.next_line().await { + tx_out.send((line.clone(), false)).await.unwrap(); + } + }); + + let tx_err = tx.clone(); + // Spawn stderr task + let stderr_task = tokio::spawn(async move { + let mut lines = stderr.lines(); + while let Ok(Some(line)) = lines.next_line().await { + tx_err.send((line.clone(), true)).await.unwrap(); + } + }); + + // Wait for the command to finish + let status = child.wait().await.expect("Failed to wait on child"); + + // Wait for all tasks to finish + stdout_task.await.unwrap(); + stderr_task.await.unwrap(); + + info!("Command exited with status: {}", status); +}