Move local variables to struct fields

This will facilitate splitting drain_the_queue into methods
This commit is contained in:
Mark Rousskov 2020-01-15 20:03:58 -05:00
parent e2f4ce114d
commit 90ef289c70

View File

@ -93,6 +93,12 @@ pub struct JobQueue<'a, 'cfg> {
progress: Progress<'cfg>,
next_id: u32,
timings: Timings<'a, 'cfg>,
tokens: Vec<Acquired>,
rustc_tokens: Vec<(JobId, Acquired)>,
to_send_clients: Vec<(JobId, Client)>,
pending_queue: Vec<(Unit<'a>, Job)>,
print: DiagnosticPrinter<'cfg>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
@ -227,6 +233,12 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
progress,
next_id: 0,
timings,
tokens: Vec::new(),
rustc_tokens: Vec::new(),
to_send_clients: Vec::new(),
pending_queue: Vec::new(),
print: DiagnosticPrinter::new(bcx.config),
}
}
@ -341,11 +353,6 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
plan: &mut BuildPlan,
jobserver_helper: &HelperThread,
) -> CargoResult<()> {
let mut tokens = Vec::new();
let mut rustc_tokens = Vec::new();
let mut to_send_clients: Vec<(JobId, Client)> = Vec::new();
let mut queue = Vec::new();
let mut print = DiagnosticPrinter::new(cx.bcx.config);
trace!("queue: {:#?}", self.queue);
// Iteratively execute the entire dependency graph. Each turn of the
@ -367,8 +374,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
// start requesting job tokens. Each job after the first needs to
// request a token.
while let Some((unit, job)) = self.queue.dequeue() {
queue.push((unit, job));
if self.active.len() + queue.len() > 1 {
self.pending_queue.push((unit, job));
if self.active.len() + self.pending_queue.len() > 1 {
jobserver_helper.request_token();
}
}
@ -376,16 +383,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() {
let (unit, job) = queue.remove(0);
while error.is_none() && self.active.len() < self.tokens.len() + 1 && !self.pending_queue.is_empty() {
let (unit, job) = self.pending_queue.remove(0);
self.run(&unit, job, cx)?;
}
info!(
"tokens: {}, rustc_tokens: {}, waiting_rustcs: {}",
tokens.len(),
rustc_tokens.len(),
to_send_clients.len()
self.tokens.len(),
self.rustc_tokens.len(),
self.to_send_clients.len()
);
// If after all that we're not actually running anything then we're
@ -395,11 +402,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
}
// If we managed to acquire some extra tokens, send them off to a waiting rustc.
let extra_tokens = tokens.len() - (self.active.len() - 1);
let extra_tokens = self.tokens.len() - (self.active.len() - 1);
for _ in 0..extra_tokens {
if let Some((id, client)) = to_send_clients.pop() {
let token = tokens.pop().expect("an extra token");
rustc_tokens.push((id, token));
if let Some((id, client)) = self.to_send_clients.pop() {
let token = self.tokens.pop().expect("an extra token");
self.rustc_tokens.push((id, token));
client
.release_raw()
.chain_err(|| "failed to release jobserver token")?;
@ -411,16 +418,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
tokens.truncate(self.active.len() - 1);
self.tokens.truncate(self.active.len() - 1);
// Record some timing information if `-Ztimings` is enabled, and
// this'll end up being a noop if we're not recording this
// information.
self.timings.mark_concurrency(
self.active.len(),
queue.len(),
self.pending_queue.len(),
self.queue.len(),
rustc_tokens.len(),
self.rustc_tokens.len(),
);
self.timings.record_cpu();
@ -462,7 +469,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
shell.err().write_all(b"\n")?;
}
Message::FixDiagnostic(msg) => {
print.print(&msg)?;
self.print.print(&msg)?;
}
Message::Finish(id, artifact, result) => {
let unit = match artifact {
@ -472,17 +479,17 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
info!("end: {:?}", id);
finished += 1;
while let Some(pos) =
rustc_tokens.iter().position(|(i, _)| *i == id)
self.rustc_tokens.iter().position(|(i, _)| *i == id)
{
// push all the leftover tokens back into
// the token list
tokens.push(rustc_tokens.remove(pos).1);
self.tokens.push(self.rustc_tokens.remove(pos).1);
}
while let Some(pos) =
to_send_clients.iter().position(|(i, _)| *i == id)
self.to_send_clients.iter().position(|(i, _)| *i == id)
{
// drain all the pending clients
to_send_clients.remove(pos);
self.to_send_clients.remove(pos);
}
self.active.remove(&id).unwrap()
}
@ -516,26 +523,26 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
Message::Token(acquired_token) => {
let token =
acquired_token.chain_err(|| "failed to acquire jobserver token")?;
if let Some((id, client)) = to_send_clients.pop() {
rustc_tokens.push((id, token));
if let Some((id, client)) = self.to_send_clients.pop() {
self.rustc_tokens.push((id, token));
client
.release_raw()
.chain_err(|| "failed to release jobserver token")?;
} else {
tokens.push(token);
self.tokens.push(token);
}
}
Message::NeedsToken(id, client) => {
log::info!("queue token request");
jobserver_helper.request_token();
to_send_clients.push((id, client));
self.to_send_clients.push((id, client));
}
Message::ReleaseToken(id) => {
// Note that this pops off potentially a completely
// different token, but all tokens of the same job are
// conceptually the same so that's fine.
if let Some(pos) = rustc_tokens.iter().position(|(i, _)| *i == id) {
tokens.push(rustc_tokens.remove(pos).1);
if let Some(pos) = self.rustc_tokens.iter().position(|(i, _)| *i == id) {
self.tokens.push(self.rustc_tokens.remove(pos).1);
} else {
panic!(
"This job (id={}) does not have tokens associated with it",
@ -570,7 +577,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
if let Some(e) = error {
Err(e)
} else if self.queue.is_empty() && queue.is_empty() {
} else if self.queue.is_empty() && self.pending_queue.is_empty() {
let message = format!(
"{} [{}] target(s) in {}",
profile_name, opt_type, time_elapsed