From 59b5c180a67efd3195677cb9811c587120f6bb01 Mon Sep 17 00:00:00 2001 From: itsscb Date: Wed, 7 May 2025 22:45:48 +0200 Subject: [PATCH] feat: initial POC for batch of stdout --- src/main.rs | 165 +++++++++++++++++++++++++++++----------------------- 1 file changed, 92 insertions(+), 73 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5a27a65..016ace5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,95 +29,98 @@ async fn main() { let (tx, mut rx) = mpsc::channel(100); task::spawn(async move { - let cmd = "$count = 0; while($count -lt 100) {Write-Error 'test-error';write-warning 'test-warning';Write-Output \"$(Get-Date -Format 'yyyy-MM-dd_HH:mm:ss'): Hello World #$($count) from PowerShell!\"; $count += 1}"; + let cmd = "$count = 0; while($count -lt 10000) { + #Write-Error 'test-error' + #write-warning 'test-warning' + Write-Output \"$(Get-Date -Format 'yyyy-MM-dd_HH:mm:ss'): Hello World #$($count) from PowerShell!\" + $count += 1 + }"; spawn_and_capture(cmd, tx).await; }); // In batches - but this does not preserve the order. Or does it? It's possible that stdout is // faster than stderr. + task::spawn(async move { + loop { + let mut closed = vec![]; + + let mut messages = vec![]; + + if rx.recv_many(&mut messages, 2000).await > 0 { + let msg = Message::Text( + messages + .into_iter() + .map(|m| if m.1 { format!("[ERR]: {}", m.0) } else { m.0 }) + .collect::>() + .join("\n") + .into(), + ); + { + let mut lock = clients_clone.lock().unwrap(); + for (i, tx) in lock.iter().enumerate() { + if tx.send(msg.clone()).is_err() { + closed.push(i); + } + } + + for i in closed.iter().rev() { + lock.remove(*i); + } + } + //for message in messages { + // let msg = Message::Text(if message.1 { + // format!("[ERR]: {}", message.0).into() + // } else { + // message.0.into() + // }); + // { + // let mut lock = clients_clone.lock().unwrap(); + // for (i, tx) in lock.iter().enumerate() { + // if tx.send(msg.clone()).is_err() { + // closed.push(i); + // } + // } + // + // for i in closed.iter().rev() { + // lock.remove(*i); + // } + // } + //} + } + //time::sleep(Duration::from_millis(500)).await; + } + }); + //task::spawn(async move { // loop { - // //let now = chrono::Utc::now().to_rfc3339(); - // //let mut msg = Message::Text(now.into()); // let mut closed = vec![]; // - // let mut messages = vec![]; - // if rx.recv_many(&mut messages, 20).await > 0 { - // for message in messages { - // let msg = Message::Text(if message.1 { - // format!("[ERR]: {}", message.0).into() - // } else { - // message.0.into() - // }); - // { - // let mut lock = clients_clone.lock().unwrap(); - // for (i, tx) in lock.iter().enumerate() { - // if tx.send(msg.clone()).is_err() { - // closed.push(i); - // } - // } - // - // for i in closed.iter().rev() { - // lock.remove(*i); + // if let Some((line, is_err)) = rx.recv().await { + // let msg = Message::Text(if is_err { + // format!("[ERR]: {line}").into() + // } else { + // line.into() + // }); + // { + // let mut lock = clients_clone.lock().unwrap(); + // for (i, tx) in lock.iter().enumerate() { + // if tx.send(msg.clone()).is_err() { + // closed.push(i); // } // } // - // //if let Some((line, is_err)) = rx.recv().await { - // // msg = Message::Text(if is_err { - // // format!("[ERR]: {line}").into() - // // } else { - // // line.into() - // // }) - // //} - // //{ - // // let mut lock = clients_clone.lock().unwrap(); - // // for (i, tx) in lock.iter().enumerate() { - // // if tx.send(msg.clone()).is_err() { - // // closed.push(i); - // // } - // // } - // // - // // for i in closed.iter().rev() { - // // lock.remove(*i); - // // } - // //} - // // + // for i in closed.iter().rev() { + // lock.remove(*i); + // } // } + // + // //time::sleep(Duration::from_millis(200)).await; + // } else { + // break; // } - // time::sleep(Duration::from_millis(500)).await; // } //}); - task::spawn(async move { - loop { - let now = chrono::Utc::now().to_rfc3339(); - let mut msg = Message::Text(now.into()); - let mut closed = vec![]; - - if let Some((line, is_err)) = rx.recv().await { - msg = Message::Text(if is_err { - format!("[ERR]: {line}").into() - } else { - line.into() - }) - } - { - let mut lock = clients_clone.lock().unwrap(); - for (i, tx) in lock.iter().enumerate() { - if tx.send(msg.clone()).is_err() { - closed.push(i); - } - } - - for i in closed.iter().rev() { - lock.remove(*i); - } - } - - time::sleep(Duration::from_millis(200)).await; - } - }); - let app = Router::new().route( "/ws", get({ @@ -174,9 +177,25 @@ async fn spawn_and_capture>(command: T, tx: mpsc::Sender<(String, let tx_out = tx.clone(); // Spawn stdout task let stdout_task = tokio::spawn(async move { + const BATCH_SIZE: usize = 10; let mut lines = stdout.lines(); + let mut buffer = Vec::with_capacity(BATCH_SIZE); + while let Ok(Some(line)) = lines.next_line().await { - tx_out.send((line.clone(), false)).await.unwrap(); + buffer.push(line); + + if buffer.len() >= BATCH_SIZE { + if tx_out.send((buffer.join("\n"), false)).await.is_err() { + break; + } + + buffer = Vec::with_capacity(BATCH_SIZE); + } + //tx_out.send((line.clone(), false)).await.unwrap(); + } + + if !buffer.is_empty() { + let _ = tx_out.send((buffer.join("\n"), false)).await; } });