From 0d722a4d83b43fb1534c487c2bd4867ead5862f2 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 15 Jan 2020 20:24:05 -0500 Subject: [PATCH] Split waiting for an event out of the primary drainer This has the slight behavior change where we won't ask for new dependencies and so forth if no events have been received but I believe that there's no activity that can happen if an event hasn't occurred (i.e., no state change has occurred) so there's no need for us to actually do anything in practice. To make sure we still record CPU usage and such sufficiently often that is also moved into the inner "waiting for events" loop. --- src/cargo/core/compiler/job_queue.rs | 69 +++++++++++++++------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index adefd6183..ff29bfaa3 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -510,6 +510,28 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { Ok(None) } + // This will also tick the progress bar as appropriate + fn wait_for_events(&mut self) -> Vec { + // Drain all events at once to avoid displaying the progress bar + // unnecessarily. If there's no events we actually block waiting for + // an event, but we keep a "heartbeat" going to allow `record_cpu` + // to run above to calculate CPU usage over time. To do this we + // listen for a message with a timeout, and on timeout we run the + // previous parts of the loop again. + let events: Vec<_> = self.rx.try_iter().collect(); + if events.is_empty() { + loop { + self.tick_progress(); + match self.rx.recv_timeout(Duration::from_millis(500)) { + Ok(message) => break vec![message], + Err(_) => continue, + } + } + } else { + events + } + } + fn drain_the_queue( &mut self, cx: &mut Context<'a, '_>, @@ -553,36 +575,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // don't actually use, and if this happens just relinquish it back // to the jobserver itself. self.tokens.truncate(self.active.len() - 1); - - // Record some timing information if `-Ztimings` is enabled, and - // this'll end up being a noop if we're not recording this - // information. - self.timings.mark_concurrency( - self.active.len(), - self.pending_queue.len(), - self.queue.len(), - self.rustc_tokens.len(), - ); - self.timings.record_cpu(); - - // Drain all events at once to avoid displaying the progress bar - // unnecessarily. If there's no events we actually block waiting for - // an event, but we keep a "heartbeat" going to allow `record_cpu` - // to run above to calculate CPU usage over time. To do this we - // listen for a message with a timeout, and on timeout we run the - // previous parts of the loop again. - let events: Vec<_> = self.rx.try_iter().collect(); - let events = if events.is_empty() { - self.show_progress(); - match self.rx.recv_timeout(Duration::from_millis(500)) { - Ok(message) => vec![message], - Err(_) => continue, - } - } else { - events - }; - - for event in events { + for event in self.wait_for_events() { if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? { error = Some(err); } @@ -628,7 +621,21 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { } } - fn show_progress(&mut self) { + // This also records CPU usage and marks concurrency; we roughly want to do + // this as often as we spin on the events receiver (at least every 500ms or + // so). + fn tick_progress(&mut self) { + // Record some timing information if `-Ztimings` is enabled, and + // this'll end up being a noop if we're not recording this + // information. + self.timings.mark_concurrency( + self.active.len(), + self.pending_queue.len(), + self.queue.len(), + self.rustc_tokens.len(), + ); + self.timings.record_cpu(); + let active_names = self .active .values()