From a7b053372f220f66963fefe91690b0284d0921a0 Mon Sep 17 00:00:00 2001 From: Flux Xu Date: Thu, 30 Aug 2018 19:53:05 -0400 Subject: [PATCH] Add ThreadPool::spawn_handle (#602) ## Motivation `tokio_threadpool::ThreadPool::spawn` has no return value. ## Solution Add `ThreadPool::spawn_handle` which calls `futures::sync::oneshot::spawn` to return a future represents the return value. --- tokio-threadpool/src/thread_pool.rs | 62 ++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/tokio-threadpool/src/thread_pool.rs b/tokio-threadpool/src/thread_pool.rs index cb2e960a2..0bebd183e 100644 --- a/tokio-threadpool/src/thread_pool.rs +++ b/tokio-threadpool/src/thread_pool.rs @@ -3,7 +3,8 @@ use pool::Pool; use sender::Sender; use shutdown::Shutdown; -use futures::Future; +use futures::{Future, Poll}; +use futures::sync::oneshot; /// Work-stealing based thread pool for executing futures. /// @@ -64,6 +65,47 @@ impl ThreadPool { self.sender().spawn(future).unwrap(); } + /// Spawn a future on to the thread pool, return a future representing + /// the produced value. + /// + /// The SpawnHandle returned is a future that is a proxy for future itself. + /// When future completes on this thread pool then the SpawnHandle will itself + /// be resolved. + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::ThreadPool; + /// use futures::future::{Future, lazy}; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = ThreadPool::new(); + /// + /// let handle = thread_pool.spawn_handle(lazy(|| Ok::<_, ()>(42))); + /// + /// let value = handle.wait().unwrap(); + /// assert_eq!(value, 42); + /// + /// // Gracefully shutdown the threadpool + /// thread_pool.shutdown().wait().unwrap(); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. + pub fn spawn_handle(&self, future: F) -> SpawnHandle + where + F: Future + Send + 'static, + F::Item: Send + 'static, + F::Error: Send + 'static, + { + SpawnHandle(oneshot::spawn(future, self.sender())) + } + /// Return a reference to the sender handle /// /// The handle is used to spawn futures onto the thread pool. It also @@ -132,3 +174,21 @@ impl Drop for ThreadPool { } } } + +/// Handle returned from ThreadPool::spawn_handle. +/// +/// This handle is a future representing the completion of a different future +/// spawned on to the thread pool. Created through the ThreadPool::spawn_handle +/// function this handle will resolve when the future provided resolves on the +/// thread pool. +#[derive(Debug)] +pub struct SpawnHandle(oneshot::SpawnHandle); + +impl Future for SpawnHandle { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll { + self.0.poll() + } +} \ No newline at end of file