#![allow(dead_code, clippy::unwrap_used, clippy::expect_used)] use axum::{ // Router, extract::ws::{Message, WebSocket, WebSocketUpgrade}, response::IntoResponse, // routing::get, }; use futures::{SinkExt, StreamExt}; use std::sync::{Arc, Mutex}; use tokio::task; use tracing::info; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::mpsc; // Use std::process::Stdio type Tx = tokio::sync::mpsc::UnboundedSender; fn main() { let db_path = "db_dev.db"; let script_path = "powershell"; befehlswerk::run(db_path, script_path); } // #[tokio::main] // async fn main() { // tracing_subscriber::fmt().pretty().init(); // let clients = Arc::new(Mutex::new(Vec::::new())); // let clients_clone = Arc::clone(&clients); // // let (tx, mut rx) = mpsc::channel(100); // // task::spawn(async move { // let cmd = "$count = 0; while($count -lt 10000) { // #Write-Error 'test-error' // #write-warning 'test-warning' // Write-Output \"$(Get-Date -Format 'yyyy-MM-dd_HH:mm:ss'): Hello World #$($count) from PowerShell!\" // $count += 1 // }"; // spawn_and_capture(cmd, tx).await; // }); // // // In batches - but this does not preserve the order. Or does it? It's possible that stdout is // // faster than stderr. // task::spawn(async move { // loop { // let mut closed = vec![]; // // let mut messages = vec![]; // // if rx.recv_many(&mut messages, 2000).await > 0 { // let msg = Message::Text( // messages // .into_iter() // .map(|m| if m.1 { format!("[ERR]: {}", m.0) } else { m.0 }) // .collect::>() // .join("\n") // .into(), // ); // { // let mut lock = clients_clone.lock().unwrap(); // for (i, tx) in lock.iter().enumerate() { // if tx.send(msg.clone()).is_err() { // closed.push(i); // } // } // // for i in closed.iter().rev() { // lock.remove(*i); // } // } // } // } // }); // // let app = Router::new().route( // "/ws", // get({ // let clients = Arc::clone(&clients); // move |ws| handle_socket(ws, Arc::clone(&clients)) // }), // ); // // let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); // axum::serve(listener, app).await.unwrap(); // } async fn handle_socket(ws: WebSocketUpgrade, clients: Arc>>) -> impl IntoResponse { ws.on_upgrade(move |socket| handle_ws(socket, clients)) } async fn handle_ws(socket: WebSocket, clients: Arc>>) { info!(websocket = "connected"); let (mut sender, mut receiver) = socket.split(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); { let mut lock = clients.lock().unwrap(); lock.push(tx); } let send_task = task::spawn(async move { while let Some(msg) = rx.recv().await { if sender.send(msg).await.is_err() { break; } } }); while let Some(Ok(_)) = receiver.next().await {} send_task.abort(); } async fn spawn_and_capture>(command: T, tx: mpsc::Sender<(String, bool)>) { let mut child = Command::new("pwsh") .arg("-NoProfile") .arg("-Command") .arg(command.as_ref()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .expect("Failed to spawn command"); let stdout = BufReader::new(child.stdout.take().expect("No stdout")); let stderr = BufReader::new(child.stderr.take().expect("No stderr")); // Create an unbounded channel for communication let tx_out = tx.clone(); // Spawn stdout task let stdout_task = tokio::spawn(async move { const BATCH_SIZE: usize = 10; let mut lines = stdout.lines(); let mut buffer = Vec::with_capacity(BATCH_SIZE); while let Ok(Some(line)) = lines.next_line().await { buffer.push(line); if buffer.len() >= BATCH_SIZE { if tx_out.send((buffer.join("\n"), false)).await.is_err() { break; } buffer = Vec::with_capacity(BATCH_SIZE); } //tx_out.send((line.clone(), false)).await.unwrap(); } if !buffer.is_empty() { let _ = tx_out.send((buffer.join("\n"), false)).await; } }); let tx_err = tx.clone(); // Spawn stderr task let stderr_task = tokio::spawn(async move { let mut lines = stderr.lines(); while let Ok(Some(line)) = lines.next_line().await { tx_err.send((line.clone(), true)).await.unwrap(); } }); // Wait for the command to finish let status = child.wait().await.expect("Failed to wait on child"); // Wait for all tasks to finish stdout_task.await.unwrap(); stderr_task.await.unwrap(); info!("Command exited with status: {}", status); }