Make threadpool::Runtime methods take &self (#1140)

The runtime is inherently multi-threaded, so it's going to have to deal
with synchronization when submitting new tasks anyway. This allows a
runtime to be shared by multiple threads more easily when e.g. building
a blocking facade over a tokio-based API.
This commit is contained in:
Steven Fackler 2019-06-10 12:54:27 -07:00 committed by Carl Lerche
parent 5c0b56278b
commit 4f6395b31c
8 changed files with 19 additions and 23 deletions

View File

@ -10,7 +10,7 @@ use tokio_io;
use tokio_tls; use tokio_tls;
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut runtime = Runtime::new()?; let runtime = Runtime::new()?;
let addr = "www.rust-lang.org:443" let addr = "www.rust-lang.org:443"
.to_socket_addrs()? .to_socket_addrs()?
.next() .next()

View File

@ -89,7 +89,7 @@ fn get_host(host: &'static str) -> Error {
let addr = format!("{}:443", host); let addr = format!("{}:443", host);
let addr = t!(addr.to_socket_addrs()).next().unwrap(); let addr = t!(addr.to_socket_addrs()).next().unwrap();
let mut l = t!(Runtime::new()); let l = t!(Runtime::new());
let client = TcpStream::connect(&addr); let client = TcpStream::connect(&addr);
let data = client.and_then(move |socket| { let data = client.and_then(move |socket| {
let builder = TlsConnector::builder(); let builder = TlsConnector::builder();

View File

@ -63,7 +63,7 @@ fn fetch_google() {
let addr = t!("google.com:443".to_socket_addrs()).next().unwrap(); let addr = t!("google.com:443".to_socket_addrs()).next().unwrap();
// Create an event loop and connect a socket to our resolved address.c // Create an event loop and connect a socket to our resolved address.c
let mut l = t!(Runtime::new()); let l = t!(Runtime::new());
let client = TcpStream::connect(&addr); let client = TcpStream::connect(&addr);
// Send off the request by first negotiating an SSL handshake, then writing // Send off the request by first negotiating an SSL handshake, then writing
@ -97,7 +97,7 @@ fn wrong_hostname_error() {
let addr = t!("google.com:443".to_socket_addrs()).next().unwrap(); let addr = t!("google.com:443".to_socket_addrs()).next().unwrap();
let mut l = t!(Runtime::new()); let l = t!(Runtime::new());
let client = TcpStream::connect(&addr); let client = TcpStream::connect(&addr);
let data = client.and_then(move |socket| { let data = client.and_then(move |socket| {
let builder = TlsConnector::builder(); let builder = TlsConnector::builder();

View File

@ -507,7 +507,7 @@ const AMT: u64 = 128 * 1024;
#[test] #[test]
fn client_to_server() { fn client_to_server() {
drop(env_logger::try_init()); drop(env_logger::try_init());
let mut l = t!(Runtime::new()); let l = t!(Runtime::new());
// Create a server listening on a port, then figure out what that port is // Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse())));
@ -540,7 +540,7 @@ fn client_to_server() {
#[test] #[test]
fn server_to_client() { fn server_to_client() {
drop(env_logger::try_init()); drop(env_logger::try_init());
let mut l = t!(Runtime::new()); let l = t!(Runtime::new());
// Create a server listening on a port, then figure out what that port is // Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse())));
@ -597,7 +597,7 @@ impl<S: AsyncWrite> AsyncWrite for OneByte<S> {
fn one_byte_at_a_time() { fn one_byte_at_a_time() {
const AMT: u64 = 1024; const AMT: u64 = 1024;
drop(env_logger::try_init()); drop(env_logger::try_init());
let mut l = t!(Runtime::new()); let l = t!(Runtime::new());
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse())));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());

View File

@ -98,7 +98,7 @@ where F: Future<Item = (), Error = ()> + Send + 'static,
{ {
// Check enter before creating a new Runtime... // Check enter before creating a new Runtime...
let mut entered = enter().expect("nested tokio::run"); let mut entered = enter().expect("nested tokio::run");
let mut runtime = Runtime::new().expect("failed to start new Runtime"); let runtime = Runtime::new().expect("failed to start new Runtime");
runtime.spawn(future); runtime.spawn(future);
entered entered
.block_on(runtime.shutdown_on_idle()) .block_on(runtime.shutdown_on_idle())
@ -216,7 +216,7 @@ impl Runtime {
/// ///
/// # fn dox() { /// # fn dox() {
/// // Create the runtime /// // Create the runtime
/// let mut rt = Runtime::new().unwrap(); /// let rt = Runtime::new().unwrap();
/// ///
/// // Spawn a future onto the runtime /// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|| { /// rt.spawn(future::lazy(|| {
@ -231,10 +231,10 @@ impl Runtime {
/// ///
/// This function panics if the spawn fails. Failure occurs if the executor /// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future. /// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self pub fn spawn<F>(&self, future: F) -> &Self
where F: Future<Item = (), Error = ()> + Send + 'static, where F: Future<Item = (), Error = ()> + Send + 'static,
{ {
self.inner_mut().pool.spawn(future); self.inner().pool.spawn(future);
self self
} }
@ -250,7 +250,7 @@ impl Runtime {
/// ///
/// This function panics if the executor is at capacity, if the provided /// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context. /// future panics, or if called within an asynchronous execution context.
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> pub fn block_on<F, R, E>(&self, future: F) -> Result<R, E>
where where
F: Send + 'static + Future<Item = R, Error = E>, F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static, R: Send + 'static,
@ -276,7 +276,7 @@ impl Runtime {
/// ///
/// This function panics if the executor is at capacity, if the provided /// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context. /// future panics, or if called within an asynchronous execution context.
pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E> pub fn block_on_all<F, R, E>(self, future: F) -> Result<R, E>
where where
F: Send + 'static + Future<Item = R, Error = E>, F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static, R: Send + 'static,
@ -375,10 +375,6 @@ impl Runtime {
fn inner(&self) -> &Inner { fn inner(&self) -> &Inner {
self.inner.as_ref().unwrap() self.inner.as_ref().unwrap()
} }
fn inner_mut(&mut self) -> &mut Inner {
self.inner.as_mut().unwrap()
}
} }
impl Drop for Runtime { impl Drop for Runtime {

View File

@ -24,7 +24,7 @@ fn clock_and_timer_concurrent() {
let when = Instant::now() + Duration::from_millis(5_000); let when = Instant::now() + Duration::from_millis(5_000);
let clock = Clock::new_with_now(MockNow(when)); let clock = Clock::new_with_now(MockNow(when));
let mut rt = runtime::Builder::new().clock(clock).build().unwrap(); let rt = runtime::Builder::new().clock(clock).build().unwrap();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();

View File

@ -89,7 +89,7 @@ fn hammer_split() {
let cnt = Arc::new(AtomicUsize::new(0)); let cnt = Arc::new(AtomicUsize::new(0));
let mut rt = Runtime::new().unwrap(); let rt = Runtime::new().unwrap();
fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) { fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) {
let socket = Arc::new(socket); let socket = Arc::new(socket);

View File

@ -229,7 +229,7 @@ fn block_on_timer() {
Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x))
} }
let mut runtime = Runtime::new().unwrap(); let runtime = Runtime::new().unwrap();
assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42);
runtime.shutdown_on_idle().wait().unwrap(); runtime.shutdown_on_idle().wait().unwrap();
} }
@ -244,7 +244,7 @@ mod from_block_on {
let cnt = Arc::new(Mutex::new(0)); let cnt = Arc::new(Mutex::new(0));
let c = cnt.clone(); let c = cnt.clone();
let mut runtime = Runtime::new().unwrap(); let runtime = Runtime::new().unwrap();
let msg = runtime let msg = runtime
.block_on(lazy(move || { .block_on(lazy(move || {
{ {
@ -300,7 +300,7 @@ fn block_waits() {
let cnt = Arc::new(Mutex::new(0)); let cnt = Arc::new(Mutex::new(0));
let c = cnt.clone(); let c = cnt.clone();
let mut runtime = Runtime::new().unwrap(); let runtime = Runtime::new().unwrap();
runtime runtime
.block_on(rx.then(move |_| { .block_on(rx.then(move |_| {
{ {
@ -467,7 +467,7 @@ mod nested_enter {
#[test] #[test]
fn threadpool_block_on_in_run() { fn threadpool_block_on_in_run() {
test(tokio::run, |fut| { test(tokio::run, |fut| {
let mut rt = threadpool_new(); let rt = threadpool_new();
rt.block_on(fut).unwrap(); rt.block_on(fut).unwrap();
}); });
} }