From 4a8237f6ad57fc959615432f49b8ca1b1e706ad3 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Fri, 20 Dec 2019 20:11:10 -0500 Subject: [PATCH] Communicate jobserver information with each rustc --- src/cargo/core/compiler/job_queue.rs | 75 +++++++++++++++++++++++++--- src/cargo/core/compiler/mod.rs | 2 +- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 311fa2d77..868331161 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -91,6 +91,12 @@ enum Message { FixDiagnostic(diagnostic_server::Message), Token(io::Result), Finish(u32, Artifact, CargoResult<()>), + + // This client should get release_raw called on it with one of our tokens + NeedsToken(u32, Client), + + // A token previously passed to a NeedsToken client is being released. + ReleaseToken(u32), } impl<'a> JobState<'a> { @@ -134,15 +140,15 @@ impl<'a> JobState<'a> { /// /// This should arrange for the passed client to eventually get a token via /// `client.release_raw()`. - pub fn will_acquire(&self, _client: &Client) { - // ... + pub fn will_acquire(&self, client: &Client) { + let _ = self.tx.send(Message::NeedsToken(self.id, client.clone())); } /// The rustc underlying this Job is informing us that it is done with a jobserver token. /// /// Note that it does *not* write that token back anywhere. pub fn release_token(&self) { - // ... + let _ = self.tx.send(Message::ReleaseToken(self.id)); } } @@ -285,6 +291,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { jobserver_helper: &HelperThread, ) -> CargoResult<()> { let mut tokens = Vec::new(); + let mut rustc_tokens = Vec::new(); + let mut to_send_clients: Vec<(u32, Client)> = Vec::new(); let mut queue = Vec::new(); let mut print = DiagnosticPrinter::new(cx.bcx.config); trace!("queue: {:#?}", self.queue); @@ -322,6 +330,13 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { self.run(&unit, job, cx, scope)?; } + info!( + "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}", + tokens.len(), + rustc_tokens.len(), + to_send_clients.len() + ); + // If after all that we're not actually running anything then we're // done! if self.active.is_empty() { @@ -333,6 +348,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // jobserver interface is architected we may acquire a token that we // don't actually use, and if this happens just relinquish it back // to the jobserver itself. + let extra_tokens = tokens.len() - (self.active.len() - 1); + for _ in 0..extra_tokens { + if let Some((id, client)) = to_send_clients.pop() { + let token = tokens.pop().expect("an extra token"); + rustc_tokens.push((id, token)); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; + } + } tokens.truncate(self.active.len() - 1); // Record some timing information if `-Ztimings` is enabled, and @@ -389,6 +414,19 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Artifact::All => { info!("end: {:?}", id); finished += 1; + while let Some(pos) = + rustc_tokens.iter().position(|(i, _)| *i == id) + { + // push all the leftover tokens back into + // the token list + tokens.push(rustc_tokens.remove(pos).1); + } + while let Some(pos) = + to_send_clients.iter().position(|(i, _)| *i == id) + { + // drain all the pending clients + to_send_clients.remove(pos); + } self.active.remove(&id).unwrap() } // ... otherwise if it hasn't finished we leave it @@ -419,9 +457,34 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } Message::Token(acquired_token) => { - tokens.push( - acquired_token.chain_err(|| "failed to acquire jobserver token")?, - ); + let token = + acquired_token.chain_err(|| "failed to acquire jobserver token")?; + if let Some((id, client)) = to_send_clients.pop() { + rustc_tokens.push((id, token)); + client + .release_raw() + .chain_err(|| "failed to release jobserver token")?; + } else { + tokens.push(token); + } + } + Message::NeedsToken(id, client) => { + log::info!("queue token request"); + jobserver_helper.request_token(); + to_send_clients.push((id, client)); + } + Message::ReleaseToken(id) => { + // Note that this pops off potentially a completely + // different token, but all tokens of the same job are + // conceptually the same so that's fine. + if let Some(pos) = rustc_tokens.iter().position(|(i, _)| *i == id) { + tokens.push(rustc_tokens.remove(pos).1); + } else { + panic!( + "This job (id={}) does not have tokens associated with it", + id + ); + } } } } diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 66f170a6f..2b9dc64ff 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -1240,7 +1240,7 @@ fn on_stderr_line_inner( if let Ok(JobserverNotification { jobserver_event }) = serde_json::from_str::(compiler_message.get()) { - log::trace!( + log::info!( "found jobserver directive from rustc: `{:?}`", jobserver_event );