#![allow(clippy::unwrap_used, clippy::expect_used)] use axum::{ Router, extract::ws::{Message, WebSocket, WebSocketUpgrade}, response::IntoResponse, routing::get, }; use futures::{SinkExt, StreamExt}; use std::{ sync::{Arc, Mutex}, time::Duration, }; use tokio::{task, time}; use tracing::info; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::mpsc; // Use std::process::Stdio type Tx = tokio::sync::mpsc::UnboundedSender; #[tokio::main] async fn main() { tracing_subscriber::fmt().pretty().init(); let clients = Arc::new(Mutex::new(Vec::::new())); let clients_clone = Arc::clone(&clients); let (tx, mut rx) = mpsc::channel(100); task::spawn(async move { let cmd = "$count = 0; while($count -lt 100) {Write-Error 'test-error';write-warning 'test-warning';Write-Output \"$(Get-Date -Format 'yyyy-MM-dd_HH:mm:ss'): Hello World #$($count) from PowerShell!\"; $count += 1}"; spawn_and_capture(cmd, tx).await; }); // In batches - but this does not preserve the order. Or does it? It's possible that stdout is // faster than stderr. //task::spawn(async move { // loop { // //let now = chrono::Utc::now().to_rfc3339(); // //let mut msg = Message::Text(now.into()); // let mut closed = vec![]; // // let mut messages = vec![]; // if rx.recv_many(&mut messages, 20).await > 0 { // for message in messages { // let msg = Message::Text(if message.1 { // format!("[ERR]: {}", message.0).into() // } else { // message.0.into() // }); // { // let mut lock = clients_clone.lock().unwrap(); // for (i, tx) in lock.iter().enumerate() { // if tx.send(msg.clone()).is_err() { // closed.push(i); // } // } // // for i in closed.iter().rev() { // lock.remove(*i); // } // } // // //if let Some((line, is_err)) = rx.recv().await { // // msg = Message::Text(if is_err { // // format!("[ERR]: {line}").into() // // } else { // // line.into() // // }) // //} // //{ // // let mut lock = clients_clone.lock().unwrap(); // // for (i, tx) in lock.iter().enumerate() { // // if tx.send(msg.clone()).is_err() { // // closed.push(i); // // } // // } // // // // for i in closed.iter().rev() { // // lock.remove(*i); // // } // //} // // // } // } // time::sleep(Duration::from_millis(500)).await; // } //}); task::spawn(async move { loop { let now = chrono::Utc::now().to_rfc3339(); let mut msg = Message::Text(now.into()); let mut closed = vec![]; if let Some((line, is_err)) = rx.recv().await { msg = Message::Text(if is_err { format!("[ERR]: {line}").into() } else { line.into() }) } { let mut lock = clients_clone.lock().unwrap(); for (i, tx) in lock.iter().enumerate() { if tx.send(msg.clone()).is_err() { closed.push(i); } } for i in closed.iter().rev() { lock.remove(*i); } } time::sleep(Duration::from_millis(200)).await; } }); let app = Router::new().route( "/ws", get({ let clients = Arc::clone(&clients); move |ws| handle_socket(ws, Arc::clone(&clients)) }), ); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); axum::serve(listener, app).await.unwrap(); } async fn handle_socket(ws: WebSocketUpgrade, clients: Arc>>) -> impl IntoResponse { ws.on_upgrade(move |socket| handle_ws(socket, clients)) } async fn handle_ws(socket: WebSocket, clients: Arc>>) { info!(websocket = "connected"); let (mut sender, mut receiver) = socket.split(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); { let mut lock = clients.lock().unwrap(); lock.push(tx); } let send_task = task::spawn(async move { while let Some(msg) = rx.recv().await { if sender.send(msg).await.is_err() { break; } } }); while let Some(Ok(_)) = receiver.next().await {} send_task.abort(); } async fn spawn_and_capture>(command: T, tx: mpsc::Sender<(String, bool)>) { let mut child = Command::new("pwsh") .arg("-NoProfile") .arg("-Command") .arg(command.as_ref()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .expect("Failed to spawn command"); let stdout = BufReader::new(child.stdout.take().expect("No stdout")); let stderr = BufReader::new(child.stderr.take().expect("No stderr")); // Create an unbounded channel for communication let tx_out = tx.clone(); // Spawn stdout task let stdout_task = tokio::spawn(async move { let mut lines = stdout.lines(); while let Ok(Some(line)) = lines.next_line().await { tx_out.send((line.clone(), false)).await.unwrap(); } }); let tx_err = tx.clone(); // Spawn stderr task let stderr_task = tokio::spawn(async move { let mut lines = stderr.lines(); while let Ok(Some(line)) = lines.next_line().await { tx_err.send((line.clone(), true)).await.unwrap(); } }); // Wait for the command to finish let status = child.wait().await.expect("Failed to wait on child"); // Wait for all tasks to finish stdout_task.await.unwrap(); stderr_task.await.unwrap(); info!("Command exited with status: {}", status); }