165 lines
5.4 KiB
Rust

use std::sync::{Arc, LazyLock, OnceLock};
pub use jobserver_crate::{Acquired, Client, HelperThread};
use jobserver_crate::{FromEnv, FromEnvErrorKind};
use parking_lot::{Condvar, Mutex};
// We can only call `from_env_ext` once per process
// We stick this in a global because there could be multiple rustc instances
// in this process, and the jobserver is per-process.
static GLOBAL_CLIENT: LazyLock<Result<Client, String>> = LazyLock::new(|| {
// Note that this is unsafe because it may misinterpret file descriptors
// on Unix as jobserver file descriptors. We hopefully execute this near
// the beginning of the process though to ensure we don't get false
// positives, or in other words we try to execute this before we open
// any file descriptors ourselves.
let FromEnv { client, var } = unsafe { Client::from_env_ext(true) };
let error = match client {
Ok(client) => return Ok(client),
Err(e) => e,
};
if matches!(
error.kind(),
FromEnvErrorKind::NoEnvVar
| FromEnvErrorKind::NoJobserver
| FromEnvErrorKind::NegativeFd
| FromEnvErrorKind::Unsupported
) {
return Ok(default_client());
}
// Environment specifies jobserver, but it looks incorrect.
// Safety: `error.kind()` should be `NoEnvVar` if `var == None`.
let (name, value) = var.unwrap();
Err(format!(
"failed to connect to jobserver from environment variable `{name}={:?}`: {error}",
value
))
});
// Create a new jobserver if there's no inherited one.
fn default_client() -> Client {
// Pick a "reasonable maximum" capping out at 32
// so we don't take everything down by hogging the process run queue.
// The fixed number is used to have deterministic compilation across machines.
let client = Client::new(32).expect("failed to create jobserver");
// Acquire a token for the main thread which we can release later
client.acquire_raw().ok();
client
}
static GLOBAL_CLIENT_CHECKED: OnceLock<Client> = OnceLock::new();
pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
let client_checked = match &*GLOBAL_CLIENT {
Ok(client) => client.clone(),
Err(e) => {
report_warning(e);
default_client()
}
};
GLOBAL_CLIENT_CHECKED.set(client_checked).ok();
}
const ACCESS_ERROR: &str = "jobserver check should have been called earlier";
pub fn client() -> Client {
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
}
struct ProxyData {
/// The number of tokens assigned to threads.
/// If this is 0, a single token is still assigned to this process, but is unused.
used: u16,
/// The number of threads requesting a token
pending: u16,
}
/// This is a jobserver proxy used to ensure that we hold on to at least one token.
pub struct Proxy {
client: Client,
data: Mutex<ProxyData>,
/// Threads which are waiting on a token will wait on this.
wake_pending: Condvar,
helper: OnceLock<HelperThread>,
}
impl Proxy {
pub fn new() -> Arc<Self> {
let proxy = Arc::new(Proxy {
client: client(),
data: Mutex::new(ProxyData { used: 1, pending: 0 }),
wake_pending: Condvar::new(),
helper: OnceLock::new(),
});
let proxy_ = Arc::clone(&proxy);
let helper = proxy
.client
.clone()
.into_helper_thread(move |token| {
if let Ok(token) = token {
let mut data = proxy_.data.lock();
if data.pending > 0 {
// Give the token to a waiting thread
token.drop_without_releasing();
assert!(data.used > 0);
data.used += 1;
data.pending -= 1;
proxy_.wake_pending.notify_one();
} else {
// The token is no longer needed, drop it.
drop(data);
drop(token);
}
}
})
.expect("failed to create helper thread");
proxy.helper.set(helper).unwrap();
proxy
}
pub fn acquire_thread(&self) {
let mut data = self.data.lock();
if data.used == 0 {
// There was a free token around. This can
// happen when all threads release their token.
assert_eq!(data.pending, 0);
data.used += 1;
} else {
// Request a token from the helper thread. We can't directly use `acquire_raw`
// as we also need to be able to wait for the final token in the process which
// does not get a corresponding `release_raw` call.
self.helper.get().unwrap().request_token();
data.pending += 1;
self.wake_pending.wait(&mut data);
}
}
pub fn release_thread(&self) {
let mut data = self.data.lock();
if data.pending > 0 {
// Give the token to a waiting thread
data.pending -= 1;
self.wake_pending.notify_one();
} else {
data.used -= 1;
// Release the token unless it's the last one in the process
if data.used > 0 {
drop(data);
self.client.release_raw().ok();
}
}
}
}