Tweak the tokio::spawn function (#171)

Currently, `tokio::spawn` matched the `spawn` function from futures 0.2.
However, this adds additional ergonomic overhead and removes the ability
to spawn from a drop fn. See rust-lang-nursery/futures-rs#830.

This patch switches the behavior to access the thread-local variable
referencing the default executor directly in the `spawn` function.
This commit is contained in:
Carl Lerche 2018-03-02 15:45:35 -08:00 committed by GitHub
parent 21c0f3a9d8
commit 7db7719419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 24 deletions

View File

@ -134,16 +134,23 @@ pub mod thread_pool {
pub use tokio_executor::{Executor, DefaultExecutor, SpawnError};
use futures::{Future, Poll, Async};
use futures::{Future, IntoFuture};
use futures::future::{self, FutureResult};
/// Future, returned by `spawn`, that completes once the future is spawned.
/// Return value from the `spawn` function.
///
/// Currently this value doesn't actually provide any functionality. However, it
/// provides a way to add functionality later without breaking backwards
/// compatibility.
///
/// This also implements `IntoFuture` so that it can be used as the return value
/// in a `for_each` loop.
///
/// See [`spawn`] for more details.
///
/// [`spawn`]: fn.spawn.html
#[derive(Debug)]
#[must_use = "Spawn does nothing unless polled"]
pub struct Spawn<F>(Option<F>);
pub struct Spawn(());
/// Spawns a future on the default executor.
///
@ -154,10 +161,6 @@ pub struct Spawn<F>(Option<F>);
///
/// The default executor is **usually** a thread pool.
///
/// Note that the function doesn't immediately spawn the future. Instead, it
/// returns `Spawn`, which itself is a future that completes once the spawn has
/// succeeded.
///
/// # Examples
///
/// In this example, a server is started and `spawn` is used to start a new task
@ -188,20 +191,27 @@ pub struct Spawn<F>(Option<F>);
/// ```
///
/// [default executor]: struct.DefaultExecutor.html
pub fn spawn<F>(f: F) -> Spawn<F>
///
/// # Panics
///
/// This function will panic if the default executor is not set or if spawning
/// onto the default executor returns an error. To avoid the panic, use
/// [`DefaultExecutor`].
///
/// [`DefaultExecutor`]: #
pub fn spawn<F>(f: F) -> Spawn
where F: Future<Item = (), Error = ()> + 'static + Send
{
Spawn(Some(f))
::tokio_executor::spawn(f);
Spawn(())
}
impl<F> Future for Spawn<F>
where F: Future<Item = (), Error = ()> + Send + 'static
{
impl IntoFuture for Spawn {
type Future = FutureResult<(), ()>;
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
::tokio_executor::spawn(self.0.take().unwrap());
Ok(Async::Ready(()))
fn into_future(self) -> Self::Future {
future::ok(())
}
}

View File

@ -81,7 +81,7 @@ fn hammer_split() {
let mut rt = Runtime::new().unwrap();
fn split(socket: TcpStream) -> Box<Future<Item = (), Error = ()> + Send> {
fn split(socket: TcpStream) {
let socket = Arc::new(socket);
let rd = Rd(socket.clone());
let wr = Wr(socket);
@ -94,25 +94,25 @@ fn hammer_split() {
.map(|_| ())
.map_err(|e| panic!("write error = {:?}", e));
Box::new({
tokio::spawn(rd)
.join(tokio::spawn(wr))
.map(|_| ())
})
tokio::spawn(rd);
tokio::spawn(wr);
}
rt.spawn({
srv.incoming()
.map_err(|e| panic!("accept error = {:?}", e))
.take(N as u64)
.for_each(|socket| split(socket))
.for_each(|socket| {
split(socket);
Ok(())
})
});
for _ in 0..N {
rt.spawn({
TcpStream::connect(&addr)
.map_err(|e| panic!("connect error = {:?}", e))
.and_then(|socket| split(socket))
.map(|socket| split(socket))
});
}