async-await: track nightly changes (#661)

The `tokio-async-await` crate is no longer a facade. Instead, the `tokio` crate
provides a feature flag to enable async/await support.
This commit is contained in:
Carl Lerche 2018-09-26 10:10:47 -07:00 committed by GitHub
parent 331a88cee6
commit 2f690d30bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 370 additions and 349 deletions

View File

@ -24,6 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures"]
members = [
"./",
"tokio-async-await",
"tokio-channel",
"tokio-codec",
"tokio-current-thread",
@ -40,6 +41,13 @@ members = [
"tokio-uds",
]
[features]
# This feature comes with no promise of stability. Things will
# break with each patch release. Use at your own risk.
async-await-preview = [
"tokio-async-await/async-await-preview",
]
[badges]
travis-ci = { repository = "tokio-rs/tokio" }
appveyor = { repository = "carllerche/tokio", id = "s83yxhy9qeb58va7" }
@ -65,6 +73,9 @@ mio = "0.6.14"
[target.'cfg(unix)'.dependencies]
tokio-uds = { version = "0.2.1", path = "tokio-uds" }
# Needed for async/await preview support
tokio-async-await = { version = "0.1.0", path = "tokio-async-await", optional = true }
[dev-dependencies]
env_logger = { version = "0.5", default-features = false }
flate2 = { version = "1", features = ["tokio"] }

View File

@ -129,6 +129,8 @@ have greater guarantees of stability.
The crates included as part of Tokio are:
* [`tokio-async-await`]: Experimental `async` / `await` support.
* [`tokio-codec`]: Utilities for encoding and decoding protocol frames.
* [`tokio-current-thread`]: Schedule the execution of futures on the current
@ -155,6 +157,7 @@ The crates included as part of Tokio are:
* [`tokio-uds`]: Unix Domain Socket bindings for use with `tokio-io` and
`tokio-reactor`.
[`tokio-async-await`]: tokio-async-await
[`tokio-codec`]: tokio-codec
[`tokio-current-thread`]: tokio-current-thread
[`tokio-executor`]: tokio-executor

26
src/async_await.rs Normal file
View File

@ -0,0 +1,26 @@
use std::future::{Future as StdFuture};
async fn map_ok<T: StdFuture>(future: T) -> Result<(), ()> {
let _ = await!(future);
Ok(())
}
/// Like `tokio::run`, but takes an `async` block
pub fn run_async<F>(future: F)
where F: StdFuture<Output = ()> + Send + 'static,
{
use tokio_async_await::compat::backward;
let future = backward::Compat::new(map_ok(future));
::run(future);
}
/// Like `tokio::spawn`, but takes an `async` block
pub fn spawn_async<F>(future: F)
where F: StdFuture<Output = ()> + Send + 'static,
{
use tokio_async_await::compat::backward;
let future = backward::Compat::new(map_ok(future));
::spawn(future);
}

View File

@ -66,6 +66,11 @@
#![doc(html_root_url = "https://docs.rs/tokio/0.1.5")]
#![deny(missing_docs, warnings, missing_debug_implementations)]
#![cfg_attr(feature = "async-await-preview", feature(
async_await,
await_macro,
futures_api,
))]
extern crate bytes;
#[macro_use]
@ -82,6 +87,9 @@ extern crate tokio_timer;
extern crate tokio_tcp;
extern crate tokio_udp;
#[cfg(feature = "async-await-preview")]
extern crate tokio_async_await;
#[cfg(unix)]
extern crate tokio_uds;
@ -619,4 +627,28 @@ pub mod prelude {
Poll,
task,
};
#[cfg(feature = "async-await-preview")]
#[doc(inline)]
pub use tokio_async_await::{
io::{
AsyncReadExt,
AsyncWriteExt,
},
sink::{
SinkExt,
},
stream::{
StreamExt as StreamAsyncExt,
},
};
}
#[cfg(feature = "async-await-preview")]
mod async_await;
#[cfg(feature = "async-await-preview")]
pub use async_await::{run_async, spawn_async};
#[cfg(feature = "async-await-preview")]
pub use tokio_async_await::await;

View File

@ -1,8 +1,5 @@
cargo-features = ["rename-dependency"]
[package]
name = "tokio-async-await"
edition = "2018"
# When releasing to crates.io:
# - Update html_root_url.
@ -17,25 +14,17 @@ Experimental async/await support for Tokio
"""
categories = ["asynchronous"]
[workspace]
members = [
"./",
"examples",
]
[lib]
name = "tokio"
[features]
# This feature comes with no promise of stability. Things will
# break with each patch release. Use at your own risk.
async-await-preview = ["futures/nightly"]
[dependencies]
futures = "0.1.23"
tokio_main = { package = "tokio", version = "0.1.8", path = ".." }
tokio-io = { version = "0.1.7", path = "../tokio-io" }
tokio-channel = { version = "0.1.0", path = "../tokio-channel", features = ["async-await-preview"] }
tokio-reactor = { version = "0.1.5", path = "../tokio-reactor", features = ["async-await-preview"] }
futures-core-preview = { version = "0.3.0-alpha.6" }
futures-util-preview = { version = "0.3.0-alpha.6" }
[dev-dependencies]
bytes = "0.4.9"
tokio-codec = { version = "0.1.0", path = "../tokio-codec" }
tokio = { version = "0.1.8", path = ".." }
# tokio-codec = { version = "0.1.0", path = "../tokio-codec" }
hyper = "0.12.8"

View File

@ -0,0 +1,2 @@
[build]
target-dir = "../../target"

View File

@ -1,28 +1,49 @@
cargo-features = ["edition"]
[package]
name = "examples"
edition = "2018"
# When releasing to crates.io:
# - Update html_root_url.
version = "0.1.1"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
# Break out of the parent workspace
[workspace]
[[bin]]
name = "chat"
path = "src/chat.rs"
[[bin]]
name = "echo_client"
path = "src/echo_client.rs"
[[bin]]
name = "echo_server"
path = "src/echo_server.rs"
[[bin]]
name = "hyper"
path = "src/hyper.rs"
[dependencies]
tokio-async-await = { version = "0.1.0", path = "../" }
tokio = { version = "0.1.0", path = "../..", features = ["async-await-preview"] }
futures = "0.1.23"
bytes = "0.4.9"
hyper = "0.12.8"
# Avoid using crates.io for Tokio dependencies
[patch.crates-io]
tokio = { path = "../.." }
tokio-async-await = { path = "../" }
tokio-codec = { path = "../../tokio-codec" }
tokio-current-thread = { path = "../../tokio-current-thread" }
tokio-executor = { path = "../../tokio-executor" }
tokio-fs = { path = "../../tokio-fs" }
tokio-io = { path = "../../tokio-io" }
tokio-reactor = { path = "../../tokio-reactor" }
tokio-signal = { path = "../../tokio-signal" }
tokio-tcp = { path = "../../tokio-tcp" }
tokio-threadpool = { path = "../../tokio-threadpool" }
tokio-timer = { path = "../../tokio-timer" }
tokio-tls = { path = "../../tokio-tls" }
tokio-udp = { path = "../../tokio-udp" }
tokio-uds = { path = "../../tokio-uds" }

View File

@ -2,11 +2,13 @@
#[macro_use]
extern crate tokio;
extern crate futures; // v0.1
use tokio::codec::{LinesCodec, Decoder};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use tokio::sync::mpsc;
use futures::sync::mpsc;
use std::collections::HashMap;
use std::io;

View File

@ -1,112 +0,0 @@
use futures::{
Future as Future01,
Poll as Poll01,
};
use futures_core::{Future as Future03};
use std::pin::PinBox;
use std::future::FutureObj;
use std::ptr::NonNull;
use std::task::{
Context,
Spawn,
UnsafeWake,
LocalWaker,
Poll as Poll03,
Waker,
SpawnObjError,
};
/// Convert an 0.3 `Future` to an 0.1 `Future`.
#[derive(Debug)]
pub struct Compat<T>(PinBox<T>);
impl<T> Compat<T> {
pub fn new(data: T) -> Compat<T> {
Compat(PinBox::new(data))
}
}
/// Convert a value into one that can be used with `await!`.
pub trait IntoAwaitable {
type Awaitable;
fn into_awaitable(self) -> Self::Awaitable;
}
impl<T> IntoAwaitable for T
where T: Future03,
{
type Awaitable = Self;
fn into_awaitable(self) -> Self {
self
}
}
impl<T, Item, Error> Future01 for Compat<T>
where T: Future03<Output = Result<Item, Error>>,
{
type Item = Item;
type Error = Error;
fn poll(&mut self) -> Poll01<Item, Error> {
use futures::Async::*;
let local_waker = noop_local_waker();
let mut executor = NoopExecutor;
let mut cx = Context::new(&local_waker, &mut executor);
let res = self.0.as_pin_mut().poll(&mut cx);
match res {
Poll03::Ready(Ok(val)) => Ok(Ready(val)),
Poll03::Ready(Err(err)) => Err(err),
Poll03::Pending => Ok(NotReady),
}
}
}
// ===== NoopWaker =====
struct NoopWaker;
fn noop_local_waker() -> LocalWaker {
let w: NonNull<NoopWaker> = NonNull::dangling();
unsafe { LocalWaker::new(w) }
}
fn noop_waker() -> Waker {
let w: NonNull<NoopWaker> = NonNull::dangling();
unsafe { Waker::new(w) }
}
unsafe impl UnsafeWake for NoopWaker {
unsafe fn clone_raw(&self) -> Waker {
noop_waker()
}
unsafe fn drop_raw(&self) {
}
unsafe fn wake(&self) {
panic!("NoopWake cannot wake");
}
}
// ===== NoopExecutor =====
struct NoopExecutor;
impl Spawn for NoopExecutor {
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnObjError> {
use std::task::SpawnErrorKind;
// NoopExecutor cannot execute
Err(SpawnObjError {
kind: SpawnErrorKind::shutdown(),
future,
})
}
}

View File

@ -1,69 +0,0 @@
use futures::{Future, Async};
use futures_core::future::Future as Future03;
use futures_core::task::Poll as Poll03;
use std::marker::Unpin;
use std::pin::PinMut;
use std::task::Context;
/// Converts an 0.1 `Future` into an 0.3 `Future`.
#[derive(Debug)]
pub struct Compat<T>(T);
pub(crate) fn convert_poll<T, E>(poll: Result<Async<T>, E>) -> Poll03<Result<T, E>> {
use futures::Async::{Ready, NotReady};
match poll {
Ok(Ready(val)) => Poll03::Ready(Ok(val)),
Ok(NotReady) => Poll03::Pending,
Err(err) => Poll03::Ready(Err(err)),
}
}
pub(crate) fn convert_poll_stream<T, E>(
poll: Result<Async<Option<T>>, E>) -> Poll03<Option<Result<T, E>>>
{
use futures::Async::{Ready, NotReady};
match poll {
Ok(Ready(Some(val))) => Poll03::Ready(Some(Ok(val))),
Ok(Ready(None)) => Poll03::Ready(None),
Ok(NotReady) => Poll03::Pending,
Err(err) => Poll03::Ready(Some(Err(err))),
}
}
/// Convert a value into one that can be used with `await!`.
pub trait IntoAwaitable {
type Awaitable;
/// Convert `self` into a value that can be used with `await!`.
fn into_awaitable(self) -> Self::Awaitable;
}
impl<T: Future + Unpin> IntoAwaitable for T {
type Awaitable = Compat<T>;
fn into_awaitable(self) -> Self::Awaitable {
Compat(self)
}
}
impl<T> Future03 for Compat<T>
where T: Future + Unpin
{
type Output = Result<T::Item, T::Error>;
fn poll(self: PinMut<Self>, _cx: &mut Context) -> Poll03<Self::Output> {
use futures::Async::{Ready, NotReady};
// TODO: wire in cx
match PinMut::get_mut(self).0.poll() {
Ok(Ready(val)) => Poll03::Ready(Ok(val)),
Ok(NotReady) => Poll03::Pending,
Err(e) => Poll03::Ready(Err(e)),
}
}
}

View File

@ -1,8 +0,0 @@
//! Utilities for working with `async` / `await`.
#[macro_use]
mod await;
pub mod compat;
pub mod io;
pub mod sink;
pub mod stream;

View File

@ -3,8 +3,10 @@
macro_rules! await {
($e:expr) => {{
use $crate::std_await;
use $crate::async_await::compat::forward::IntoAwaitable as IntoAwaitableForward;
use $crate::async_await::compat::backward::IntoAwaitable as IntoAwaitableBackward;
#[allow(unused_imports)]
use $crate::compat::forward::IntoAwaitable as IntoAwaitableForward;
#[allow(unused_imports)]
use $crate::compat::backward::IntoAwaitable as IntoAwaitableBackward;
#[allow(unused_mut)]
let mut e = $e;

View File

@ -0,0 +1,89 @@
use futures::{Future, Poll};
use std::pin::Pin;
use std::future::{
Future as StdFuture,
};
use std::ptr::NonNull;
use std::task::{
LocalWaker,
Poll as StdPoll,
UnsafeWake,
Waker,
};
/// Convert an 0.3 `Future` to an 0.1 `Future`.
#[derive(Debug)]
pub struct Compat<T>(Pin<Box<T>>);
impl<T> Compat<T> {
/// Create a new `Compat` backed by `future`.
pub fn new(future: T) -> Compat<T> {
Compat(Box::pinned(future))
}
}
/// Convert a value into one that can be used with `await!`.
pub trait IntoAwaitable {
type Awaitable;
fn into_awaitable(self) -> Self::Awaitable;
}
impl<T> IntoAwaitable for T
where T: StdFuture,
{
type Awaitable = Self;
fn into_awaitable(self) -> Self {
self
}
}
impl<T, Item, Error> Future for Compat<T>
where T: StdFuture<Output = Result<Item, Error>>,
{
type Item = Item;
type Error = Error;
fn poll(&mut self) -> Poll<Item, Error> {
use futures::Async::*;
let local_waker = noop_local_waker();
let res = self.0.as_mut().poll(&local_waker);
match res {
StdPoll::Ready(Ok(val)) => Ok(Ready(val)),
StdPoll::Ready(Err(err)) => Err(err),
StdPoll::Pending => Ok(NotReady),
}
}
}
// ===== NoopWaker =====
struct NoopWaker;
fn noop_local_waker() -> LocalWaker {
let w: NonNull<NoopWaker> = NonNull::dangling();
unsafe { LocalWaker::new(w) }
}
fn noop_waker() -> Waker {
let w: NonNull<NoopWaker> = NonNull::dangling();
unsafe { Waker::new(w) }
}
unsafe impl UnsafeWake for NoopWaker {
unsafe fn clone_raw(&self) -> Waker {
noop_waker()
}
unsafe fn drop_raw(&self) {
}
unsafe fn wake(&self) {
panic!("NoopWake cannot wake");
}
}

View File

@ -0,0 +1,68 @@
use futures::{Future, Async};
use std::marker::Unpin;
use std::future::Future as StdFuture;
use std::pin::Pin;
use std::task::{LocalWaker, Poll as StdPoll};
/// Converts an 0.1 `Future` into an 0.3 `Future`.
#[derive(Debug)]
pub struct Compat<T>(T);
pub(crate) fn convert_poll<T, E>(poll: Result<Async<T>, E>) -> StdPoll<Result<T, E>> {
use futures::Async::{Ready, NotReady};
match poll {
Ok(Ready(val)) => StdPoll::Ready(Ok(val)),
Ok(NotReady) => StdPoll::Pending,
Err(err) => StdPoll::Ready(Err(err)),
}
}
pub(crate) fn convert_poll_stream<T, E>(
poll: Result<Async<Option<T>>, E>) -> StdPoll<Option<Result<T, E>>>
{
use futures::Async::{Ready, NotReady};
match poll {
Ok(Ready(Some(val))) => StdPoll::Ready(Some(Ok(val))),
Ok(Ready(None)) => StdPoll::Ready(None),
Ok(NotReady) => StdPoll::Pending,
Err(err) => StdPoll::Ready(Some(Err(err))),
}
}
/// Convert a value into one that can be used with `await!`.
pub trait IntoAwaitable {
type Awaitable;
/// Convert `self` into a value that can be used with `await!`.
fn into_awaitable(self) -> Self::Awaitable;
}
impl<T: Future + Unpin> IntoAwaitable for T {
type Awaitable = Compat<T>;
fn into_awaitable(self) -> Self::Awaitable {
Compat(self)
}
}
impl<T> StdFuture for Compat<T>
where T: Future + Unpin
{
type Output = Result<T::Item, T::Error>;
fn poll(mut self: Pin<&mut Self>, _lw: &LocalWaker) -> StdPoll<Self::Output> {
use futures::Async::{Ready, NotReady};
// TODO: wire in cx
match self.0.poll() {
Ok(Ready(val)) => StdPoll::Ready(Ok(val)),
Ok(NotReady) => StdPoll::Pending,
Err(e) => StdPoll::Ready(Err(e)),
}
}
}

View File

@ -1,11 +1,11 @@
use tokio_io::AsyncWrite;
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use std::io;
use std::future::Future;
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
/// A future used to fully flush an I/O object.
#[derive(Debug)]
@ -13,7 +13,7 @@ pub struct Flush<'a, T: ?Sized + 'a> {
writer: &'a mut T,
}
// PinMut is never projected to fields
// Pin is never projected to fields
impl<'a, T: ?Sized> Unpin for Flush<'a, T> {}
impl<'a, T: AsyncWrite + ?Sized> Flush<'a, T> {
@ -25,8 +25,8 @@ impl<'a, T: AsyncWrite + ?Sized> Flush<'a, T> {
impl<'a, T: AsyncWrite + ?Sized> Future for Flush<'a, T> {
type Output = io::Result<()>;
fn poll(mut self: PinMut<Self>, _cx: &mut task::Context) -> Poll<Self::Output> {
use crate::async_await::compat::forward::convert_poll;
fn poll(mut self: Pin<&mut Self>, _wx: &LocalWaker) -> Poll<Self::Output> {
use crate::compat::forward::convert_poll;
convert_poll(self.writer.poll_flush())
}
}

View File

@ -1,11 +1,11 @@
use tokio_io::AsyncRead;
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use std::future::Future;
use std::task::{self, Poll};
use std::io;
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
/// A future which can be used to read bytes.
#[derive(Debug)]
@ -29,8 +29,8 @@ impl<'a, T: AsyncRead + ?Sized> Read<'a, T> {
impl<'a, T: AsyncRead + ?Sized> Future for Read<'a, T> {
type Output = io::Result<usize>;
fn poll(mut self: PinMut<Self>, _cx: &mut task::Context) -> Poll<Self::Output> {
use crate::async_await::compat::forward::convert_poll;
fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll<Self::Output> {
use crate::compat::forward::convert_poll;
let this = &mut *self;
convert_poll(this.reader.poll_read(this.buf))

View File

@ -1,14 +1,12 @@
use tokio_io::AsyncRead;
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use futures_util::try_ready;
use std::future::Future;
use std::task::{self, Poll};
use std::io;
use std::marker::Unpin;
use std::mem;
use core::pin::PinMut;
use std::pin::Pin;
/// A future which can be used to read exactly enough bytes to fill a buffer.
#[derive(Debug)]
@ -36,8 +34,8 @@ fn eof() -> io::Error {
impl<'a, T: AsyncRead + ?Sized> Future for ReadExact<'a, T> {
type Output = io::Result<()>;
fn poll(mut self: PinMut<Self>, _cx: &mut task::Context) -> Poll<Self::Output> {
use crate::async_await::compat::forward::convert_poll;
fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll<Self::Output> {
use crate::compat::forward::convert_poll;
let this = &mut *self;

View File

@ -1,11 +1,11 @@
use tokio_io::AsyncWrite;
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use std::future::Future;
use std::task::{self, Poll};
use std::io;
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
/// A future used to write data.
#[derive(Debug)]
@ -29,8 +29,8 @@ impl<'a, T: AsyncWrite + ?Sized> Write<'a, T> {
impl<'a, T: AsyncWrite + ?Sized> Future for Write<'a, T> {
type Output = io::Result<usize>;
fn poll(mut self: PinMut<Self>, _cx: &mut task::Context) -> Poll<io::Result<usize>> {
use crate::async_await::compat::forward::convert_poll;
fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll<io::Result<usize>> {
use crate::compat::forward::convert_poll;
let this = &mut *self;
convert_poll(this.writer.poll_write(this.buf))

View File

@ -1,14 +1,12 @@
use tokio_io::AsyncWrite;
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use futures_util::try_ready;
use std::future::Future;
use std::task::{self, Poll};
use std::io;
use std::marker::Unpin;
use std::mem;
use core::pin::PinMut;
use std::pin::Pin;
/// A future used to write the entire contents of a buffer.
#[derive(Debug)]
@ -36,8 +34,8 @@ fn zero_write() -> io::Error {
impl<'a, T: AsyncWrite + ?Sized> Future for WriteAll<'a, T> {
type Output = io::Result<()>;
fn poll(mut self: PinMut<Self>, _cx: &mut task::Context) -> Poll<io::Result<()>> {
use crate::async_await::compat::forward::convert_poll;
fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll<io::Result<()>> {
use crate::compat::forward::convert_poll;
let this = &mut *self;

View File

@ -1,4 +1,12 @@
#![feature(futures_api, await_macro, pin, arbitrary_self_types)]
#![cfg(feature = "async-await-preview")]
#![feature(
rust_2018_preview,
arbitrary_self_types,
async_await,
await_macro,
futures_api,
pin,
)]
#![doc(html_root_url = "https://docs.rs/tokio-async-await/0.1.3")]
#![deny(missing_docs, missing_debug_implementations)]
@ -7,39 +15,31 @@
//! A preview of Tokio w/ `async` / `await` support.
extern crate futures;
extern crate futures_core;
extern crate futures_util;
extern crate tokio_io;
// Re-export all of Tokio
pub use tokio_main::{
// Modules
clock,
codec,
executor,
fs,
io,
net,
reactor,
runtime,
timer,
util,
// Functions
run,
spawn,
};
pub mod sync {
//! Asynchronous aware synchronization
pub use tokio_channel::{
mpsc,
oneshot,
};
/// Extracts the successful type of a `Poll<Result<T, E>>`.
///
/// This macro bakes in propagation of `Pending` and `Err` signals by returning early.
macro_rules! try_ready {
($x:expr) => {
match $x {
std::task::Poll::Ready(Ok(x)) => x,
std::task::Poll::Ready(Err(e)) =>
return std::task::Poll::Ready(Err(e.into())),
std::task::Poll::Pending =>
return std::task::Poll::Pending,
}
}
}
pub mod async_await;
#[macro_use]
mod await;
pub mod compat;
pub mod io;
pub mod sink;
pub mod stream;
/*
pub mod prelude {
//! A "prelude" for users of the `tokio` crate.
//!
@ -69,33 +69,47 @@ pub mod prelude {
},
};
}
*/
use futures_core::{
Future as Future03,
};
// Rename the `await` macro in `std`
// Rename the `await` macro in `std`. This is used by the redefined
// `await` macro in this crate.
#[doc(hidden)]
pub use std::await as std_await;
/*
use std::future::{Future as StdFuture};
fn run<T: futures::Future<Item = (), Error = ()>>(t: T) {
drop(t);
}
async fn map_ok<T: StdFuture>(future: T) -> Result<(), ()> {
let _ = await!(future);
Ok(())
}
/// Like `tokio::run`, but takes an `async` block
pub fn run_async<F>(future: F)
where F: Future03<Output = ()> + Send + 'static,
where F: StdFuture<Output = ()> + Send + 'static,
{
use futures_util::future::FutureExt;
use crate::async_await::compat::backward;
use async_await::compat::backward;
let future = backward::Compat::new(map_ok(future));
let future = future.map(|_| Ok(()));
run(backward::Compat::new(future))
run(future);
unimplemented!();
}
*/
/*
/// Like `tokio::spawn`, but takes an `async` block
pub fn spawn_async<F>(future: F)
where F: Future03<Output = ()> + Send + 'static,
where F: StdFuture<Output = ()> + Send + 'static,
{
use futures_util::future::FutureExt;
use crate::async_await::compat::backward;
let future = future.map(|_| Ok(()));
spawn(backward::Compat::new(future));
spawn(backward::Compat::new(async || {
let _ = await!(future);
Ok(())
}));
}
*/

View File

@ -1,10 +1,10 @@
use futures::Sink;
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use std::future::Future;
use std::task::{self, Poll};
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
/// Future for the `SinkExt::send_async` combinator, which sends a value to a
/// sink and then waits until the sink has fully flushed.
@ -28,17 +28,12 @@ impl<'a, T: Sink + Unpin + ?Sized> Send<'a, T> {
impl<T: Sink + Unpin + ?Sized> Future for Send<'_, T> {
type Output = Result<(), T::SinkError>;
fn poll(mut self: PinMut<Self>, _cx: &mut task::Context) -> Poll<Self::Output> {
use crate::async_await::compat::forward::convert_poll;
fn poll(mut self: Pin<&mut Self>, _lw: &task::LocalWaker) -> Poll<Self::Output> {
use crate::compat::forward::convert_poll;
use futures::AsyncSink::{Ready, NotReady};
use futures_util::try_ready;
// use crate::compat::forward::convert_poll;
let this = &mut *self;
if let Some(item) = this.item.take() {
match this.sink.start_send(item) {
if let Some(item) = self.item.take() {
match self.sink.start_send(item) {
Ok(Ready) => {}
Ok(NotReady(val)) => {
self.item = Some(val);
@ -52,7 +47,7 @@ impl<T: Sink + Unpin + ?Sized> Future for Send<'_, T> {
// we're done sending the item, but want to block on flushing the
// sink
try_ready!(convert_poll(this.sink.poll_complete()));
try_ready!(convert_poll(self.sink.poll_complete()));
Poll::Ready(Ok(()))
}

View File

@ -1,9 +1,9 @@
use futures::Stream;
use futures_core::future::Future;
use futures_core::task::{self, Poll};
use std::future::Future;
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
/// A future of the next element of a stream.
#[derive(Debug)]
@ -22,10 +22,9 @@ impl<'a, T: Stream + Unpin> Next<'a, T> {
impl<'a, T: Stream + Unpin> Future for Next<'a, T> {
type Output = Option<Result<T::Item, T::Error>>;
fn poll(self: PinMut<Self>, _cx: &mut task::Context) -> Poll<Self::Output> {
use crate::async_await::compat::forward::convert_poll_stream;
fn poll(mut self: Pin<&mut Self>, _lw: &LocalWaker) -> Poll<Self::Output> {
use crate::compat::forward::convert_poll_stream;
convert_poll_stream(
PinMut::get_mut(self).stream.poll())
convert_poll_stream(self.stream.poll())
}
}

View File

@ -16,10 +16,5 @@ Channels for asynchronous communication using Tokio.
"""
categories = ["asynchronous"]
[features]
# This feature comes with no promise of stability. Things will break with each
# patch release. Use at your own risk.
async-await-preview = []
[dependencies]
futures = "0.1.23"

View File

@ -1,10 +0,0 @@
use {oneshot, mpsc};
use std::marker::Unpin;
impl<T> Unpin for oneshot::Sender<T> {}
impl<T> Unpin for oneshot::Receiver<T> {}
impl<T> Unpin for mpsc::Sender<T> {}
impl<T> Unpin for mpsc::UnboundedSender<T> {}
impl<T> Unpin for mpsc::Receiver<T> {}

View File

@ -1,8 +1,5 @@
#![doc(html_root_url = "https://docs.rs/tokio-channel/0.1.0")]
#![deny(missing_docs, warnings, missing_debug_implementations)]
#![cfg_attr(feature = "async-await-preview", feature(
pin,
))]
//! Asynchronous channels.
//!
@ -15,8 +12,3 @@ pub mod mpsc;
pub mod oneshot;
mod lock;
// ===== EXPERIMENTAL async / await support =====
#[cfg(feature = "async-await-preview")]
mod async_await;

View File

@ -18,11 +18,6 @@ Event loop that drives Tokio I/O resources.
"""
categories = ["asynchronous", "network-programming"]
[features]
# This feature comes with no promise of stability. Things will break with each
# patch release. Use at your own risk.
async-await-preview = []
[dependencies]
crossbeam-utils = "0.5.0"
futures = "0.1.19"

View File

@ -1,5 +0,0 @@
use Registration;
use std::marker::Unpin;
impl Unpin for Registration {}

View File

@ -1,6 +1,5 @@
#![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.5")]
#![deny(missing_docs, warnings, missing_debug_implementations)]
#![cfg_attr(feature = "async-await-preview", feature(pin))]
//! Event loop that drives Tokio I/O resources.
//!
@ -757,8 +756,3 @@ impl Error for SetFallbackError {
"attempted to set fallback reactor while already configured"
}
}
// ===== EXPERIMENTAL async / await support =====
#[cfg(feature = "async-await-preview")]
mod async_await;