Async/await polish (#1058)

A general refresh of Tokio's experimental async / await support.
This commit is contained in:
Carl Lerche 2019-04-25 19:22:32 -07:00 committed by David Barsky
parent df702130d6
commit 0e400af78c
44 changed files with 400 additions and 211 deletions

View File

@ -31,13 +31,13 @@ task:
folder: $HOME/.cargo/registry
test_script:
- . $HOME/.cargo/env
- cargo test --all --no-fail-fast
- cargo test --all
- (cd tokio-trace/test-log-support && cargo test)
- (cd tokio-trace/test_static_max_level_features && cargo test)
- cargo doc --all
i686_test_script:
- . $HOME/.cargo/env
- |
cargo test --all --exclude tokio-tls --no-fail-fast --target i686-unknown-freebsd
cargo test --all --exclude tokio-tls --exclude tokio-macros --target i686-unknown-freebsd
before_cache_script:
- rm -rf $HOME/.cargo/registry/index

View File

@ -2,13 +2,14 @@
members = [
"tokio",
"tokio-async-await",
"tokio-buf",
"tokio-codec",
"tokio-current-thread",
"tokio-executor",
"tokio-fs",
"tokio-futures",
"tokio-io",
"tokio-macros",
"tokio-reactor",
"tokio-signal",
"tokio-sync",

View File

@ -126,10 +126,6 @@ have greater guarantees of stability.
The crates included as part of Tokio are:
* [`tokio-async-await`]: Experimental `async` / `await` support.
* [`tokio-codec`]: Utilities for encoding and decoding protocol frames.
* [`tokio-current-thread`]: Schedule the execution of futures on the current
thread.
@ -137,8 +133,14 @@ The crates included as part of Tokio are:
* [`tokio-fs`]: Filesystem (and standard in / out) APIs.
* [`tokio-futures`]: Experimental `std::future::Future` and `async` / `await` support.
* [`tokio-codec`]: Utilities for encoding and decoding protocol frames.
* [`tokio-io`]: Asynchronous I/O related traits and utilities.
* [`tokio-macros`]: Macros for usage with Tokio.
* [`tokio-reactor`]: Event loop that drives I/O resources (like TCP and UDP
sockets).
@ -154,12 +156,13 @@ The crates included as part of Tokio are:
* [`tokio-uds`]: Unix Domain Socket bindings for use with `tokio-io` and
`tokio-reactor`.
[`tokio-async-await`]: tokio-async-await
[`tokio-codec`]: tokio-codec
[`tokio-current-thread`]: tokio-current-thread
[`tokio-executor`]: tokio-executor
[`tokio-fs`]: tokio-fs
[`tokio-futures`]: tokio-futures
[`tokio-io`]: tokio-io
[`tokio-macros`]: tokio-macros
[`tokio-reactor`]: tokio-reactor
[`tokio-tcp`]: tokio-tcp
[`tokio-threadpool`]: tokio-threadpool

View File

@ -0,0 +1,2 @@
[build]
target-dir = "../target"

49
async-await/Cargo.toml Normal file
View File

@ -0,0 +1,49 @@
[package]
name = "examples"
edition = "2018"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
# Break out of the parent workspace
[workspace]
[[bin]]
name = "chat"
path = "src/chat.rs"
[[bin]]
name = "echo_client"
path = "src/echo_client.rs"
[[bin]]
name = "echo_server"
path = "src/echo_server.rs"
[[bin]]
name = "hyper"
path = "src/hyper.rs"
[dependencies]
tokio = { version = "0.1.18", features = ["async-await-preview"] }
futures = "0.1.23"
bytes = "0.4.9"
hyper = "0.12.8"
# Avoid using crates.io for Tokio dependencies
[patch.crates-io]
tokio = { path = "../tokio" }
tokio-codec = { path = "../tokio-codec" }
tokio-current-thread = { path = "../tokio-current-thread" }
tokio-executor = { path = "../tokio-executor" }
tokio-fs = { path = "../tokio-fs" }
# tokio-futures = { path = "../" }
tokio-io = { path = "../tokio-io" }
tokio-reactor = { path = "../tokio-reactor" }
tokio-signal = { path = "../tokio-signal" }
tokio-tcp = { path = "../tokio-tcp" }
tokio-threadpool = { path = "../tokio-threadpool" }
tokio-timer = { path = "../tokio-timer" }
tokio-tls = { path = "../tokio-tls" }
tokio-udp = { path = "../tokio-udp" }
tokio-uds = { path = "../tokio-uds" }

View File

@ -1,9 +1,6 @@
#![feature(await_macro, async_await, futures_api)]
#[macro_use]
extern crate tokio;
extern crate futures; // v0.1
#![feature(await_macro, async_await)]
use tokio::await;
use tokio::codec::{LinesCodec, Decoder};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
@ -95,7 +92,8 @@ async fn process(stream: TcpStream, state: Arc<Mutex<Shared>>) -> io::Result<()>
Ok(())
}
fn main() {
#[tokio::main]
async fn main() {
// Create the shared state. This is how all the peers communicate.
//
// The server task will hold a handle to this. For every new client, the
@ -113,23 +111,21 @@ fn main() {
println!("server running on localhost:6142");
// Start the Tokio runtime.
tokio::run_async(async move {
let mut incoming = listener.incoming();
let mut incoming = listener.incoming();
while let Some(stream) = await!(incoming.next()) {
let stream = match stream {
Ok(stream) => stream,
Err(_) => continue,
};
while let Some(stream) = await!(incoming.next()) {
let stream = match stream {
Ok(stream) => stream,
Err(_) => continue,
};
let state = state.clone();
let state = state.clone();
tokio::spawn_async(async move {
if let Err(_) = await!(process(stream, state)) {
eprintln!("failed to process connection");
}
});
}
});
tokio::spawn_async(async move {
if let Err(_) = await!(process(stream, state)) {
eprintln!("failed to process connection");
}
});
}
}

View File

@ -1,8 +1,6 @@
#![feature(await_macro, async_await, futures_api)]
#[macro_use]
extern crate tokio;
#![feature(await_macro, async_await)]
use tokio::await;
use tokio::net::TcpStream;
use tokio::prelude::*;
@ -36,7 +34,8 @@ async fn run_client(addr: &SocketAddr) -> io::Result<()> {
Ok(())
}
fn main() {
#[tokio::main]
async fn main() {
use std::env;
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
@ -44,10 +43,8 @@ fn main() {
// Connect to the echo serveer
tokio::run_async(async move {
match await!(run_client(&addr)) {
Ok(_) => println!("done."),
Err(e) => eprintln!("echo client failed; error = {:?}", e),
}
});
match await!(run_client(&addr)) {
Ok(_) => println!("done."),
Err(e) => eprintln!("echo client failed; error = {:?}", e),
}
}

View File

@ -1,8 +1,6 @@
#![feature(await_macro, async_await, futures_api)]
#[macro_use]
extern crate tokio;
#![feature(await_macro, async_await)]
use tokio::await;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
@ -24,7 +22,8 @@ fn handle(mut stream: TcpStream) {
});
}
fn main() {
#[tokio::main]
async fn main() {
use std::env;
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
@ -34,12 +33,10 @@ fn main() {
let listener = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);
tokio::run_async(async {
let mut incoming = listener.incoming();
let mut incoming = listener.incoming();
while let Some(stream) = await!(incoming.next()) {
let stream = stream.unwrap();
handle(stream);
}
});
while let Some(stream) = await!(incoming.next()) {
let stream = stream.unwrap();
handle(stream);
}
}

29
async-await/src/hyper.rs Normal file
View File

@ -0,0 +1,29 @@
#![feature(await_macro, async_await)]
use tokio::await;
use tokio::prelude::*;
use hyper::Client;
use std::time::Duration;
use std::str;
#[tokio::main]
async fn main() {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse().unwrap();
let response = await!({
client.get(uri)
.timeout(Duration::from_secs(10))
}).unwrap();
println!("Response: {}", response.status());
let mut body = response.into_body();
while let Some(chunk) = await!(body.next()) {
let chunk = chunk.unwrap();
println!("chunk = {}", str::from_utf8(&chunk[..]).unwrap());
}
}

View File

@ -76,7 +76,7 @@ jobs:
parameters:
name: async_await
displayName: Async / Await
rust: nightly-2019-04-22
rust: nightly-2019-04-25
noDefaultFeatures: ''
benches: true
crates:

View File

@ -2,12 +2,12 @@
# repository.
[patch.crates-io]
tokio = { path = "tokio" }
tokio-async-await = { path = "tokio-async-await" }
tokio-buf = { path = "tokio-buf" }
tokio-codec = { path = "tokio-codec" }
tokio-current-thread = { path = "tokio-current-thread" }
tokio-executor = { path = "tokio-executor" }
tokio-fs = { path = "tokio-fs" }
tokio-futures = { path = "tokio-futures" }
tokio-io = { path = "tokio-io" }
tokio-reactor = { path = "tokio-reactor" }
tokio-signal = { path = "tokio-signal" }

View File

@ -1,2 +0,0 @@
[build]
target-dir = "../../target"

View File

@ -1,49 +0,0 @@
[package]
name = "examples"
edition = "2018"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
# Break out of the parent workspace
[workspace]
[[bin]]
name = "chat"
path = "src/chat.rs"
[[bin]]
name = "echo_client"
path = "src/echo_client.rs"
[[bin]]
name = "echo_server"
path = "src/echo_server.rs"
[[bin]]
name = "hyper"
path = "src/hyper.rs"
[dependencies]
tokio = { version = "0.1.18", features = ["async-await-preview"] }
futures = "0.1.23"
bytes = "0.4.9"
hyper = "0.12.8"
# Avoid using crates.io for Tokio dependencies
[patch.crates-io]
tokio = { path = "../../tokio" }
tokio-async-await = { path = "../" }
tokio-codec = { path = "../../tokio-codec" }
tokio-current-thread = { path = "../../tokio-current-thread" }
tokio-executor = { path = "../../tokio-executor" }
tokio-fs = { path = "../../tokio-fs" }
tokio-io = { path = "../../tokio-io" }
tokio-reactor = { path = "../../tokio-reactor" }
tokio-signal = { path = "../../tokio-signal" }
tokio-tcp = { path = "../../tokio-tcp" }
tokio-threadpool = { path = "../../tokio-threadpool" }
tokio-timer = { path = "../../tokio-timer" }
tokio-tls = { path = "../../tokio-tls" }
tokio-udp = { path = "../../tokio-udp" }
tokio-uds = { path = "../../tokio-uds" }

View File

@ -1,33 +0,0 @@
#![feature(await_macro, async_await, futures_api)]
#[macro_use]
extern crate tokio;
extern crate hyper;
use tokio::prelude::*;
use hyper::Client;
use std::time::Duration;
use std::str;
pub fn main() {
tokio::run_async(async {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse().unwrap();
let response = await!({
client.get(uri)
.timeout(Duration::from_secs(10))
}).unwrap();
println!("Response: {}", response.status());
let mut body = response.into_body();
while let Some(chunk) = await!(body.next()) {
let chunk = chunk.unwrap();
println!("chunk = {}", str::from_utf8(&chunk[..]).unwrap());
}
});
}

View File

@ -1,4 +0,0 @@
#![doc(hidden)]
pub mod backward;
pub mod forward;

View File

@ -1,16 +1,16 @@
[package]
name = "tokio-async-await"
name = "tokio-futures"
# When releasing to crates.io:
# - Update html_root_url.
version = "0.1.7"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-async-await/0.1.7"
documentation = "https://docs.rs/tokio-futures/0.1.0"
description = """
Experimental async/await support for Tokio
Experimental std::future::Future and async/await support for Tokio
"""
categories = ["asynchronous"]
@ -25,5 +25,5 @@ tokio-io = "0.1.7"
[dev-dependencies]
bytes = "0.4.9"
tokio = "0.1.8"
hyper = "0.12.8"
tokio = { version = "0.1.8", path = "../tokio" }

View File

@ -25,9 +25,9 @@ Then, get started. In your application, add:
```rust
// The nightly features that are commonly needed with async / await
#![feature(await_macro, async_await, futures_api)]
#![feature(await_macro, async_await)]
// This pulls in the `tokio-async-await` crate. While Rust 2018 doesn't require
// This pulls in the `tokio-futures` crate. While Rust 2018 doesn't require
// `extern crate`, we need to pull in the macros.
#[macro_use]
extern crate tokio;

View File

@ -1,3 +1,5 @@
//! Converts a `std::future::Future` into an 0.1 `Future.
use futures::{Future, Poll};
use std::future::Future as StdFuture;
@ -5,18 +7,18 @@ use std::pin::Pin;
use std::ptr;
use std::task::{Context, Poll as StdPoll, RawWaker, RawWakerVTable, Waker};
/// Convert an 0.3 `Future` to an 0.1 `Future`.
/// Converts a `std::future::Future` into an 0.1 `Future.
#[derive(Debug)]
pub struct Compat<T>(Pin<Box<T>>);
impl<T> Compat<T> {
/// Create a new `Compat` backed by `future`.
pub fn new(future: T) -> Compat<T> {
pub(crate) fn new(future: T) -> Compat<T> {
Compat(Box::pin(future))
}
}
/// Convert a value into one that can be used with `await!`.
#[doc(hidden)]
pub trait IntoAwaitable {
type Awaitable;
@ -74,7 +76,11 @@ unsafe fn clone_raw(_data: *const ()) -> RawWaker {
unsafe fn drop_raw(_data: *const ()) {}
unsafe fn wake(_data: *const ()) {
unimplemented!("async-await-preview currently only supports futures 0.1. Use the compatibility layer of futures 0.3 instead, if you want to use futures 0.3.");
unimplemented!(
"async-await-preview currently only supports futures 0.1. Use \
the compatibility layer of futures 0.3 instead, if you want \
to use futures 0.3."
);
}
const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(clone_raw, wake, wake, drop_raw);

View File

@ -1,10 +1,12 @@
//! Converts an 0.1 `Future` into a `std::future::Future`.
//!
use futures::{Async, Future};
use std::future::Future as StdFuture;
use std::pin::Pin;
use std::task::{Context, Poll as StdPoll};
/// Converts an 0.1 `Future` into an 0.3 `Future`.
/// Converts an 0.1 `Future` into a `std::future::Future`.
#[derive(Debug)]
pub struct Compat<T>(T);
@ -31,7 +33,7 @@ pub(crate) fn convert_poll_stream<T, E>(
}
}
/// Convert a value into one that can be used with `await!`.
#[doc(hidden)]
pub trait IntoAwaitable {
type Awaitable;

View File

@ -0,0 +1,42 @@
//! Compatibility layer between futures 0.1 and `std`.
pub mod backward;
pub mod forward;
/// Convert a `std::future::Future` yielding `Result` into an 0.1 `Future`.
pub fn into_01<T, Item, Error>(future: T) -> backward::Compat<T>
where
T: std::future::Future<Output = Result<Item, Error>>,
{
backward::Compat::new(future)
}
/// Convert a `std::future::Future` into an 0.1 `Future` with unit error.
pub fn infallible_into_01<T>(future: T) -> impl futures::Future<Item = T::Output, Error = ()>
where
T: std::future::Future,
{
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct Map<T>(T);
impl<T> Map<T> {
fn future<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut T> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) }
}
}
impl<T: std::future::Future> std::future::Future for Map<T> {
type Output = Result<T::Output, ()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.future().poll(cx) {
Poll::Ready(v) => Poll::Ready(Ok(v)),
Poll::Pending => Poll::Pending,
}
}
}
into_01(Map(future))
}

View File

@ -25,7 +25,7 @@ pub trait AsyncReadExt: AsyncRead {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro, futures_api)]
/// #![feature(async_await, await_macro)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
@ -59,7 +59,7 @@ pub trait AsyncReadExt: AsyncRead {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro, futures_api)]
/// #![feature(async_await, await_macro)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
@ -78,7 +78,7 @@ pub trait AsyncReadExt: AsyncRead {
/// ## EOF is hit before `buf` is filled
///
/// ```edition2018
/// #![feature(async_await, await_macro, futures_api)]
/// #![feature(async_await, await_macro)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
@ -110,7 +110,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro, futures_api)]
/// #![feature(async_await, await_macro)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
@ -139,7 +139,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro, futures_api)]
/// #![feature(async_await, await_macro)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
@ -163,7 +163,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro, futures_api)]
/// #![feature(async_await, await_macro)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.

View File

@ -1,6 +1,6 @@
#![cfg(feature = "async-await-preview")]
#![feature(rust_2018_preview, async_await, await_macro, futures_api)]
#![doc(html_root_url = "https://docs.rs/tokio-async-await/0.1.7")]
#![feature(await_macro)]
#![doc(html_root_url = "https://docs.rs/tokio-futures/0.1.0")]
#![deny(missing_docs, missing_debug_implementations)]
#![cfg_attr(test, deny(warnings))]

View File

@ -13,7 +13,7 @@ pub trait StreamExt: Stream {
/// # Examples
///
/// ```edition2018
/// #![feature(await_macro, async_await, futures_api)]
/// #![feature(await_macro, async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.

19
tokio-macros/Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "tokio-macros"
version = "0.1.0"
authors = ["Tokio Contributors <team@tokio.rs>"]
edition = "2018"
publish = false
[lib]
proc-macro = true
[features]
# This feature comes with no promise of stability. Things will
# break with each patch release. Use at your own risk.
async-await-preview = []
[dependencies]
proc-macro2 = "0.4.27"
quote = "0.6.11"
syn = { version = "0.15.27", features = ["full", "extra-traits", "visit-mut"] }

47
tokio-macros/LICENSE Normal file
View File

@ -0,0 +1,47 @@
Copyright (c) 2019 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
The MIT License (MIT)
Copyright (c) 2019 Yoshua Wuyts
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

13
tokio-macros/README.md Normal file
View File

@ -0,0 +1,13 @@
# Tokio Macros
Procedural macros for use with Tokio
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tokio by you, shall be licensed as MIT, without any additional
terms or conditions.

79
tokio-macros/src/lib.rs Normal file
View File

@ -0,0 +1,79 @@
#![cfg(feature = "async-await-preview")]
extern crate proc_macro;
use proc_macro::TokenStream;
use quote::{quote, quote_spanned};
use syn::spanned::Spanned;
/// Define the program entry point
///
/// # Examples
///
/// ```
/// #[tokio::main]
/// async fn main() {
/// println!("Hello world");
/// }
#[proc_macro_attribute]
pub fn main(_attr: TokenStream, item: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(item as syn::ItemFn);
let ret = &input.decl.output;
let name = &input.ident;
let body = &input.block;
if input.asyncness.is_none() {
let tokens = quote_spanned! { input.span() =>
compile_error!("the async keyword is missing from the function declaration");
};
return TokenStream::from(tokens);
}
let result = quote! {
fn #name() #ret {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on_async(async { #body })
}
};
result.into()
}
/// Define a Tokio aware unit test
///
/// # Examples
///
/// ```
/// #[tokio::test]
/// async fn my_test() {
/// assert!(true);
/// }
/// ```
#[proc_macro_attribute]
pub fn test(_attr: TokenStream, item: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(item as syn::ItemFn);
let ret = &input.decl.output;
let name = &input.ident;
let body = &input.block;
if input.asyncness.is_none() {
let tokens = quote_spanned! { input.span() =>
compile_error!("the async keyword is missing from the function declaration");
};
return TokenStream::from(tokens);
}
let result = quote! {
#[test]
fn #name() #ret {
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
rt.block_on_async(async { #body })
}
};
result.into()
}

View File

@ -58,7 +58,8 @@ uds = ["tokio-uds"]
# This feature comes with no promise of stability. Things will
# break with each patch release. Use at your own risk.
async-await-preview = [
"tokio-async-await/async-await-preview",
"tokio-futures/async-await-preview",
"tokio-macros/async-await-preview",
]
[dependencies]
@ -73,6 +74,7 @@ tokio-current-thread = { version = "0.1.6", optional = true }
tokio-fs = { version = "0.1.6", optional = true }
tokio-io = { version = "0.1.6", optional = true }
tokio-executor = { version = "0.1.7", optional = true }
tokio-macros = { version = "0.1.0", optional = true, path = "../tokio-macros" }
tokio-reactor = { version = "0.1.1", optional = true }
tokio-sync = { version = "0.1.5", optional = true }
tokio-threadpool = { version = "0.1.13", optional = true }
@ -85,7 +87,7 @@ tokio-trace-core = { version = "0.1", optional = true }
mio = { version = "0.6.14", optional = true }
# Needed for async/await preview support
tokio-async-await = { version = "0.1.0", optional = true }
tokio-futures = { version = "0.1.0", optional = true, path = "../tokio-futures" }
[target.'cfg(unix)'.dependencies]
tokio-uds = { version = "0.2.1", optional = true }

View File

@ -1,48 +1,17 @@
use std::future::Future as StdFuture;
use std::pin::Pin;
use std::task::{Context, Poll};
fn map_ok<T: StdFuture>(future: T) -> impl StdFuture<Output = Result<(), ()>> {
MapOk(future)
}
struct MapOk<T>(T);
impl<T> MapOk<T> {
fn future<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut T> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) }
}
}
impl<T: StdFuture> StdFuture for MapOk<T> {
type Output = Result<(), ()>;
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
match self.future().poll(context) {
Poll::Ready(_) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
}
}
use tokio_futures::compat;
/// Like `tokio::run`, but takes an `async` block
pub fn run_async<F>(future: F)
where
F: StdFuture<Output = ()> + Send + 'static,
F: std::future::Future<Output = ()> + Send + 'static,
{
use tokio_async_await::compat::backward;
let future = backward::Compat::new(map_ok(future));
::run(future);
::run(compat::infallible_into_01(future));
}
/// Like `tokio::spawn`, but takes an `async` block
pub fn spawn_async<F>(future: F)
where
F: StdFuture<Output = ()> + Send + 'static,
F: std::future::Future<Output = ()> + Send + 'static,
{
use tokio_async_await::compat::backward;
let future = backward::Compat::new(map_ok(future));
::spawn(future);
::spawn(compat::infallible_into_01(future));
}

View File

@ -1,9 +1,6 @@
#![doc(html_root_url = "https://docs.rs/tokio/0.1.19")]
#![deny(missing_docs, warnings, missing_debug_implementations)]
#![cfg_attr(
feature = "async-await-preview",
feature(async_await, await_macro, futures_api,)
)]
#![cfg_attr(feature = "async-await-preview", feature(async_await, await_macro))]
//! A runtime for writing reliable, asynchronous, and slim applications.
//!
@ -108,9 +105,6 @@ extern crate tokio_timer;
#[cfg(feature = "udp")]
extern crate tokio_udp;
#[cfg(feature = "async-await-preview")]
extern crate tokio_async_await;
#[cfg(all(unix, feature = "uds"))]
extern crate tokio_uds;
@ -145,6 +139,12 @@ if_runtime! {
// ===== Experimental async/await support =====
#[cfg(feature = "async-await-preview")]
extern crate tokio_futures;
#[cfg(feature = "async-await-preview")]
extern crate tokio_macros;
#[cfg(feature = "async-await-preview")]
mod async_await;
@ -152,4 +152,7 @@ mod async_await;
pub use async_await::{run_async, spawn_async};
#[cfg(feature = "async-await-preview")]
pub use tokio_async_await::await;
pub use tokio_futures::await;
#[cfg(feature = "async-await-preview")]
pub use tokio_macros::{main, test};

View File

@ -21,7 +21,7 @@ pub use futures::{future, stream, task, Async, AsyncSink, Future, IntoFuture, Po
#[cfg(feature = "async-await-preview")]
#[doc(inline)]
pub use tokio_async_await::{
pub use tokio_futures::{
io::{AsyncReadExt, AsyncWriteExt},
sink::SinkExt,
stream::StreamExt as StreamAsyncExt,

View File

@ -0,0 +1,18 @@
use super::Runtime;
use std::future::Future;
impl Runtime {
/// Like `block_on`, but takes an `async` block
pub fn block_on_async<F>(&mut self, future: F) -> F::Output
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
use tokio_futures::compat;
match self.block_on(compat::infallible_into_01(future)) {
Ok(v) => v,
Err(_) => unreachable!(),
}
}
}

View File

@ -2,6 +2,9 @@ mod builder;
mod shutdown;
mod task_executor;
#[cfg(feature = "async-await-preview")]
mod async_await;
pub use self::builder::Builder;
pub use self::shutdown::Shutdown;
pub use self::task_executor::TaskExecutor;