From 4f6395b31c2e5e550188c8ae01de2c4eaee2937b Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Mon, 10 Jun 2019 12:54:27 -0700 Subject: [PATCH] Make threadpool::Runtime methods take &self (#1140) The runtime is inherently multi-threaded, so it's going to have to deal with synchronization when submitting new tasks anyway. This allows a runtime to be shared by multiple threads more easily when e.g. building a blocking facade over a tokio-based API. --- tokio-tls/examples/download-rust-lang.rs | 2 +- tokio-tls/tests/bad.rs | 2 +- tokio-tls/tests/google.rs | 4 ++-- tokio-tls/tests/smoke.rs | 6 +++--- tokio/src/runtime/threadpool/mod.rs | 16 ++++++---------- tokio/tests/clock.rs | 2 +- tokio/tests/global.rs | 2 +- tokio/tests/runtime.rs | 8 ++++---- 8 files changed, 19 insertions(+), 23 deletions(-) diff --git a/tokio-tls/examples/download-rust-lang.rs b/tokio-tls/examples/download-rust-lang.rs index 08e463bf1..62e91743b 100644 --- a/tokio-tls/examples/download-rust-lang.rs +++ b/tokio-tls/examples/download-rust-lang.rs @@ -10,7 +10,7 @@ use tokio_io; use tokio_tls; fn main() -> Result<(), Box> { - let mut runtime = Runtime::new()?; + let runtime = Runtime::new()?; let addr = "www.rust-lang.org:443" .to_socket_addrs()? .next() diff --git a/tokio-tls/tests/bad.rs b/tokio-tls/tests/bad.rs index e72b2d23a..954593b09 100644 --- a/tokio-tls/tests/bad.rs +++ b/tokio-tls/tests/bad.rs @@ -89,7 +89,7 @@ fn get_host(host: &'static str) -> Error { let addr = format!("{}:443", host); let addr = t!(addr.to_socket_addrs()).next().unwrap(); - let mut l = t!(Runtime::new()); + let l = t!(Runtime::new()); let client = TcpStream::connect(&addr); let data = client.and_then(move |socket| { let builder = TlsConnector::builder(); diff --git a/tokio-tls/tests/google.rs b/tokio-tls/tests/google.rs index 15346d4c8..b76c1085f 100644 --- a/tokio-tls/tests/google.rs +++ b/tokio-tls/tests/google.rs @@ -63,7 +63,7 @@ fn fetch_google() { let addr = t!("google.com:443".to_socket_addrs()).next().unwrap(); // Create an event loop and connect a socket to our resolved address.c - let mut l = t!(Runtime::new()); + let l = t!(Runtime::new()); let client = TcpStream::connect(&addr); // Send off the request by first negotiating an SSL handshake, then writing @@ -97,7 +97,7 @@ fn wrong_hostname_error() { let addr = t!("google.com:443".to_socket_addrs()).next().unwrap(); - let mut l = t!(Runtime::new()); + let l = t!(Runtime::new()); let client = TcpStream::connect(&addr); let data = client.and_then(move |socket| { let builder = TlsConnector::builder(); diff --git a/tokio-tls/tests/smoke.rs b/tokio-tls/tests/smoke.rs index 18d5dde8c..76564d9b2 100644 --- a/tokio-tls/tests/smoke.rs +++ b/tokio-tls/tests/smoke.rs @@ -507,7 +507,7 @@ const AMT: u64 = 128 * 1024; #[test] fn client_to_server() { drop(env_logger::try_init()); - let mut l = t!(Runtime::new()); + let l = t!(Runtime::new()); // Create a server listening on a port, then figure out what that port is let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); @@ -540,7 +540,7 @@ fn client_to_server() { #[test] fn server_to_client() { drop(env_logger::try_init()); - let mut l = t!(Runtime::new()); + let l = t!(Runtime::new()); // Create a server listening on a port, then figure out what that port is let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); @@ -597,7 +597,7 @@ impl AsyncWrite for OneByte { fn one_byte_at_a_time() { const AMT: u64 = 1024; drop(env_logger::try_init()); - let mut l = t!(Runtime::new()); + let l = t!(Runtime::new()); let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); let addr = t!(srv.local_addr()); diff --git a/tokio/src/runtime/threadpool/mod.rs b/tokio/src/runtime/threadpool/mod.rs index 4b865b4ae..22f0ff3af 100644 --- a/tokio/src/runtime/threadpool/mod.rs +++ b/tokio/src/runtime/threadpool/mod.rs @@ -98,7 +98,7 @@ where F: Future + Send + 'static, { // Check enter before creating a new Runtime... let mut entered = enter().expect("nested tokio::run"); - let mut runtime = Runtime::new().expect("failed to start new Runtime"); + let runtime = Runtime::new().expect("failed to start new Runtime"); runtime.spawn(future); entered .block_on(runtime.shutdown_on_idle()) @@ -216,7 +216,7 @@ impl Runtime { /// /// # fn dox() { /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// /// // Spawn a future onto the runtime /// rt.spawn(future::lazy(|| { @@ -231,10 +231,10 @@ impl Runtime { /// /// This function panics if the spawn fails. Failure occurs if the executor /// is currently at capacity and is unable to spawn a new future. - pub fn spawn(&mut self, future: F) -> &mut Self + pub fn spawn(&self, future: F) -> &Self where F: Future + Send + 'static, { - self.inner_mut().pool.spawn(future); + self.inner().pool.spawn(future); self } @@ -250,7 +250,7 @@ impl Runtime { /// /// This function panics if the executor is at capacity, if the provided /// future panics, or if called within an asynchronous execution context. - pub fn block_on(&mut self, future: F) -> Result + pub fn block_on(&self, future: F) -> Result where F: Send + 'static + Future, R: Send + 'static, @@ -276,7 +276,7 @@ impl Runtime { /// /// This function panics if the executor is at capacity, if the provided /// future panics, or if called within an asynchronous execution context. - pub fn block_on_all(mut self, future: F) -> Result + pub fn block_on_all(self, future: F) -> Result where F: Send + 'static + Future, R: Send + 'static, @@ -375,10 +375,6 @@ impl Runtime { fn inner(&self) -> &Inner { self.inner.as_ref().unwrap() } - - fn inner_mut(&mut self) -> &mut Inner { - self.inner.as_mut().unwrap() - } } impl Drop for Runtime { diff --git a/tokio/tests/clock.rs b/tokio/tests/clock.rs index 9d4b22544..65ec81098 100644 --- a/tokio/tests/clock.rs +++ b/tokio/tests/clock.rs @@ -24,7 +24,7 @@ fn clock_and_timer_concurrent() { let when = Instant::now() + Duration::from_millis(5_000); let clock = Clock::new_with_now(MockNow(when)); - let mut rt = runtime::Builder::new().clock(clock).build().unwrap(); + let rt = runtime::Builder::new().clock(clock).build().unwrap(); let (tx, rx) = mpsc::channel(); diff --git a/tokio/tests/global.rs b/tokio/tests/global.rs index 43052f12d..da6af2e77 100644 --- a/tokio/tests/global.rs +++ b/tokio/tests/global.rs @@ -89,7 +89,7 @@ fn hammer_split() { let cnt = Arc::new(AtomicUsize::new(0)); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); fn split(socket: TcpStream, cnt: Arc) { let socket = Arc::new(socket); diff --git a/tokio/tests/runtime.rs b/tokio/tests/runtime.rs index 6f47cfa45..4ec292d95 100644 --- a/tokio/tests/runtime.rs +++ b/tokio/tests/runtime.rs @@ -229,7 +229,7 @@ fn block_on_timer() { Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) } - let mut runtime = Runtime::new().unwrap(); + let runtime = Runtime::new().unwrap(); assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); runtime.shutdown_on_idle().wait().unwrap(); } @@ -244,7 +244,7 @@ mod from_block_on { let cnt = Arc::new(Mutex::new(0)); let c = cnt.clone(); - let mut runtime = Runtime::new().unwrap(); + let runtime = Runtime::new().unwrap(); let msg = runtime .block_on(lazy(move || { { @@ -300,7 +300,7 @@ fn block_waits() { let cnt = Arc::new(Mutex::new(0)); let c = cnt.clone(); - let mut runtime = Runtime::new().unwrap(); + let runtime = Runtime::new().unwrap(); runtime .block_on(rx.then(move |_| { { @@ -467,7 +467,7 @@ mod nested_enter { #[test] fn threadpool_block_on_in_run() { test(tokio::run, |fut| { - let mut rt = threadpool_new(); + let rt = threadpool_new(); rt.block_on(fut).unwrap(); }); }