From 2f690d30bc061a8595744e34fde371882b676a86 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 26 Sep 2018 10:10:47 -0700 Subject: [PATCH] async-await: track nightly changes (#661) The `tokio-async-await` crate is no longer a facade. Instead, the `tokio` crate provides a feature flag to enable async/await support. --- Cargo.toml | 11 ++ README.md | 3 + src/async_await.rs | 26 ++++ src/lib.rs | 32 +++++ tokio-async-await/Cargo.toml | 23 +--- tokio-async-await/examples/.cargo/config | 2 + tokio-async-await/examples/Cargo.toml | 35 ++++-- tokio-async-await/examples/src/chat.rs | 4 +- .../src/async_await/compat/backward.rs | 112 ------------------ .../src/async_await/compat/forward.rs | 69 ----------- tokio-async-await/src/async_await/mod.rs | 8 -- .../src/{async_await => }/await.rs | 6 +- tokio-async-await/src/compat/backward.rs | 89 ++++++++++++++ tokio-async-await/src/compat/forward.rs | 68 +++++++++++ .../src/{async_await => }/compat/mod.rs | 0 .../src/{async_await => }/io/flush.rs | 12 +- .../src/{async_await => }/io/mod.rs | 0 .../src/{async_await => }/io/read.rs | 10 +- .../src/{async_await => }/io/read_exact.rs | 12 +- .../src/{async_await => }/io/write.rs | 10 +- .../src/{async_await => }/io/write_all.rs | 12 +- tokio-async-await/src/lib.rs | 102 +++++++++------- .../src/{async_await => }/sink/mod.rs | 0 .../src/{async_await => }/sink/send.rs | 21 ++-- .../src/{async_await => }/stream/mod.rs | 0 .../src/{async_await => }/stream/next.rs | 13 +- tokio-channel/Cargo.toml | 5 - tokio-channel/src/async_await.rs | 10 -- tokio-channel/src/lib.rs | 8 -- tokio-reactor/Cargo.toml | 5 - tokio-reactor/src/async_await.rs | 5 - tokio-reactor/src/lib.rs | 6 - 32 files changed, 370 insertions(+), 349 deletions(-) create mode 100644 src/async_await.rs create mode 100644 tokio-async-await/examples/.cargo/config delete mode 100644 tokio-async-await/src/async_await/compat/backward.rs delete mode 100644 tokio-async-await/src/async_await/compat/forward.rs delete mode 100644 tokio-async-await/src/async_await/mod.rs rename tokio-async-await/src/{async_await => }/await.rs (52%) create mode 100644 tokio-async-await/src/compat/backward.rs create mode 100644 tokio-async-await/src/compat/forward.rs rename tokio-async-await/src/{async_await => }/compat/mod.rs (100%) rename tokio-async-await/src/{async_await => }/io/flush.rs (65%) rename tokio-async-await/src/{async_await => }/io/mod.rs (100%) rename tokio-async-await/src/{async_await => }/io/read.rs (73%) rename tokio-async-await/src/{async_await => }/io/read_exact.rs (81%) rename tokio-async-await/src/{async_await => }/io/write.rs (73%) rename tokio-async-await/src/{async_await => }/io/write_all.rs (80%) rename tokio-async-await/src/{async_await => }/sink/mod.rs (100%) rename tokio-async-await/src/{async_await => }/sink/send.rs (68%) rename tokio-async-await/src/{async_await => }/stream/mod.rs (100%) rename tokio-async-await/src/{async_await => }/stream/next.rs (59%) delete mode 100644 tokio-channel/src/async_await.rs delete mode 100644 tokio-reactor/src/async_await.rs diff --git a/Cargo.toml b/Cargo.toml index 22049861f..d380efec8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures"] members = [ "./", + "tokio-async-await", "tokio-channel", "tokio-codec", "tokio-current-thread", @@ -40,6 +41,13 @@ members = [ "tokio-uds", ] +[features] +# This feature comes with no promise of stability. Things will +# break with each patch release. Use at your own risk. +async-await-preview = [ + "tokio-async-await/async-await-preview", +] + [badges] travis-ci = { repository = "tokio-rs/tokio" } appveyor = { repository = "carllerche/tokio", id = "s83yxhy9qeb58va7" } @@ -65,6 +73,9 @@ mio = "0.6.14" [target.'cfg(unix)'.dependencies] tokio-uds = { version = "0.2.1", path = "tokio-uds" } +# Needed for async/await preview support +tokio-async-await = { version = "0.1.0", path = "tokio-async-await", optional = true } + [dev-dependencies] env_logger = { version = "0.5", default-features = false } flate2 = { version = "1", features = ["tokio"] } diff --git a/README.md b/README.md index 9438aec4a..32ec4968e 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,8 @@ have greater guarantees of stability. The crates included as part of Tokio are: +* [`tokio-async-await`]: Experimental `async` / `await` support. + * [`tokio-codec`]: Utilities for encoding and decoding protocol frames. * [`tokio-current-thread`]: Schedule the execution of futures on the current @@ -155,6 +157,7 @@ The crates included as part of Tokio are: * [`tokio-uds`]: Unix Domain Socket bindings for use with `tokio-io` and `tokio-reactor`. +[`tokio-async-await`]: tokio-async-await [`tokio-codec`]: tokio-codec [`tokio-current-thread`]: tokio-current-thread [`tokio-executor`]: tokio-executor diff --git a/src/async_await.rs b/src/async_await.rs new file mode 100644 index 000000000..88903643f --- /dev/null +++ b/src/async_await.rs @@ -0,0 +1,26 @@ +use std::future::{Future as StdFuture}; + +async fn map_ok(future: T) -> Result<(), ()> { + let _ = await!(future); + Ok(()) +} + +/// Like `tokio::run`, but takes an `async` block +pub fn run_async(future: F) +where F: StdFuture + Send + 'static, +{ + use tokio_async_await::compat::backward; + let future = backward::Compat::new(map_ok(future)); + + ::run(future); +} + +/// Like `tokio::spawn`, but takes an `async` block +pub fn spawn_async(future: F) +where F: StdFuture + Send + 'static, +{ + use tokio_async_await::compat::backward; + let future = backward::Compat::new(map_ok(future)); + + ::spawn(future); +} diff --git a/src/lib.rs b/src/lib.rs index 3d3ab483f..cbe8e0860 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,11 @@ #![doc(html_root_url = "https://docs.rs/tokio/0.1.5")] #![deny(missing_docs, warnings, missing_debug_implementations)] +#![cfg_attr(feature = "async-await-preview", feature( + async_await, + await_macro, + futures_api, + ))] extern crate bytes; #[macro_use] @@ -82,6 +87,9 @@ extern crate tokio_timer; extern crate tokio_tcp; extern crate tokio_udp; +#[cfg(feature = "async-await-preview")] +extern crate tokio_async_await; + #[cfg(unix)] extern crate tokio_uds; @@ -619,4 +627,28 @@ pub mod prelude { Poll, task, }; + + #[cfg(feature = "async-await-preview")] + #[doc(inline)] + pub use tokio_async_await::{ + io::{ + AsyncReadExt, + AsyncWriteExt, + }, + sink::{ + SinkExt, + }, + stream::{ + StreamExt as StreamAsyncExt, + }, + }; } + +#[cfg(feature = "async-await-preview")] +mod async_await; + +#[cfg(feature = "async-await-preview")] +pub use async_await::{run_async, spawn_async}; + +#[cfg(feature = "async-await-preview")] +pub use tokio_async_await::await; diff --git a/tokio-async-await/Cargo.toml b/tokio-async-await/Cargo.toml index 0e3739e0e..cd31b3510 100644 --- a/tokio-async-await/Cargo.toml +++ b/tokio-async-await/Cargo.toml @@ -1,8 +1,5 @@ -cargo-features = ["rename-dependency"] - [package] name = "tokio-async-await" -edition = "2018" # When releasing to crates.io: # - Update html_root_url. @@ -17,25 +14,17 @@ Experimental async/await support for Tokio """ categories = ["asynchronous"] -[workspace] -members = [ - "./", - "examples", -] - -[lib] -name = "tokio" +[features] +# This feature comes with no promise of stability. Things will +# break with each patch release. Use at your own risk. +async-await-preview = ["futures/nightly"] [dependencies] futures = "0.1.23" -tokio_main = { package = "tokio", version = "0.1.8", path = ".." } tokio-io = { version = "0.1.7", path = "../tokio-io" } -tokio-channel = { version = "0.1.0", path = "../tokio-channel", features = ["async-await-preview"] } -tokio-reactor = { version = "0.1.5", path = "../tokio-reactor", features = ["async-await-preview"] } -futures-core-preview = { version = "0.3.0-alpha.6" } -futures-util-preview = { version = "0.3.0-alpha.6" } [dev-dependencies] bytes = "0.4.9" -tokio-codec = { version = "0.1.0", path = "../tokio-codec" } +tokio = { version = "0.1.8", path = ".." } +# tokio-codec = { version = "0.1.0", path = "../tokio-codec" } hyper = "0.12.8" diff --git a/tokio-async-await/examples/.cargo/config b/tokio-async-await/examples/.cargo/config new file mode 100644 index 000000000..9766b8111 --- /dev/null +++ b/tokio-async-await/examples/.cargo/config @@ -0,0 +1,2 @@ +[build] +target-dir = "../../target" diff --git a/tokio-async-await/examples/Cargo.toml b/tokio-async-await/examples/Cargo.toml index 204c91dec..058ecd574 100644 --- a/tokio-async-await/examples/Cargo.toml +++ b/tokio-async-await/examples/Cargo.toml @@ -1,28 +1,49 @@ -cargo-features = ["edition"] - [package] name = "examples" edition = "2018" - -# When releasing to crates.io: -# - Update html_root_url. -version = "0.1.1" +version = "0.1.0" authors = ["Carl Lerche "] license = "MIT" +# Break out of the parent workspace +[workspace] + +[[bin]] +name = "chat" +path = "src/chat.rs" + +[[bin]] +name = "echo_client" +path = "src/echo_client.rs" + +[[bin]] +name = "echo_server" +path = "src/echo_server.rs" + [[bin]] name = "hyper" path = "src/hyper.rs" [dependencies] -tokio-async-await = { version = "0.1.0", path = "../" } +tokio = { version = "0.1.0", path = "../..", features = ["async-await-preview"] } +futures = "0.1.23" bytes = "0.4.9" hyper = "0.12.8" +# Avoid using crates.io for Tokio dependencies [patch.crates-io] tokio = { path = "../.." } +tokio-async-await = { path = "../" } +tokio-codec = { path = "../../tokio-codec" } +tokio-current-thread = { path = "../../tokio-current-thread" } tokio-executor = { path = "../../tokio-executor" } +tokio-fs = { path = "../../tokio-fs" } tokio-io = { path = "../../tokio-io" } tokio-reactor = { path = "../../tokio-reactor" } +tokio-signal = { path = "../../tokio-signal" } tokio-tcp = { path = "../../tokio-tcp" } +tokio-threadpool = { path = "../../tokio-threadpool" } tokio-timer = { path = "../../tokio-timer" } +tokio-tls = { path = "../../tokio-tls" } +tokio-udp = { path = "../../tokio-udp" } +tokio-uds = { path = "../../tokio-uds" } diff --git a/tokio-async-await/examples/src/chat.rs b/tokio-async-await/examples/src/chat.rs index 672e245f6..53de28b33 100644 --- a/tokio-async-await/examples/src/chat.rs +++ b/tokio-async-await/examples/src/chat.rs @@ -2,11 +2,13 @@ #[macro_use] extern crate tokio; +extern crate futures; // v0.1 use tokio::codec::{LinesCodec, Decoder}; use tokio::net::{TcpListener, TcpStream}; use tokio::prelude::*; -use tokio::sync::mpsc; + +use futures::sync::mpsc; use std::collections::HashMap; use std::io; diff --git a/tokio-async-await/src/async_await/compat/backward.rs b/tokio-async-await/src/async_await/compat/backward.rs deleted file mode 100644 index f7625845f..000000000 --- a/tokio-async-await/src/async_await/compat/backward.rs +++ /dev/null @@ -1,112 +0,0 @@ -use futures::{ - Future as Future01, - Poll as Poll01, -}; -use futures_core::{Future as Future03}; - -use std::pin::PinBox; -use std::future::FutureObj; -use std::ptr::NonNull; -use std::task::{ - Context, - Spawn, - UnsafeWake, - LocalWaker, - Poll as Poll03, - Waker, - SpawnObjError, -}; - -/// Convert an 0.3 `Future` to an 0.1 `Future`. -#[derive(Debug)] -pub struct Compat(PinBox); - -impl Compat { - pub fn new(data: T) -> Compat { - Compat(PinBox::new(data)) - } -} - -/// Convert a value into one that can be used with `await!`. -pub trait IntoAwaitable { - type Awaitable; - - fn into_awaitable(self) -> Self::Awaitable; -} - -impl IntoAwaitable for T -where T: Future03, -{ - type Awaitable = Self; - - fn into_awaitable(self) -> Self { - self - } -} - -impl Future01 for Compat -where T: Future03>, -{ - type Item = Item; - type Error = Error; - - fn poll(&mut self) -> Poll01 { - use futures::Async::*; - - let local_waker = noop_local_waker(); - let mut executor = NoopExecutor; - - let mut cx = Context::new(&local_waker, &mut executor); - - let res = self.0.as_pin_mut().poll(&mut cx); - - match res { - Poll03::Ready(Ok(val)) => Ok(Ready(val)), - Poll03::Ready(Err(err)) => Err(err), - Poll03::Pending => Ok(NotReady), - } - } -} - -// ===== NoopWaker ===== - -struct NoopWaker; - -fn noop_local_waker() -> LocalWaker { - let w: NonNull = NonNull::dangling(); - unsafe { LocalWaker::new(w) } -} - -fn noop_waker() -> Waker { - let w: NonNull = NonNull::dangling(); - unsafe { Waker::new(w) } -} - -unsafe impl UnsafeWake for NoopWaker { - unsafe fn clone_raw(&self) -> Waker { - noop_waker() - } - - unsafe fn drop_raw(&self) { - } - - unsafe fn wake(&self) { - panic!("NoopWake cannot wake"); - } -} - -// ===== NoopExecutor ===== - -struct NoopExecutor; - -impl Spawn for NoopExecutor { - fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnObjError> { - use std::task::SpawnErrorKind; - - // NoopExecutor cannot execute - Err(SpawnObjError { - kind: SpawnErrorKind::shutdown(), - future, - }) - } -} diff --git a/tokio-async-await/src/async_await/compat/forward.rs b/tokio-async-await/src/async_await/compat/forward.rs deleted file mode 100644 index fb712b50c..000000000 --- a/tokio-async-await/src/async_await/compat/forward.rs +++ /dev/null @@ -1,69 +0,0 @@ - -use futures::{Future, Async}; -use futures_core::future::Future as Future03; -use futures_core::task::Poll as Poll03; - -use std::marker::Unpin; -use std::pin::PinMut; -use std::task::Context; - -/// Converts an 0.1 `Future` into an 0.3 `Future`. -#[derive(Debug)] -pub struct Compat(T); - -pub(crate) fn convert_poll(poll: Result, E>) -> Poll03> { - use futures::Async::{Ready, NotReady}; - - match poll { - Ok(Ready(val)) => Poll03::Ready(Ok(val)), - Ok(NotReady) => Poll03::Pending, - Err(err) => Poll03::Ready(Err(err)), - } -} - -pub(crate) fn convert_poll_stream( - poll: Result>, E>) -> Poll03>> -{ - use futures::Async::{Ready, NotReady}; - - match poll { - Ok(Ready(Some(val))) => Poll03::Ready(Some(Ok(val))), - Ok(Ready(None)) => Poll03::Ready(None), - Ok(NotReady) => Poll03::Pending, - Err(err) => Poll03::Ready(Some(Err(err))), - } -} - -/// Convert a value into one that can be used with `await!`. -pub trait IntoAwaitable { - type Awaitable; - - /// Convert `self` into a value that can be used with `await!`. - fn into_awaitable(self) -> Self::Awaitable; -} - -impl IntoAwaitable for T { - type Awaitable = Compat; - - fn into_awaitable(self) -> Self::Awaitable { - Compat(self) - } -} - -impl Future03 for Compat -where T: Future + Unpin -{ - type Output = Result; - - fn poll(self: PinMut, _cx: &mut Context) -> Poll03 { - use futures::Async::{Ready, NotReady}; - - // TODO: wire in cx - - match PinMut::get_mut(self).0.poll() { - Ok(Ready(val)) => Poll03::Ready(Ok(val)), - Ok(NotReady) => Poll03::Pending, - Err(e) => Poll03::Ready(Err(e)), - } - } -} diff --git a/tokio-async-await/src/async_await/mod.rs b/tokio-async-await/src/async_await/mod.rs deleted file mode 100644 index 8731fa3dd..000000000 --- a/tokio-async-await/src/async_await/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -//! Utilities for working with `async` / `await`. - -#[macro_use] -mod await; -pub mod compat; -pub mod io; -pub mod sink; -pub mod stream; diff --git a/tokio-async-await/src/async_await/await.rs b/tokio-async-await/src/await.rs similarity index 52% rename from tokio-async-await/src/async_await/await.rs rename to tokio-async-await/src/await.rs index a2d4c09d0..7cc7f8133 100644 --- a/tokio-async-await/src/async_await/await.rs +++ b/tokio-async-await/src/await.rs @@ -3,8 +3,10 @@ macro_rules! await { ($e:expr) => {{ use $crate::std_await; - use $crate::async_await::compat::forward::IntoAwaitable as IntoAwaitableForward; - use $crate::async_await::compat::backward::IntoAwaitable as IntoAwaitableBackward; + #[allow(unused_imports)] + use $crate::compat::forward::IntoAwaitable as IntoAwaitableForward; + #[allow(unused_imports)] + use $crate::compat::backward::IntoAwaitable as IntoAwaitableBackward; #[allow(unused_mut)] let mut e = $e; diff --git a/tokio-async-await/src/compat/backward.rs b/tokio-async-await/src/compat/backward.rs new file mode 100644 index 000000000..b4e725472 --- /dev/null +++ b/tokio-async-await/src/compat/backward.rs @@ -0,0 +1,89 @@ +use futures::{Future, Poll}; + +use std::pin::Pin; +use std::future::{ + Future as StdFuture, +}; +use std::ptr::NonNull; +use std::task::{ + LocalWaker, + Poll as StdPoll, + UnsafeWake, + Waker, +}; + +/// Convert an 0.3 `Future` to an 0.1 `Future`. +#[derive(Debug)] +pub struct Compat(Pin>); + +impl Compat { + /// Create a new `Compat` backed by `future`. + pub fn new(future: T) -> Compat { + Compat(Box::pinned(future)) + } +} + +/// Convert a value into one that can be used with `await!`. +pub trait IntoAwaitable { + type Awaitable; + + fn into_awaitable(self) -> Self::Awaitable; +} + +impl IntoAwaitable for T +where T: StdFuture, +{ + type Awaitable = Self; + + fn into_awaitable(self) -> Self { + self + } +} + +impl Future for Compat +where T: StdFuture>, +{ + type Item = Item; + type Error = Error; + + fn poll(&mut self) -> Poll { + use futures::Async::*; + + let local_waker = noop_local_waker(); + + let res = self.0.as_mut().poll(&local_waker); + + match res { + StdPoll::Ready(Ok(val)) => Ok(Ready(val)), + StdPoll::Ready(Err(err)) => Err(err), + StdPoll::Pending => Ok(NotReady), + } + } +} + +// ===== NoopWaker ===== + +struct NoopWaker; + +fn noop_local_waker() -> LocalWaker { + let w: NonNull = NonNull::dangling(); + unsafe { LocalWaker::new(w) } +} + +fn noop_waker() -> Waker { + let w: NonNull = NonNull::dangling(); + unsafe { Waker::new(w) } +} + +unsafe impl UnsafeWake for NoopWaker { + unsafe fn clone_raw(&self) -> Waker { + noop_waker() + } + + unsafe fn drop_raw(&self) { + } + + unsafe fn wake(&self) { + panic!("NoopWake cannot wake"); + } +} diff --git a/tokio-async-await/src/compat/forward.rs b/tokio-async-await/src/compat/forward.rs new file mode 100644 index 000000000..65b1351b9 --- /dev/null +++ b/tokio-async-await/src/compat/forward.rs @@ -0,0 +1,68 @@ + +use futures::{Future, Async}; + +use std::marker::Unpin; +use std::future::Future as StdFuture; +use std::pin::Pin; +use std::task::{LocalWaker, Poll as StdPoll}; + +/// Converts an 0.1 `Future` into an 0.3 `Future`. +#[derive(Debug)] +pub struct Compat(T); + +pub(crate) fn convert_poll(poll: Result, E>) -> StdPoll> { + use futures::Async::{Ready, NotReady}; + + match poll { + Ok(Ready(val)) => StdPoll::Ready(Ok(val)), + Ok(NotReady) => StdPoll::Pending, + Err(err) => StdPoll::Ready(Err(err)), + } +} + +pub(crate) fn convert_poll_stream( + poll: Result>, E>) -> StdPoll>> +{ + use futures::Async::{Ready, NotReady}; + + match poll { + Ok(Ready(Some(val))) => StdPoll::Ready(Some(Ok(val))), + Ok(Ready(None)) => StdPoll::Ready(None), + Ok(NotReady) => StdPoll::Pending, + Err(err) => StdPoll::Ready(Some(Err(err))), + } +} + +/// Convert a value into one that can be used with `await!`. +pub trait IntoAwaitable { + type Awaitable; + + /// Convert `self` into a value that can be used with `await!`. + fn into_awaitable(self) -> Self::Awaitable; +} + +impl IntoAwaitable for T { + type Awaitable = Compat; + + fn into_awaitable(self) -> Self::Awaitable { + Compat(self) + } +} + +impl StdFuture for Compat +where T: Future + Unpin +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, _lw: &LocalWaker) -> StdPoll { + use futures::Async::{Ready, NotReady}; + + // TODO: wire in cx + + match self.0.poll() { + Ok(Ready(val)) => StdPoll::Ready(Ok(val)), + Ok(NotReady) => StdPoll::Pending, + Err(e) => StdPoll::Ready(Err(e)), + } + } +} diff --git a/tokio-async-await/src/async_await/compat/mod.rs b/tokio-async-await/src/compat/mod.rs similarity index 100% rename from tokio-async-await/src/async_await/compat/mod.rs rename to tokio-async-await/src/compat/mod.rs diff --git a/tokio-async-await/src/async_await/io/flush.rs b/tokio-async-await/src/io/flush.rs similarity index 65% rename from tokio-async-await/src/async_await/io/flush.rs rename to tokio-async-await/src/io/flush.rs index 49813c041..db0dbb05b 100644 --- a/tokio-async-await/src/async_await/io/flush.rs +++ b/tokio-async-await/src/io/flush.rs @@ -1,11 +1,11 @@ use tokio_io::AsyncWrite; -use futures_core::future::Future; -use futures_core::task::{self, Poll}; use std::io; +use std::future::Future; use std::marker::Unpin; -use std::pin::PinMut; +use std::pin::Pin; +use std::task::{LocalWaker, Poll}; /// A future used to fully flush an I/O object. #[derive(Debug)] @@ -13,7 +13,7 @@ pub struct Flush<'a, T: ?Sized + 'a> { writer: &'a mut T, } -// PinMut is never projected to fields +// Pin is never projected to fields impl<'a, T: ?Sized> Unpin for Flush<'a, T> {} impl<'a, T: AsyncWrite + ?Sized> Flush<'a, T> { @@ -25,8 +25,8 @@ impl<'a, T: AsyncWrite + ?Sized> Flush<'a, T> { impl<'a, T: AsyncWrite + ?Sized> Future for Flush<'a, T> { type Output = io::Result<()>; - fn poll(mut self: PinMut, _cx: &mut task::Context) -> Poll { - use crate::async_await::compat::forward::convert_poll; + fn poll(mut self: Pin<&mut Self>, _wx: &LocalWaker) -> Poll { + use crate::compat::forward::convert_poll; convert_poll(self.writer.poll_flush()) } } diff --git a/tokio-async-await/src/async_await/io/mod.rs b/tokio-async-await/src/io/mod.rs similarity index 100% rename from tokio-async-await/src/async_await/io/mod.rs rename to tokio-async-await/src/io/mod.rs diff --git a/tokio-async-await/src/async_await/io/read.rs b/tokio-async-await/src/io/read.rs similarity index 73% rename from tokio-async-await/src/async_await/io/read.rs rename to tokio-async-await/src/io/read.rs index 006f87f39..909a4d60b 100644 --- a/tokio-async-await/src/async_await/io/read.rs +++ b/tokio-async-await/src/io/read.rs @@ -1,11 +1,11 @@ use tokio_io::AsyncRead; -use futures_core::future::Future; -use futures_core::task::{self, Poll}; +use std::future::Future; +use std::task::{self, Poll}; use std::io; use std::marker::Unpin; -use std::pin::PinMut; +use std::pin::Pin; /// A future which can be used to read bytes. #[derive(Debug)] @@ -29,8 +29,8 @@ impl<'a, T: AsyncRead + ?Sized> Read<'a, T> { impl<'a, T: AsyncRead + ?Sized> Future for Read<'a, T> { type Output = io::Result; - fn poll(mut self: PinMut, _cx: &mut task::Context) -> Poll { - use crate::async_await::compat::forward::convert_poll; + fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll { + use crate::compat::forward::convert_poll; let this = &mut *self; convert_poll(this.reader.poll_read(this.buf)) diff --git a/tokio-async-await/src/async_await/io/read_exact.rs b/tokio-async-await/src/io/read_exact.rs similarity index 81% rename from tokio-async-await/src/async_await/io/read_exact.rs rename to tokio-async-await/src/io/read_exact.rs index 9bba54f51..ca9cd4d09 100644 --- a/tokio-async-await/src/async_await/io/read_exact.rs +++ b/tokio-async-await/src/io/read_exact.rs @@ -1,14 +1,12 @@ use tokio_io::AsyncRead; -use futures_core::future::Future; -use futures_core::task::{self, Poll}; -use futures_util::try_ready; +use std::future::Future; +use std::task::{self, Poll}; use std::io; use std::marker::Unpin; use std::mem; - -use core::pin::PinMut; +use std::pin::Pin; /// A future which can be used to read exactly enough bytes to fill a buffer. #[derive(Debug)] @@ -36,8 +34,8 @@ fn eof() -> io::Error { impl<'a, T: AsyncRead + ?Sized> Future for ReadExact<'a, T> { type Output = io::Result<()>; - fn poll(mut self: PinMut, _cx: &mut task::Context) -> Poll { - use crate::async_await::compat::forward::convert_poll; + fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll { + use crate::compat::forward::convert_poll; let this = &mut *self; diff --git a/tokio-async-await/src/async_await/io/write.rs b/tokio-async-await/src/io/write.rs similarity index 73% rename from tokio-async-await/src/async_await/io/write.rs rename to tokio-async-await/src/io/write.rs index 6a933fbb4..4e5dd354d 100644 --- a/tokio-async-await/src/async_await/io/write.rs +++ b/tokio-async-await/src/io/write.rs @@ -1,11 +1,11 @@ use tokio_io::AsyncWrite; -use futures_core::future::Future; -use futures_core::task::{self, Poll}; +use std::future::Future; +use std::task::{self, Poll}; use std::io; use std::marker::Unpin; -use std::pin::PinMut; +use std::pin::Pin; /// A future used to write data. #[derive(Debug)] @@ -29,8 +29,8 @@ impl<'a, T: AsyncWrite + ?Sized> Write<'a, T> { impl<'a, T: AsyncWrite + ?Sized> Future for Write<'a, T> { type Output = io::Result; - fn poll(mut self: PinMut, _cx: &mut task::Context) -> Poll> { - use crate::async_await::compat::forward::convert_poll; + fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll> { + use crate::compat::forward::convert_poll; let this = &mut *self; convert_poll(this.writer.poll_write(this.buf)) diff --git a/tokio-async-await/src/async_await/io/write_all.rs b/tokio-async-await/src/io/write_all.rs similarity index 80% rename from tokio-async-await/src/async_await/io/write_all.rs rename to tokio-async-await/src/io/write_all.rs index ffede24e8..cfc3e7ef0 100644 --- a/tokio-async-await/src/async_await/io/write_all.rs +++ b/tokio-async-await/src/io/write_all.rs @@ -1,14 +1,12 @@ use tokio_io::AsyncWrite; -use futures_core::future::Future; -use futures_core::task::{self, Poll}; -use futures_util::try_ready; +use std::future::Future; +use std::task::{self, Poll}; use std::io; use std::marker::Unpin; use std::mem; - -use core::pin::PinMut; +use std::pin::Pin; /// A future used to write the entire contents of a buffer. #[derive(Debug)] @@ -36,8 +34,8 @@ fn zero_write() -> io::Error { impl<'a, T: AsyncWrite + ?Sized> Future for WriteAll<'a, T> { type Output = io::Result<()>; - fn poll(mut self: PinMut, _cx: &mut task::Context) -> Poll> { - use crate::async_await::compat::forward::convert_poll; + fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll> { + use crate::compat::forward::convert_poll; let this = &mut *self; diff --git a/tokio-async-await/src/lib.rs b/tokio-async-await/src/lib.rs index c4364fcfe..7850059bc 100644 --- a/tokio-async-await/src/lib.rs +++ b/tokio-async-await/src/lib.rs @@ -1,4 +1,12 @@ -#![feature(futures_api, await_macro, pin, arbitrary_self_types)] +#![cfg(feature = "async-await-preview")] +#![feature( + rust_2018_preview, + arbitrary_self_types, + async_await, + await_macro, + futures_api, + pin, + )] #![doc(html_root_url = "https://docs.rs/tokio-async-await/0.1.3")] #![deny(missing_docs, missing_debug_implementations)] @@ -7,39 +15,31 @@ //! A preview of Tokio w/ `async` / `await` support. extern crate futures; -extern crate futures_core; -extern crate futures_util; +extern crate tokio_io; -// Re-export all of Tokio -pub use tokio_main::{ - // Modules - clock, - codec, - executor, - fs, - io, - net, - reactor, - runtime, - timer, - util, - - // Functions - run, - spawn, -}; - -pub mod sync { - //! Asynchronous aware synchronization - - pub use tokio_channel::{ - mpsc, - oneshot, - }; +/// Extracts the successful type of a `Poll>`. +/// +/// This macro bakes in propagation of `Pending` and `Err` signals by returning early. +macro_rules! try_ready { + ($x:expr) => { + match $x { + std::task::Poll::Ready(Ok(x)) => x, + std::task::Poll::Ready(Err(e)) => + return std::task::Poll::Ready(Err(e.into())), + std::task::Poll::Pending => + return std::task::Poll::Pending, + } + } } -pub mod async_await; +#[macro_use] +mod await; +pub mod compat; +pub mod io; +pub mod sink; +pub mod stream; +/* pub mod prelude { //! A "prelude" for users of the `tokio` crate. //! @@ -69,33 +69,47 @@ pub mod prelude { }, }; } +*/ -use futures_core::{ - Future as Future03, -}; - -// Rename the `await` macro in `std` +// Rename the `await` macro in `std`. This is used by the redefined +// `await` macro in this crate. #[doc(hidden)] pub use std::await as std_await; +/* +use std::future::{Future as StdFuture}; + +fn run>(t: T) { + drop(t); +} + +async fn map_ok(future: T) -> Result<(), ()> { + let _ = await!(future); + Ok(()) +} + /// Like `tokio::run`, but takes an `async` block pub fn run_async(future: F) -where F: Future03 + Send + 'static, +where F: StdFuture + Send + 'static, { - use futures_util::future::FutureExt; - use crate::async_await::compat::backward; + use async_await::compat::backward; + let future = backward::Compat::new(map_ok(future)); - let future = future.map(|_| Ok(())); - run(backward::Compat::new(future)) + run(future); + unimplemented!(); } +*/ +/* /// Like `tokio::spawn`, but takes an `async` block pub fn spawn_async(future: F) -where F: Future03 + Send + 'static, +where F: StdFuture + Send + 'static, { - use futures_util::future::FutureExt; use crate::async_await::compat::backward; - let future = future.map(|_| Ok(())); - spawn(backward::Compat::new(future)); + spawn(backward::Compat::new(async || { + let _ = await!(future); + Ok(()) + })); } +*/ diff --git a/tokio-async-await/src/async_await/sink/mod.rs b/tokio-async-await/src/sink/mod.rs similarity index 100% rename from tokio-async-await/src/async_await/sink/mod.rs rename to tokio-async-await/src/sink/mod.rs diff --git a/tokio-async-await/src/async_await/sink/send.rs b/tokio-async-await/src/sink/send.rs similarity index 68% rename from tokio-async-await/src/async_await/sink/send.rs rename to tokio-async-await/src/sink/send.rs index f283d5d81..f643ffc6c 100644 --- a/tokio-async-await/src/async_await/sink/send.rs +++ b/tokio-async-await/src/sink/send.rs @@ -1,10 +1,10 @@ use futures::Sink; -use futures_core::future::Future; -use futures_core::task::{self, Poll}; +use std::future::Future; +use std::task::{self, Poll}; use std::marker::Unpin; -use std::pin::PinMut; +use std::pin::Pin; /// Future for the `SinkExt::send_async` combinator, which sends a value to a /// sink and then waits until the sink has fully flushed. @@ -28,17 +28,12 @@ impl<'a, T: Sink + Unpin + ?Sized> Send<'a, T> { impl Future for Send<'_, T> { type Output = Result<(), T::SinkError>; - fn poll(mut self: PinMut, _cx: &mut task::Context) -> Poll { - use crate::async_await::compat::forward::convert_poll; + fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll { + use crate::compat::forward::convert_poll; use futures::AsyncSink::{Ready, NotReady}; - use futures_util::try_ready; - // use crate::compat::forward::convert_poll; - - let this = &mut *self; - - if let Some(item) = this.item.take() { - match this.sink.start_send(item) { + if let Some(item) = self.item.take() { + match self.sink.start_send(item) { Ok(Ready) => {} Ok(NotReady(val)) => { self.item = Some(val); @@ -52,7 +47,7 @@ impl Future for Send<'_, T> { // we're done sending the item, but want to block on flushing the // sink - try_ready!(convert_poll(this.sink.poll_complete())); + try_ready!(convert_poll(self.sink.poll_complete())); Poll::Ready(Ok(())) } diff --git a/tokio-async-await/src/async_await/stream/mod.rs b/tokio-async-await/src/stream/mod.rs similarity index 100% rename from tokio-async-await/src/async_await/stream/mod.rs rename to tokio-async-await/src/stream/mod.rs diff --git a/tokio-async-await/src/async_await/stream/next.rs b/tokio-async-await/src/stream/next.rs similarity index 59% rename from tokio-async-await/src/async_await/stream/next.rs rename to tokio-async-await/src/stream/next.rs index 8fdf86e6a..1ca245372 100644 --- a/tokio-async-await/src/async_await/stream/next.rs +++ b/tokio-async-await/src/stream/next.rs @@ -1,9 +1,9 @@ use futures::Stream; -use futures_core::future::Future; -use futures_core::task::{self, Poll}; +use std::future::Future; use std::marker::Unpin; -use std::pin::PinMut; +use std::pin::Pin; +use std::task::{LocalWaker, Poll}; /// A future of the next element of a stream. #[derive(Debug)] @@ -22,10 +22,9 @@ impl<'a, T: Stream + Unpin> Next<'a, T> { impl<'a, T: Stream + Unpin> Future for Next<'a, T> { type Output = Option>; - fn poll(self: PinMut, _cx: &mut task::Context) -> Poll { - use crate::async_await::compat::forward::convert_poll_stream; + fn poll(mut self: Pin<&mut Self>, _lw: &LocalWaker) -> Poll { + use crate::compat::forward::convert_poll_stream; - convert_poll_stream( - PinMut::get_mut(self).stream.poll()) + convert_poll_stream(self.stream.poll()) } } diff --git a/tokio-channel/Cargo.toml b/tokio-channel/Cargo.toml index d38ac3753..d596f9ea6 100644 --- a/tokio-channel/Cargo.toml +++ b/tokio-channel/Cargo.toml @@ -16,10 +16,5 @@ Channels for asynchronous communication using Tokio. """ categories = ["asynchronous"] -[features] -# This feature comes with no promise of stability. Things will break with each -# patch release. Use at your own risk. -async-await-preview = [] - [dependencies] futures = "0.1.23" diff --git a/tokio-channel/src/async_await.rs b/tokio-channel/src/async_await.rs deleted file mode 100644 index ff501d683..000000000 --- a/tokio-channel/src/async_await.rs +++ /dev/null @@ -1,10 +0,0 @@ -use {oneshot, mpsc}; - -use std::marker::Unpin; - -impl Unpin for oneshot::Sender {} -impl Unpin for oneshot::Receiver {} - -impl Unpin for mpsc::Sender {} -impl Unpin for mpsc::UnboundedSender {} -impl Unpin for mpsc::Receiver {} diff --git a/tokio-channel/src/lib.rs b/tokio-channel/src/lib.rs index ae385530c..9b1b7b52a 100644 --- a/tokio-channel/src/lib.rs +++ b/tokio-channel/src/lib.rs @@ -1,8 +1,5 @@ #![doc(html_root_url = "https://docs.rs/tokio-channel/0.1.0")] #![deny(missing_docs, warnings, missing_debug_implementations)] -#![cfg_attr(feature = "async-await-preview", feature( - pin, - ))] //! Asynchronous channels. //! @@ -15,8 +12,3 @@ pub mod mpsc; pub mod oneshot; mod lock; - -// ===== EXPERIMENTAL async / await support ===== - -#[cfg(feature = "async-await-preview")] -mod async_await; diff --git a/tokio-reactor/Cargo.toml b/tokio-reactor/Cargo.toml index a7117ca77..d041968a3 100644 --- a/tokio-reactor/Cargo.toml +++ b/tokio-reactor/Cargo.toml @@ -18,11 +18,6 @@ Event loop that drives Tokio I/O resources. """ categories = ["asynchronous", "network-programming"] -[features] -# This feature comes with no promise of stability. Things will break with each -# patch release. Use at your own risk. -async-await-preview = [] - [dependencies] crossbeam-utils = "0.5.0" futures = "0.1.19" diff --git a/tokio-reactor/src/async_await.rs b/tokio-reactor/src/async_await.rs deleted file mode 100644 index 698f285ce..000000000 --- a/tokio-reactor/src/async_await.rs +++ /dev/null @@ -1,5 +0,0 @@ -use Registration; - -use std::marker::Unpin; - -impl Unpin for Registration {} diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs index 9a7f112da..dc2c0c262 100644 --- a/tokio-reactor/src/lib.rs +++ b/tokio-reactor/src/lib.rs @@ -1,6 +1,5 @@ #![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.5")] #![deny(missing_docs, warnings, missing_debug_implementations)] -#![cfg_attr(feature = "async-await-preview", feature(pin))] //! Event loop that drives Tokio I/O resources. //! @@ -757,8 +756,3 @@ impl Error for SetFallbackError { "attempted to set fallback reactor while already configured" } } - -// ===== EXPERIMENTAL async / await support ===== - -#[cfg(feature = "async-await-preview")] -mod async_await;