diff --git a/src/cargo/core/compiler/context/mod.rs b/src/cargo/core/compiler/context/mod.rs index 652acaba8..e57bab18d 100644 --- a/src/cargo/core/compiler/context/mod.rs +++ b/src/cargo/core/compiler/context/mod.rs @@ -491,4 +491,23 @@ impl<'a, 'cfg> Context<'a, 'cfg> { pub fn rmeta_required(&self, unit: &Unit<'a>) -> bool { self.rmeta_required.contains(unit) || self.bcx.config.cli_unstable().timings.is_some() } + + pub fn new_jobserver(&mut self) -> CargoResult { + let tokens = self.bcx.build_config.jobs as usize; + let client = Client::new(tokens).chain_err(|| "failed to create jobserver")?; + + // Drain the client fully + for i in 0..tokens { + while let Err(e) = client.acquire_raw() { + anyhow::bail!( + "failed to fully drain {}/{} token from jobserver at startup: {:?}", + i, + tokens, + e, + ); + } + } + + Ok(client) + } } diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index bdde12972..311fa2d77 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -9,7 +9,7 @@ use std::time::Duration; use anyhow::format_err; use crossbeam_utils::thread::Scope; -use jobserver::{Acquired, HelperThread}; +use jobserver::{Acquired, Client, HelperThread}; use log::{debug, info, trace}; use super::context::OutputFile; @@ -128,6 +128,22 @@ impl<'a> JobState<'a> { .tx .send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); } + + /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) + /// on the passed client. + /// + /// This should arrange for the passed client to eventually get a token via + /// `client.release_raw()`. + pub fn will_acquire(&self, _client: &Client) { + // ... + } + + /// The rustc underlying this Job is informing us that it is done with a jobserver token. + /// + /// Note that it does *not* write that token back anywhere. + pub fn release_token(&self) { + // ... + } } impl<'a, 'cfg> JobQueue<'a, 'cfg> { diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 67c5496e9..66f170a6f 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -48,6 +48,7 @@ use crate::util::errors::{self, CargoResult, CargoResultExt, Internal, ProcessEr use crate::util::machine_message::Message; use crate::util::{self, machine_message, ProcessBuilder}; use crate::util::{internal, join_paths, paths, profile}; +use jobserver::Client; /// A glorified callback for executing calls to rustc. Rather than calling rustc /// directly, we'll use an `Executor`, giving clients an opportunity to intercept @@ -171,7 +172,7 @@ fn rustc<'a, 'cfg>( unit: &Unit<'a>, exec: &Arc, ) -> CargoResult { - let mut rustc = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?; + let (rustc_client, mut rustc) = prepare_rustc(cx, &unit.target.rustc_crate_types(), unit)?; let build_plan = cx.bcx.build_config.build_plan; let name = unit.pkg.name().to_string(); @@ -286,7 +287,16 @@ fn rustc<'a, 'cfg>( &target, mode, &mut |line| on_stdout_line(state, line, package_id, &target), - &mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), + &mut |line| { + on_stderr_line( + state, + line, + package_id, + &target, + &mut output_options, + Some(&rustc_client), + ) + }, ) .map_err(internal_if_simple_exit_code) .chain_err(|| format!("could not compile `{}`.", name))?; @@ -534,14 +544,15 @@ fn prepare_rustc<'a, 'cfg>( cx: &mut Context<'a, 'cfg>, crate_types: &[&str], unit: &Unit<'a>, -) -> CargoResult { +) -> CargoResult<(Client, ProcessBuilder)> { let is_primary = cx.is_primary_package(unit); let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?; - base.inherit_jobserver(&cx.jobserver); + let client = cx.new_jobserver()?; + base.inherit_jobserver(&client); build_base_args(cx, &mut base, unit, crate_types)?; build_deps_args(&mut base, cx, unit)?; - Ok(base) + Ok((client, base)) } fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult { @@ -600,7 +611,9 @@ fn rustdoc<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>) -> CargoResult rustdoc .exec_with_streaming( &mut |line| on_stdout_line(state, line, package_id, &target), - &mut |line| on_stderr_line(state, line, package_id, &target, &mut output_options), + &mut |line| { + on_stderr_line(state, line, package_id, &target, &mut output_options, None) + }, false, ) .chain_err(|| format!("Could not document `{}`.", name))?; @@ -683,6 +696,7 @@ fn add_error_format_and_color( _ => {} } cmd.arg(json); + cmd.arg("-Zjobserver-token-requests"); Ok(()) } @@ -1081,8 +1095,9 @@ fn on_stderr_line( package_id: PackageId, target: &Target, options: &mut OutputOptions, + rustc_client: Option<&Client>, ) -> CargoResult<()> { - if on_stderr_line_inner(state, line, package_id, target, options)? { + if on_stderr_line_inner(state, line, package_id, target, options, rustc_client)? { // Check if caching is enabled. if let Some((path, cell)) = &mut options.cache_cell { // Cache the output, which will be replayed later when Fresh. @@ -1102,6 +1117,7 @@ fn on_stderr_line_inner( package_id: PackageId, target: &Target, options: &mut OutputOptions, + rustc_client: Option<&Client>, ) -> CargoResult { // We primarily want to use this function to process JSON messages from // rustc. The compiler should always print one JSON message per line, and @@ -1210,6 +1226,36 @@ fn on_stderr_line_inner( } } + #[derive(serde::Deserialize)] + struct JobserverNotification { + jobserver_event: Event, + } + + #[derive(Debug, serde::Deserialize)] + enum Event { + WillAcquire, + Release, + } + + if let Ok(JobserverNotification { jobserver_event }) = + serde_json::from_str::(compiler_message.get()) + { + log::trace!( + "found jobserver directive from rustc: `{:?}`", + jobserver_event + ); + match rustc_client { + Some(client) => match jobserver_event { + Event::WillAcquire => state.will_acquire(client), + Event::Release => state.release_token(), + }, + None => { + panic!("Received jobserver event without a client"); + } + } + return Ok(false); + } + // And failing all that above we should have a legitimate JSON diagnostic // from the compiler, so wrap it in an external Cargo JSON message // indicating which package it came from and then emit it. @@ -1258,7 +1304,7 @@ fn replay_output_cache( break; } let trimmed = line.trim_end_matches(&['\n', '\r'][..]); - on_stderr_line(state, trimmed, package_id, &target, &mut options)?; + on_stderr_line(state, trimmed, package_id, &target, &mut options, None)?; line.clear(); } Ok(())