Wire up methods for informing job queue of rustc jobserver state

This commit is contained in:
Mark Rousskov 2019-12-19 13:40:29 -05:00
parent 9d11cd19a9
commit ec80cf90b0
3 changed files with 90 additions and 9 deletions

View File

@ -491,4 +491,23 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
pub fn rmeta_required(&self, unit: &Unit<'a>) -> bool { pub fn rmeta_required(&self, unit: &Unit<'a>) -> bool {
self.rmeta_required.contains(unit) || self.bcx.config.cli_unstable().timings.is_some() self.rmeta_required.contains(unit) || self.bcx.config.cli_unstable().timings.is_some()
} }
pub fn new_jobserver(&mut self) -> CargoResult<Client> {
let tokens = self.bcx.build_config.jobs as usize;
let client = Client::new(tokens).chain_err(|| "failed to create jobserver")?;
// Drain the client fully
for i in 0..tokens {
while let Err(e) = client.acquire_raw() {
anyhow::bail!(
"failed to fully drain {}/{} token from jobserver at startup: {:?}",
i,
tokens,
e,
);
}
}
Ok(client)
}
} }

View File

@ -9,7 +9,7 @@ use std::time::Duration;
use anyhow::format_err; use anyhow::format_err;
use crossbeam_utils::thread::Scope; use crossbeam_utils::thread::Scope;
use jobserver::{Acquired, HelperThread}; use jobserver::{Acquired, Client, HelperThread};
use log::{debug, info, trace}; use log::{debug, info, trace};
use super::context::OutputFile; use super::context::OutputFile;
@ -128,6 +128,22 @@ impl<'a> JobState<'a> {
.tx .tx
.send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); .send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
} }
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
/// on the passed client.
///
/// This should arrange for the passed client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self, _client: &Client) {
// ...
}
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
// ...
}
} }
impl<'a, 'cfg> JobQueue<'a, 'cfg> { impl<'a, 'cfg> JobQueue<'a, 'cfg> {

View File

@ -48,6 +48,7 @@ use crate::util::errors::{self, CargoResult, CargoResultExt, Internal, ProcessEr
use crate::util::machine_message::Message; use crate::util::machine_message::Message;
use crate::util::{self, machine_message, ProcessBuilder}; use crate::util::{self, machine_message, ProcessBuilder};
use crate::util::{internal, join_paths, paths, profile}; use crate::util::{internal, join_paths, paths, profile};
use jobserver::Client;
/// A glorified callback for executing calls to rustc. Rather than calling rustc /// A glorified callback for executing calls to rustc. Rather than calling rustc
/// directly, we'll use an `Executor`, giving clients an opportunity to intercept /// directly, we'll use an `Executor`, giving clients an opportunity to intercept
@ -171,7 +172,7 @@ fn rustc<'a, 'cfg>(
unit: &Unit<'a>, unit: &Unit<'a>,
exec: &Arc<dyn Executor>, exec: &Arc<dyn Executor>,
) -> CargoResult<Work> { ) -> CargoResult<Work> {
let mut rustc = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?; let (rustc_client, mut rustc) = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?;
let build_plan = cx.bcx.build_config.build_plan; let build_plan = cx.bcx.build_config.build_plan;
let name = unit.pkg.name().to_string(); let name = unit.pkg.name().to_string();
@ -286,7 +287,16 @@ fn rustc<'a, 'cfg>(
&target, &target,
mode, mode,
&mut |line| on_stdout_line(state, line, package_id, &target), &mut |line| on_stdout_line(state, line, package_id, &target),
&mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), &mut |line| {
on_stderr_line(
state,
line,
package_id,
&target,
&mut output_options,
Some(&rustc_client),
)
},
) )
.map_err(internal_if_simple_exit_code) .map_err(internal_if_simple_exit_code)
.chain_err(|| format!("could not compile `{}`.", name))?; .chain_err(|| format!("could not compile `{}`.", name))?;
@ -534,14 +544,15 @@ fn prepare_rustc<'a, 'cfg>(
cx: &mut Context<'a, 'cfg>, cx: &mut Context<'a, 'cfg>,
crate_types: &[&str], crate_types: &[&str],
unit: &Unit<'a>, unit: &Unit<'a>,
) -> CargoResult<ProcessBuilder> { ) -> CargoResult<(Client, ProcessBuilder)> {
let is_primary = cx.is_primary_package(unit); let is_primary = cx.is_primary_package(unit);
let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?; let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?;
base.inherit_jobserver(&cx.jobserver); let client = cx.new_jobserver()?;
base.inherit_jobserver(&client);
build_base_args(cx, &mut base, unit, crate_types)?; build_base_args(cx, &mut base, unit, crate_types)?;
build_deps_args(&mut base, cx, unit)?; build_deps_args(&mut base, cx, unit)?;
Ok(base) Ok((client, base))
} }
fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult<Work> { fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult<Work> {
@ -600,7 +611,9 @@ fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult
rustdoc rustdoc
.exec_with_streaming( .exec_with_streaming(
&mut |line| on_stdout_line(state, line, package_id, &target), &mut |line| on_stdout_line(state, line, package_id, &target),
&mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), &mut |line| {
on_stderr_line(state, line, package_id, &target, &mut output_options, None)
},
false, false,
) )
.chain_err(|| format!("Could not document `{}`.", name))?; .chain_err(|| format!("Could not document `{}`.", name))?;
@ -683,6 +696,7 @@ fn add_error_format_and_color(
_ => {} _ => {}
} }
cmd.arg(json); cmd.arg(json);
cmd.arg("-Zjobserver-token-requests");
Ok(()) Ok(())
} }
@ -1081,8 +1095,9 @@ fn on_stderr_line(
package_id: PackageId, package_id: PackageId,
target: &Target, target: &Target,
options: &mut OutputOptions, options: &mut OutputOptions,
rustc_client: Option<&Client>,
) -> CargoResult<()> { ) -> CargoResult<()> {
if on_stderr_line_inner(state, line, package_id, target, options)? { if on_stderr_line_inner(state, line, package_id, target, options, rustc_client)? {
// Check if caching is enabled. // Check if caching is enabled.
if let Some((path, cell)) = &mut options.cache_cell { if let Some((path, cell)) = &mut options.cache_cell {
// Cache the output, which will be replayed later when Fresh. // Cache the output, which will be replayed later when Fresh.
@ -1102,6 +1117,7 @@ fn on_stderr_line_inner(
package_id: PackageId, package_id: PackageId,
target: &Target, target: &Target,
options: &mut OutputOptions, options: &mut OutputOptions,
rustc_client: Option<&Client>,
) -> CargoResult<bool> { ) -> CargoResult<bool> {
// We primarily want to use this function to process JSON messages from // We primarily want to use this function to process JSON messages from
// rustc. The compiler should always print one JSON message per line, and // rustc. The compiler should always print one JSON message per line, and
@ -1210,6 +1226,36 @@ fn on_stderr_line_inner(
} }
} }
#[derive(serde::Deserialize)]
struct JobserverNotification {
jobserver_event: Event,
}
#[derive(Debug, serde::Deserialize)]
enum Event {
WillAcquire,
Release,
}
if let Ok(JobserverNotification { jobserver_event }) =
serde_json::from_str::<JobserverNotification>(compiler_message.get())
{
log::trace!(
"found jobserver directive from rustc: `{:?}`",
jobserver_event
);
match rustc_client {
Some(client) => match jobserver_event {
Event::WillAcquire => state.will_acquire(client),
Event::Release => state.release_token(),
},
None => {
panic!("Received jobserver event without a client");
}
}
return Ok(false);
}
// And failing all that above we should have a legitimate JSON diagnostic // And failing all that above we should have a legitimate JSON diagnostic
// from the compiler, so wrap it in an external Cargo JSON message // from the compiler, so wrap it in an external Cargo JSON message
// indicating which package it came from and then emit it. // indicating which package it came from and then emit it.
@ -1258,7 +1304,7 @@ fn replay_output_cache(
break; break;
} }
let trimmed = line.trim_end_matches(&['\n', '\r'][..]); let trimmed = line.trim_end_matches(&['\n', '\r'][..]);
on_stderr_line(state, trimmed, package_id, &target, &mut options)?; on_stderr_line(state, trimmed, package_id, &target, &mut options, None)?;
line.clear(); line.clear();
} }
Ok(()) Ok(())