feat: initial POC for batch of stdout

This commit is contained in:
itsscb 2025-05-07 22:45:48 +02:00
parent 5ff157d0bf
commit 59b5c180a6

View File

@ -29,20 +29,44 @@ async fn main() {
let (tx, mut rx) = mpsc::channel(100);
task::spawn(async move {
let cmd = "$count = 0; while($count -lt 100) {Write-Error 'test-error';write-warning 'test-warning';Write-Output \"$(Get-Date -Format 'yyyy-MM-dd_HH:mm:ss'): Hello World #$($count) from PowerShell!\"; $count += 1}";
let cmd = "$count = 0; while($count -lt 10000) {
#Write-Error 'test-error'
#write-warning 'test-warning'
Write-Output \"$(Get-Date -Format 'yyyy-MM-dd_HH:mm:ss'): Hello World #$($count) from PowerShell!\"
$count += 1
}";
spawn_and_capture(cmd, tx).await;
});
// In batches - but this does not preserve the order. Or does it? It's possible that stdout is
// faster than stderr.
//task::spawn(async move {
// loop {
// //let now = chrono::Utc::now().to_rfc3339();
// //let mut msg = Message::Text(now.into());
// let mut closed = vec![];
//
// let mut messages = vec![];
// if rx.recv_many(&mut messages, 20).await > 0 {
task::spawn(async move {
loop {
let mut closed = vec![];
let mut messages = vec![];
if rx.recv_many(&mut messages, 2000).await > 0 {
let msg = Message::Text(
messages
.into_iter()
.map(|m| if m.1 { format!("[ERR]: {}", m.0) } else { m.0 })
.collect::<Vec<_>>()
.join("\n")
.into(),
);
{
let mut lock = clients_clone.lock().unwrap();
for (i, tx) in lock.iter().enumerate() {
if tx.send(msg.clone()).is_err() {
closed.push(i);
}
}
for i in closed.iter().rev() {
lock.remove(*i);
}
}
//for message in messages {
// let msg = Message::Text(if message.1 {
// format!("[ERR]: {}", message.0).into()
@ -61,63 +85,42 @@ async fn main() {
// lock.remove(*i);
// }
// }
//
// //if let Some((line, is_err)) = rx.recv().await {
// // msg = Message::Text(if is_err {
// // format!("[ERR]: {line}").into()
// // } else {
// // line.into()
// // })
// //}
// //{
// // let mut lock = clients_clone.lock().unwrap();
// // for (i, tx) in lock.iter().enumerate() {
// // if tx.send(msg.clone()).is_err() {
// // closed.push(i);
// // }
// // }
// //
// // for i in closed.iter().rev() {
// // lock.remove(*i);
// // }
// //}
// //
// }
//}
}
//time::sleep(Duration::from_millis(500)).await;
// }
//});
task::spawn(async move {
loop {
let now = chrono::Utc::now().to_rfc3339();
let mut msg = Message::Text(now.into());
let mut closed = vec![];
if let Some((line, is_err)) = rx.recv().await {
msg = Message::Text(if is_err {
format!("[ERR]: {line}").into()
} else {
line.into()
})
}
{
let mut lock = clients_clone.lock().unwrap();
for (i, tx) in lock.iter().enumerate() {
if tx.send(msg.clone()).is_err() {
closed.push(i);
}
}
for i in closed.iter().rev() {
lock.remove(*i);
}
}
time::sleep(Duration::from_millis(200)).await;
}
});
//task::spawn(async move {
// loop {
// let mut closed = vec![];
//
// if let Some((line, is_err)) = rx.recv().await {
// let msg = Message::Text(if is_err {
// format!("[ERR]: {line}").into()
// } else {
// line.into()
// });
// {
// let mut lock = clients_clone.lock().unwrap();
// for (i, tx) in lock.iter().enumerate() {
// if tx.send(msg.clone()).is_err() {
// closed.push(i);
// }
// }
//
// for i in closed.iter().rev() {
// lock.remove(*i);
// }
// }
//
// //time::sleep(Duration::from_millis(200)).await;
// } else {
// break;
// }
// }
//});
let app = Router::new().route(
"/ws",
get({
@ -174,9 +177,25 @@ async fn spawn_and_capture<T: AsRef<str>>(command: T, tx: mpsc::Sender<(String,
let tx_out = tx.clone();
// Spawn stdout task
let stdout_task = tokio::spawn(async move {
const BATCH_SIZE: usize = 10;
let mut lines = stdout.lines();
let mut buffer = Vec::with_capacity(BATCH_SIZE);
while let Ok(Some(line)) = lines.next_line().await {
tx_out.send((line.clone(), false)).await.unwrap();
buffer.push(line);
if buffer.len() >= BATCH_SIZE {
if tx_out.send((buffer.join("\n"), false)).await.is_err() {
break;
}
buffer = Vec::with_capacity(BATCH_SIZE);
}
//tx_out.send((line.clone(), false)).await.unwrap();
}
if !buffer.is_empty() {
let _ = tx_out.send((buffer.join("\n"), false)).await;
}
});