Communicate jobserver information with each rustc

This commit is contained in:
Mark Rousskov 2019-12-20 20:11:10 -05:00
parent ec80cf90b0
commit 4a8237f6ad
2 changed files with 70 additions and 7 deletions

View File

@ -91,6 +91,12 @@ enum Message {
FixDiagnostic(diagnostic_server::Message),
Token(io::Result<Acquired>),
Finish(u32, Artifact, CargoResult<()>),
// This client should get release_raw called on it with one of our tokens
NeedsToken(u32, Client),
// A token previously passed to a NeedsToken client is being released.
ReleaseToken(u32),
}
impl<'a> JobState<'a> {
@ -134,15 +140,15 @@ impl<'a> JobState<'a> {
///
/// This should arrange for the passed client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self, _client: &Client) {
// ...
pub fn will_acquire(&self, client: &Client) {
let _ = self.tx.send(Message::NeedsToken(self.id, client.clone()));
}
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
// ...
let _ = self.tx.send(Message::ReleaseToken(self.id));
}
}
@ -285,6 +291,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
jobserver_helper: &HelperThread,
) -> CargoResult<()> {
let mut tokens = Vec::new();
let mut rustc_tokens = Vec::new();
let mut to_send_clients: Vec<(u32, Client)> = Vec::new();
let mut queue = Vec::new();
let mut print = DiagnosticPrinter::new(cx.bcx.config);
trace!("queue: {:#?}", self.queue);
@ -322,6 +330,13 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
self.run(&unit, job, cx, scope)?;
}
info!(
"tokens: {}, rustc_tokens: {}, waiting_rustcs: {}",
tokens.len(),
rustc_tokens.len(),
to_send_clients.len()
);
// If after all that we're not actually running anything then we're
// done!
if self.active.is_empty() {
@ -333,6 +348,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
let extra_tokens = tokens.len() - (self.active.len() - 1);
for _ in 0..extra_tokens {
if let Some((id, client)) = to_send_clients.pop() {
let token = tokens.pop().expect("an extra token");
rustc_tokens.push((id, token));
client
.release_raw()
.chain_err(|| "failed to release jobserver token")?;
}
}
tokens.truncate(self.active.len() - 1);
// Record some timing information if `-Ztimings` is enabled, and
@ -389,6 +414,19 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
Artifact::All => {
info!("end: {:?}", id);
finished += 1;
while let Some(pos) =
rustc_tokens.iter().position(|(i, _)| *i == id)
{
// push all the leftover tokens back into
// the token list
tokens.push(rustc_tokens.remove(pos).1);
}
while let Some(pos) =
to_send_clients.iter().position(|(i, _)| *i == id)
{
// drain all the pending clients
to_send_clients.remove(pos);
}
self.active.remove(&id).unwrap()
}
// ... otherwise if it hasn't finished we leave it
@ -419,9 +457,34 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
}
}
Message::Token(acquired_token) => {
tokens.push(
acquired_token.chain_err(|| "failed to acquire jobserver token")?,
);
let token =
acquired_token.chain_err(|| "failed to acquire jobserver token")?;
if let Some((id, client)) = to_send_clients.pop() {
rustc_tokens.push((id, token));
client
.release_raw()
.chain_err(|| "failed to release jobserver token")?;
} else {
tokens.push(token);
}
}
Message::NeedsToken(id, client) => {
log::info!("queue token request");
jobserver_helper.request_token();
to_send_clients.push((id, client));
}
Message::ReleaseToken(id) => {
// Note that this pops off potentially a completely
// different token, but all tokens of the same job are
// conceptually the same so that's fine.
if let Some(pos) = rustc_tokens.iter().position(|(i, _)| *i == id) {
tokens.push(rustc_tokens.remove(pos).1);
} else {
panic!(
"This job (id={}) does not have tokens associated with it",
id
);
}
}
}
}

View File

@ -1240,7 +1240,7 @@ fn on_stderr_line_inner(
if let Ok(JobserverNotification { jobserver_event }) =
serde_json::from_str::<JobserverNotification>(compiler_message.get())
{
log::trace!(
log::info!(
"found jobserver directive from rustc: `{:?}`",
jobserver_event
);