mirror of
				https://github.com/rust-lang/rust.git
				synced 2025-10-31 04:57:19 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			165 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			165 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use std::sync::{Arc, LazyLock, OnceLock};
 | |
| 
 | |
| pub use jobserver_crate::{Acquired, Client, HelperThread};
 | |
| use jobserver_crate::{FromEnv, FromEnvErrorKind};
 | |
| use parking_lot::{Condvar, Mutex};
 | |
| 
 | |
| // We can only call `from_env_ext` once per process
 | |
| 
 | |
| // We stick this in a global because there could be multiple rustc instances
 | |
| // in this process, and the jobserver is per-process.
 | |
| static GLOBAL_CLIENT: LazyLock<Result<Client, String>> = LazyLock::new(|| {
 | |
|     // Note that this is unsafe because it may misinterpret file descriptors
 | |
|     // on Unix as jobserver file descriptors. We hopefully execute this near
 | |
|     // the beginning of the process though to ensure we don't get false
 | |
|     // positives, or in other words we try to execute this before we open
 | |
|     // any file descriptors ourselves.
 | |
|     let FromEnv { client, var } = unsafe { Client::from_env_ext(true) };
 | |
| 
 | |
|     let error = match client {
 | |
|         Ok(client) => return Ok(client),
 | |
|         Err(e) => e,
 | |
|     };
 | |
| 
 | |
|     if matches!(
 | |
|         error.kind(),
 | |
|         FromEnvErrorKind::NoEnvVar
 | |
|             | FromEnvErrorKind::NoJobserver
 | |
|             | FromEnvErrorKind::NegativeFd
 | |
|             | FromEnvErrorKind::Unsupported
 | |
|     ) {
 | |
|         return Ok(default_client());
 | |
|     }
 | |
| 
 | |
|     // Environment specifies jobserver, but it looks incorrect.
 | |
|     // Safety: `error.kind()` should be `NoEnvVar` if `var == None`.
 | |
|     let (name, value) = var.unwrap();
 | |
|     Err(format!(
 | |
|         "failed to connect to jobserver from environment variable `{name}={:?}`: {error}",
 | |
|         value
 | |
|     ))
 | |
| });
 | |
| 
 | |
| // Create a new jobserver if there's no inherited one.
 | |
| fn default_client() -> Client {
 | |
|     // Pick a "reasonable maximum" capping out at 32
 | |
|     // so we don't take everything down by hogging the process run queue.
 | |
|     // The fixed number is used to have deterministic compilation across machines.
 | |
|     let client = Client::new(32).expect("failed to create jobserver");
 | |
| 
 | |
|     // Acquire a token for the main thread which we can release later
 | |
|     client.acquire_raw().ok();
 | |
| 
 | |
|     client
 | |
| }
 | |
| 
 | |
| static GLOBAL_CLIENT_CHECKED: OnceLock<Client> = OnceLock::new();
 | |
| 
 | |
| pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
 | |
|     let client_checked = match &*GLOBAL_CLIENT {
 | |
|         Ok(client) => client.clone(),
 | |
|         Err(e) => {
 | |
|             report_warning(e);
 | |
|             default_client()
 | |
|         }
 | |
|     };
 | |
|     GLOBAL_CLIENT_CHECKED.set(client_checked).ok();
 | |
| }
 | |
| 
 | |
| const ACCESS_ERROR: &str = "jobserver check should have been called earlier";
 | |
| 
 | |
| pub fn client() -> Client {
 | |
|     GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
 | |
| }
 | |
| 
 | |
| struct ProxyData {
 | |
|     /// The number of tokens assigned to threads.
 | |
|     /// If this is 0, a single token is still assigned to this process, but is unused.
 | |
|     used: u16,
 | |
| 
 | |
|     /// The number of threads requesting a token
 | |
|     pending: u16,
 | |
| }
 | |
| 
 | |
| /// This is a jobserver proxy used to ensure that we hold on to at least one token.
 | |
| pub struct Proxy {
 | |
|     client: Client,
 | |
|     data: Mutex<ProxyData>,
 | |
| 
 | |
|     /// Threads which are waiting on a token will wait on this.
 | |
|     wake_pending: Condvar,
 | |
| 
 | |
|     helper: OnceLock<HelperThread>,
 | |
| }
 | |
| 
 | |
| impl Proxy {
 | |
|     pub fn new() -> Arc<Self> {
 | |
|         let proxy = Arc::new(Proxy {
 | |
|             client: client(),
 | |
|             data: Mutex::new(ProxyData { used: 1, pending: 0 }),
 | |
|             wake_pending: Condvar::new(),
 | |
|             helper: OnceLock::new(),
 | |
|         });
 | |
|         let proxy_ = Arc::clone(&proxy);
 | |
|         let helper = proxy
 | |
|             .client
 | |
|             .clone()
 | |
|             .into_helper_thread(move |token| {
 | |
|                 if let Ok(token) = token {
 | |
|                     let mut data = proxy_.data.lock();
 | |
|                     if data.pending > 0 {
 | |
|                         // Give the token to a waiting thread
 | |
|                         token.drop_without_releasing();
 | |
|                         assert!(data.used > 0);
 | |
|                         data.used += 1;
 | |
|                         data.pending -= 1;
 | |
|                         proxy_.wake_pending.notify_one();
 | |
|                     } else {
 | |
|                         // The token is no longer needed, drop it.
 | |
|                         drop(data);
 | |
|                         drop(token);
 | |
|                     }
 | |
|                 }
 | |
|             })
 | |
|             .expect("failed to create helper thread");
 | |
|         proxy.helper.set(helper).unwrap();
 | |
|         proxy
 | |
|     }
 | |
| 
 | |
|     pub fn acquire_thread(&self) {
 | |
|         let mut data = self.data.lock();
 | |
| 
 | |
|         if data.used == 0 {
 | |
|             // There was a free token around. This can
 | |
|             // happen when all threads release their token.
 | |
|             assert_eq!(data.pending, 0);
 | |
|             data.used += 1;
 | |
|         } else {
 | |
|             // Request a token from the helper thread. We can't directly use `acquire_raw`
 | |
|             // as we also need to be able to wait for the final token in the process which
 | |
|             // does not get a corresponding `release_raw` call.
 | |
|             self.helper.get().unwrap().request_token();
 | |
|             data.pending += 1;
 | |
|             self.wake_pending.wait(&mut data);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     pub fn release_thread(&self) {
 | |
|         let mut data = self.data.lock();
 | |
| 
 | |
|         if data.pending > 0 {
 | |
|             // Give the token to a waiting thread
 | |
|             data.pending -= 1;
 | |
|             self.wake_pending.notify_one();
 | |
|         } else {
 | |
|             data.used -= 1;
 | |
| 
 | |
|             // Release the token unless it's the last one in the process
 | |
|             if data.used > 0 {
 | |
|                 drop(data);
 | |
|                 self.client.release_raw().ok();
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 | 
