1use std::{
5 ffi::OsString,
6 fmt,
7 io::{self, BufWriter, Write},
8 marker::PhantomData,
9 path::PathBuf,
10 process::{ChildStderr, ChildStdout, Command, Stdio},
11};
12
13use anyhow::Context;
14use crossbeam_channel::Sender;
15use paths::Utf8PathBuf;
16use process_wrap::std::{StdChildWrapper, StdCommandWrap};
17use stdx::process::streaming_output;
18
19pub(crate) trait JsonLinesParser<T>: Send + 'static {
25 fn from_line(&self, line: &str, error: &mut String) -> Option<T>;
26 fn from_eof(&self) -> Option<T>;
27}
28
29struct CommandActor<T> {
30 parser: Box<dyn JsonLinesParser<T>>,
31 sender: Sender<T>,
32 stdout: ChildStdout,
33 stderr: ChildStderr,
34}
35
36impl<T: Sized + Send + 'static> CommandActor<T> {
37 fn new(
38 parser: impl JsonLinesParser<T>,
39 sender: Sender<T>,
40 stdout: ChildStdout,
41 stderr: ChildStderr,
42 ) -> Self {
43 let parser = Box::new(parser);
44 CommandActor { parser, sender, stdout, stderr }
45 }
46}
47
48impl<T: Sized + Send + 'static> CommandActor<T> {
49 fn run(self, outfile: Option<Utf8PathBuf>) -> io::Result<(bool, String)> {
50 let mut stdout = outfile.as_ref().and_then(|path| {
60 _ = std::fs::create_dir_all(path);
61 Some(BufWriter::new(std::fs::File::create(path.join("stdout")).ok()?))
62 });
63 let mut stderr = outfile.as_ref().and_then(|path| {
64 _ = std::fs::create_dir_all(path);
65 Some(BufWriter::new(std::fs::File::create(path.join("stderr")).ok()?))
66 });
67
68 let mut stdout_errors = String::new();
69 let mut stderr_errors = String::new();
70 let mut read_at_least_one_stdout_message = false;
71 let mut read_at_least_one_stderr_message = false;
72 let process_line = |line: &str, error: &mut String| {
73 if let Some(t) = self.parser.from_line(line, error) {
75 self.sender.send(t).unwrap();
76 true
77 } else {
78 false
79 }
80 };
81 let output = streaming_output(
82 self.stdout,
83 self.stderr,
84 &mut |line| {
85 if let Some(stdout) = &mut stdout {
86 _ = stdout.write_all(line.as_bytes());
87 _ = stdout.write_all(b"\n");
88 }
89 if process_line(line, &mut stdout_errors) {
90 read_at_least_one_stdout_message = true;
91 }
92 },
93 &mut |line| {
94 if let Some(stderr) = &mut stderr {
95 _ = stderr.write_all(line.as_bytes());
96 _ = stderr.write_all(b"\n");
97 }
98 if process_line(line, &mut stderr_errors) {
99 read_at_least_one_stderr_message = true;
100 }
101 },
102 &mut || {
103 if let Some(t) = self.parser.from_eof() {
104 self.sender.send(t).unwrap();
105 }
106 },
107 );
108
109 let read_at_least_one_message =
110 read_at_least_one_stdout_message || read_at_least_one_stderr_message;
111 let mut error = stdout_errors;
112 error.push_str(&stderr_errors);
113 match output {
114 Ok(_) => Ok((read_at_least_one_message, error)),
115 Err(e) => Err(io::Error::new(e.kind(), format!("{e:?}: {error}"))),
116 }
117 }
118}
119
120struct JodGroupChild(Box<dyn StdChildWrapper>);
124
125impl Drop for JodGroupChild {
126 fn drop(&mut self) {
127 _ = self.0.kill();
128 _ = self.0.wait();
129 }
130}
131
132pub(crate) struct CommandHandle<T> {
134 child: JodGroupChild,
137 thread: stdx::thread::JoinHandle<io::Result<(bool, String)>>,
138 program: OsString,
139 arguments: Vec<OsString>,
140 current_dir: Option<PathBuf>,
141 _phantom: PhantomData<T>,
142}
143
144impl<T> fmt::Debug for CommandHandle<T> {
145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 f.debug_struct("CommandHandle")
147 .field("program", &self.program)
148 .field("arguments", &self.arguments)
149 .field("current_dir", &self.current_dir)
150 .finish()
151 }
152}
153
154impl<T: Sized + Send + 'static> CommandHandle<T> {
155 pub(crate) fn spawn(
156 mut command: Command,
157 parser: impl JsonLinesParser<T>,
158 sender: Sender<T>,
159 out_file: Option<Utf8PathBuf>,
160 ) -> anyhow::Result<Self> {
161 command.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null());
162
163 let program = command.get_program().into();
164 let arguments = command.get_args().map(|arg| arg.into()).collect::<Vec<OsString>>();
165 let current_dir = command.get_current_dir().map(|arg| arg.to_path_buf());
166
167 let mut child = StdCommandWrap::from(command);
168 #[cfg(unix)]
169 child.wrap(process_wrap::std::ProcessSession);
170 #[cfg(windows)]
171 child.wrap(process_wrap::std::JobObject);
172 let mut child = child
173 .spawn()
174 .map(JodGroupChild)
175 .with_context(|| "Failed to spawn command: {child:?}")?;
176
177 let stdout = child.0.stdout().take().unwrap();
178 let stderr = child.0.stderr().take().unwrap();
179
180 let actor = CommandActor::<T>::new(parser, sender, stdout, stderr);
181 let thread =
182 stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker, "CommandHandle")
183 .spawn(move || actor.run(out_file))
184 .expect("failed to spawn thread");
185 Ok(CommandHandle { program, arguments, current_dir, child, thread, _phantom: PhantomData })
186 }
187
188 pub(crate) fn cancel(mut self) {
189 let _ = self.child.0.kill();
190 let _ = self.child.0.wait();
191 }
192
193 pub(crate) fn join(mut self) -> io::Result<()> {
194 let exit_status = self.child.0.wait()?;
195 let (read_at_least_one_message, error) = self.thread.join()?;
196 if read_at_least_one_message || exit_status.success() {
197 Ok(())
198 } else {
199 Err(io::Error::other(format!(
200 "Cargo watcher failed, the command produced no valid metadata (exit code: {exit_status:?}):\n{error}"
201 )))
202 }
203 }
204
205 pub(crate) fn has_exited(&mut self) -> bool {
206 match self.child.0.try_wait() {
207 Ok(Some(_exit_code)) => {
208 true
210 }
211 Ok(None) => {
212 false
214 }
215 Err(_) => {
216 true
219 }
220 }
221 }
222}