Immediately print job output for fresh jobs

This prevents a deadlock where the message queue is filled with output
messages but not emptied as the job producing the messages runs on the
same thread as the message processing.
This commit is contained in:
bjorn3 2020-11-07 18:12:11 +01:00
parent 49b63b7caa
commit 0583081d2a
3 changed files with 51 additions and 37 deletions

View File

@ -128,7 +128,7 @@ fn emit_build_output(
output: &BuildOutput, output: &BuildOutput,
out_dir: &Path, out_dir: &Path,
package_id: PackageId, package_id: PackageId,
) { ) -> CargoResult<()> {
let library_paths = output let library_paths = output
.library_paths .library_paths
.iter() .iter()
@ -144,7 +144,8 @@ fn emit_build_output(
out_dir, out_dir,
} }
.to_json_string(); .to_json_string();
state.stdout(msg); state.stdout(msg)?;
Ok(())
} }
fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> { fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
@ -353,13 +354,13 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
warnings_in_case_of_panic.push(warning.to_owned()); warnings_in_case_of_panic.push(warning.to_owned());
} }
if extra_verbose { if extra_verbose {
state.stdout(format!("{}{}", prefix, stdout)); state.stdout(format!("{}{}", prefix, stdout))?;
} }
Ok(()) Ok(())
}, },
&mut |stderr| { &mut |stderr| {
if extra_verbose { if extra_verbose {
state.stderr(format!("{}{}", prefix, stderr)); state.stderr(format!("{}{}", prefix, stderr))?;
} }
Ok(()) Ok(())
}, },
@ -396,7 +397,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
BuildOutput::parse(&output.stdout, &pkg_name, &script_out_dir, &script_out_dir)?; BuildOutput::parse(&output.stdout, &pkg_name, &script_out_dir, &script_out_dir)?;
if json_messages { if json_messages {
emit_build_output(state, &parsed_output, script_out_dir.as_path(), id); emit_build_output(state, &parsed_output, script_out_dir.as_path(), id)?;
} }
build_script_outputs build_script_outputs
.lock() .lock()
@ -421,7 +422,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
}; };
if json_messages { if json_messages {
emit_build_output(state, &output, script_out_dir.as_path(), id); emit_build_output(state, &output, script_out_dir.as_path(), id)?;
} }
build_script_outputs build_script_outputs

View File

@ -166,10 +166,11 @@ pub struct JobState<'a> {
/// Channel back to the main thread to coordinate messages and such. /// Channel back to the main thread to coordinate messages and such.
messages: Arc<Queue<Message>>, messages: Arc<Queue<Message>>,
/// Normally messages are handled in a bounded way. When the job is fresh /// Normally output is sent to the job queue with backpressure. When the job is fresh
/// however we need to immediately return to prevent a deadlock as the messages /// however we need to immediately display the output to prevent a deadlock as the
/// are processed on the same thread as they are sent from. /// output messages are processed on the same thread as they are sent from. `output`
messages_bounded: bool, /// defines where to output in this case.
output: Option<&'a Config>,
/// The job id that this state is associated with, used when sending /// The job id that this state is associated with, used when sending
/// messages back to the main thread. /// messages back to the main thread.
@ -236,20 +237,24 @@ impl<'a> JobState<'a> {
.push(Message::BuildPlanMsg(module_name, cmd, filenames)); .push(Message::BuildPlanMsg(module_name, cmd, filenames));
} }
pub fn stdout(&self, stdout: String) { pub fn stdout(&self, stdout: String) -> CargoResult<()> {
if self.messages_bounded { if let Some(config) = self.output {
self.messages.push_bounded(Message::Stdout(stdout)); writeln!(config.shell().out(), "{}", stdout)?;
} else { } else {
self.messages.push(Message::Stdout(stdout)); self.messages.push_bounded(Message::Stdout(stdout));
} }
Ok(())
} }
pub fn stderr(&self, stderr: String) { pub fn stderr(&self, stderr: String) -> CargoResult<()> {
if self.messages_bounded { if let Some(config) = self.output {
self.messages.push_bounded(Message::Stderr(stderr)); let mut shell = config.shell();
shell.print_ansi(stderr.as_bytes())?;
shell.err().write_all(b"\n")?;
} else { } else {
self.messages.push(Message::Stderr(stderr)); self.messages.push_bounded(Message::Stderr(stderr));
} }
Ok(())
} }
/// A method used to signal to the coordinator thread that the rmeta file /// A method used to signal to the coordinator thread that the rmeta file
@ -839,17 +844,9 @@ impl<'cfg> DrainState<'cfg> {
self.note_working_on(cx.bcx.config, unit, fresh)?; self.note_working_on(cx.bcx.config, unit, fresh)?;
} }
let doit = move || { let doit = move |state: JobState<'_>| {
let state = JobState {
id,
messages: messages.clone(),
messages_bounded: job.freshness() == Freshness::Dirty,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};
let mut sender = FinishOnDrop { let mut sender = FinishOnDrop {
messages: &messages, messages: &state.messages,
id, id,
result: None, result: None,
}; };
@ -868,7 +865,9 @@ impl<'cfg> DrainState<'cfg> {
// we need to make sure that the metadata is flagged as produced so // we need to make sure that the metadata is flagged as produced so
// send a synthetic message here. // send a synthetic message here.
if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() { if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
messages.push(Message::Finish(id, Artifact::Metadata, Ok(()))); state
.messages
.push(Message::Finish(state.id, Artifact::Metadata, Ok(())));
} }
// Use a helper struct with a `Drop` implementation to guarantee // Use a helper struct with a `Drop` implementation to guarantee
@ -898,11 +897,25 @@ impl<'cfg> DrainState<'cfg> {
self.timings.add_fresh(); self.timings.add_fresh();
// Running a fresh job on the same thread is often much faster than spawning a new // Running a fresh job on the same thread is often much faster than spawning a new
// thread to run the job. // thread to run the job.
doit(); doit(JobState {
id,
messages: messages.clone(),
output: Some(cx.bcx.config),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
});
} }
Freshness::Dirty => { Freshness::Dirty => {
self.timings.add_dirty(); self.timings.add_dirty();
scope.spawn(move |_| doit()); scope.spawn(move |_| {
doit(JobState {
id,
messages: messages.clone(),
output: None,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
})
});
} }
} }

View File

@ -448,7 +448,7 @@ fn link_targets(cx: &mut Context<'_, '_>, unit: &Unit, fresh: bool) -> CargoResu
fresh, fresh,
} }
.to_json_string(); .to_json_string();
state.stdout(msg); state.stdout(msg)?;
} }
Ok(()) Ok(())
})) }))
@ -1139,7 +1139,7 @@ fn on_stdout_line(
_package_id: PackageId, _package_id: PackageId,
_target: &Target, _target: &Target,
) -> CargoResult<()> { ) -> CargoResult<()> {
state.stdout(line.to_string()); state.stdout(line.to_string())?;
Ok(()) Ok(())
} }
@ -1177,7 +1177,7 @@ fn on_stderr_line_inner(
// something like that), so skip over everything that doesn't look like a // something like that), so skip over everything that doesn't look like a
// JSON message. // JSON message.
if !line.starts_with('{') { if !line.starts_with('{') {
state.stderr(line.to_string()); state.stderr(line.to_string())?;
return Ok(true); return Ok(true);
} }
@ -1189,7 +1189,7 @@ fn on_stderr_line_inner(
// to stderr. // to stderr.
Err(e) => { Err(e) => {
debug!("failed to parse json: {:?}", e); debug!("failed to parse json: {:?}", e);
state.stderr(line.to_string()); state.stderr(line.to_string())?;
return Ok(true); return Ok(true);
} }
}; };
@ -1225,7 +1225,7 @@ fn on_stderr_line_inner(
.map(|v| String::from_utf8(v).expect("utf8")) .map(|v| String::from_utf8(v).expect("utf8"))
.expect("strip should never fail") .expect("strip should never fail")
}; };
state.stderr(rendered); state.stderr(rendered)?;
return Ok(true); return Ok(true);
} }
} }
@ -1316,7 +1316,7 @@ fn on_stderr_line_inner(
// Switch json lines from rustc/rustdoc that appear on stderr to stdout // Switch json lines from rustc/rustdoc that appear on stderr to stdout
// instead. We want the stdout of Cargo to always be machine parseable as // instead. We want the stdout of Cargo to always be machine parseable as
// stderr has our colorized human-readable messages. // stderr has our colorized human-readable messages.
state.stdout(msg); state.stdout(msg)?;
Ok(true) Ok(true)
} }